From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java   |  545 +++++++++++++++++++++++++-------------------
 opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java     |    7 
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java          |   37 --
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java           |   47 ++-
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java  |   43 ++-
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java |    6 
 6 files changed, 379 insertions(+), 306 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 1be7e3c..70fc1a8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2016 ForgeRock AS
+ *      Portions Copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
@@ -2266,6 +2266,15 @@
   }
 
   /**
+   * Marks the specified message as the one currently processed by a replay thread.
+   * @param msg the message being processed
+   */
+  void markInProgress(LDAPUpdateMsg msg)
+  {
+    remotePendingChanges.markInProgress(msg);
+  }
+
+  /**
    * Create and replay a synchronized Operation from an UpdateMsg.
    *
    * @param msg
@@ -2292,7 +2301,6 @@
         // error handling paths.
         Operation nextOp = op = msg.createOperation(conn);
         dependency = remotePendingChanges.checkDependencies(op, msg);
-
         boolean replayDone = false;
         int retryCount = 10;
         while (!dependency && !replayDone && retryCount-- > 0)
@@ -2350,25 +2358,27 @@
               ModifyOperation castOp = (ModifyOperation) op;
               dependency = remotePendingChanges.checkDependencies(castOp);
               ModifyMsg modifyMsg = (ModifyMsg) msg;
-              replayDone = solveNamingConflict(castOp, modifyMsg);
+              replayDone = !dependency && solveNamingConflict(castOp, modifyMsg);
             }
             else if (op instanceof DeleteOperation)
             {
               DeleteOperation castOp = (DeleteOperation) op;
               dependency = remotePendingChanges.checkDependencies(castOp);
-              replayDone = solveNamingConflict(castOp, msg);
+              replayDone = !dependency && solveNamingConflict(castOp, msg);
             }
             else if (op instanceof AddOperation)
             {
               AddOperation castOp = (AddOperation) op;
               AddMsg addMsg = (AddMsg) msg;
               dependency = remotePendingChanges.checkDependencies(castOp);
-              replayDone = solveNamingConflict(castOp, addMsg);
+              replayDone = !dependency && solveNamingConflict(castOp, addMsg);
             }
             else if (op instanceof ModifyDNOperation)
             {
               ModifyDNOperation castOp = (ModifyDNOperation) op;
-              replayDone = solveNamingConflict(castOp, msg);
+              ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg;
+              dependency = remotePendingChanges.checkDependencies(modifyDNMsg);
+              replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg);
             }
             else
             {
@@ -2385,8 +2395,8 @@
             else
             {
               /*
-               * Create a new operation reflecting the new state of the
-               * UpdateMsg after conflict resolution modified it.
+               * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution
+               * modified it and try replaying it again. Dependencies might have been replayed by now.
                *  Note: When msg is a DeleteMsg, the DeleteOperation is properly
                *  created with subtreeDelete request control when needed.
                */
@@ -4293,16 +4303,13 @@
     // number of updates in the pending list
     addMonitorData(attributes, "pending-updates", pendingChanges.size());
 
-    addMonitorData(attributes, "replayed-updates-ok",
-        numReplayedPostOpCalled.get());
-    addMonitorData(attributes, "resolved-modify-conflicts",
-        numResolvedModifyConflicts.get());
-    addMonitorData(attributes, "resolved-naming-conflicts",
-        numResolvedNamingConflicts.get());
-    addMonitorData(attributes, "unresolved-naming-conflicts",
-        numUnresolvedNamingConflicts.get());
-    addMonitorData(attributes, "remote-pending-changes-size",
-        remotePendingChanges.getQueueSize());
+    addMonitorData(attributes, "replayed-updates-ok", numReplayedPostOpCalled.get());
+    addMonitorData(attributes, "resolved-modify-conflicts", numResolvedModifyConflicts.get());
+    addMonitorData(attributes, "resolved-naming-conflicts", numResolvedNamingConflicts.get());
+    addMonitorData(attributes, "unresolved-naming-conflicts", numUnresolvedNamingConflicts.get());
+    addMonitorData(attributes, "remote-pending-changes-size", remotePendingChanges.getQueueSize());
+    addMonitorData(attributes, "dependent-changes-size", remotePendingChanges.getDependentChangesSize());
+    addMonitorData(attributes, "changes-in-progress-size", remotePendingChanges.changesInProgressSize());
 
     return attributes;
   }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
index 8527c20..1d87753 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2016 ForgeRock AS
+ *      Portions Copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
@@ -41,6 +41,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -316,9 +317,10 @@
   {
     replayThreads.clear();
 
+    ReentrantLock switchQueueLock = new ReentrantLock();
     for (int i = 0; i < replayThreadNumber; i++)
     {
-      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
+      ReplayThread replayThread = new ReplayThread(updateToReplayQueue, switchQueueLock);
       replayThread.start();
       replayThreads.add(replayThread);
     }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
index 9e7309f..2104731 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
@@ -22,12 +22,11 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions copyright 2014 ForgeRock AS
+ *      Portions copyright 2014-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
 import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.LDAPUpdateMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.operation.PluginOperation;
@@ -42,7 +41,6 @@
   private boolean committed;
   private UpdateMsg msg;
   private final PluginOperation op;
-  private ServerState dependencyState;
 
   /**
    * Construct a new PendingChange.
@@ -128,35 +126,6 @@
     return this.op;
   }
 
-  /**
-   * Add the given CSN to the list of dependencies of this PendingChange.
-   *
-   * @param csn
-   *          The CSN to add to the list of dependencies of this PendingChange.
-   */
-  public void addDependency(CSN csn)
-  {
-    if (dependencyState == null)
-    {
-      dependencyState = new ServerState();
-    }
-    dependencyState.update(csn);
-  }
-
-  /**
-   * Check if the given ServerState covers the dependencies of this
-   * PendingChange.
-   *
-   * @param state The ServerState for which dependencies must be checked,
-   *
-   * @return A boolean indicating if the given ServerState covers the
-   *         dependencies of this PendingChange.
-   */
-  public boolean dependenciesIsCovered(ServerState state)
-  {
-    return state.cover(dependencyState);
-  }
-
   /** {@inheritDoc} */
   @Override
   public int compareTo(PendingChange o)
@@ -173,8 +142,6 @@
         + ", csn=" + csn.toStringUI()
         + ", msg=[" + msg
         + "], isOperationSynchronized="
-        + (op != null ? op.isSynchronizationOperation() : "false")
-        + ", dependencyState="
-        + (dependencyState != null ? dependencyState : "");
+        + (op != null ? op.isSynchronizationOperation() : "false");
   }
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
index 79edb18..21fcad9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,7 +27,11 @@
 package org.opends.server.replication.plugin;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import net.jcip.annotations.GuardedBy;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.ModifyDNOperationBasis;
@@ -51,6 +55,7 @@
 final class RemotePendingChanges
 {
   /** A map used to store the pending changes. */
+  @GuardedBy("pendingChangesLock")
   private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>();
 
   /**
@@ -58,7 +63,18 @@
    * not been replayed correctly because they are dependent on
    * another change to be completed.
    */
+  @GuardedBy("dependentChangesLock")
   private final SortedSet<PendingChange> dependentChanges = new TreeSet<>();
+  /**
+   * {@code activeAndDependentChanges} also contains changes discovered to be dependent
+   * on currently in progress changes.
+   */
+  private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>();
+
+  private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true);
+  private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock();
+  private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock();
+  private ReentrantLock dependentChangesLock = new ReentrantLock();
 
   /** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */
   private final ServerState state;
@@ -75,13 +91,49 @@
   }
 
   /**
-   * Returns the number of changes currently in this list.
+   * Returns the number of changes waiting to be replayed.
    *
-   * @return The number of changes currently in this list.
+   * @return The number of changes waiting to be replayed
    */
-  public synchronized int getQueueSize()
+  public int getQueueSize()
   {
-    return pendingChanges.size();
+    pendingChangesReadLock.lock();
+    try
+    {
+      return pendingChanges.size();
+    }
+    finally
+    {
+      pendingChangesReadLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the number of changes actively being replayed.
+   *
+   * @return the number of changes actively being replayed.
+   */
+  public int changesInProgressSize()
+  {
+    return activeAndDependentChanges.size();
+  }
+
+  /**
+   * Returns the number of changes depending on other changes.
+   *
+   * @return the number of changes depending on other changes.
+   */
+  public int getDependentChangesSize()
+  {
+    dependentChangesLock.lock();
+    try
+    {
+      return dependentChanges.size();
+    }
+    finally
+    {
+      dependentChangesLock.unlock();
+    }
   }
 
   /**
@@ -93,11 +145,18 @@
    * @return {@code false} if the update was already registered in the pending
    *         changes.
    */
-  public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update)
+  public boolean putRemoteUpdate(LDAPUpdateMsg update)
   {
-    CSN csn = update.getCSN();
-    return pendingChanges.put(csn,
-        new PendingChange(csn, null, update)) == null;
+    pendingChangesWriteLock.lock();
+    try
+    {
+      CSN csn = update.getCSN();
+      return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null;
+    }
+    finally
+    {
+      pendingChangesWriteLock.unlock();
+    }
   }
 
   /**
@@ -106,72 +165,111 @@
    * @param csn
    *          The CSN of the update message that must be set as committed.
    */
-  public synchronized void commit(CSN csn)
+  public void commit(CSN csn)
   {
-    PendingChange curChange = pendingChanges.get(csn);
-    if (curChange == null)
+    pendingChangesWriteLock.lock();
+    try
     {
-      throw new NoSuchElementException();
+      PendingChange curChange = pendingChanges.get(csn);
+      if (curChange == null)
+      {
+        throw new NoSuchElementException();
+      }
+      curChange.setCommitted(true);
+      activeAndDependentChanges.remove(curChange);
+
+      Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator();
+      while (it.hasNext())
+      {
+        Map.Entry<CSN, PendingChange> change = it.next();
+        PendingChange pendingChange = change.getValue();
+        if (!pendingChange.isCommitted())
+        {
+          break;
+        }
+        if (pendingChange.getMsg().contributesToDomainState())
+        {
+          state.update(change.getKey());
+        }
+        it.remove();
+      }
     }
-    curChange.setCommitted(true);
-
-    CSN firstCSN = pendingChanges.firstKey();
-    PendingChange firstChange = pendingChanges.get(firstCSN);
-
-    while (firstChange != null && firstChange.isCommitted())
+    finally
     {
-      if (firstChange.getMsg().contributesToDomainState())
-      {
-        state.update(firstCSN);
-      }
-      pendingChanges.remove(firstCSN);
-
-      if (pendingChanges.isEmpty())
-      {
-        firstChange = null;
-      }
-      else
-      {
-        firstCSN = pendingChanges.firstKey();
-        firstChange = pendingChanges.get(firstCSN);
-      }
+      pendingChangesWriteLock.unlock();
     }
   }
 
+  public void markInProgress(LDAPUpdateMsg msg)
+  {
+    pendingChangesReadLock.lock();
+    try
+    {
+      activeAndDependentChanges.add(pendingChanges.get(msg.getCSN()));
+    }
+    finally
+    {
+      pendingChangesReadLock.unlock();
+    }
+  }
   /**
    * Get the first update in the list that have some dependencies cleared.
    *
    * @return The LDAPUpdateMsg to be handled.
    */
-  public synchronized LDAPUpdateMsg getNextUpdate()
+  public LDAPUpdateMsg getNextUpdate()
   {
-    /*
-     * Parse the list of Update with dependencies and check if the dependencies
-     * are now cleared until an Update without dependencies is found.
-     */
-    for (PendingChange change : dependentChanges)
+    pendingChangesReadLock.lock();
+    dependentChangesLock.lock();
+    try
     {
-      if (change.dependenciesIsCovered(state))
+      if (!dependentChanges.isEmpty())
       {
-        dependentChanges.remove(change);
-        return change.getLDAPUpdateMsg();
+        PendingChange firstDependentChange = dependentChanges.first();
+        if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN()))
+        {
+          dependentChanges.remove(firstDependentChange);
+          return firstDependentChange.getLDAPUpdateMsg();
+        }
       }
+      return null;
     }
-    return null;
+    finally
+    {
+      dependentChangesLock.unlock();
+      pendingChangesReadLock.unlock();
+    }
   }
 
   /**
    * Mark the first pendingChange as dependent on the second PendingChange.
    * @param dependentChange The PendingChange that depend on the second
    *                        PendingChange.
-   * @param pendingChange   The PendingChange on which the first PendingChange
-   *                        is dependent.
    */
-  private void addDependency(
-      PendingChange dependentChange, PendingChange pendingChange)
+  private void addDependency(PendingChange dependentChange)
   {
-    dependentChange.addDependency(pendingChange.getCSN());
-    dependentChanges.add(dependentChange);
+    dependentChangesLock.lock();
+    try
+    {
+      dependentChanges.add(dependentChange);
+    }
+    finally
+    {
+      dependentChangesLock.unlock();
+    }
+  }
+
+  private PendingChange getPendingChange(CSN csn)
+  {
+    pendingChangesReadLock.lock();
+    try
+    {
+      return pendingChanges.get(csn);
+    }
+    finally
+    {
+      pendingChangesReadLock.unlock();
+    }
   }
 
   /**
@@ -190,80 +288,70 @@
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(AddOperation op)
+  public boolean checkDependencies(AddOperation op)
   {
     boolean hasDependencies = false;
     final DN targetDN = op.getEntryDN();
     final CSN csn = OperationContext.getCSN(op);
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
       return false;
     }
 
-    for (PendingChange pendingChange : pendingChanges.values())
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg != null)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof DeleteMsg)
+      {
+          /*
+           * Check if the operation to be run is a deleteOperation on the same DN.
+           */
+        if (pendingMsg.getDN().equals(targetDN))
         {
-          if (pendingMsg instanceof DeleteMsg)
-          {
-            /*
-             * Check is the operation to be run is a deleteOperation on the
-             * same DN.
-             */
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof AddMsg)
-          {
-            /*
-             * Check if the operation to be run is an addOperation on a
-             * parent of the current AddOperation.
-             */
-            if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof ModifyDNMsg)
-          {
-            /*
-             * Check if the operation to be run is ModifyDnOperation with
-             * the same target DN as the ADD DN
-             * or a ModifyDnOperation with new DN equals to the ADD DN parent
-             */
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-            else
-            {
-              final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
-              if (pendingModDn.newDNIsParent(targetDN))
-              {
-                hasDependencies = true;
-                addDependency(change, pendingChange);
-              }
-            }
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof AddMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+          /*
+           * Check if the operation to be run is an addOperation on a
+           * parent of the current AddOperation.
+           */
+        if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+      }
+      else if (pendingMsg instanceof ModifyDNMsg)
+      {
+          /*
+           * Check if the operation to be run is ModifyDnOperation with
+           * the same target DN as the ADD DN
+           * or a ModifyDnOperation with new DN equals to the ADD DN parent
+           */
+        if (pendingMsg.getDN().equals(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+        else
+        {
+          final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+          if (pendingModDn.newDNIsParent(targetDN))
+          {
+            hasDependencies = true;
+            addDependency(change);
+          }
+        }
       }
     }
     return hasDependencies;
@@ -277,45 +365,48 @@
    *
    * ModifyOperation depends on
    * - AddOperation done on the same DN
+   * - ModifyDNOperation having newDN the same as targetDN
    *
    * @param op The ModifyOperation to be checked.
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(ModifyOperation op)
+  public boolean checkDependencies(ModifyOperation op)
   {
     boolean hasDependencies = false;
-    final DN targetDN = op.getEntryDN();
     final CSN csn = OperationContext.getCSN(op);
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
-      return false;
+        return false;
     }
 
-    for (PendingChange pendingChange : pendingChanges.values())
+    final DN targetDN = change.getLDAPUpdateMsg().getDN();
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg instanceof AddMsg)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof AddMsg)
+      {
+        // Check if the operation to be run is an addOperation on a same DN.
+        if (pendingMsg.getDN().equals(targetDN))
         {
-          // Check if the operation to be run is an addOperation on a same DN.
-          if (pendingMsg.getDN().equals(targetDN))
-          {
-            hasDependencies = true;
-            addDependency(change, pendingChange);
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof ModifyDNMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+        if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
       }
     }
     return hasDependencies;
@@ -333,76 +424,68 @@
    * - DeleteOperation done on the new DN of the MODDN operation
    * - ModifyDNOperation done from the new DN of the MODDN operation
    *
+   * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first
+   *
    * @param msg The ModifyDNMsg to be checked.
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  private synchronized boolean checkDependencies(ModifyDNMsg msg)
+  public boolean checkDependencies(ModifyDNMsg msg)
   {
     boolean hasDependencies = false;
     final CSN csn = msg.getCSN();
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
       return false;
     }
 
     final DN targetDN = change.getLDAPUpdateMsg().getDN();
-
-    for (PendingChange pendingChange : pendingChanges.values())
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg != null)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof DeleteMsg)
+      {
+        // Check if the target of the Delete is the same
+        // as the new DN of this ModifyDN
+        if (msg.newDNIsEqual(pendingMsg.getDN()))
         {
-          if (pendingMsg instanceof DeleteMsg)
-          {
-            // Check if the target of the Delete is the same
-            // as the new DN of this ModifyDN
-            if (msg.newDNIsEqual(pendingMsg.getDN()))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof AddMsg)
-          {
-            // Check if the Add Operation was done on the new parent of
-            // the MODDN  operation
-            if (msg.newParentIsEqual(pendingMsg.getDN()))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-            // Check if the AddOperation was done on the same DN as the
-            // target DN of the MODDN operation
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof ModifyDNMsg)
-          {
-            // Check if the ModifyDNOperation was done from the new DN of
-            // the MODDN operation
-            if (msg.newDNIsEqual(pendingMsg.getDN()))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof AddMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+        // Check if the Add Operation was done on the new parent of
+        // the MODDN  operation
+        if (msg.newParentIsEqual(pendingMsg.getDN()))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+        // Check if the AddOperation was done on the same DN as the
+        // target DN of the MODDN operation
+        if (pendingMsg.getDN().equals(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+      }
+      else if (pendingMsg instanceof ModifyDNMsg)
+      {
+        // Check if the ModifyDNOperation was done from the new DN of
+        // the MODDN operation
+        if (msg.newDNIsEqual(pendingMsg.getDN()))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
       }
     }
     return hasDependencies;
@@ -424,72 +507,62 @@
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(DeleteOperation op)
+  public boolean checkDependencies(DeleteOperation op)
   {
     boolean hasDependencies = false;
     final DN targetDN = op.getEntryDN();
     final CSN csn = OperationContext.getCSN(op);
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
       return false;
     }
 
-    for (PendingChange pendingChange : pendingChanges.values())
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg != null)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof DeleteMsg)
+      {
+          /*
+           * Check if the operation to be run is a deleteOperation on a
+           * children of the current DeleteOperation.
+           */
+        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
         {
-          if (pendingMsg instanceof DeleteMsg)
-          {
-            /*
-             * Check if the operation to be run is a deleteOperation on a
-             * children of the current DeleteOperation.
-             */
-            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof AddMsg)
-          {
-            /*
-             * Check if the operation to be run is an addOperation on a
-             * parent of the current DeleteOperation.
-             */
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof ModifyDNMsg)
-          {
-            final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
-            /*
-             * Check if the operation to be run is an ModifyDNOperation
-             * on a children of the current DeleteOperation
-             */
-            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)
-                || pendingModDn.newDNIsParent(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof AddMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+          /*
+           * Check if the operation to be run is an addOperation on a
+           * parent of the current DeleteOperation.
+           */
+        if (pendingMsg.getDN().equals(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+      }
+      else if (pendingMsg instanceof ModifyDNMsg)
+      {
+        final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+          /*
+           * Check if the operation to be run is an ModifyDNOperation
+           * on a children of the current DeleteOperation
+           */
+        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
       }
     }
     return hasDependencies;
@@ -514,16 +587,18 @@
     {
       DeleteOperation newOp = (DeleteOperation) op;
       return checkDependencies(newOp);
-
-    } else if (op instanceof AddOperation)
+    }
+    else if (op instanceof AddOperation)
     {
       AddOperation newOp = (AddOperation) op;
       return checkDependencies(newOp);
-    } else if (op instanceof ModifyDNOperationBasis)
+    }
+    else if (op instanceof ModifyDNOperationBasis)
     {
       ModifyDNMsg newMsg = (ModifyDNMsg) msg;
       return checkDependencies(newMsg);
-    } else
+    }
+    else
     {
       return true;  // unknown type of operation ?!
     }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
index 8c19fef..4d1f54d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2015 ForgeRock AS
+ *      Portions Copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
@@ -32,6 +32,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.opends.server.api.DirectoryThread;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -49,6 +50,7 @@
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
   private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
+  private final ReentrantLock switchQueueLock;
   private AtomicBoolean shutdown = new AtomicBoolean(false);
   private static int count;
 
@@ -56,11 +58,13 @@
    * Constructor for the ReplayThread.
    *
    * @param updateToReplayQueue The queue of update messages we have to replay
+   * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic
    */
-  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
+  public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock)
   {
-     super("Replica replay thread " + count++);
-     this.updateToReplayQueue = updateToReplayQueue;
+    super("Replica replay thread " + count++);
+    this.updateToReplayQueue = updateToReplayQueue;
+    this.switchQueueLock = switchQueueLock;
   }
 
   /**
@@ -86,19 +90,34 @@
     {
       try
       {
-        UpdateToReplay updateToreplay;
-        // Loop getting an updateToReplayQueue from the update message queue and
-        // replaying matching changes
-        while (!shutdown.get() &&
-          ((updateToreplay = updateToReplayQueue.poll(1L,
-          TimeUnit.SECONDS)) != null))
+        if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS))
         {
-          // Find replication domain for that update message
-          LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
-          LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
+          LDAPReplicationDomain domain;
+          LDAPUpdateMsg updateMsg;
+          try
+          {
+            if (shutdown.get())
+            {
+              break;
+            }
+            UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS);
+            if (updateToreplay == null)
+            {
+              continue;
+            }
+            // Find replication domain for that update message and mark it as "in progress"
+            updateMsg = updateToreplay.getUpdateMessage();
+            domain = updateToreplay.getReplicationDomain();
+            domain.markInProgress(updateMsg);
+          }
+          finally
+          {
+            switchQueueLock.unlock();
+          }
           domain.replay(updateMsg, shutdown);
         }
-      } catch (Exception e)
+      }
+      catch (Exception e)
       {
         /*
          * catch all exceptions happening so that the thread never dies even
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
index d7f4e82..2c8f796 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2013-2015 ForgeRock AS
+ *      Portions Copyright 2013-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
@@ -40,6 +40,7 @@
 import org.opends.server.replication.common.CSNGenerator;
 import org.opends.server.replication.protocol.AddMsg;
 import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
 import org.opends.server.replication.protocol.ModifyDNMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.Attribute;
@@ -374,6 +375,8 @@
   private void replayMsg(UpdateMsg updateMsg) throws InterruptedException
   {
     domain.processUpdate(updateMsg);
-    domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
+    LDAPUpdateMsg ldapUpdate = queue.take().getUpdateMessage();
+    domain.markInProgress(ldapUpdate);
+    domain.replay(ldapUpdate, SHUTDOWN);
   }
 }

--
Gitblit v1.10.0