mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.19.2007 a714dc56fbe8419a6f0e4e8ffd36384009a89557
opendj-sdk/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -304,4 +304,25 @@
  {
    return list.keySet().iterator();
  }
  /**
   * Check that all the ChangeNumbers in the covered serverState are also in
   * this serverState.
   *
   * @param covered The ServerState that needs to be checked.
   * @return A boolean indicating if this ServerState covers the ServerState
   *         given in parameter.
   */
  public boolean cover(ServerState covered)
  {
    for (ChangeNumber coveredChange : covered.list.values())
    {
      ChangeNumber change = this.list.get(coveredChange.getServerId());
      if ((change == null) || (change.older(coveredChange)))
      {
        return false;
      }
    }
    return true;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -82,6 +82,68 @@
  /**
   * Finds the domain for a given DN.
   *
   * @param dn   The DN for which the domain must be returned.
   * @param op   An optional operation for which the check is done.
   *             Can be null is the request has no associated operation.
   * @return     The domain for this DN.
   */
  public static ReplicationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special replication code on Operation that are
     * specifically marked as don't synchronize.
     */
    if ((op != null) && op.dontSynchronize())
      return null;
    ReplicationDomain domain = null;
    DN temp = dn;
    do
    {
      domain = domains.get(temp);
      temp = temp.getParentDNInSuffix();
      if (temp == null)
      {
        break;
      }
    } while (domain == null);
    return domain;
  }
  /**
   * Creates a new domain from its configEntry, do the
   * necessary initialization and starts it so that it is
   * fully operational when this method returns.
   * @param configuration The entry whith the configuration of this domain.
   * @return The domain created.
   * @throws ConfigException When the configuration is not valid.
   */
  public static ReplicationDomain createNewDomain(
      MultimasterDomainCfg configuration)
      throws ConfigException
  {
    ReplicationDomain domain;
    domain = new ReplicationDomain(configuration);
    domains.put(domain.getBaseDN(), domain);
    domain.start();
    return domain;
  }
  /**
   * Deletes a domain.
   * @param dn : the base DN of the domain to delete.
   */
  public static void deleteDomain(DN dn)
  {
    ReplicationDomain domain = domains.remove(dn);
    if (domain != null)
      domain.shutdown();
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -149,23 +211,6 @@
  }
  /**
   * Creates a new domain from its configEntry, do the
   * necessary initialization and starts it so that it is
   * fully operational when this method returns.
   * @param configuration The entry whith the configuration of this domain.
   * @throws ConfigException When the configuration is not valid.
   */
  private void createNewDomain(
      MultimasterDomainCfg configuration)
      throws ConfigException
  {
    ReplicationDomain domain;
    domain = new ReplicationDomain(configuration);
    domains.put(domain.getBaseDN(), domain);
    domain.start();
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -354,55 +399,6 @@
  }
  /**
   * Finds the domain for a given DN.
   *
   * @param dn   The DN for which the domain must be returned.
   * @param op   An optional operation for which the check is done.
   *             Can be null is the request has no associated operation.
   * @return     The domain for this DN.
   */
  public static ReplicationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special replication code on Operation that are
     * specifically marked as don't synchronize.
     */
    if ((op != null) && op.dontSynchronize())
      return null;
    ReplicationDomain domain = null;
    DN temp = dn;
    do
    {
      domain = domains.get(temp);
      temp = temp.getParentDNInSuffix();
      if (temp == null)
      {
        break;
      }
    } while (domain == null);
    return domain;
  }
  /**
   * Generic code for all the postOperation entry point.
   *
   * @param operation The Operation for which the post-operation is called.
   * @param dn The Dn for which the post-operation is called.
   */
  private void genericPostOperation(Operation operation, DN dn)
  {
    ReplicationDomain domain = findDomain(dn, operation);
    if (domain == null)
      return;
    domain.synchronize(operation);
    return;
  }
  /**
   * This method is called whenever the server detects a modification
   * of the schema done by directly modifying the backing files
   * of the schema backend.
@@ -535,10 +531,7 @@
  public ConfigChangeResult applyConfigurationDelete(
      MultimasterDomainCfg configuration)
  {
    DN dn = configuration.getReplicationDN();
    ReplicationDomain domain = domains.remove(dn);
    if (domain != null)
      domain.shutdown();
    deleteDomain(configuration.getReplicationDN());
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
@@ -551,6 +544,23 @@
  {
    return true;
  }
  /**
   * Generic code for all the postOperation entry point.
   *
   * @param operation The Operation for which the post-operation is called.
   * @param dn The Dn for which the post-operation is called.
   */
  private void genericPostOperation(Operation operation, DN dn)
  {
    ReplicationDomain domain = findDomain(dn, operation);
    if (domain == null)
      return;
    domain.synchronize(operation);
    return;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -27,19 +27,24 @@
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Operation;
/**
 * This class is use to store the list of operations currently
 * This class is use to store an operation currently
 * in progress and not yet committed in the database.
 */
public class PendingChange
public class PendingChange implements Comparable<PendingChange>
{
  private ChangeNumber changeNumber;
  private boolean committed;
  private UpdateMessage msg;
  private Operation op;
  private ServerState dependencyState = null;
  private DN targetDN = null;
  /**
   * Construct a new PendingChange.
@@ -121,4 +126,66 @@
    this.op = op;
  }
  /**
   * Add the given ChangeNumber in the list of dependencies of this
   * PendingChange.
   *
   * @param changeNumber The ChangeNumber to add in the list of dependencies
   *                     of this PendingChange.
   */
  public void addDependency(ChangeNumber changeNumber)
  {
    if (dependencyState == null)
    {
      dependencyState = new ServerState();
    }
    dependencyState.update(changeNumber);
  }
  /**
   * Check if the given ServerState covers the dependencies of this
   * PendingChange.
   *
   * @param state The ServerState for which dependencies must be checked,
   *
   * @return A boolean indicating if the given ServerState covers the
   *         dependencies of this PendingChange.
   */
  public boolean dependenciesIsCovered(ServerState state)
  {
    return state.cover(dependencyState);
  }
  /**
   * Get the Target DN of this message.
   *
   * @return The target DN of this message.
   */
  public DN getTargetDN()
  {
    synchronized (this)
    {
      if (targetDN != null)
        return targetDN;
      else
      {
        try
        {
          targetDN = DN.decode(msg.getDn());
        }
        catch (DirectoryException e)
        {
        }
      }
      return targetDN;
    }
  }
  /**
   * {@inheritDoc}
   */
  public int compareTo(PendingChange o)
  {
    return this.getChangeNumber().compareTo(o.getChangeNumber());
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
New file
@@ -0,0 +1,559 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Operation;
/**
 *
 * This class is use to store the list of operations currently
 * in progress and not yet committed in the database.
 */
public class PendingChanges
{
  /**
   * A map used to store the pending changes.
   */
  private SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  /**
   * A sorted set containing the list of PendingChanges that have
   * not been replayed correctly because they are dependent on
   * another change to be completed.
   */
  private SortedSet<PendingChange> dependentChanges =
    new TreeSet<PendingChange>();
  /**
   * The ChangeNumberGenerator to use to create new unique ChangeNumbers
   * for each operation done on the replication domain.
   */
  private ChangeNumberGenerator changeNumberGenerator;
  /**
   * The Replicationbroker that will be used to send UpdateMessage.
   */
  private ReplicationBroker broker;
  /**
   * The ServerState that will be updated when UpdateMessage are committed.
   */
  private ServerState state;
  /**
   * Creates a new PendingChanges using the provided ChangeNumberGenerator.
   *
   * @param changeNumberGenerator The ChangeNumberGenerator to use to create
   *                               new unique ChangeNumbers.
   * @param broker  The Replicationbroker that will be used to send
   *                UpdateMessage.
   * @param state   The ServerState that will be updated when UpdateMessage
   *                are committed.
   */
  public PendingChanges(
      ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
      ServerState state)
  {
    this.changeNumberGenerator = changeNumberGenerator;
    this.broker = broker;
    this.state = state;
  }
  /**
   * Remove and return an update form the pending changes list.
   *
   * @param changeNumber The ChangeNumber of the update to remove.
   *
   * @return The UpdateMessage that was just removed.
   */
  public synchronized UpdateMessage remove(ChangeNumber changeNumber)
  {
    return pendingChanges.remove(changeNumber).getMsg();
  }
  /**
   * Returns the number of update currently in the list.
   *
   * @return The number of update currently in the list.
   */
  public synchronized int size()
  {
    return pendingChanges.size();
  }
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param op           The Operation associated of the update when the
   *                     update is a local update.
   * @param msg          The message associated to the update when the update
   *                     was received from a replication server.
   */
  public synchronized void commit(ChangeNumber changeNumber,
      Operation op, UpdateMessage msg)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    if (curChange == null)
    {
      throw new NoSuchElementException();
    }
    curChange.setCommitted(true);
    if (op.isSynchronizationOperation())
      curChange.setOp(op);
    else
      curChange.setMsg(msg);
  }
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   */
  public synchronized void commit(ChangeNumber changeNumber)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    if (curChange == null)
    {
      throw new NoSuchElementException();
    }
    curChange.setCommitted(true);
  }
  /**
   * Add a new UpdateMessage to the pending list from the provided local
   * operation.
   *
   * @param operation The local operation for which an UpdateMessage mus
   *                  be added in the pending list.
   * @return The ChangeNumber now associated to the operation.
   */
  public synchronized ChangeNumber putLocalOperation(Operation operation)
  {
    ChangeNumber changeNumber;
    changeNumber = changeNumberGenerator.NewChangeNumber();
    PendingChange change = new PendingChange(changeNumber, operation, null);
    pendingChanges.put(changeNumber, change);
    return changeNumber;
  }
  /**
   * Add a new UpdateMessage that was received from the replication server
   * to the pendingList.
   *
   * @param update The UpdateMessage that was received from the replication
   *               server and that will be added to the pending list.
   */
  public synchronized void putRemoteUpdate(UpdateMessage update)
  {
    ChangeNumber changeNumber = update.getChangeNumber();
    changeNumberGenerator.adjust(changeNumber);
    pendingChanges.put(changeNumber, new PendingChange(changeNumber, null,
                                                        update));
  }
  /**
   * Push all committed local changes to the replicationServer service.
   *
   * @return The number of pushed updates.
   */
  public synchronized int pushCommittedChanges()
  {
    int numSentUpdates = 0;
    if (pendingChanges.isEmpty())
      return numSentUpdates;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
    return numSentUpdates;
  }
  /**
   * Get the first update in the list that have some dependencies cleared.
   *
   * @return The UpdateMessage to be handled.
   */
  public synchronized UpdateMessage getNextUpdate()
  {
    /*
     * Parse the list of Update with dependencies and check if the dependencies
     * are now cleared until an Update withour dependencies is found.
     */
    for (PendingChange change : dependentChanges)
    {
      if (change.dependenciesIsCovered(state))
      {
        dependentChanges.remove(change);
        return change.getMsg();
      }
    }
    return null;
  }
  /**
   * Check if the given AddOperation has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   * AddOperation depends on
   *
   * - DeleteOperation done on the same DN
   * - ModifyDnOperation with the same target DN as the ADD DN
   * - ModifyDnOperation with new DN equals to the ADD DN parent
   * - AddOperation done on the parent DN of the ADD DN
   *
   * @param op The AddOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(AddOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check is the operation to be run is a deleteOperation on the
             * same DN.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * parent of the current AddOperation.
             */
            if (pendingChange.getTargetDN().isAncestorOf(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            /*
             * Check if the operation to be run is ModifyDnOperation with
             * the same target DN as the ADD DN
             * or a ModifyDnOperation with new DN equals to the ADD DN parent
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            else
            {
              ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
              if (pendingModDn.newDNIsParent(targetDn))
              {
                hasDependencies = true;
                addDependency(change, pendingChange);
              }
            }
          }
        }
      }
    }
    return hasDependencies;
  }
  /**
   * Check if the given ModifyOperation has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   *
   * ModifyOperation depends on
   * - AddOperation done on the same DN
   *
   * @param op The ModifyOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * same DN.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
        }
      }
    }
    return hasDependencies;
  }
  /**
   * Mark the first pendingChange as dependent on the second PendingChange.
   * @param dependentChange The PendingChange that depend on the second
   *                        PendingChange.
   * @param pendingChange   The PendingChange on which the first PendingChange
   *                        is dependent.
   */
  private void addDependency(
      PendingChange dependentChange, PendingChange pendingChange)
  {
    dependentChange.addDependency(pendingChange.getChangeNumber());
    dependentChanges.add(dependentChange);
  }
  /**
   * Check if the given ModifyDNMsg has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   *
   * Modify DN Operation depends on
   * - AddOperation done on the same DN as the target DN of the MODDN operation
   * - AddOperation done on the new parent of the MODDN  operation
   * - DeleteOperation done on the new DN of the MODDN operation
   * - ModifyDNOperation done from the new DN of the MODDN operation
   *
   * @param msg The ModifyDNMsg to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyDNMsg msg)
  {
    boolean hasDependencies = false;
    ChangeNumber changeNumber = msg.getChangeNumber();
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    DN targetDn = change.getTargetDN();
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            // Check if the target of the Delete is the same
            // as the new DN of this ModifyDN
            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            // Check if the Add Operation was done on the new parent of
            // the MODDN  operation
            if (msg.newParentIsEqual(pendingChange.getTargetDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            // Check if the AddOperation was done on the same DN as the
            // target DN of the MODDN operation
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            // Check if the ModifyDNOperation was done from the new DN of
            // the MODDN operation
            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
        }
      }
    }
    return hasDependencies;
  }
  /**
   * Check if the given DeleteOperation has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   *
   * DeleteOperation depends on
   * - DeleteOperation done on children DN
   * - ModifyDnOperation with target DN that are children of the DEL DN
   * - AddOperation done on the same DN
   *
   *
   * @param op The DeleteOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(DeleteOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check if the operation to be run is a deleteOperation on a
             * children of the current DeleteOperation.
             */
            if (pendingChange.getTargetDN().isDescendantOf(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * parent of the current DeleteOperation.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            /*
             * Check if the operation to be run is an ModifyDNOperation
             * on a children of the current DeleteOperation
             */
            if (pendingChange.getTargetDN().isDescendantOf(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
        }
      }
    }
    return hasDependencies;
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -128,17 +129,14 @@
{
  private ReplicationMonitor monitor;
  private ChangeNumberGenerator changeNumberGenerator;
  private ReplicationBroker broker;
  private List<ListenerThread> synchroThreads =
    new ArrayList<ListenerThread>();
  private final SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMessage>();
  private int numRcvdUpdates = 0;
  private int numSentUpdates = 0;
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
  private AtomicInteger numProcessedUpdates = new AtomicInteger();
  private int debugCount = 0;
  private PersistentServerState state;
@@ -150,6 +148,19 @@
  private int maxSendDelay = 0;
  /**
   * This object is used to store the list of update currently being
   * done on the local database.
   * It contain both the update that are done directly on this server
   * and the updates that was done on another server, transmitted
   * by the replication server and that are currently replayed.
   * It is usefull to make sure that dependencies between operations
   * are correctly fullfilled, that the local operations are sent in a
   * correct order to the replication server and that the ServerState
   * is not updated too early.
   */
  private PendingChanges pendingChanges;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
@@ -245,7 +256,7 @@
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 1;
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
  private Collection<String> replicationServers;
@@ -317,12 +328,6 @@
    DirectoryServer.registerMonitorProvider(monitor);
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
    /*
     * create the broker object used to publish and receive changes
     */
    try
@@ -348,6 +353,14 @@
      */
    }
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    pendingChanges =
      new PendingChanges(new ChangeNumberGenerator(serverId, state),
                         broker, state);
    // listen for changes on the configuration
    configuration.addChangeListener(this);
  }
@@ -444,23 +457,37 @@
       * of the parent entry
       */
      // There is a potential of perfs improvement here
      // if we could avoid the following parent entry retrieval
      DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
      if (parentDnFromCtx != null)
      String parentUid = ctx.getParentUid();
      // root entry have no parent,
      // there is no need to check for it.
      if (parentUid != null)
      {
        DN entryDN = addOperation.getEntryDN();
        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
        if ((parentDnFromEntryDn != null)
            && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
        // There is a potential of perfs improvement here
        // if we could avoid the following parent entry retrieval
        DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
        if (parentDnFromCtx == null)
        {
          // parentEntry has been renamed
          // replication name conflict resolution is expected to fix that
          // later in the flow
          // The parent does not exist with the specified unique id
          // stop the operation with NO_SUCH_OBJECT and let the
          // conflict resolution or the dependency resolution solve this.
          addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
          return new SynchronizationProviderResult(false);
        }
        else
        {
          DN entryDN = addOperation.getEntryDN();
          DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
          if ((parentDnFromEntryDn != null)
              && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
          {
            // parentEntry has been renamed
            // replication name conflict resolution is expected to fix that
            // later in the flow
            addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
            return new SynchronizationProviderResult(false);
          }
        }
      }
    }
    return new SynchronizationProviderResult(true);
@@ -633,89 +660,96 @@
   */
  public UpdateMessage receive()
  {
    synchronized (broker)
    UpdateMessage update = pendingChanges.getNextUpdate();
    if (update == null)
    {
      UpdateMessage update = null;
      while (update == null)
      synchronized (broker)
      {
        ReplicationMessage msg;
        try
        while (update == null)
        {
          msg = broker.receive();
          if (msg == null)
          ReplicationMessage msg;
          try
          {
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
          }
          else if (msg instanceof UpdateMessage)
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            msg = broker.receive();
            if (msg == null)
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
              // The server is in the shutdown process
              return null;
            }
            catch(DirectoryException de)
            log("Broker received message :" + msg);
            if (msg instanceof AckMessage)
            {
              // Returns an error message to notify the sender
              int msgID = de.getMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
              AckMessage ack = (AckMessage) msg;
              receiveAck(ack);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            else if (msg instanceof InitializeRequestMessage)
            {
              // Another server requests us to provide entries
              // for a total update
              InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
              try
              {
                initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                                 null);
              }
              catch(DirectoryException de)
              {
                // Returns an error message to notify the sender
                int msgID = de.getMessageID();
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                                   msgID, de.getMessage());
                broker.publish(errorMsg);
              }
            }
            else if (msg instanceof InitializeTargetMessage)
            {
              // Another server is exporting its entries to us
              InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
              try
              {
                importBackend(initMsg);
              }
              catch(DirectoryException de)
              {
                // Return an error message to notify the sender
                int msgID = de.getMessageID();
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                                   msgID, de.getMessage());
                log(getMessage(msgID,
                               backend.getBackendID()) + de.getMessage());
                broker.publish(errorMsg);
              }
            }
            catch(DirectoryException de)
            else if (msg instanceof ErrorMessage)
            {
              // Return an error message to notify the sender
              int msgID = de.getMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
              if (ieContext != null)
              {
                // This is an error termination for the 2 following cases :
                // - either during an export
                // - or before an import really started
                //   For example, when we publish a request and the
                //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
            }
            else if (msg instanceof UpdateMessage)
            {
              update = (UpdateMessage) msg;
              receiveUpdate(update);
            }
          }
          else if (msg instanceof ErrorMessage)
          catch (SocketTimeoutException e)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
            // just retry
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
        }
      }
      return update;
    }
    return update;
  }
  /**
@@ -725,21 +759,8 @@
   */
  public void receiveUpdate(UpdateMessage update)
  {
    ChangeNumber changeNumber = update.getChangeNumber();
    synchronized (pendingChanges)
    {
      if (pendingChanges.containsKey(changeNumber))
      {
        /*
         * This should never happen,
         * TODO log error and throw exception
         */
      }
      pendingChanges.put(changeNumber,
          new PendingChange(changeNumber, null, update));
      numRcvdUpdates++;
    }
    pendingChanges.putRemoteUpdate(update);
    numRcvdUpdates.incrementAndGet();
  }
  /**
@@ -752,10 +773,9 @@
    UpdateMessage update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    synchronized (waitingAckMsgs)
    {
      update = waitingAckMsgs.get(changeNumber);
      waitingAckMsgs.remove(changeNumber);
      update = waitingAckMsgs.remove(changeNumber);
    }
    if (update != null)
    {
@@ -798,61 +818,55 @@
         * This is an operation type that we do not know about
         * It should never happen.
         */
        synchronized (pendingChanges)
        {
          pendingChanges.remove(curChangeNumber);
          int    msgID   = MSGID_UNKNOWN_TYPE;
          String message = getMessage(msgID, op.getOperationType().toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          return;
        }
        pendingChanges.remove(curChangeNumber);
        int    msgID   = MSGID_UNKNOWN_TYPE;
        String message = getMessage(msgID, op.getOperationType().toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
    }
    synchronized(pendingChanges)
    if (result == ResultCode.SUCCESS)
    {
      if (result == ResultCode.SUCCESS)
      try
      {
        PendingChange curChange = pendingChanges.get(curChangeNumber);
        if (curChange == null)
        {
          // This should never happen
          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
          String message = getMessage(msgID, curChangeNumber.toString(),
              op.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          return;
        }
        curChange.setCommitted(true);
        pendingChanges.commit(curChangeNumber, op, msg);
      }
      catch  (NoSuchElementException e)
      {
        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
        String message = getMessage(msgID, curChangeNumber.toString(),
                                    op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
        if (op.isSynchronizationOperation())
          curChange.setOp(op);
        else
          curChange.setMsg(msg);
        if (msg != null && isAssured)
      if (msg != null && isAssured)
      {
        synchronized (waitingAckMsgs)
        {
          // Add the assured message to the list of those whose acknowledgements
          // we are awaiting.
          // Add the assured message to the list of update that are
          // waiting acknowledgements
          waitingAckMsgs.put(curChangeNumber, msg);
        }
      }
      else if (!op.isSynchronizationOperation())
      {
        // Remove an unsuccessful non-replication operation from the pending
        // changes list.
        if (curChangeNumber != null)
        {
          pendingChanges.remove(curChangeNumber);
        }
      }
      pushCommittedChanges();
    }
    else if (!op.isSynchronizationOperation())
    {
      // Remove an unsuccessful non-replication operation from the pending
      // changes list.
      if (curChangeNumber != null)
      {
        pendingChanges.remove(curChangeNumber);
      }
    }
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
    // Wait for acknowledgement of an assured message.
    if (msg != null && isAssured)
@@ -880,7 +894,7 @@
   */
  public int getNumRcvdUpdates()
  {
    return numRcvdUpdates;
    return numRcvdUpdates.get();
  }
  /**
@@ -890,11 +904,11 @@
   */
  public int getNumSentUpdates()
  {
    return numSentUpdates;
    return numSentUpdates.get();
  }
  /**
   * get the number of updates in the pending list.
   * Get the number of updates in the pending list.
   *
   * @return The number of updates in the pending list
   */
@@ -1052,45 +1066,63 @@
  {
    Operation op = null;
    boolean done = false;
    boolean dependency = false;
    ChangeNumber changeNumber = null;
    int retryCount = 10;
    boolean firstTry = true;
    try
    {
      while (!done && retryCount-- > 0)
      while ((!dependency) && (!done) && (retryCount-- > 0))
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        changeNumber = OperationContext.getChangeNumber(op);
        if (changeNumber != null)
          changeNumberGenerator.adjust(changeNumber);
        op.run();
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
        {
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            done = solveNamingConflict(newOp, msg);
            dependency = pendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            done = solveNamingConflict(newOp, msg);
            dependency = pendingChanges.checkDependencies(newOp);
            if ((!dependency) && (!firstTry))
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            done = solveNamingConflict(newOp, msg);
          } else if (op instanceof ModifyDNOperation)
            dependency = pendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof ModifyDNOperation)
          {
            ModifyDNOperation newOp = (ModifyDNOperation) op;
            done = solveNamingConflict(newOp, msg);
            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
            dependency = pendingChanges.checkDependencies(newMsg);
            if (!dependency)
            {
              ModifyDNOperation newOp = (ModifyDNOperation) op;
              done = solveNamingConflict(newOp, msg);
            }
          }
          else
          {
@@ -1108,9 +1140,10 @@
        {
          done = true;
        }
        firstTry = false;
      }
      if (!done)
      if (!done && !dependency)
      {
        // Continue with the next change but the servers could now become
        // inconsistent.
@@ -1119,6 +1152,7 @@
        String message = getMessage(msgID, op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
        updateError(changeNumber);
      }
    }
@@ -1177,9 +1211,12 @@
    }
    finally
    {
      if (msg.isAssured())
        ack(msg.getChangeNumber());
      incProcessedUpdates();
      if (!dependency)
      {
        if (msg.isAssured())
          ack(msg.getChangeNumber());
        incProcessedUpdates();
      }
    }
  }
@@ -1193,12 +1230,9 @@
   */
  public void updateError(ChangeNumber changeNumber)
  {
    synchronized (pendingChanges)
    {
      PendingChange change = pendingChanges.get(changeNumber);
      change.setCommitted(true);
      pushCommittedChanges();
    }
    pendingChanges.commit(changeNumber);
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
  }
  /**
@@ -1210,15 +1244,7 @@
   */
  private ChangeNumber generateChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber;
    changeNumber = changeNumberGenerator.NewChangeNumber();
    PendingChange change = new PendingChange(changeNumber, operation, null);
    synchronized(pendingChanges)
    {
      pendingChanges.put(changeNumber, change);
    }
    return changeNumber;
    return pendingChanges.putLocalOperation(operation);
  }
@@ -1604,42 +1630,6 @@
  }
  /**
   * Push all committed local changes to the replicationServer service.
   * PRECONDITION : The pendingChanges lock must be held before calling
   * this method.
   */
  private void pushCommittedChanges()
  {
    if (pendingChanges.isEmpty())
      return;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -139,8 +139,9 @@
    for (Attribute a : userAttributes)
      elems.add(new LDAPAttribute(a).encode());
    for (Attribute a : operationalAttributes)
      elems.add(new LDAPAttribute(a).encode());
    if (operationalAttributes != null)
      for (Attribute a : operationalAttributes)
        elems.add(new LDAPAttribute(a).encode());
    encodedAttributes = ASN1Element.encodeValue(elems);
  }
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -35,6 +35,8 @@
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Operation;
/**
@@ -267,4 +269,86 @@
    this.newRDN = newRDN;
  }
  /**
   * Check if this MSG will change the DN of the target entry to be
   * the same as the dn given as a parameter.
   * @param targetDn the DN to use when checking if this MSG will change
   *                 the DN of the entry to a given DN.
   * @return A boolean indicating if the modify DN MSG will change the DN of
   *         the target entry to be the same as the dn given as a parameter.
   */
  public boolean newDNIsParent(DN targetDn)
  {
    try
    {
      String newStringDN = newRDN + "," + newSuperior;
      DN newDN = DN.decode(newStringDN);
      if (newDN.isAncestorOf(targetDn))
        return true;
      else
        return false;
    } catch (DirectoryException e)
    {
      // The DN was not a correct DN, and therefore does not a parent of the
      // DN given as a parameter.
      return false;
    }
  }
  /**
   * Check if the new dn of this ModifyDNMsg is the same as the targetDN
   * given in parameter.
   *
   * @param targetDN The targetDN to use to check for equality.
   *
   * @return A boolean indicating if the targetDN if the same as the new DN of
   *         the ModifyDNMsg.
   */
  public boolean newDNIsEqual(DN targetDN)
  {
    try
    {
      String newStringDN = newRDN + "," + newSuperior;
      DN newDN = DN.decode(newStringDN);
      if (newDN.equals(targetDN))
        return true;
      else
        return false;
    } catch (DirectoryException e)
    {
      // The DN was not a correct DN, and therefore does not match the
      // DN given as a parameter.
      return false;
    }
  }
  /**
   * Check if the new parent of the modifyDNMsg is the same as the targetDN
   * given in parameter.
   *
   * @param targetDN the targetDN to use when checking equality.
   *
   * @return A boolean indicating if the new parent of the modifyDNMsg is the
   *         same as the targetDN.
   */
  public boolean newParentIsEqual(DN targetDN)
  {
    try
    {
      DN newSuperiorDN = DN.decode(newSuperior);
      if (newSuperiorDN.equals(targetDN))
        return true;
      else
        return false;
    } catch (DirectoryException e)
    {
      // The newsuperior was not a correct DN, and therefore does not match the
      // DN given as a parameter.
      return false;
    }
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -90,7 +90,7 @@
   * @throws DataFormatException if the encoded byte array is not valid.
   * @throws UnsupportedEncodingException if UTF-8 is not supprted.
   */
  public UpdateMessage(byte[] in) throws DataFormatException,
  protected UpdateMessage(byte[] in) throws DataFormatException,
                                         UnsupportedEncodingException
  {
    /* read the changeNumber */
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -238,10 +238,13 @@
      {}
    }
    if ( (recentChangeNumber != null) &&
         (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
    if ( (recentChangeNumber != null) && (changeNumber != null))
    {
      flush();
      if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
         ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
      {
        flush();
      }
    }
    return new ReplicationIterator(serverId, db, changeNumber);
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -221,6 +221,7 @@
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        newSocket.setTcpNoDelay(true);
        newSocket.setKeepAlive(true);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket), queueSize);
        handler.start(null, serverId, serverURL, rcvWindow, this);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
New file
@@ -0,0 +1,553 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
import static org.testng.Assert.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.DomainFakeCfg;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.testng.annotations.*;
/**
 * Test that the dependencies are computed correctly when replaying
 * sequences of operations that requires to follow a given order
 * such as : ADD an entry, ADD a children entry.
 */
public class DependencyTest extends ReplicationTestCase
{
  private static final String BASEDN_STRING = "dc=example,dc=com";
  /**
   * Check that a sequence of dependents adds and mods is correctly ordered:
   * Using a deep dit :
   * dc=example,dc=com
   *       |
   *    dc=dependency1
   *       |
   *    dc=dependency2
   *       |
   *    dc=dependency2
   *       |
   *
   *       |
   *    dc=dependencyN
   * This test sends a sequence of interleaved ADD operations and MODIFY
   * operations to build such a dit.
   *
   * Then test that the sequence of Delete necessary to remove
   * all those entries is also correctly ordered.
   */
  @SuppressWarnings("unchecked")
  @Test(groups="slow")
  public void addModDelDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
    ReplicationDomain domain = null;
    DN baseDn = DN.decode(BASEDN_STRING);
    SynchronizationProvider replicationPlugin = null;
    short brokerId = 2;
    short serverId = 1;
    short replServerId = 1;
    int AddSequenceLength = 30;
    cleanDB();
    try
    {
      /*
       * FIRST PART :
       * Check that a sequence of dependent ADD is correctly ordered.
       *
       * - Create replication server
       * - Send sequence of ADD messages to the replication server
       * - Configure replication server
       * - check that the last entry has been correctly added
       */
      String entryldif =
        "dn:" + BASEDN_STRING + "\n"
         + "objectClass: top\n"
         + "objectClass: domain\n"
         + "entryuuid: " + stringUID(1) + "\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      // find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      replicationPlugin = new MultimasterReplication();
      DirectoryServer.registerSynchronizationProvider(replicationPlugin);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "addModDeldependency",
                                        0, replServerId, 0,
                                        AddSequenceLength*5+100, null);
      replServer = new ReplicationServer(conf);
      ReplicationBroker broker =
        openReplicationSession(baseDn, brokerId, 1000, replServerPort, 1000,
                               false);
      TimeThread.sleep(2000);
      // send a sequence of add operation
      String addDn = BASEDN_STRING;
      ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
                           new LinkedList<AttributeValue>());
        addDn = "dc=dependency" + sequence + "," + addDn;
        AddMsg addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
                     stringUID(sequence),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        ModifyMsg modifyMsg =
          new ModifyMsg(gen.NewChangeNumber(), DN.decode(addDn),
                        generatemods("description", "test"),
                        stringUID(sequence+1));
        broker.publish(modifyMsg);
      }
      // configure and start replication of dc=example,dc=com on the server
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      domainConf.setHeartbeatInterval(100000);
      domain = MultimasterReplication.createNewDomain(domainConf);
      // check that last entry in sequence got added.
      Entry lastEntry = getEntry(DN.decode(addDn), 30000, true);
      assertNotNull(lastEntry,
                    "The last entry of the ADD sequence was not added.");
      // Check that all the modify have been replayed
      // (all the entries should have a description).
      addDn = BASEDN_STRING;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=dependency" + sequence + "," + addDn;
        boolean found =
          checkEntryHasAttribute(DN.decode(addDn), "description", "test",
                                 10000, true);
        if (!found)
        {
          fail("The modification was not replayed on entry " + addDn);
        }
      }
      /*
       * SECOND PART
       *
       * Now check that the dependencies between delete are correctly
       * managed.
       *
       * disable the domain while we publish the delete message to
       * to replication server so that when we enable it it receives the
       * delete operation in bulk.
       */
      domain.disable();
      Thread.sleep(2000);  // necesary because disable does not wait
                           // for full termination of all threads. (issue 1571)
      DN deleteDN = DN.decode(addDn);
      while (sequence-->1)
      {
        DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(),
                                         gen.NewChangeNumber(),
                                         stringUID(sequence + 1));
        broker.publish(delMsg);
        deleteDN = deleteDN.getParent();
      }
      domain.enable();
      // check that entry just below the base entry was deleted.
      // (we can't delete the base entry because some other tests might
      // have added other children)
      DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING);
      Entry baseEntry = getEntry(node1, 30000, false);
      assertNull(baseEntry,
                 "The last entry of the DEL sequence was not deleted.");
    }
    finally
    {
      if (replServer != null)
        replServer.shutdown();
      if (domain != null)
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  /**
   * Clean the database and replace with a single entry.
   *
   * @throws FileNotFoundException
   * @throws IOException
   * @throws Exception
   */
  private void cleanDB() throws FileNotFoundException, IOException, Exception
  {
    String baseentryldif =
      "dn:" + BASEDN_STRING + "\n"
       + "objectClass: top\n"
       + "objectClass: domain\n"
       + "dc: example\n"
       + "entryuuid: " + stringUID(1) + "\n";
      // Initialization :
      // Load the database with a single entry :
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String path = buildRoot + File.separator + "build" +
                    File.separator + "unit-tests" + File.separator +
                    "package"+ File.separator + "addModDelDependencyTest";
      OutputStream out = new FileOutputStream(new File(path));
      out.write(baseentryldif.getBytes());
      task("dn: ds-task-id=" + UUID.randomUUID()
          + ",cn=Scheduled Tasks,cn=Tasks\n"
          + "objectclass: top\n"
          + "objectclass: ds-task\n"
          + "objectclass: ds-task-import\n"
          + "ds-task-class-name: org.opends.server.tasks.ImportTask\n"
          + "ds-task-import-backend-id: userRoot\n"
          + "ds-task-import-ldif-file: " + path + "\n"
          + "ds-task-import-reject-file: " + path + "reject\n");
  }
  /**
   * Check that after a sequence of add/del/add done on the same DN
   * the second entry is in the database.
   * The unique id of the entry is used to check that the correct entry
   * has been added.
   * To increase the risks of failures a loop of add/del/add is done.
   */
  @SuppressWarnings("unchecked")
  @Test(groups="slow")
  public void addDelAddDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
    ReplicationDomain domain = null;
    DN baseDn = DN.decode(BASEDN_STRING);
    SynchronizationProvider replicationPlugin = null;
    short brokerId = 2;
    short serverId = 1;
    short replServerId = 1;
    int AddSequenceLength = 30;
    cleanDB();
    try
    {
      String entryldif = "dn:" + BASEDN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: domain\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      // find a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      replicationPlugin = new MultimasterReplication();
      DirectoryServer.registerSynchronizationProvider(replicationPlugin);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "addDelAdddependency", 0,
                                        replServerId,
                                        0, 5*AddSequenceLength+100, null);
      replServer = new ReplicationServer(conf);
      ReplicationBroker broker =
        openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000,
                               false);
      // send a sequence of add/del/add operations
      String addDn = BASEDN_STRING;
      ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        // add the entry a first time
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
                           new LinkedList<AttributeValue>());
        addDn = "dc=dependency" + sequence + "," + addDn;
        AddMsg addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
                     stringUID(sequence == 1 ? sequence : sequence +1000),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        // delete the entry
        DeleteMsg delMsg = new DeleteMsg(addDn, gen.NewChangeNumber(),
                                         stringUID(sequence+1));
        broker.publish(delMsg);
        // add again the entry with a new entryuuid.
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1001)),
                           new LinkedList<AttributeValue>());
        addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1001),
                     stringUID(sequence == 1 ? sequence : sequence +1000),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
      }
      // configure and start replication of dc=example,dc=com on the server
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      domain = MultimasterReplication.createNewDomain(domainConf);
      // check that all entries have been deleted and added
      // again by checking that they do have the correct entryuuid
      addDn = BASEDN_STRING;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=dependency" + sequence + "," + addDn;
        boolean found =
          checkEntryHasAttribute(DN.decode(addDn), "entryuuid",
                                 stringUID(sequence+1001),
                                 30000, true);
        if (!found)
        {
          fail("The second add was not replayed on entry " + addDn);
        }
      }
      DN deleteDN = DN.decode(addDn);
      while (sequence-->1)
      {
        DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(),
                                         gen.NewChangeNumber(),
                                         stringUID(sequence + 1001));
        broker.publish(delMsg);
        deleteDN = deleteDN.getParent();
      }
      // check that the database was cleaned successfully
      DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING);
      Entry baseEntry = getEntry(node1, 30000, false);
      assertNull(baseEntry,
        "The entry were not removed succesfully after test completion.");
    }
    finally
    {
      if (replServer != null)
        replServer.shutdown();
      if (domain != null)
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  /**
   * Check that the dependency of moddn operation are working by
   * issuing a set of Add operation followed by a modrdn of the added entry.
   */
  @SuppressWarnings("unchecked")
  @Test(groups="slow")
  public void addModdnDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
    ReplicationDomain domain = null;
    DN baseDn = DN.decode(BASEDN_STRING);
    SynchronizationProvider replicationPlugin = null;
    short brokerId = 2;
    short serverId = 1;
    short replServerId = 1;
    int AddSequenceLength = 30;
    cleanDB();
    try
    {
      String entryldif = "dn:" + BASEDN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: domain\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      // find a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      replicationPlugin = new MultimasterReplication();
      DirectoryServer.registerSynchronizationProvider(replicationPlugin);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "addModdndependency", 0,
                                        replServerId,
                                        0, 5*AddSequenceLength+100, null);
      replServer = new ReplicationServer(conf);
      ReplicationBroker broker =
        openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000,
                               false);
      String addDn = BASEDN_STRING;
      ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
      // send a sequence of add/modrdn operations
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        // add the entry
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
                           new LinkedList<AttributeValue>());
        addDn = "dc=dependency" + sequence + "," + BASEDN_STRING;
        AddMsg addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
                     stringUID(1),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        // rename the entry
        ModifyDNMsg moddnMsg =
          new ModifyDNMsg(addDn, gen.NewChangeNumber(), stringUID(sequence+1),
                          stringUID(1), true, null, "dc=new_dep" + sequence);
        broker.publish(moddnMsg);
      }
      // configure and start replication of dc=example,dc=com on the server
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      domain = MultimasterReplication.createNewDomain(domainConf);
      // check that all entries have been renamed
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING;
        Entry baseEntry = getEntry(DN.decode(addDn), 30000, true);
        assertNotNull(baseEntry,
          "The rename was not applied correctly on :" + addDn);
      }
      // delete the entries to clean the database.
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING;
        DeleteMsg delMsg = new DeleteMsg(addDn.toString(),
                                         gen.NewChangeNumber(),
                                         stringUID(sequence + 1001));
        broker.publish(delMsg);
      }
    }
    finally
    {
      if (replServer != null)
        replServer.shutdown();
      if (domain != null)
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  /**
   * Builds and return a uuid from an integer.
   * This methods assume that unique integers are used and does not make any
   * unicity checks. It is only responsible for generating a uid with a
   * correct syntax.
   */
  private String stringUID(int i)
  {
    return String.format("11111111-1111-1111-1111-%012x", i);
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -153,10 +153,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    baseDn = DN.decode("dc=example,dc=com");
    updatedEntries = newLDIFEntries();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -32,8 +32,6 @@
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.TestCaseUtils;
@@ -47,16 +45,12 @@
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDAPException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
@@ -242,10 +236,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -325,22 +315,6 @@
    configureReplication();
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  private void processModify(int count)
  {
    while (count>0)
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.io.File;
@@ -36,18 +33,12 @@
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -113,9 +104,9 @@
    configureReplication();
    // Give some time to the replication to setup
    // Give some time to the replication to setup
    Thread.sleep(1000);
    // Create a dummy entry
    addEntry("dn: dc=dummy, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
@@ -209,7 +200,7 @@
    String path = buildRoot + File.separator + "build" +
                  File.separator + "unit-tests" + File.separator +
                  "package"+ File.separator + "ReSynchTest";
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
@@ -237,73 +228,4 @@
  }
  /**
   * Utility method to create, run a task and check its result.
   */
  private void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
    // Wait until the task completes.
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
         ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    {
      InternalSearchOperation searchOperation =
           connection.processSearch(taskEntry.getDN(),
                                    SearchScope.BASE_OBJECT,
                                    filter);
      try
      {
        resultEntry = searchOperation.getSearchEntries().getFirst();
      } catch (Exception e)
      {
        continue;
      }
      completionTime =
           resultEntry.getAttributeValue(completionTimeType,
                                         DirectoryStringSyntax.DECODER);
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    if (completionTime == null)
    {
      fail("The task has not completed after 30 seconds.");
    }
    // Check that the task state is as expected.
    AttributeType taskStateType =
         DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
    String stateString =
         resultEntry.getAttributeValue(taskStateType,
                                       DirectoryStringSyntax.DECODER);
    TaskState taskState = TaskState.fromString(stateString);
    assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
                 "The task completed in an unexpected state");
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,12 +26,16 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.List;
@@ -42,7 +46,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -53,6 +60,10 @@
import org.opends.server.types.ByteStringFactory;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -87,11 +98,6 @@
  protected Entry replServerEntry;
  /**
   * schema check flag
   */
  protected boolean schemaCheck;
  /**
   * The replication plugin entry
   */
  protected String synchroPluginStringDN =
@@ -108,7 +114,6 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -152,7 +157,7 @@
        }
      }
      catch (Exception e)
      {
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
@@ -244,7 +249,7 @@
             logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "cleaning config entry " + dn, 1);
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
@@ -256,7 +261,7 @@
      // done
    }
  }
  /**
   * suppress all the real entries created by the tests in this class
   */
@@ -265,7 +270,7 @@
   logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "ReplicationTestCase/Cleaning entries" , 1);
    DeleteOperation op;
    // Delete entries
    try
@@ -298,8 +303,6 @@
  @AfterClass
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    cleanConfigEntries();
    cleanRealEntries();
  }
@@ -323,7 +326,7 @@
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster replication plugin");
    // domains container entry.
    String domainsLdif = "dn: "
      + "cn=domains," + synchroPluginStringDN + "\n"
@@ -335,7 +338,7 @@
    assertNotNull(DirectoryServer.getConfigEntry(
      DN.decode(synchroPluginStringDN)),
      "Unable to add the Multimaster replication plugin");
    // Add the replication server
    DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
@@ -380,7 +383,7 @@
  protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
      String valueString, int timeout, boolean hasAttribute) throws Exception
  {
    boolean found;
    boolean found = false;
    int count = timeout/100;
    if (count<1)
      count=1;
@@ -408,15 +411,15 @@
        newEntry = DirectoryServer.getEntry(dn);
        if (newEntry == null)
          fail("The entry " + dn +
          " has incorrectly been deleted from the database.");
        List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
        Attribute tmpAttr = tmpAttrList.get(0);
        if (newEntry != null)
        {
          List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
          Attribute tmpAttr = tmpAttrList.get(0);
        AttributeType attrType =
          DirectoryServer.getAttributeType(attrTypeStr, true);
        found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
          AttributeType attrType =
            DirectoryServer.getAttributeType(attrTypeStr, true);
          found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
        }
      }
      finally
@@ -478,4 +481,95 @@
    }
  }
  /**
   * Generate a new modification replace with the given information.
   *
   * @param attrName The attribute to replace.
   * @param attrValue The new value for the attribute
   *
   * @return The modification replace.
   */
  protected List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  /**
   * Utility method to create, run a task and check its result.
   */
  protected void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
    // Wait until the task completes.
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
         ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    {
      InternalSearchOperation searchOperation =
           connection.processSearch(taskEntry.getDN(),
                                    SearchScope.BASE_OBJECT,
                                    filter);
      try
      {
        resultEntry = searchOperation.getSearchEntries().getFirst();
      } catch (Exception e)
      {
        continue;
      }
      completionTime =
           resultEntry.getAttributeValue(completionTimeType,
                                         DirectoryStringSyntax.DECODER);
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    if (completionTime == null)
    {
      fail("The task has not completed after 30 seconds.");
    }
    // Check that the task state is as expected.
    AttributeType taskStateType =
         DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
    String stateString =
         resultEntry.getAttributeValue(taskStateType,
                                       DirectoryStringSyntax.DECODER);
    TaskState taskState = TaskState.fromString(stateString);
    assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
                 "The task completed in an unexpected state");
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -82,7 +82,6 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
@@ -33,8 +33,6 @@
import static org.testng.Assert.fail;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -50,15 +48,12 @@
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
@@ -187,10 +182,6 @@
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create backend top level entries
    String[] topEntries = new String[2];
    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
@@ -263,22 +254,6 @@
    configureReplication();
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  private class BrokerWriter extends Thread
  {
    int count;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -100,10 +100,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -1006,19 +1002,6 @@
    }
  }
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  /**
   *  Get the entryUUID for a given DN.
   *
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
New file
@@ -0,0 +1,183 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.SortedSet;
import org.opends.server.admin.ManagedObjectDefinition;
import org.opends.server.admin.PropertyProvider;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.client.MultimasterDomainCfgClient;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
import org.opends.server.types.DN;
/**
 * This class implement a configuration object for the MultimasterDomain
 * that can be used in unit tests to instantiate ReplicationDomain.
 */
public class DomainFakeCfg implements MultimasterDomainCfg
{
  private DN baseDn;
  private int serverId;
  private SortedSet<String> replicationServers;
  private long heartbeatInterval = 1000;
  /**
   * Creates a new Domain with the provided information
   */
  public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers)
  {
    this.baseDn = baseDn;
    this.serverId = serverId;
    this.replicationServers = replServers;
  }
  /**
   * {@inheritDoc}
   */
  public void addChangeListener(
      ConfigurationChangeListener<MultimasterDomainCfg> listener)
  {
  }
  /**
   * {@inheritDoc}
   */
  public ManagedObjectDefinition<? extends MultimasterDomainCfgClient,
      ? extends MultimasterDomainCfg> definition()
  {
    return null;
  }
  /**
   * {@inheritDoc}
   */
  public long getHeartbeatInterval()
  {
    return heartbeatInterval ;
  }
  /**
   * {@inheritDoc}
   */
  public long getMaxReceiveDelay()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public int getMaxReceiveQueue()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public long getMaxSendDelay()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public int getMaxSendQueue()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public DN getReplicationDN()
  {
    return baseDn;
  }
  /**
   * {@inheritDoc}
   */
  public SortedSet<String> getReplicationServer()
  {
    return replicationServers;
  }
  /**
   * {@inheritDoc}
   */
  public int getServerId()
  {
    return serverId;
  }
  /**
   * {@inheritDoc}
   */
  public int getWindowSize()
  {
    return 100;
  }
  /**
   * {@inheritDoc}
   */
  public void removeChangeListener(
      ConfigurationChangeListener<MultimasterDomainCfg> listener)
  {
  }
  /**
   * {@inheritDoc}
   */
  public DN dn()
  {
    return null;
  }
  /**
   * {@inheritDoc}
   */
  public PropertyProvider properties()
  {
    return null;
  }
  /**
   * Set the heartbeat interval.
   *
   * @param interval
   */
  public void setHeartbeatInterval(long interval)
  {
    heartbeatInterval = interval;
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -76,7 +76,7 @@
  /**
   * The port of the replicationServer.
   */
  private int changelogPort;
  private int replicationServerPort;
  private ChangeNumber firstChangeNumberServer1 = null;
  private ChangeNumber secondChangeNumberServer1 = null;
@@ -97,11 +97,11 @@
    //  find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    changelogPort = socket.getLocalPort();
    replicationServerPort = socket.getLocalPort();
    socket.close();
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null);
      new ReplServerFakeConfiguration(replicationServerPort, null, 0, 1, 0, 0, null);
    replicationServer = new ReplicationServer(conf);
  }
@@ -124,10 +124,10 @@
       * Open a sender session and a receiver session to the replicationServer
       */
      server1 = openReplicationSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
          DN.decode("dc=example,dc=com"), (short) 1, 100, replicationServerPort,
          1000, true);
      server2 = openReplicationSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
          DN.decode("dc=example,dc=com"), (short) 2, 100, replicationServerPort,
          1000, true);
      /*
@@ -240,7 +240,7 @@
    try {
      broker =
        openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, false);
                             100, replicationServerPort, 1000, false);
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
@@ -277,7 +277,7 @@
    try {
      broker =
        openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, state);
                             100, replicationServerPort, 1000, state);
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
@@ -425,7 +425,7 @@
       * Open a sender session
       */
      server = openReplicationSession(
          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
          DN.decode("dc=example,dc=com"), (short) 5, 100, replicationServerPort,
          1000, 1000, 0, true);
      BrokerReader reader = new BrokerReader(server);
@@ -436,7 +436,7 @@
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i] = openReplicationSession(
            DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
            DN.decode("dc=example,dc=com"), (short) (100+i), 100, replicationServerPort,
            1000, true);
        client[i] = new BrokerReader(clientBroker[i]);
      }
@@ -510,7 +510,7 @@
          new ChangeNumberGenerator(serverId , (long) 0);
        ReplicationBroker broker =
          openReplicationSession( DN.decode("dc=example,dc=com"), serverId,
            100, changelogPort, 1000, 1000, 0, true);
            100, replicationServerPort, 1000, 1000, 0, true);
        producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
        reader[i] = new BrokerReader(broker);