From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java |  545 +++++++++++++++++++++++++++++++-----------------------
 1 files changed, 310 insertions(+), 235 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
index 79edb18..21fcad9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,7 +27,11 @@
 package org.opends.server.replication.plugin;
 
 import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import net.jcip.annotations.GuardedBy;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
 import org.opends.server.core.ModifyDNOperationBasis;
@@ -51,6 +55,7 @@
 final class RemotePendingChanges
 {
   /** A map used to store the pending changes. */
+  @GuardedBy("pendingChangesLock")
   private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>();
 
   /**
@@ -58,7 +63,18 @@
    * not been replayed correctly because they are dependent on
    * another change to be completed.
    */
+  @GuardedBy("dependentChangesLock")
   private final SortedSet<PendingChange> dependentChanges = new TreeSet<>();
+  /**
+   * {@code activeAndDependentChanges} also contains changes discovered to be dependent
+   * on currently in progress changes.
+   */
+  private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>();
+
+  private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true);
+  private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock();
+  private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock();
+  private ReentrantLock dependentChangesLock = new ReentrantLock();
 
   /** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */
   private final ServerState state;
@@ -75,13 +91,49 @@
   }
 
   /**
-   * Returns the number of changes currently in this list.
+   * Returns the number of changes waiting to be replayed.
    *
-   * @return The number of changes currently in this list.
+   * @return The number of changes waiting to be replayed
    */
-  public synchronized int getQueueSize()
+  public int getQueueSize()
   {
-    return pendingChanges.size();
+    pendingChangesReadLock.lock();
+    try
+    {
+      return pendingChanges.size();
+    }
+    finally
+    {
+      pendingChangesReadLock.unlock();
+    }
+  }
+
+  /**
+   * Returns the number of changes actively being replayed.
+   *
+   * @return the number of changes actively being replayed.
+   */
+  public int changesInProgressSize()
+  {
+    return activeAndDependentChanges.size();
+  }
+
+  /**
+   * Returns the number of changes depending on other changes.
+   *
+   * @return the number of changes depending on other changes.
+   */
+  public int getDependentChangesSize()
+  {
+    dependentChangesLock.lock();
+    try
+    {
+      return dependentChanges.size();
+    }
+    finally
+    {
+      dependentChangesLock.unlock();
+    }
   }
 
   /**
@@ -93,11 +145,18 @@
    * @return {@code false} if the update was already registered in the pending
    *         changes.
    */
-  public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update)
+  public boolean putRemoteUpdate(LDAPUpdateMsg update)
   {
-    CSN csn = update.getCSN();
-    return pendingChanges.put(csn,
-        new PendingChange(csn, null, update)) == null;
+    pendingChangesWriteLock.lock();
+    try
+    {
+      CSN csn = update.getCSN();
+      return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null;
+    }
+    finally
+    {
+      pendingChangesWriteLock.unlock();
+    }
   }
 
   /**
@@ -106,72 +165,111 @@
    * @param csn
    *          The CSN of the update message that must be set as committed.
    */
-  public synchronized void commit(CSN csn)
+  public void commit(CSN csn)
   {
-    PendingChange curChange = pendingChanges.get(csn);
-    if (curChange == null)
+    pendingChangesWriteLock.lock();
+    try
     {
-      throw new NoSuchElementException();
+      PendingChange curChange = pendingChanges.get(csn);
+      if (curChange == null)
+      {
+        throw new NoSuchElementException();
+      }
+      curChange.setCommitted(true);
+      activeAndDependentChanges.remove(curChange);
+
+      Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator();
+      while (it.hasNext())
+      {
+        Map.Entry<CSN, PendingChange> change = it.next();
+        PendingChange pendingChange = change.getValue();
+        if (!pendingChange.isCommitted())
+        {
+          break;
+        }
+        if (pendingChange.getMsg().contributesToDomainState())
+        {
+          state.update(change.getKey());
+        }
+        it.remove();
+      }
     }
-    curChange.setCommitted(true);
-
-    CSN firstCSN = pendingChanges.firstKey();
-    PendingChange firstChange = pendingChanges.get(firstCSN);
-
-    while (firstChange != null && firstChange.isCommitted())
+    finally
     {
-      if (firstChange.getMsg().contributesToDomainState())
-      {
-        state.update(firstCSN);
-      }
-      pendingChanges.remove(firstCSN);
-
-      if (pendingChanges.isEmpty())
-      {
-        firstChange = null;
-      }
-      else
-      {
-        firstCSN = pendingChanges.firstKey();
-        firstChange = pendingChanges.get(firstCSN);
-      }
+      pendingChangesWriteLock.unlock();
     }
   }
 
+  public void markInProgress(LDAPUpdateMsg msg)
+  {
+    pendingChangesReadLock.lock();
+    try
+    {
+      activeAndDependentChanges.add(pendingChanges.get(msg.getCSN()));
+    }
+    finally
+    {
+      pendingChangesReadLock.unlock();
+    }
+  }
   /**
    * Get the first update in the list that have some dependencies cleared.
    *
    * @return The LDAPUpdateMsg to be handled.
    */
-  public synchronized LDAPUpdateMsg getNextUpdate()
+  public LDAPUpdateMsg getNextUpdate()
   {
-    /*
-     * Parse the list of Update with dependencies and check if the dependencies
-     * are now cleared until an Update without dependencies is found.
-     */
-    for (PendingChange change : dependentChanges)
+    pendingChangesReadLock.lock();
+    dependentChangesLock.lock();
+    try
     {
-      if (change.dependenciesIsCovered(state))
+      if (!dependentChanges.isEmpty())
       {
-        dependentChanges.remove(change);
-        return change.getLDAPUpdateMsg();
+        PendingChange firstDependentChange = dependentChanges.first();
+        if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN()))
+        {
+          dependentChanges.remove(firstDependentChange);
+          return firstDependentChange.getLDAPUpdateMsg();
+        }
       }
+      return null;
     }
-    return null;
+    finally
+    {
+      dependentChangesLock.unlock();
+      pendingChangesReadLock.unlock();
+    }
   }
 
   /**
    * Mark the first pendingChange as dependent on the second PendingChange.
    * @param dependentChange The PendingChange that depend on the second
    *                        PendingChange.
-   * @param pendingChange   The PendingChange on which the first PendingChange
-   *                        is dependent.
    */
-  private void addDependency(
-      PendingChange dependentChange, PendingChange pendingChange)
+  private void addDependency(PendingChange dependentChange)
   {
-    dependentChange.addDependency(pendingChange.getCSN());
-    dependentChanges.add(dependentChange);
+    dependentChangesLock.lock();
+    try
+    {
+      dependentChanges.add(dependentChange);
+    }
+    finally
+    {
+      dependentChangesLock.unlock();
+    }
+  }
+
+  private PendingChange getPendingChange(CSN csn)
+  {
+    pendingChangesReadLock.lock();
+    try
+    {
+      return pendingChanges.get(csn);
+    }
+    finally
+    {
+      pendingChangesReadLock.unlock();
+    }
   }
 
   /**
@@ -190,80 +288,70 @@
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(AddOperation op)
+  public boolean checkDependencies(AddOperation op)
   {
     boolean hasDependencies = false;
     final DN targetDN = op.getEntryDN();
     final CSN csn = OperationContext.getCSN(op);
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
       return false;
     }
 
-    for (PendingChange pendingChange : pendingChanges.values())
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg != null)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof DeleteMsg)
+      {
+          /*
+           * Check if the operation to be run is a deleteOperation on the same DN.
+           */
+        if (pendingMsg.getDN().equals(targetDN))
         {
-          if (pendingMsg instanceof DeleteMsg)
-          {
-            /*
-             * Check is the operation to be run is a deleteOperation on the
-             * same DN.
-             */
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof AddMsg)
-          {
-            /*
-             * Check if the operation to be run is an addOperation on a
-             * parent of the current AddOperation.
-             */
-            if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof ModifyDNMsg)
-          {
-            /*
-             * Check if the operation to be run is ModifyDnOperation with
-             * the same target DN as the ADD DN
-             * or a ModifyDnOperation with new DN equals to the ADD DN parent
-             */
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-            else
-            {
-              final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
-              if (pendingModDn.newDNIsParent(targetDN))
-              {
-                hasDependencies = true;
-                addDependency(change, pendingChange);
-              }
-            }
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof AddMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+          /*
+           * Check if the operation to be run is an addOperation on a
+           * parent of the current AddOperation.
+           */
+        if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+      }
+      else if (pendingMsg instanceof ModifyDNMsg)
+      {
+          /*
+           * Check if the operation to be run is ModifyDnOperation with
+           * the same target DN as the ADD DN
+           * or a ModifyDnOperation with new DN equals to the ADD DN parent
+           */
+        if (pendingMsg.getDN().equals(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+        else
+        {
+          final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+          if (pendingModDn.newDNIsParent(targetDN))
+          {
+            hasDependencies = true;
+            addDependency(change);
+          }
+        }
       }
     }
     return hasDependencies;
@@ -277,45 +365,48 @@
    *
    * ModifyOperation depends on
    * - AddOperation done on the same DN
+   * - ModifyDNOperation having newDN the same as targetDN
    *
    * @param op The ModifyOperation to be checked.
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(ModifyOperation op)
+  public boolean checkDependencies(ModifyOperation op)
   {
     boolean hasDependencies = false;
-    final DN targetDN = op.getEntryDN();
     final CSN csn = OperationContext.getCSN(op);
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
-      return false;
+        return false;
     }
 
-    for (PendingChange pendingChange : pendingChanges.values())
+    final DN targetDN = change.getLDAPUpdateMsg().getDN();
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg instanceof AddMsg)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof AddMsg)
+      {
+        // Check if the operation to be run is an addOperation on a same DN.
+        if (pendingMsg.getDN().equals(targetDN))
         {
-          // Check if the operation to be run is an addOperation on a same DN.
-          if (pendingMsg.getDN().equals(targetDN))
-          {
-            hasDependencies = true;
-            addDependency(change, pendingChange);
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof ModifyDNMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+        if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
       }
     }
     return hasDependencies;
@@ -333,76 +424,68 @@
    * - DeleteOperation done on the new DN of the MODDN operation
    * - ModifyDNOperation done from the new DN of the MODDN operation
    *
+   * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first
+   *
    * @param msg The ModifyDNMsg to be checked.
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  private synchronized boolean checkDependencies(ModifyDNMsg msg)
+  public boolean checkDependencies(ModifyDNMsg msg)
   {
     boolean hasDependencies = false;
     final CSN csn = msg.getCSN();
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
       return false;
     }
 
     final DN targetDN = change.getLDAPUpdateMsg().getDN();
-
-    for (PendingChange pendingChange : pendingChanges.values())
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg != null)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof DeleteMsg)
+      {
+        // Check if the target of the Delete is the same
+        // as the new DN of this ModifyDN
+        if (msg.newDNIsEqual(pendingMsg.getDN()))
         {
-          if (pendingMsg instanceof DeleteMsg)
-          {
-            // Check if the target of the Delete is the same
-            // as the new DN of this ModifyDN
-            if (msg.newDNIsEqual(pendingMsg.getDN()))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof AddMsg)
-          {
-            // Check if the Add Operation was done on the new parent of
-            // the MODDN  operation
-            if (msg.newParentIsEqual(pendingMsg.getDN()))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-            // Check if the AddOperation was done on the same DN as the
-            // target DN of the MODDN operation
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof ModifyDNMsg)
-          {
-            // Check if the ModifyDNOperation was done from the new DN of
-            // the MODDN operation
-            if (msg.newDNIsEqual(pendingMsg.getDN()))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof AddMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+        // Check if the Add Operation was done on the new parent of
+        // the MODDN  operation
+        if (msg.newParentIsEqual(pendingMsg.getDN()))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+        // Check if the AddOperation was done on the same DN as the
+        // target DN of the MODDN operation
+        if (pendingMsg.getDN().equals(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+      }
+      else if (pendingMsg instanceof ModifyDNMsg)
+      {
+        // Check if the ModifyDNOperation was done from the new DN of
+        // the MODDN operation
+        if (msg.newDNIsEqual(pendingMsg.getDN()))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
       }
     }
     return hasDependencies;
@@ -424,72 +507,62 @@
    *
    * @return A boolean indicating if this operation has some dependencies.
    */
-  public synchronized boolean checkDependencies(DeleteOperation op)
+  public boolean checkDependencies(DeleteOperation op)
   {
     boolean hasDependencies = false;
     final DN targetDN = op.getEntryDN();
     final CSN csn = OperationContext.getCSN(op);
-    final PendingChange change = pendingChanges.get(csn);
+    final PendingChange change = getPendingChange(csn);
+
     if (change == null)
     {
       return false;
     }
 
-    for (PendingChange pendingChange : pendingChanges.values())
+    for (PendingChange pendingChange : activeAndDependentChanges)
     {
-      if (pendingChange.getCSN().isOlderThan(csn))
+      if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
-        if (pendingMsg != null)
+        // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+        break;
+      }
+      final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+      if (pendingMsg instanceof DeleteMsg)
+      {
+          /*
+           * Check if the operation to be run is a deleteOperation on a
+           * children of the current DeleteOperation.
+           */
+        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
         {
-          if (pendingMsg instanceof DeleteMsg)
-          {
-            /*
-             * Check if the operation to be run is a deleteOperation on a
-             * children of the current DeleteOperation.
-             */
-            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof AddMsg)
-          {
-            /*
-             * Check if the operation to be run is an addOperation on a
-             * parent of the current DeleteOperation.
-             */
-            if (pendingMsg.getDN().equals(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
-          else if (pendingMsg instanceof ModifyDNMsg)
-          {
-            final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
-            /*
-             * Check if the operation to be run is an ModifyDNOperation
-             * on a children of the current DeleteOperation
-             */
-            if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)
-                || pendingModDn.newDNIsParent(targetDN))
-            {
-              hasDependencies = true;
-              addDependency(change, pendingChange);
-            }
-          }
+          hasDependencies = true;
+          addDependency(change);
         }
       }
-      else
+      else if (pendingMsg instanceof AddMsg)
       {
-        // We reached an operation that is newer than the operation
-        // for which we are doing the dependency check so it is
-        // not possible to find another operation with some dependency.
-        // break the loop to avoid going through the potentially large
-        // list of pending changes.
-        break;
+          /*
+           * Check if the operation to be run is an addOperation on a
+           * parent of the current DeleteOperation.
+           */
+        if (pendingMsg.getDN().equals(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
+      }
+      else if (pendingMsg instanceof ModifyDNMsg)
+      {
+        final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+          /*
+           * Check if the operation to be run is an ModifyDNOperation
+           * on a children of the current DeleteOperation
+           */
+        if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN))
+        {
+          hasDependencies = true;
+          addDependency(change);
+        }
       }
     }
     return hasDependencies;
@@ -514,16 +587,18 @@
     {
       DeleteOperation newOp = (DeleteOperation) op;
       return checkDependencies(newOp);
-
-    } else if (op instanceof AddOperation)
+    }
+    else if (op instanceof AddOperation)
     {
       AddOperation newOp = (AddOperation) op;
       return checkDependencies(newOp);
-    } else if (op instanceof ModifyDNOperationBasis)
+    }
+    else if (op instanceof ModifyDNOperationBasis)
     {
       ModifyDNMsg newMsg = (ModifyDNMsg) msg;
       return checkDependencies(newMsg);
-    } else
+    }
+    else
     {
       return true;  // unknown type of operation ?!
     }

--
Gitblit v1.10.0