mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
11.19.2007 88e5620001d65afa8d0d8e07d1361fa44705743e
This code allows the replication code to replay operation in the correct
order when operation have dependencies (like adding child entry after parent)

The operations are first done without checking for dependency and only if
they failed are checked for dependencies and inserted in a list of dependent changes.

More information can be found in issue 612.

This code also set back the number of threads used by the replication to 10 (issue 1568)
3 files added
17 files modified
2385 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/ServerState.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 150 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChange.java 71 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java 559 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java 428 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AddMsg.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java 84 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java 9 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 1 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java 553 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java 26 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java 86 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 140 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java 1 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java 25 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java 17 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java 183 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -304,4 +304,25 @@
  {
    return list.keySet().iterator();
  }
  /**
   * Check that all the ChangeNumbers in the covered serverState are also in
   * this serverState.
   *
   * @param covered The ServerState that needs to be checked.
   * @return A boolean indicating if this ServerState covers the ServerState
   *         given in parameter.
   */
  public boolean cover(ServerState covered)
  {
    for (ChangeNumber coveredChange : covered.list.values())
    {
      ChangeNumber change = this.list.get(coveredChange.getServerId());
      if ((change == null) || (change.older(coveredChange)))
      {
        return false;
      }
    }
    return true;
  }
}
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -82,6 +82,68 @@
  /**
   * Finds the domain for a given DN.
   *
   * @param dn   The DN for which the domain must be returned.
   * @param op   An optional operation for which the check is done.
   *             Can be null is the request has no associated operation.
   * @return     The domain for this DN.
   */
  public static ReplicationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special replication code on Operation that are
     * specifically marked as don't synchronize.
     */
    if ((op != null) && op.dontSynchronize())
      return null;
    ReplicationDomain domain = null;
    DN temp = dn;
    do
    {
      domain = domains.get(temp);
      temp = temp.getParentDNInSuffix();
      if (temp == null)
      {
        break;
      }
    } while (domain == null);
    return domain;
  }
  /**
   * Creates a new domain from its configEntry, do the
   * necessary initialization and starts it so that it is
   * fully operational when this method returns.
   * @param configuration The entry whith the configuration of this domain.
   * @return The domain created.
   * @throws ConfigException When the configuration is not valid.
   */
  public static ReplicationDomain createNewDomain(
      MultimasterDomainCfg configuration)
      throws ConfigException
  {
    ReplicationDomain domain;
    domain = new ReplicationDomain(configuration);
    domains.put(domain.getBaseDN(), domain);
    domain.start();
    return domain;
  }
  /**
   * Deletes a domain.
   * @param dn : the base DN of the domain to delete.
   */
  public static void deleteDomain(DN dn)
  {
    ReplicationDomain domain = domains.remove(dn);
    if (domain != null)
      domain.shutdown();
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -149,23 +211,6 @@
  }
  /**
   * Creates a new domain from its configEntry, do the
   * necessary initialization and starts it so that it is
   * fully operational when this method returns.
   * @param configuration The entry whith the configuration of this domain.
   * @throws ConfigException When the configuration is not valid.
   */
  private void createNewDomain(
      MultimasterDomainCfg configuration)
      throws ConfigException
  {
    ReplicationDomain domain;
    domain = new ReplicationDomain(configuration);
    domains.put(domain.getBaseDN(), domain);
    domain.start();
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -354,55 +399,6 @@
  }
  /**
   * Finds the domain for a given DN.
   *
   * @param dn   The DN for which the domain must be returned.
   * @param op   An optional operation for which the check is done.
   *             Can be null is the request has no associated operation.
   * @return     The domain for this DN.
   */
  public static ReplicationDomain findDomain(DN dn, Operation op)
  {
    /*
     * Don't run the special replication code on Operation that are
     * specifically marked as don't synchronize.
     */
    if ((op != null) && op.dontSynchronize())
      return null;
    ReplicationDomain domain = null;
    DN temp = dn;
    do
    {
      domain = domains.get(temp);
      temp = temp.getParentDNInSuffix();
      if (temp == null)
      {
        break;
      }
    } while (domain == null);
    return domain;
  }
  /**
   * Generic code for all the postOperation entry point.
   *
   * @param operation The Operation for which the post-operation is called.
   * @param dn The Dn for which the post-operation is called.
   */
  private void genericPostOperation(Operation operation, DN dn)
  {
    ReplicationDomain domain = findDomain(dn, operation);
    if (domain == null)
      return;
    domain.synchronize(operation);
    return;
  }
  /**
   * This method is called whenever the server detects a modification
   * of the schema done by directly modifying the backing files
   * of the schema backend.
@@ -535,10 +531,7 @@
  public ConfigChangeResult applyConfigurationDelete(
      MultimasterDomainCfg configuration)
  {
    DN dn = configuration.getReplicationDN();
    ReplicationDomain domain = domains.remove(dn);
    if (domain != null)
      domain.shutdown();
    deleteDomain(configuration.getReplicationDN());
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
@@ -551,6 +544,23 @@
  {
    return true;
  }
  /**
   * Generic code for all the postOperation entry point.
   *
   * @param operation The Operation for which the post-operation is called.
   * @param dn The Dn for which the post-operation is called.
   */
  private void genericPostOperation(Operation operation, DN dn)
  {
    ReplicationDomain domain = findDomain(dn, operation);
    if (domain == null)
      return;
    domain.synchronize(operation);
    return;
  }
}
opends/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -27,19 +27,24 @@
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Operation;
/**
 * This class is use to store the list of operations currently
 * This class is use to store an operation currently
 * in progress and not yet committed in the database.
 */
public class PendingChange
public class PendingChange implements Comparable<PendingChange>
{
  private ChangeNumber changeNumber;
  private boolean committed;
  private UpdateMessage msg;
  private Operation op;
  private ServerState dependencyState = null;
  private DN targetDN = null;
  /**
   * Construct a new PendingChange.
@@ -121,4 +126,66 @@
    this.op = op;
  }
  /**
   * Add the given ChangeNumber in the list of dependencies of this
   * PendingChange.
   *
   * @param changeNumber The ChangeNumber to add in the list of dependencies
   *                     of this PendingChange.
   */
  public void addDependency(ChangeNumber changeNumber)
  {
    if (dependencyState == null)
    {
      dependencyState = new ServerState();
    }
    dependencyState.update(changeNumber);
  }
  /**
   * Check if the given ServerState covers the dependencies of this
   * PendingChange.
   *
   * @param state The ServerState for which dependencies must be checked,
   *
   * @return A boolean indicating if the given ServerState covers the
   *         dependencies of this PendingChange.
   */
  public boolean dependenciesIsCovered(ServerState state)
  {
    return state.cover(dependencyState);
  }
  /**
   * Get the Target DN of this message.
   *
   * @return The target DN of this message.
   */
  public DN getTargetDN()
  {
    synchronized (this)
    {
      if (targetDN != null)
        return targetDN;
      else
      {
        try
        {
          targetDN = DN.decode(msg.getDn());
        }
        catch (DirectoryException e)
        {
        }
      }
      return targetDN;
    }
  }
  /**
   * {@inheritDoc}
   */
  public int compareTo(PendingChange o)
  {
    return this.getChangeNumber().compareTo(o.getChangeNumber());
  }
}
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
New file
@@ -0,0 +1,559 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.types.DN;
import org.opends.server.types.Operation;
/**
 *
 * This class is use to store the list of operations currently
 * in progress and not yet committed in the database.
 */
public class PendingChanges
{
  /**
   * A map used to store the pending changes.
   */
  private SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  /**
   * A sorted set containing the list of PendingChanges that have
   * not been replayed correctly because they are dependent on
   * another change to be completed.
   */
  private SortedSet<PendingChange> dependentChanges =
    new TreeSet<PendingChange>();
  /**
   * The ChangeNumberGenerator to use to create new unique ChangeNumbers
   * for each operation done on the replication domain.
   */
  private ChangeNumberGenerator changeNumberGenerator;
  /**
   * The Replicationbroker that will be used to send UpdateMessage.
   */
  private ReplicationBroker broker;
  /**
   * The ServerState that will be updated when UpdateMessage are committed.
   */
  private ServerState state;
  /**
   * Creates a new PendingChanges using the provided ChangeNumberGenerator.
   *
   * @param changeNumberGenerator The ChangeNumberGenerator to use to create
   *                               new unique ChangeNumbers.
   * @param broker  The Replicationbroker that will be used to send
   *                UpdateMessage.
   * @param state   The ServerState that will be updated when UpdateMessage
   *                are committed.
   */
  public PendingChanges(
      ChangeNumberGenerator changeNumberGenerator, ReplicationBroker broker,
      ServerState state)
  {
    this.changeNumberGenerator = changeNumberGenerator;
    this.broker = broker;
    this.state = state;
  }
  /**
   * Remove and return an update form the pending changes list.
   *
   * @param changeNumber The ChangeNumber of the update to remove.
   *
   * @return The UpdateMessage that was just removed.
   */
  public synchronized UpdateMessage remove(ChangeNumber changeNumber)
  {
    return pendingChanges.remove(changeNumber).getMsg();
  }
  /**
   * Returns the number of update currently in the list.
   *
   * @return The number of update currently in the list.
   */
  public synchronized int size()
  {
    return pendingChanges.size();
  }
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   * @param op           The Operation associated of the update when the
   *                     update is a local update.
   * @param msg          The message associated to the update when the update
   *                     was received from a replication server.
   */
  public synchronized void commit(ChangeNumber changeNumber,
      Operation op, UpdateMessage msg)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    if (curChange == null)
    {
      throw new NoSuchElementException();
    }
    curChange.setCommitted(true);
    if (op.isSynchronizationOperation())
      curChange.setOp(op);
    else
      curChange.setMsg(msg);
  }
  /**
   * Mark an update message as committed.
   *
   * @param changeNumber The ChangeNumber of the update message that must be
   *                     set as committed.
   */
  public synchronized void commit(ChangeNumber changeNumber)
  {
    PendingChange curChange = pendingChanges.get(changeNumber);
    if (curChange == null)
    {
      throw new NoSuchElementException();
    }
    curChange.setCommitted(true);
  }
  /**
   * Add a new UpdateMessage to the pending list from the provided local
   * operation.
   *
   * @param operation The local operation for which an UpdateMessage mus
   *                  be added in the pending list.
   * @return The ChangeNumber now associated to the operation.
   */
  public synchronized ChangeNumber putLocalOperation(Operation operation)
  {
    ChangeNumber changeNumber;
    changeNumber = changeNumberGenerator.NewChangeNumber();
    PendingChange change = new PendingChange(changeNumber, operation, null);
    pendingChanges.put(changeNumber, change);
    return changeNumber;
  }
  /**
   * Add a new UpdateMessage that was received from the replication server
   * to the pendingList.
   *
   * @param update The UpdateMessage that was received from the replication
   *               server and that will be added to the pending list.
   */
  public synchronized void putRemoteUpdate(UpdateMessage update)
  {
    ChangeNumber changeNumber = update.getChangeNumber();
    changeNumberGenerator.adjust(changeNumber);
    pendingChanges.put(changeNumber, new PendingChange(changeNumber, null,
                                                        update));
  }
  /**
   * Push all committed local changes to the replicationServer service.
   *
   * @return The number of pushed updates.
   */
  public synchronized int pushCommittedChanges()
  {
    int numSentUpdates = 0;
    if (pendingChanges.isEmpty())
      return numSentUpdates;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
    return numSentUpdates;
  }
  /**
   * Get the first update in the list that have some dependencies cleared.
   *
   * @return The UpdateMessage to be handled.
   */
  public synchronized UpdateMessage getNextUpdate()
  {
    /*
     * Parse the list of Update with dependencies and check if the dependencies
     * are now cleared until an Update withour dependencies is found.
     */
    for (PendingChange change : dependentChanges)
    {
      if (change.dependenciesIsCovered(state))
      {
        dependentChanges.remove(change);
        return change.getMsg();
      }
    }
    return null;
  }
  /**
   * Check if the given AddOperation has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   * AddOperation depends on
   *
   * - DeleteOperation done on the same DN
   * - ModifyDnOperation with the same target DN as the ADD DN
   * - ModifyDnOperation with new DN equals to the ADD DN parent
   * - AddOperation done on the parent DN of the ADD DN
   *
   * @param op The AddOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(AddOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check is the operation to be run is a deleteOperation on the
             * same DN.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * parent of the current AddOperation.
             */
            if (pendingChange.getTargetDN().isAncestorOf(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            /*
             * Check if the operation to be run is ModifyDnOperation with
             * the same target DN as the ADD DN
             * or a ModifyDnOperation with new DN equals to the ADD DN parent
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            else
            {
              ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
              if (pendingModDn.newDNIsParent(targetDn))
              {
                hasDependencies = true;
                addDependency(change, pendingChange);
              }
            }
          }
        }
      }
    }
    return hasDependencies;
  }
  /**
   * Check if the given ModifyOperation has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   *
   * ModifyOperation depends on
   * - AddOperation done on the same DN
   *
   * @param op The ModifyOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * same DN.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
        }
      }
    }
    return hasDependencies;
  }
  /**
   * Mark the first pendingChange as dependent on the second PendingChange.
   * @param dependentChange The PendingChange that depend on the second
   *                        PendingChange.
   * @param pendingChange   The PendingChange on which the first PendingChange
   *                        is dependent.
   */
  private void addDependency(
      PendingChange dependentChange, PendingChange pendingChange)
  {
    dependentChange.addDependency(pendingChange.getChangeNumber());
    dependentChanges.add(dependentChange);
  }
  /**
   * Check if the given ModifyDNMsg has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   *
   * Modify DN Operation depends on
   * - AddOperation done on the same DN as the target DN of the MODDN operation
   * - AddOperation done on the new parent of the MODDN  operation
   * - DeleteOperation done on the new DN of the MODDN operation
   * - ModifyDNOperation done from the new DN of the MODDN operation
   *
   * @param msg The ModifyDNMsg to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyDNMsg msg)
  {
    boolean hasDependencies = false;
    ChangeNumber changeNumber = msg.getChangeNumber();
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    DN targetDn = change.getTargetDN();
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            // Check if the target of the Delete is the same
            // as the new DN of this ModifyDN
            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            // Check if the Add Operation was done on the new parent of
            // the MODDN  operation
            if (msg.newParentIsEqual(pendingChange.getTargetDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
            // Check if the AddOperation was done on the same DN as the
            // target DN of the MODDN operation
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            // Check if the ModifyDNOperation was done from the new DN of
            // the MODDN operation
            if (msg.newDNIsEqual(pendingChange.getTargetDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
        }
      }
    }
    return hasDependencies;
  }
  /**
   * Check if the given DeleteOperation has some dependencies on any
   * currently running previous operation.
   * Update the dependency list in the associated PendingChange if
   * there are some dependencies.
   *
   * DeleteOperation depends on
   * - DeleteOperation done on children DN
   * - ModifyDnOperation with target DN that are children of the DEL DN
   * - AddOperation done on the same DN
   *
   *
   * @param op The DeleteOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(DeleteOperation op)
  {
    boolean hasDependencies = false;
    DN targetDn = op.getEntryDN();
    ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
    PendingChange change = pendingChanges.get(changeNumber);
    if (change == null)
      return false;
    for (PendingChange pendingChange : pendingChanges.values())
    {
      if (pendingChange.getChangeNumber().older(changeNumber))
      {
        UpdateMessage pendingMsg = pendingChange.getMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check if the operation to be run is a deleteOperation on a
             * children of the current DeleteOperation.
             */
            if (pendingChange.getTargetDN().isDescendantOf(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof AddMsg)
          {
            /*
             * Check if the operation to be run is an addOperation on a
             * parent of the current DeleteOperation.
             */
            if (pendingChange.getTargetDN().equals(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
          {
            /*
             * Check if the operation to be run is an ModifyDNOperation
             * on a children of the current DeleteOperation
             */
            if (pendingChange.getTargetDN().isDescendantOf(targetDn))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
            }
          }
        }
      }
    }
    return hasDependencies;
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -128,17 +129,14 @@
{
  private ReplicationMonitor monitor;
  private ChangeNumberGenerator changeNumberGenerator;
  private ReplicationBroker broker;
  private List<ListenerThread> synchroThreads =
    new ArrayList<ListenerThread>();
  private final SortedMap<ChangeNumber, PendingChange> pendingChanges =
    new TreeMap<ChangeNumber, PendingChange>();
  private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMessage>();
  private int numRcvdUpdates = 0;
  private int numSentUpdates = 0;
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
  private AtomicInteger numProcessedUpdates = new AtomicInteger();
  private int debugCount = 0;
  private PersistentServerState state;
@@ -150,6 +148,19 @@
  private int maxSendDelay = 0;
  /**
   * This object is used to store the list of update currently being
   * done on the local database.
   * It contain both the update that are done directly on this server
   * and the updates that was done on another server, transmitted
   * by the replication server and that are currently replayed.
   * It is usefull to make sure that dependencies between operations
   * are correctly fullfilled, that the local operations are sent in a
   * correct order to the replication server and that the ServerState
   * is not updated too early.
   */
  private PendingChanges pendingChanges;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
@@ -245,7 +256,7 @@
  private ConfigEntry backendConfigEntry;
  private List<DN> branches = new ArrayList<DN>(0);
  private int listenerThreadNumber = 1;
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
  private Collection<String> replicationServers;
@@ -317,12 +328,6 @@
    DirectoryServer.registerMonitorProvider(monitor);
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
    /*
     * create the broker object used to publish and receive changes
     */
    try
@@ -348,6 +353,14 @@
      */
    }
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on the replication domain.
     */
    pendingChanges =
      new PendingChanges(new ChangeNumberGenerator(serverId, state),
                         broker, state);
    // listen for changes on the configuration
    configuration.addChangeListener(this);
  }
@@ -444,23 +457,37 @@
       * of the parent entry
       */
      // There is a potential of perfs improvement here
      // if we could avoid the following parent entry retrieval
      DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
      if (parentDnFromCtx != null)
      String parentUid = ctx.getParentUid();
      // root entry have no parent,
      // there is no need to check for it.
      if (parentUid != null)
      {
        DN entryDN = addOperation.getEntryDN();
        DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
        if ((parentDnFromEntryDn != null)
            && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
        // There is a potential of perfs improvement here
        // if we could avoid the following parent entry retrieval
        DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
        if (parentDnFromCtx == null)
        {
          // parentEntry has been renamed
          // replication name conflict resolution is expected to fix that
          // later in the flow
          // The parent does not exist with the specified unique id
          // stop the operation with NO_SUCH_OBJECT and let the
          // conflict resolution or the dependency resolution solve this.
          addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
          return new SynchronizationProviderResult(false);
        }
        else
        {
          DN entryDN = addOperation.getEntryDN();
          DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
          if ((parentDnFromEntryDn != null)
              && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
          {
            // parentEntry has been renamed
            // replication name conflict resolution is expected to fix that
            // later in the flow
            addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
            return new SynchronizationProviderResult(false);
          }
        }
      }
    }
    return new SynchronizationProviderResult(true);
@@ -633,89 +660,96 @@
   */
  public UpdateMessage receive()
  {
    synchronized (broker)
    UpdateMessage update = pendingChanges.getNextUpdate();
    if (update == null)
    {
      UpdateMessage update = null;
      while (update == null)
      synchronized (broker)
      {
        ReplicationMessage msg;
        try
        while (update == null)
        {
          msg = broker.receive();
          if (msg == null)
          ReplicationMessage msg;
          try
          {
            // The server is in the shutdown process
            return null;
          }
          log("Broker received message :" + msg);
          if (msg instanceof AckMessage)
          {
            AckMessage ack = (AckMessage) msg;
            receiveAck(ack);
          }
          else if (msg instanceof UpdateMessage)
          {
            update = (UpdateMessage) msg;
            receiveUpdate(update);
          }
          else if (msg instanceof InitializeRequestMessage)
          {
            // Another server requests us to provide entries
            // for a total update
            InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
            try
            msg = broker.receive();
            if (msg == null)
            {
              initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                  null);
              // The server is in the shutdown process
              return null;
            }
            catch(DirectoryException de)
            log("Broker received message :" + msg);
            if (msg instanceof AckMessage)
            {
              // Returns an error message to notify the sender
              int msgID = de.getMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              broker.publish(errorMsg);
              AckMessage ack = (AckMessage) msg;
              receiveAck(ack);
            }
          }
          else if (msg instanceof InitializeTargetMessage)
          {
            // Another server is exporting its entries to us
            InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            else if (msg instanceof InitializeRequestMessage)
            {
              // Another server requests us to provide entries
              // for a total update
              InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
              try
              {
                initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
                                 null);
              }
              catch(DirectoryException de)
              {
                // Returns an error message to notify the sender
                int msgID = de.getMessageID();
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                                   msgID, de.getMessage());
                broker.publish(errorMsg);
              }
            }
            else if (msg instanceof InitializeTargetMessage)
            {
              // Another server is exporting its entries to us
              InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
            try
            {
              importBackend(initMsg);
              try
              {
                importBackend(initMsg);
              }
              catch(DirectoryException de)
              {
                // Return an error message to notify the sender
                int msgID = de.getMessageID();
                ErrorMessage errorMsg =
                  new ErrorMessage(initMsg.getsenderID(),
                                   msgID, de.getMessage());
                log(getMessage(msgID,
                               backend.getBackendID()) + de.getMessage());
                broker.publish(errorMsg);
              }
            }
            catch(DirectoryException de)
            else if (msg instanceof ErrorMessage)
            {
              // Return an error message to notify the sender
              int msgID = de.getMessageID();
              ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
                  msgID, de.getMessage());
              log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
              broker.publish(errorMsg);
              if (ieContext != null)
              {
                // This is an error termination for the 2 following cases :
                // - either during an export
                // - or before an import really started
                //   For example, when we publish a request and the
                //  replicationServer did not find any import source.
                abandonImportExport((ErrorMessage)msg);
              }
            }
            else if (msg instanceof UpdateMessage)
            {
              update = (UpdateMessage) msg;
              receiveUpdate(update);
            }
          }
          else if (msg instanceof ErrorMessage)
          catch (SocketTimeoutException e)
          {
            if (ieContext != null)
            {
              // This is an error termination for the 2 following cases :
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  replicationServer did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
            // just retry
          }
        } catch (SocketTimeoutException e)
        {
          // just retry
        }
      }
      return update;
    }
    return update;
  }
  /**
@@ -725,21 +759,8 @@
   */
  public void receiveUpdate(UpdateMessage update)
  {
    ChangeNumber changeNumber = update.getChangeNumber();
    synchronized (pendingChanges)
    {
      if (pendingChanges.containsKey(changeNumber))
      {
        /*
         * This should never happen,
         * TODO log error and throw exception
         */
      }
      pendingChanges.put(changeNumber,
          new PendingChange(changeNumber, null, update));
      numRcvdUpdates++;
    }
    pendingChanges.putRemoteUpdate(update);
    numRcvdUpdates.incrementAndGet();
  }
  /**
@@ -752,10 +773,9 @@
    UpdateMessage update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (pendingChanges)
    synchronized (waitingAckMsgs)
    {
      update = waitingAckMsgs.get(changeNumber);
      waitingAckMsgs.remove(changeNumber);
      update = waitingAckMsgs.remove(changeNumber);
    }
    if (update != null)
    {
@@ -798,61 +818,55 @@
         * This is an operation type that we do not know about
         * It should never happen.
         */
        synchronized (pendingChanges)
        {
          pendingChanges.remove(curChangeNumber);
          int    msgID   = MSGID_UNKNOWN_TYPE;
          String message = getMessage(msgID, op.getOperationType().toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          return;
        }
        pendingChanges.remove(curChangeNumber);
        int    msgID   = MSGID_UNKNOWN_TYPE;
        String message = getMessage(msgID, op.getOperationType().toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
    }
    synchronized(pendingChanges)
    if (result == ResultCode.SUCCESS)
    {
      if (result == ResultCode.SUCCESS)
      try
      {
        PendingChange curChange = pendingChanges.get(curChangeNumber);
        if (curChange == null)
        {
          // This should never happen
          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
          String message = getMessage(msgID, curChangeNumber.toString(),
              op.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          return;
        }
        curChange.setCommitted(true);
        pendingChanges.commit(curChangeNumber, op, msg);
      }
      catch  (NoSuchElementException e)
      {
        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
        String message = getMessage(msgID, curChangeNumber.toString(),
                                    op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
        if (op.isSynchronizationOperation())
          curChange.setOp(op);
        else
          curChange.setMsg(msg);
        if (msg != null && isAssured)
      if (msg != null && isAssured)
      {
        synchronized (waitingAckMsgs)
        {
          // Add the assured message to the list of those whose acknowledgements
          // we are awaiting.
          // Add the assured message to the list of update that are
          // waiting acknowledgements
          waitingAckMsgs.put(curChangeNumber, msg);
        }
      }
      else if (!op.isSynchronizationOperation())
      {
        // Remove an unsuccessful non-replication operation from the pending
        // changes list.
        if (curChangeNumber != null)
        {
          pendingChanges.remove(curChangeNumber);
        }
      }
      pushCommittedChanges();
    }
    else if (!op.isSynchronizationOperation())
    {
      // Remove an unsuccessful non-replication operation from the pending
      // changes list.
      if (curChangeNumber != null)
      {
        pendingChanges.remove(curChangeNumber);
      }
    }
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
    // Wait for acknowledgement of an assured message.
    if (msg != null && isAssured)
@@ -880,7 +894,7 @@
   */
  public int getNumRcvdUpdates()
  {
    return numRcvdUpdates;
    return numRcvdUpdates.get();
  }
  /**
@@ -890,11 +904,11 @@
   */
  public int getNumSentUpdates()
  {
    return numSentUpdates;
    return numSentUpdates.get();
  }
  /**
   * get the number of updates in the pending list.
   * Get the number of updates in the pending list.
   *
   * @return The number of updates in the pending list
   */
@@ -1052,45 +1066,63 @@
  {
    Operation op = null;
    boolean done = false;
    boolean dependency = false;
    ChangeNumber changeNumber = null;
    int retryCount = 10;
    boolean firstTry = true;
    try
    {
      while (!done && retryCount-- > 0)
      while ((!dependency) && (!done) && (retryCount-- > 0))
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        changeNumber = OperationContext.getChangeNumber(op);
        if (changeNumber != null)
          changeNumberGenerator.adjust(changeNumber);
        op.run();
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
        {
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            done = solveNamingConflict(newOp, msg);
            dependency = pendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            done = solveNamingConflict(newOp, msg);
            dependency = pendingChanges.checkDependencies(newOp);
            if ((!dependency) && (!firstTry))
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            done = solveNamingConflict(newOp, msg);
          } else if (op instanceof ModifyDNOperation)
            dependency = pendingChanges.checkDependencies(newOp);
            if (!dependency)
            {
              done = solveNamingConflict(newOp, msg);
            }
          }
          else if (op instanceof ModifyDNOperation)
          {
            ModifyDNOperation newOp = (ModifyDNOperation) op;
            done = solveNamingConflict(newOp, msg);
            ModifyDNMsg newMsg = (ModifyDNMsg) msg;
            dependency = pendingChanges.checkDependencies(newMsg);
            if (!dependency)
            {
              ModifyDNOperation newOp = (ModifyDNOperation) op;
              done = solveNamingConflict(newOp, msg);
            }
          }
          else
          {
@@ -1108,9 +1140,10 @@
        {
          done = true;
        }
        firstTry = false;
      }
      if (!done)
      if (!done && !dependency)
      {
        // Continue with the next change but the servers could now become
        // inconsistent.
@@ -1119,6 +1152,7 @@
        String message = getMessage(msgID, op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
        updateError(changeNumber);
      }
    }
@@ -1177,9 +1211,12 @@
    }
    finally
    {
      if (msg.isAssured())
        ack(msg.getChangeNumber());
      incProcessedUpdates();
      if (!dependency)
      {
        if (msg.isAssured())
          ack(msg.getChangeNumber());
        incProcessedUpdates();
      }
    }
  }
@@ -1193,12 +1230,9 @@
   */
  public void updateError(ChangeNumber changeNumber)
  {
    synchronized (pendingChanges)
    {
      PendingChange change = pendingChanges.get(changeNumber);
      change.setCommitted(true);
      pushCommittedChanges();
    }
    pendingChanges.commit(changeNumber);
    int pushedChanges = pendingChanges.pushCommittedChanges();
    numSentUpdates.addAndGet(pushedChanges);
  }
  /**
@@ -1210,15 +1244,7 @@
   */
  private ChangeNumber generateChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber;
    changeNumber = changeNumberGenerator.NewChangeNumber();
    PendingChange change = new PendingChange(changeNumber, operation, null);
    synchronized(pendingChanges)
    {
      pendingChanges.put(changeNumber, change);
    }
    return changeNumber;
    return pendingChanges.putLocalOperation(operation);
  }
@@ -1604,42 +1630,6 @@
  }
  /**
   * Push all committed local changes to the replicationServer service.
   * PRECONDITION : The pendingChanges lock must be held before calling
   * this method.
   */
  private void pushCommittedChanges()
  {
    if (pendingChanges.isEmpty())
      return;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if ((firstChange.getOp() != null ) &&
          (firstChange.getOp().isSynchronizationOperation() == false))
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -139,8 +139,9 @@
    for (Attribute a : userAttributes)
      elems.add(new LDAPAttribute(a).encode());
    for (Attribute a : operationalAttributes)
      elems.add(new LDAPAttribute(a).encode());
    if (operationalAttributes != null)
      for (Attribute a : operationalAttributes)
        elems.add(new LDAPAttribute(a).encode());
    encodedAttributes = ASN1Element.encodeValue(elems);
  }
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -35,6 +35,8 @@
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Operation;
/**
@@ -267,4 +269,86 @@
    this.newRDN = newRDN;
  }
  /**
   * Check if this MSG will change the DN of the target entry to be
   * the same as the dn given as a parameter.
   * @param targetDn the DN to use when checking if this MSG will change
   *                 the DN of the entry to a given DN.
   * @return A boolean indicating if the modify DN MSG will change the DN of
   *         the target entry to be the same as the dn given as a parameter.
   */
  public boolean newDNIsParent(DN targetDn)
  {
    try
    {
      String newStringDN = newRDN + "," + newSuperior;
      DN newDN = DN.decode(newStringDN);
      if (newDN.isAncestorOf(targetDn))
        return true;
      else
        return false;
    } catch (DirectoryException e)
    {
      // The DN was not a correct DN, and therefore does not a parent of the
      // DN given as a parameter.
      return false;
    }
  }
  /**
   * Check if the new dn of this ModifyDNMsg is the same as the targetDN
   * given in parameter.
   *
   * @param targetDN The targetDN to use to check for equality.
   *
   * @return A boolean indicating if the targetDN if the same as the new DN of
   *         the ModifyDNMsg.
   */
  public boolean newDNIsEqual(DN targetDN)
  {
    try
    {
      String newStringDN = newRDN + "," + newSuperior;
      DN newDN = DN.decode(newStringDN);
      if (newDN.equals(targetDN))
        return true;
      else
        return false;
    } catch (DirectoryException e)
    {
      // The DN was not a correct DN, and therefore does not match the
      // DN given as a parameter.
      return false;
    }
  }
  /**
   * Check if the new parent of the modifyDNMsg is the same as the targetDN
   * given in parameter.
   *
   * @param targetDN the targetDN to use when checking equality.
   *
   * @return A boolean indicating if the new parent of the modifyDNMsg is the
   *         same as the targetDN.
   */
  public boolean newParentIsEqual(DN targetDN)
  {
    try
    {
      DN newSuperiorDN = DN.decode(newSuperior);
      if (newSuperiorDN.equals(targetDN))
        return true;
      else
        return false;
    } catch (DirectoryException e)
    {
      // The newsuperior was not a correct DN, and therefore does not match the
      // DN given as a parameter.
      return false;
    }
  }
}
opends/src/server/org/opends/server/replication/protocol/UpdateMessage.java
@@ -90,7 +90,7 @@
   * @throws DataFormatException if the encoded byte array is not valid.
   * @throws UnsupportedEncodingException if UTF-8 is not supprted.
   */
  public UpdateMessage(byte[] in) throws DataFormatException,
  protected UpdateMessage(byte[] in) throws DataFormatException,
                                         UnsupportedEncodingException
  {
    /* read the changeNumber */
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -238,10 +238,13 @@
      {}
    }
    if ( (recentChangeNumber != null) &&
         (recentChangeNumber.getTimeSec() - changeNumber.getTimeSec() < 2))
    if ( (recentChangeNumber != null) && (changeNumber != null))
    {
      flush();
      if (((recentChangeNumber.getTimeSec() - changeNumber.getTimeSec()) < 2) ||
         ((recentChangeNumber.getSeqnum() - changeNumber.getSeqnum()) < 20))
      {
        flush();
      }
    }
    return new ReplicationIterator(serverId, db, changeNumber);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -221,6 +221,7 @@
        newSocket =  listenSocket.accept();
        newSocket.setReceiveBufferSize(1000000);
        newSocket.setTcpNoDelay(true);
        newSocket.setKeepAlive(true);
        ServerHandler handler = new ServerHandler(
                                     new SocketSession(newSocket), queueSize);
        handler.start(null, serverId, serverURL, rcvWindow, this);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/DependencyTest.java
New file
@@ -0,0 +1,553 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication;
import static org.testng.Assert.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.util.LinkedList;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.DomainFakeCfg;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.util.TimeThread;
import org.testng.annotations.*;
/**
 * Test that the dependencies are computed correctly when replaying
 * sequences of operations that requires to follow a given order
 * such as : ADD an entry, ADD a children entry.
 */
public class DependencyTest extends ReplicationTestCase
{
  private static final String BASEDN_STRING = "dc=example,dc=com";
  /**
   * Check that a sequence of dependents adds and mods is correctly ordered:
   * Using a deep dit :
   * dc=example,dc=com
   *       |
   *    dc=dependency1
   *       |
   *    dc=dependency2
   *       |
   *    dc=dependency2
   *       |
   *
   *       |
   *    dc=dependencyN
   * This test sends a sequence of interleaved ADD operations and MODIFY
   * operations to build such a dit.
   *
   * Then test that the sequence of Delete necessary to remove
   * all those entries is also correctly ordered.
   */
  @SuppressWarnings("unchecked")
  @Test(groups="slow")
  public void addModDelDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
    ReplicationDomain domain = null;
    DN baseDn = DN.decode(BASEDN_STRING);
    SynchronizationProvider replicationPlugin = null;
    short brokerId = 2;
    short serverId = 1;
    short replServerId = 1;
    int AddSequenceLength = 30;
    cleanDB();
    try
    {
      /*
       * FIRST PART :
       * Check that a sequence of dependent ADD is correctly ordered.
       *
       * - Create replication server
       * - Send sequence of ADD messages to the replication server
       * - Configure replication server
       * - check that the last entry has been correctly added
       */
      String entryldif =
        "dn:" + BASEDN_STRING + "\n"
         + "objectClass: top\n"
         + "objectClass: domain\n"
         + "entryuuid: " + stringUID(1) + "\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      // find  a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      replicationPlugin = new MultimasterReplication();
      DirectoryServer.registerSynchronizationProvider(replicationPlugin);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "addModDeldependency",
                                        0, replServerId, 0,
                                        AddSequenceLength*5+100, null);
      replServer = new ReplicationServer(conf);
      ReplicationBroker broker =
        openReplicationSession(baseDn, brokerId, 1000, replServerPort, 1000,
                               false);
      TimeThread.sleep(2000);
      // send a sequence of add operation
      String addDn = BASEDN_STRING;
      ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
                           new LinkedList<AttributeValue>());
        addDn = "dc=dependency" + sequence + "," + addDn;
        AddMsg addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
                     stringUID(sequence),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        ModifyMsg modifyMsg =
          new ModifyMsg(gen.NewChangeNumber(), DN.decode(addDn),
                        generatemods("description", "test"),
                        stringUID(sequence+1));
        broker.publish(modifyMsg);
      }
      // configure and start replication of dc=example,dc=com on the server
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      domainConf.setHeartbeatInterval(100000);
      domain = MultimasterReplication.createNewDomain(domainConf);
      // check that last entry in sequence got added.
      Entry lastEntry = getEntry(DN.decode(addDn), 30000, true);
      assertNotNull(lastEntry,
                    "The last entry of the ADD sequence was not added.");
      // Check that all the modify have been replayed
      // (all the entries should have a description).
      addDn = BASEDN_STRING;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=dependency" + sequence + "," + addDn;
        boolean found =
          checkEntryHasAttribute(DN.decode(addDn), "description", "test",
                                 10000, true);
        if (!found)
        {
          fail("The modification was not replayed on entry " + addDn);
        }
      }
      /*
       * SECOND PART
       *
       * Now check that the dependencies between delete are correctly
       * managed.
       *
       * disable the domain while we publish the delete message to
       * to replication server so that when we enable it it receives the
       * delete operation in bulk.
       */
      domain.disable();
      Thread.sleep(2000);  // necesary because disable does not wait
                           // for full termination of all threads. (issue 1571)
      DN deleteDN = DN.decode(addDn);
      while (sequence-->1)
      {
        DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(),
                                         gen.NewChangeNumber(),
                                         stringUID(sequence + 1));
        broker.publish(delMsg);
        deleteDN = deleteDN.getParent();
      }
      domain.enable();
      // check that entry just below the base entry was deleted.
      // (we can't delete the base entry because some other tests might
      // have added other children)
      DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING);
      Entry baseEntry = getEntry(node1, 30000, false);
      assertNull(baseEntry,
                 "The last entry of the DEL sequence was not deleted.");
    }
    finally
    {
      if (replServer != null)
        replServer.shutdown();
      if (domain != null)
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  /**
   * Clean the database and replace with a single entry.
   *
   * @throws FileNotFoundException
   * @throws IOException
   * @throws Exception
   */
  private void cleanDB() throws FileNotFoundException, IOException, Exception
  {
    String baseentryldif =
      "dn:" + BASEDN_STRING + "\n"
       + "objectClass: top\n"
       + "objectClass: domain\n"
       + "dc: example\n"
       + "entryuuid: " + stringUID(1) + "\n";
      // Initialization :
      // Load the database with a single entry :
      String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
      String path = buildRoot + File.separator + "build" +
                    File.separator + "unit-tests" + File.separator +
                    "package"+ File.separator + "addModDelDependencyTest";
      OutputStream out = new FileOutputStream(new File(path));
      out.write(baseentryldif.getBytes());
      task("dn: ds-task-id=" + UUID.randomUUID()
          + ",cn=Scheduled Tasks,cn=Tasks\n"
          + "objectclass: top\n"
          + "objectclass: ds-task\n"
          + "objectclass: ds-task-import\n"
          + "ds-task-class-name: org.opends.server.tasks.ImportTask\n"
          + "ds-task-import-backend-id: userRoot\n"
          + "ds-task-import-ldif-file: " + path + "\n"
          + "ds-task-import-reject-file: " + path + "reject\n");
  }
  /**
   * Check that after a sequence of add/del/add done on the same DN
   * the second entry is in the database.
   * The unique id of the entry is used to check that the correct entry
   * has been added.
   * To increase the risks of failures a loop of add/del/add is done.
   */
  @SuppressWarnings("unchecked")
  @Test(groups="slow")
  public void addDelAddDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
    ReplicationDomain domain = null;
    DN baseDn = DN.decode(BASEDN_STRING);
    SynchronizationProvider replicationPlugin = null;
    short brokerId = 2;
    short serverId = 1;
    short replServerId = 1;
    int AddSequenceLength = 30;
    cleanDB();
    try
    {
      String entryldif = "dn:" + BASEDN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: domain\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      // find a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      replicationPlugin = new MultimasterReplication();
      DirectoryServer.registerSynchronizationProvider(replicationPlugin);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "addDelAdddependency", 0,
                                        replServerId,
                                        0, 5*AddSequenceLength+100, null);
      replServer = new ReplicationServer(conf);
      ReplicationBroker broker =
        openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000,
                               false);
      // send a sequence of add/del/add operations
      String addDn = BASEDN_STRING;
      ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        // add the entry a first time
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
                           new LinkedList<AttributeValue>());
        addDn = "dc=dependency" + sequence + "," + addDn;
        AddMsg addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
                     stringUID(sequence == 1 ? sequence : sequence +1000),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        // delete the entry
        DeleteMsg delMsg = new DeleteMsg(addDn, gen.NewChangeNumber(),
                                         stringUID(sequence+1));
        broker.publish(delMsg);
        // add again the entry with a new entryuuid.
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1001)),
                           new LinkedList<AttributeValue>());
        addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1001),
                     stringUID(sequence == 1 ? sequence : sequence +1000),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
      }
      // configure and start replication of dc=example,dc=com on the server
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      domain = MultimasterReplication.createNewDomain(domainConf);
      // check that all entries have been deleted and added
      // again by checking that they do have the correct entryuuid
      addDn = BASEDN_STRING;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=dependency" + sequence + "," + addDn;
        boolean found =
          checkEntryHasAttribute(DN.decode(addDn), "entryuuid",
                                 stringUID(sequence+1001),
                                 30000, true);
        if (!found)
        {
          fail("The second add was not replayed on entry " + addDn);
        }
      }
      DN deleteDN = DN.decode(addDn);
      while (sequence-->1)
      {
        DeleteMsg delMsg = new DeleteMsg(deleteDN.toString(),
                                         gen.NewChangeNumber(),
                                         stringUID(sequence + 1001));
        broker.publish(delMsg);
        deleteDN = deleteDN.getParent();
      }
      // check that the database was cleaned successfully
      DN node1 = DN.decode("dc=dependency1," + BASEDN_STRING);
      Entry baseEntry = getEntry(node1, 30000, false);
      assertNull(baseEntry,
        "The entry were not removed succesfully after test completion.");
    }
    finally
    {
      if (replServer != null)
        replServer.shutdown();
      if (domain != null)
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  /**
   * Check that the dependency of moddn operation are working by
   * issuing a set of Add operation followed by a modrdn of the added entry.
   */
  @SuppressWarnings("unchecked")
  @Test(groups="slow")
  public void addModdnDependencyTest() throws Exception
  {
    ReplicationServer replServer = null;
    ReplicationDomain domain = null;
    DN baseDn = DN.decode(BASEDN_STRING);
    SynchronizationProvider replicationPlugin = null;
    short brokerId = 2;
    short serverId = 1;
    short replServerId = 1;
    int AddSequenceLength = 30;
    cleanDB();
    try
    {
      String entryldif = "dn:" + BASEDN_STRING + "\n"
      + "objectClass: top\n"
      + "objectClass: domain\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryldif);
      AttributeType uidType =
        DirectoryServer.getSchema().getAttributeType("entryuuid");
      // find a free port for the replicationServer
      ServerSocket socket = TestCaseUtils.bindFreePort();
      int replServerPort = socket.getLocalPort();
      socket.close();
      replicationPlugin = new MultimasterReplication();
      DirectoryServer.registerSynchronizationProvider(replicationPlugin);
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(replServerPort, "addModdndependency", 0,
                                        replServerId,
                                        0, 5*AddSequenceLength+100, null);
      replServer = new ReplicationServer(conf);
      ReplicationBroker broker =
        openReplicationSession(baseDn, brokerId, 100, replServerPort, 1000,
                               false);
      String addDn = BASEDN_STRING;
      ChangeNumberGenerator gen = new ChangeNumberGenerator(brokerId, 0L);
      // send a sequence of add/modrdn operations
      int sequence;
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        // add the entry
        entry.removeAttribute(uidType);
        entry.addAttribute(new Attribute("entryuuid", stringUID(sequence+1)),
                           new LinkedList<AttributeValue>());
        addDn = "dc=dependency" + sequence + "," + BASEDN_STRING;
        AddMsg addMsg =
          new AddMsg(gen.NewChangeNumber(), addDn, stringUID(sequence+1),
                     stringUID(1),
                     entry.getObjectClassAttribute(),
                     entry.getAttributes(), null );
        broker.publish(addMsg);
        // rename the entry
        ModifyDNMsg moddnMsg =
          new ModifyDNMsg(addDn, gen.NewChangeNumber(), stringUID(sequence+1),
                          stringUID(1), true, null, "dc=new_dep" + sequence);
        broker.publish(moddnMsg);
      }
      // configure and start replication of dc=example,dc=com on the server
      SortedSet<String> replServers = new TreeSet<String>();
      replServers.add("localhost:"+replServerPort);
      DomainFakeCfg domainConf =
        new DomainFakeCfg(baseDn, serverId, replServers);
      domain = MultimasterReplication.createNewDomain(domainConf);
      // check that all entries have been renamed
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING;
        Entry baseEntry = getEntry(DN.decode(addDn), 30000, true);
        assertNotNull(baseEntry,
          "The rename was not applied correctly on :" + addDn);
      }
      // delete the entries to clean the database.
      for (sequence = 1; sequence<=AddSequenceLength; sequence ++)
      {
        addDn = "dc=new_dep" + sequence + "," + BASEDN_STRING;
        DeleteMsg delMsg = new DeleteMsg(addDn.toString(),
                                         gen.NewChangeNumber(),
                                         stringUID(sequence + 1001));
        broker.publish(delMsg);
      }
    }
    finally
    {
      if (replServer != null)
        replServer.shutdown();
      if (domain != null)
        MultimasterReplication.deleteDomain(baseDn);
      if (replicationPlugin != null)
        DirectoryServer.deregisterSynchronizationProvider(replicationPlugin);
    }
  }
  /**
   * Builds and return a uuid from an integer.
   * This methods assume that unique integers are used and does not make any
   * unicity checks. It is only responsible for generating a uid with a
   * correct syntax.
   */
  private String stringUID(int i)
  {
    return String.format("11111111-1111-1111-1111-%012x", i);
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -153,10 +153,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    baseDn = DN.decode("dc=example,dc=com");
    updatedEntries = newLDIFEntries();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -32,8 +32,6 @@
import java.net.ServerSocket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import org.opends.server.TestCaseUtils;
@@ -47,16 +45,12 @@
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.LDAPException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
@@ -242,10 +236,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -325,22 +315,6 @@
    configureReplication();
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  private void processModify(int count)
  {
    while (count>0)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.fail;
import java.io.File;
@@ -36,18 +33,12 @@
import java.util.UUID;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -113,9 +104,9 @@
    configureReplication();
    // Give some time to the replication to setup
    // Give some time to the replication to setup
    Thread.sleep(1000);
    // Create a dummy entry
    addEntry("dn: dc=dummy, dc=example,dc=com\n"
        + "objectClass: top\n" + "objectClass: domain\n");
@@ -209,7 +200,7 @@
    String path = buildRoot + File.separator + "build" +
                  File.separator + "unit-tests" + File.separator +
                  "package"+ File.separator + "ReSynchTest";
    task("dn: ds-task-id=" + UUID.randomUUID()
        + ",cn=Scheduled Tasks,cn=Tasks\n"
        + "objectclass: top\n"
@@ -237,73 +228,4 @@
  }
  /**
   * Utility method to create, run a task and check its result.
   */
  private void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
    // Wait until the task completes.
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
         ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    {
      InternalSearchOperation searchOperation =
           connection.processSearch(taskEntry.getDN(),
                                    SearchScope.BASE_OBJECT,
                                    filter);
      try
      {
        resultEntry = searchOperation.getSearchEntries().getFirst();
      } catch (Exception e)
      {
        continue;
      }
      completionTime =
           resultEntry.getAttributeValue(completionTimeType,
                                         DirectoryStringSyntax.DECODER);
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    if (completionTime == null)
    {
      fail("The task has not completed after 30 seconds.");
    }
    // Check that the task state is as expected.
    AttributeType taskStateType =
         DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
    String stateString =
         resultEntry.getAttributeValue(taskStateType,
                                       DirectoryStringSyntax.DECODER);
    TaskState taskState = TaskState.fromString(stateString);
    assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
                 "The task completed in an unexpected state");
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -26,12 +26,16 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.fail;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.NoSuchElementException;
import java.util.List;
@@ -42,7 +46,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.protocols.internal.InternalClientConnection;
@@ -53,6 +60,10 @@
import org.opends.server.types.ByteStringFactory;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.AttributeType;
@@ -87,11 +98,6 @@
  protected Entry replServerEntry;
  /**
   * schema check flag
   */
  protected boolean schemaCheck;
  /**
   * The replication plugin entry
   */
  protected String synchroPluginStringDN =
@@ -108,7 +114,6 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -152,7 +157,7 @@
        }
      }
      catch (Exception e)
      {
      {
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "ReplicationTestCase/openChangelogSession" + e.getMessage(), 1);
@@ -244,7 +249,7 @@
             logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.NOTICE,
            "cleaning config entry " + dn, 1);
        op = new DeleteOperation(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            dn);
@@ -256,7 +261,7 @@
      // done
    }
  }
  /**
   * suppress all the real entries created by the tests in this class
   */
@@ -265,7 +270,7 @@
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.NOTICE,
        "ReplicationTestCase/Cleaning entries" , 1);
    DeleteOperation op;
    // Delete entries
    try
@@ -298,8 +303,6 @@
  @AfterClass
  public void classCleanUp() throws Exception
  {
    DirectoryServer.setCheckSchema(schemaCheck);
    cleanConfigEntries();
    cleanRealEntries();
  }
@@ -323,7 +326,7 @@
    assertNotNull(DirectoryServer.getConfigEntry(DN
        .decode(synchroPluginStringDN)),
        "Unable to add the Multimaster replication plugin");
    // domains container entry.
    String domainsLdif = "dn: "
      + "cn=domains," + synchroPluginStringDN + "\n"
@@ -335,7 +338,7 @@
    assertNotNull(DirectoryServer.getConfigEntry(
      DN.decode(synchroPluginStringDN)),
      "Unable to add the Multimaster replication plugin");
    // Add the replication server
    DirectoryServer.getConfigHandler().addEntry(replServerEntry, null);
@@ -380,7 +383,7 @@
  protected boolean checkEntryHasAttribute(DN dn, String attrTypeStr,
      String valueString, int timeout, boolean hasAttribute) throws Exception
  {
    boolean found;
    boolean found = false;
    int count = timeout/100;
    if (count<1)
      count=1;
@@ -408,15 +411,15 @@
        newEntry = DirectoryServer.getEntry(dn);
        if (newEntry == null)
          fail("The entry " + dn +
          " has incorrectly been deleted from the database.");
        List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
        Attribute tmpAttr = tmpAttrList.get(0);
        if (newEntry != null)
        {
          List<Attribute> tmpAttrList = newEntry.getAttribute(attrTypeStr);
          Attribute tmpAttr = tmpAttrList.get(0);
        AttributeType attrType =
          DirectoryServer.getAttributeType(attrTypeStr, true);
        found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
          AttributeType attrType =
            DirectoryServer.getAttributeType(attrTypeStr, true);
          found = tmpAttr.hasValue(new AttributeValue(attrType, valueString));
        }
      }
      finally
@@ -478,4 +481,95 @@
    }
  }
  /**
   * Generate a new modification replace with the given information.
   *
   * @param attrName The attribute to replace.
   * @param attrValue The new value for the attribute
   *
   * @return The modification replace.
   */
  protected List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  /**
   * Utility method to create, run a task and check its result.
   */
  protected void task(String task) throws Exception
  {
    Entry taskEntry = TestCaseUtils.makeEntry(task);
    InternalClientConnection connection =
         InternalClientConnection.getRootConnection();
    // Add the task.
    AddOperation addOperation =
         connection.processAdd(taskEntry.getDN(),
                               taskEntry.getObjectClasses(),
                               taskEntry.getUserAttributes(),
                               taskEntry.getOperationalAttributes());
    assertEquals(addOperation.getResultCode(), ResultCode.SUCCESS,
                 "Add of the task definition was not successful");
    // Wait until the task completes.
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
         ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
         SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    String completionTime = null;
    long startMillisecs = System.currentTimeMillis();
    do
    {
      InternalSearchOperation searchOperation =
           connection.processSearch(taskEntry.getDN(),
                                    SearchScope.BASE_OBJECT,
                                    filter);
      try
      {
        resultEntry = searchOperation.getSearchEntries().getFirst();
      } catch (Exception e)
      {
        continue;
      }
      completionTime =
           resultEntry.getAttributeValue(completionTimeType,
                                         DirectoryStringSyntax.DECODER);
      if (completionTime == null)
      {
        if (System.currentTimeMillis() - startMillisecs > 1000*30)
        {
          break;
        }
        Thread.sleep(10);
      }
    } while (completionTime == null);
    if (completionTime == null)
    {
      fail("The task has not completed after 30 seconds.");
    }
    // Check that the task state is as expected.
    AttributeType taskStateType =
         DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
    String stateString =
         resultEntry.getAttributeValue(taskStateType,
                                       DirectoryStringSyntax.DECODER);
    TaskState taskState = TaskState.fromString(stateString);
    assertEquals(taskState, TaskState.COMPLETED_SUCCESSFULLY,
                 "The task completed in an unexpected state");
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -82,7 +82,6 @@
  {
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
@@ -33,8 +33,6 @@
import static org.testng.Assert.fail;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -50,15 +48,12 @@
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.InitializationException;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.OperationType;
import org.opends.server.types.ResultCode;
@@ -187,10 +182,6 @@
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create backend top level entries
    String[] topEntries = new String[2];
    topEntries[0] = "dn: dc=example,dc=com\n" + "objectClass: top\n"
@@ -263,22 +254,6 @@
    configureReplication();
  }
  /**
   * @return
   */
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  private class BrokerWriter extends Thread
  {
    int count;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -100,10 +100,6 @@
    // This test suite depends on having the schema available.
    TestCaseUtils.startServer();
    // Disable schema check
    schemaCheck = DirectoryServer.checkSchema();
    DirectoryServer.setCheckSchema(false);
    // Create an internal connection
    connection = InternalClientConnection.getRootConnection();
@@ -1006,19 +1002,6 @@
    }
  }
  private List<Modification> generatemods(String attrName, String attrValue)
  {
    AttributeType attrType =
      DirectoryServer.getAttributeType(attrName.toLowerCase(), true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, attrValue));
    Attribute attr = new Attribute(attrType, attrName, values);
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
    return mods;
  }
  /**
   *  Get the entryUUID for a given DN.
   *
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
New file
@@ -0,0 +1,183 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.SortedSet;
import org.opends.server.admin.ManagedObjectDefinition;
import org.opends.server.admin.PropertyProvider;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.client.MultimasterDomainCfgClient;
import org.opends.server.admin.std.server.MultimasterDomainCfg;
import org.opends.server.types.DN;
/**
 * This class implement a configuration object for the MultimasterDomain
 * that can be used in unit tests to instantiate ReplicationDomain.
 */
public class DomainFakeCfg implements MultimasterDomainCfg
{
  private DN baseDn;
  private int serverId;
  private SortedSet<String> replicationServers;
  private long heartbeatInterval = 1000;
  /**
   * Creates a new Domain with the provided information
   */
  public DomainFakeCfg(DN baseDn, int serverId, SortedSet<String> replServers)
  {
    this.baseDn = baseDn;
    this.serverId = serverId;
    this.replicationServers = replServers;
  }
  /**
   * {@inheritDoc}
   */
  public void addChangeListener(
      ConfigurationChangeListener<MultimasterDomainCfg> listener)
  {
  }
  /**
   * {@inheritDoc}
   */
  public ManagedObjectDefinition<? extends MultimasterDomainCfgClient,
      ? extends MultimasterDomainCfg> definition()
  {
    return null;
  }
  /**
   * {@inheritDoc}
   */
  public long getHeartbeatInterval()
  {
    return heartbeatInterval ;
  }
  /**
   * {@inheritDoc}
   */
  public long getMaxReceiveDelay()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public int getMaxReceiveQueue()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public long getMaxSendDelay()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public int getMaxSendQueue()
  {
    return 0;
  }
  /**
   * {@inheritDoc}
   */
  public DN getReplicationDN()
  {
    return baseDn;
  }
  /**
   * {@inheritDoc}
   */
  public SortedSet<String> getReplicationServer()
  {
    return replicationServers;
  }
  /**
   * {@inheritDoc}
   */
  public int getServerId()
  {
    return serverId;
  }
  /**
   * {@inheritDoc}
   */
  public int getWindowSize()
  {
    return 100;
  }
  /**
   * {@inheritDoc}
   */
  public void removeChangeListener(
      ConfigurationChangeListener<MultimasterDomainCfg> listener)
  {
  }
  /**
   * {@inheritDoc}
   */
  public DN dn()
  {
    return null;
  }
  /**
   * {@inheritDoc}
   */
  public PropertyProvider properties()
  {
    return null;
  }
  /**
   * Set the heartbeat interval.
   *
   * @param interval
   */
  public void setHeartbeatInterval(long interval)
  {
    heartbeatInterval = interval;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -76,7 +76,7 @@
  /**
   * The port of the replicationServer.
   */
  private int changelogPort;
  private int replicationServerPort;
  private ChangeNumber firstChangeNumberServer1 = null;
  private ChangeNumber secondChangeNumberServer1 = null;
@@ -97,11 +97,11 @@
    //  find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    changelogPort = socket.getLocalPort();
    replicationServerPort = socket.getLocalPort();
    socket.close();
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null);
      new ReplServerFakeConfiguration(replicationServerPort, null, 0, 1, 0, 0, null);
    replicationServer = new ReplicationServer(conf);
  }
@@ -124,10 +124,10 @@
       * Open a sender session and a receiver session to the replicationServer
       */
      server1 = openReplicationSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
          DN.decode("dc=example,dc=com"), (short) 1, 100, replicationServerPort,
          1000, true);
      server2 = openReplicationSession(
          DN.decode("dc=example,dc=com"), (short) 2, 100, changelogPort,
          DN.decode("dc=example,dc=com"), (short) 2, 100, replicationServerPort,
          1000, true);
      /*
@@ -240,7 +240,7 @@
    try {
      broker =
        openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, false);
                             100, replicationServerPort, 1000, false);
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
@@ -277,7 +277,7 @@
    try {
      broker =
        openReplicationSession(DN.decode("dc=example,dc=com"), (short) 3,
                             100, changelogPort, 1000, state);
                             100, replicationServerPort, 1000, state);
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
@@ -425,7 +425,7 @@
       * Open a sender session
       */
      server = openReplicationSession(
          DN.decode("dc=example,dc=com"), (short) 5, 100, changelogPort,
          DN.decode("dc=example,dc=com"), (short) 5, 100, replicationServerPort,
          1000, 1000, 0, true);
      BrokerReader reader = new BrokerReader(server);
@@ -436,7 +436,7 @@
      for (int i =0; i< CLIENT_THREADS; i++)
      {
        clientBroker[i] = openReplicationSession(
            DN.decode("dc=example,dc=com"), (short) (100+i), 100, changelogPort,
            DN.decode("dc=example,dc=com"), (short) (100+i), 100, replicationServerPort,
            1000, true);
        client[i] = new BrokerReader(clientBroker[i]);
      }
@@ -510,7 +510,7 @@
          new ChangeNumberGenerator(serverId , (long) 0);
        ReplicationBroker broker =
          openReplicationSession( DN.decode("dc=example,dc=com"), serverId,
            100, changelogPort, 1000, 1000, 0, true);
            100, replicationServerPort, 1000, 1000, 0, true);
        producer[i] = new BrokerWriter(broker, gen, TOTAL_MSG/THREADS);
        reader[i] = new BrokerReader(broker);