mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Fabio Pistolesi
11.39.2016 3c140af6c756b30b325ce3c6ed080e8898e2b7ec
OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput

The main issue came from slowly going through changes and verifying dependencies.
Rewrote all checks to verify only what is currently being processed by the replay threads and recent changes marked as dependent.
Switch from synchronized methods to fine grained locks and concurrent lists.
Simplified server state management and pending changes.
6 files modified
451 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java 43 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java 6 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java 37 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java 315 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java 43 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java 7 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2016 ForgeRock AS
 *      Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
@@ -2266,6 +2266,15 @@
  }
  /**
   * Marks the specified message as the one currently processed by a replay thread.
   * @param msg the message being processed
   */
  void markInProgress(LDAPUpdateMsg msg)
  {
    remotePendingChanges.markInProgress(msg);
  }
  /**
   * Create and replay a synchronized Operation from an UpdateMsg.
   *
   * @param msg
@@ -2292,7 +2301,6 @@
        // error handling paths.
        Operation nextOp = op = msg.createOperation(conn);
        dependency = remotePendingChanges.checkDependencies(op, msg);
        boolean replayDone = false;
        int retryCount = 10;
        while (!dependency && !replayDone && retryCount-- > 0)
@@ -2350,25 +2358,27 @@
              ModifyOperation castOp = (ModifyOperation) op;
              dependency = remotePendingChanges.checkDependencies(castOp);
              ModifyMsg modifyMsg = (ModifyMsg) msg;
              replayDone = solveNamingConflict(castOp, modifyMsg);
              replayDone = !dependency && solveNamingConflict(castOp, modifyMsg);
            }
            else if (op instanceof DeleteOperation)
            {
              DeleteOperation castOp = (DeleteOperation) op;
              dependency = remotePendingChanges.checkDependencies(castOp);
              replayDone = solveNamingConflict(castOp, msg);
              replayDone = !dependency && solveNamingConflict(castOp, msg);
            }
            else if (op instanceof AddOperation)
            {
              AddOperation castOp = (AddOperation) op;
              AddMsg addMsg = (AddMsg) msg;
              dependency = remotePendingChanges.checkDependencies(castOp);
              replayDone = solveNamingConflict(castOp, addMsg);
              replayDone = !dependency && solveNamingConflict(castOp, addMsg);
            }
            else if (op instanceof ModifyDNOperation)
            {
              ModifyDNOperation castOp = (ModifyDNOperation) op;
              replayDone = solveNamingConflict(castOp, msg);
              ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg;
              dependency = remotePendingChanges.checkDependencies(modifyDNMsg);
              replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg);
            }
            else
            {
@@ -2385,8 +2395,8 @@
            else
            {
              /*
               * Create a new operation reflecting the new state of the
               * UpdateMsg after conflict resolution modified it.
               * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution
               * modified it and try replaying it again. Dependencies might have been replayed by now.
               *  Note: When msg is a DeleteMsg, the DeleteOperation is properly
               *  created with subtreeDelete request control when needed.
               */
@@ -4293,16 +4303,13 @@
    // number of updates in the pending list
    addMonitorData(attributes, "pending-updates", pendingChanges.size());
    addMonitorData(attributes, "replayed-updates-ok",
        numReplayedPostOpCalled.get());
    addMonitorData(attributes, "resolved-modify-conflicts",
        numResolvedModifyConflicts.get());
    addMonitorData(attributes, "resolved-naming-conflicts",
        numResolvedNamingConflicts.get());
    addMonitorData(attributes, "unresolved-naming-conflicts",
        numUnresolvedNamingConflicts.get());
    addMonitorData(attributes, "remote-pending-changes-size",
        remotePendingChanges.getQueueSize());
    addMonitorData(attributes, "replayed-updates-ok", numReplayedPostOpCalled.get());
    addMonitorData(attributes, "resolved-modify-conflicts", numResolvedModifyConflicts.get());
    addMonitorData(attributes, "resolved-naming-conflicts", numResolvedNamingConflicts.get());
    addMonitorData(attributes, "unresolved-naming-conflicts", numUnresolvedNamingConflicts.get());
    addMonitorData(attributes, "remote-pending-changes-size", remotePendingChanges.getQueueSize());
    addMonitorData(attributes, "dependent-changes-size", remotePendingChanges.getDependentChangesSize());
    addMonitorData(attributes, "changes-in-progress-size", remotePendingChanges.changesInProgressSize());
    return attributes;
  }
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2016 ForgeRock AS
 *      Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
@@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -316,9 +317,10 @@
  {
    replayThreads.clear();
    ReentrantLock switchQueueLock = new ReentrantLock();
    for (int i = 0; i < replayThreadNumber; i++)
    {
      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
      ReplayThread replayThread = new ReplayThread(updateToReplayQueue, switchQueueLock);
      replayThread.start();
      replayThreads.add(replayThread);
    }
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
@@ -22,12 +22,11 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2014 ForgeRock AS
 *      Portions copyright 2014-2016 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.operation.PluginOperation;
@@ -42,7 +41,6 @@
  private boolean committed;
  private UpdateMsg msg;
  private final PluginOperation op;
  private ServerState dependencyState;
  /**
   * Construct a new PendingChange.
@@ -128,35 +126,6 @@
    return this.op;
  }
  /**
   * Add the given CSN to the list of dependencies of this PendingChange.
   *
   * @param csn
   *          The CSN to add to the list of dependencies of this PendingChange.
   */
  public void addDependency(CSN csn)
  {
    if (dependencyState == null)
    {
      dependencyState = new ServerState();
    }
    dependencyState.update(csn);
  }
  /**
   * Check if the given ServerState covers the dependencies of this
   * PendingChange.
   *
   * @param state The ServerState for which dependencies must be checked,
   *
   * @return A boolean indicating if the given ServerState covers the
   *         dependencies of this PendingChange.
   */
  public boolean dependenciesIsCovered(ServerState state)
  {
    return state.cover(dependencyState);
  }
  /** {@inheritDoc} */
  @Override
  public int compareTo(PendingChange o)
@@ -173,8 +142,6 @@
        + ", csn=" + csn.toStringUI()
        + ", msg=[" + msg
        + "], isOperationSynchronized="
        + (op != null ? op.isSynchronizationOperation() : "false")
        + ", dependencyState="
        + (dependencyState != null ? dependencyState : "");
        + (op != null ? op.isSynchronizationOperation() : "false");
  }
}
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,7 +27,11 @@
package org.opends.server.replication.plugin;
import java.util.*;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import net.jcip.annotations.GuardedBy;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -51,6 +55,7 @@
final class RemotePendingChanges
{
  /** A map used to store the pending changes. */
  @GuardedBy("pendingChangesLock")
  private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>();
  /**
@@ -58,7 +63,18 @@
   * not been replayed correctly because they are dependent on
   * another change to be completed.
   */
  @GuardedBy("dependentChangesLock")
  private final SortedSet<PendingChange> dependentChanges = new TreeSet<>();
  /**
   * {@code activeAndDependentChanges} also contains changes discovered to be dependent
   * on currently in progress changes.
   */
  private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>();
  private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true);
  private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock();
  private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock();
  private ReentrantLock dependentChangesLock = new ReentrantLock();
  /** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */
  private final ServerState state;
@@ -75,14 +91,50 @@
  }
  /**
   * Returns the number of changes currently in this list.
   * Returns the number of changes waiting to be replayed.
   *
   * @return The number of changes currently in this list.
   * @return The number of changes waiting to be replayed
   */
  public synchronized int getQueueSize()
  public int getQueueSize()
  {
    pendingChangesReadLock.lock();
    try
  {
    return pendingChanges.size();
  }
    finally
    {
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Returns the number of changes actively being replayed.
   *
   * @return the number of changes actively being replayed.
   */
  public int changesInProgressSize()
  {
    return activeAndDependentChanges.size();
  }
  /**
   * Returns the number of changes depending on other changes.
   *
   * @return the number of changes depending on other changes.
   */
  public int getDependentChangesSize()
  {
    dependentChangesLock.lock();
    try
    {
      return dependentChanges.size();
    }
    finally
    {
      dependentChangesLock.unlock();
    }
  }
  /**
   * Add a new LDAPUpdateMsg that was received from the replication server
@@ -93,11 +145,18 @@
   * @return {@code false} if the update was already registered in the pending
   *         changes.
   */
  public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update)
  public boolean putRemoteUpdate(LDAPUpdateMsg update)
  {
    pendingChangesWriteLock.lock();
    try
  {
    CSN csn = update.getCSN();
    return pendingChanges.put(csn,
        new PendingChange(csn, null, update)) == null;
      return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null;
    }
    finally
    {
      pendingChangesWriteLock.unlock();
    }
  }
  /**
@@ -106,7 +165,10 @@
   * @param csn
   *          The CSN of the update message that must be set as committed.
   */
  public synchronized void commit(CSN csn)
  public void commit(CSN csn)
  {
    pendingChangesWriteLock.lock();
    try
  {
    PendingChange curChange = pendingChanges.get(csn);
    if (curChange == null)
@@ -114,65 +176,101 @@
      throw new NoSuchElementException();
    }
    curChange.setCommitted(true);
      activeAndDependentChanges.remove(curChange);
    CSN firstCSN = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstCSN);
    while (firstChange != null && firstChange.isCommitted())
      Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator();
      while (it.hasNext())
    {
      if (firstChange.getMsg().contributesToDomainState())
        Map.Entry<CSN, PendingChange> change = it.next();
        PendingChange pendingChange = change.getValue();
        if (!pendingChange.isCommitted())
      {
        state.update(firstCSN);
          break;
      }
      pendingChanges.remove(firstCSN);
      if (pendingChanges.isEmpty())
        if (pendingChange.getMsg().contributesToDomainState())
      {
        firstChange = null;
          state.update(change.getKey());
      }
      else
        it.remove();
      }
    }
    finally
      {
        firstCSN = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstCSN);
      }
      pendingChangesWriteLock.unlock();
    }
  }
  public void markInProgress(LDAPUpdateMsg msg)
  {
    pendingChangesReadLock.lock();
    try
    {
      activeAndDependentChanges.add(pendingChanges.get(msg.getCSN()));
    }
    finally
    {
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Get the first update in the list that have some dependencies cleared.
   *
   * @return The LDAPUpdateMsg to be handled.
   */
  public synchronized LDAPUpdateMsg getNextUpdate()
  public LDAPUpdateMsg getNextUpdate()
  {
    /*
     * Parse the list of Update with dependencies and check if the dependencies
     * are now cleared until an Update without dependencies is found.
     */
    for (PendingChange change : dependentChanges)
    pendingChangesReadLock.lock();
    dependentChangesLock.lock();
    try
    {
      if (change.dependenciesIsCovered(state))
      if (!dependentChanges.isEmpty())
      {
        dependentChanges.remove(change);
        return change.getLDAPUpdateMsg();
        PendingChange firstDependentChange = dependentChanges.first();
        if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN()))
        {
          dependentChanges.remove(firstDependentChange);
          return firstDependentChange.getLDAPUpdateMsg();
      }
    }
    return null;
  }
    finally
    {
      dependentChangesLock.unlock();
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Mark the first pendingChange as dependent on the second PendingChange.
   * @param dependentChange The PendingChange that depend on the second
   *                        PendingChange.
   * @param pendingChange   The PendingChange on which the first PendingChange
   *                        is dependent.
   */
  private void addDependency(
      PendingChange dependentChange, PendingChange pendingChange)
  private void addDependency(PendingChange dependentChange)
  {
    dependentChange.addDependency(pendingChange.getCSN());
    dependentChangesLock.lock();
    try
    {
    dependentChanges.add(dependentChange);
  }
    finally
    {
      dependentChangesLock.unlock();
    }
  }
  private PendingChange getPendingChange(CSN csn)
  {
    pendingChangesReadLock.lock();
    try
    {
      return pendingChanges.get(csn);
    }
    finally
    {
      pendingChangesReadLock.unlock();
    }
  }
  /**
   * Check if the given AddOperation has some dependencies on any
@@ -190,34 +288,35 @@
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(AddOperation op)
  public boolean checkDependencies(AddOperation op)
  {
    boolean hasDependencies = false;
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
             * Check is the operation to be run is a deleteOperation on the
             * same DN.
           * Check if the operation to be run is a deleteOperation on the same DN.
             */
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
          else if (pendingMsg instanceof AddMsg)
@@ -229,7 +328,7 @@
            if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
@@ -242,7 +341,7 @@
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
            else
            {
@@ -250,22 +349,11 @@
              if (pendingModDn.newDNIsParent(targetDN))
              {
                hasDependencies = true;
                addDependency(change, pendingChange);
            addDependency(change);
              }
            }
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
@@ -277,26 +365,31 @@
   *
   * ModifyOperation depends on
   * - AddOperation done on the same DN
   * - ModifyDNOperation having newDN the same as targetDN
   *
   * @param op The ModifyOperation to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(ModifyOperation op)
  public boolean checkDependencies(ModifyOperation op)
  {
    boolean hasDependencies = false;
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    final DN targetDN = change.getLDAPUpdateMsg().getDN();
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg instanceof AddMsg)
        {
@@ -304,18 +397,16 @@
          if (pendingMsg.getDN().equals(targetDN))
          {
            hasDependencies = true;
            addDependency(change, pendingChange);
          addDependency(change);
          }
        }
      }
      else
      else if (pendingMsg instanceof ModifyDNMsg)
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
        if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN))
        {
          hasDependencies = true;
          addDependency(change);
        }
      }
    }
    return hasDependencies;
@@ -333,29 +424,32 @@
   * - DeleteOperation done on the new DN of the MODDN operation
   * - ModifyDNOperation done from the new DN of the MODDN operation
   *
   * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first
   *
   * @param msg The ModifyDNMsg to be checked.
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  private synchronized boolean checkDependencies(ModifyDNMsg msg)
  public boolean checkDependencies(ModifyDNMsg msg)
  {
    boolean hasDependencies = false;
    final CSN csn = msg.getCSN();
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    final DN targetDN = change.getLDAPUpdateMsg().getDN();
    for (PendingChange pendingChange : pendingChanges.values())
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            // Check if the target of the Delete is the same
@@ -363,7 +457,7 @@
            if (msg.newDNIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
          else if (pendingMsg instanceof AddMsg)
@@ -373,14 +467,14 @@
            if (msg.newParentIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
            // Check if the AddOperation was done on the same DN as the
            // target DN of the MODDN operation
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
@@ -390,21 +484,10 @@
            if (msg.newDNIsEqual(pendingMsg.getDN()))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
@@ -424,24 +507,26 @@
   *
   * @return A boolean indicating if this operation has some dependencies.
   */
  public synchronized boolean checkDependencies(DeleteOperation op)
  public boolean checkDependencies(DeleteOperation op)
  {
    boolean hasDependencies = false;
    final DN targetDN = op.getEntryDN();
    final CSN csn = OperationContext.getCSN(op);
    final PendingChange change = pendingChanges.get(csn);
    final PendingChange change = getPendingChange(csn);
    if (change == null)
    {
      return false;
    }
    for (PendingChange pendingChange : pendingChanges.values())
    for (PendingChange pendingChange : activeAndDependentChanges)
    {
      if (pendingChange.getCSN().isOlderThan(csn))
      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
      {
        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
        break;
      }
        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
        if (pendingMsg != null)
        {
          if (pendingMsg instanceof DeleteMsg)
          {
            /*
@@ -451,7 +536,7 @@
            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
          else if (pendingMsg instanceof AddMsg)
@@ -463,7 +548,7 @@
            if (pendingMsg.getDN().equals(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
          else if (pendingMsg instanceof ModifyDNMsg)
@@ -473,25 +558,13 @@
             * Check if the operation to be run is an ModifyDNOperation
             * on a children of the current DeleteOperation
             */
            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)
                || pendingModDn.newDNIsParent(targetDN))
        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN))
            {
              hasDependencies = true;
              addDependency(change, pendingChange);
          addDependency(change);
            }
          }
        }
      }
      else
      {
        // We reached an operation that is newer than the operation
        // for which we are doing the dependency check so it is
        // not possible to find another operation with some dependency.
        // break the loop to avoid going through the potentially large
        // list of pending changes.
        break;
      }
    }
    return hasDependencies;
  }
@@ -514,16 +587,18 @@
    {
      DeleteOperation newOp = (DeleteOperation) op;
      return checkDependencies(newOp);
    } else if (op instanceof AddOperation)
    }
    else if (op instanceof AddOperation)
    {
      AddOperation newOp = (AddOperation) op;
      return checkDependencies(newOp);
    } else if (op instanceof ModifyDNOperationBasis)
    }
    else if (op instanceof ModifyDNOperationBasis)
    {
      ModifyDNMsg newMsg = (ModifyDNMsg) msg;
      return checkDependencies(newMsg);
    } else
    }
    else
    {
      return true;  // unknown type of operation ?!
    }
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2015 ForgeRock AS
 *      Portions Copyright 2011-2016 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
@@ -32,6 +32,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -49,6 +50,7 @@
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
  private final ReentrantLock switchQueueLock;
  private AtomicBoolean shutdown = new AtomicBoolean(false);
  private static int count;
@@ -56,11 +58,13 @@
   * Constructor for the ReplayThread.
   *
   * @param updateToReplayQueue The queue of update messages we have to replay
   * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic
   */
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock)
  {
     super("Replica replay thread " + count++);
     this.updateToReplayQueue = updateToReplayQueue;
    this.switchQueueLock = switchQueueLock;
  }
  /**
@@ -86,19 +90,34 @@
    {
      try
      {
        UpdateToReplay updateToreplay;
        // Loop getting an updateToReplayQueue from the update message queue and
        // replaying matching changes
        while (!shutdown.get() &&
          ((updateToreplay = updateToReplayQueue.poll(1L,
          TimeUnit.SECONDS)) != null))
        if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS))
        {
          // Find replication domain for that update message
          LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
          LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
          LDAPReplicationDomain domain;
          LDAPUpdateMsg updateMsg;
          try
          {
            if (shutdown.get())
            {
              break;
            }
            UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS);
            if (updateToreplay == null)
            {
              continue;
            }
            // Find replication domain for that update message and mark it as "in progress"
            updateMsg = updateToreplay.getUpdateMessage();
            domain = updateToreplay.getReplicationDomain();
            domain.markInProgress(updateMsg);
          }
          finally
          {
            switchQueueLock.unlock();
          }
          domain.replay(updateMsg, shutdown);
        }
      } catch (Exception e)
      }
      catch (Exception e)
      {
        /*
         * catch all exceptions happening so that the thread never dies even
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013-2015 ForgeRock AS
 *      Portions Copyright 2013-2016 ForgeRock AS.
 */
package org.opends.server.replication.plugin;
@@ -40,6 +40,7 @@
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
@@ -374,6 +375,8 @@
  private void replayMsg(UpdateMsg updateMsg) throws InterruptedException
  {
    domain.processUpdate(updateMsg);
    domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
    LDAPUpdateMsg ldapUpdate = queue.take().getUpdateMessage();
    domain.markInProgress(ldapUpdate);
    domain.replay(ldapUpdate, SHUTDOWN);
  }
}