From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java |   43 +++++++++++++++++++++++++------------------
 1 files changed, 25 insertions(+), 18 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 1be7e3c..70fc1a8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2016 ForgeRock AS
+ *      Portions Copyright 2011-2016 ForgeRock AS.
  */
 package org.opends.server.replication.plugin;
 
@@ -2266,6 +2266,15 @@
   }
 
   /**
+   * Marks the specified message as the one currently processed by a replay thread.
+   * @param msg the message being processed
+   */
+  void markInProgress(LDAPUpdateMsg msg)
+  {
+    remotePendingChanges.markInProgress(msg);
+  }
+
+  /**
    * Create and replay a synchronized Operation from an UpdateMsg.
    *
    * @param msg
@@ -2292,7 +2301,6 @@
         // error handling paths.
         Operation nextOp = op = msg.createOperation(conn);
         dependency = remotePendingChanges.checkDependencies(op, msg);
-
         boolean replayDone = false;
         int retryCount = 10;
         while (!dependency && !replayDone && retryCount-- > 0)
@@ -2350,25 +2358,27 @@
               ModifyOperation castOp = (ModifyOperation) op;
               dependency = remotePendingChanges.checkDependencies(castOp);
               ModifyMsg modifyMsg = (ModifyMsg) msg;
-              replayDone = solveNamingConflict(castOp, modifyMsg);
+              replayDone = !dependency && solveNamingConflict(castOp, modifyMsg);
             }
             else if (op instanceof DeleteOperation)
             {
               DeleteOperation castOp = (DeleteOperation) op;
               dependency = remotePendingChanges.checkDependencies(castOp);
-              replayDone = solveNamingConflict(castOp, msg);
+              replayDone = !dependency && solveNamingConflict(castOp, msg);
             }
             else if (op instanceof AddOperation)
             {
               AddOperation castOp = (AddOperation) op;
               AddMsg addMsg = (AddMsg) msg;
               dependency = remotePendingChanges.checkDependencies(castOp);
-              replayDone = solveNamingConflict(castOp, addMsg);
+              replayDone = !dependency && solveNamingConflict(castOp, addMsg);
             }
             else if (op instanceof ModifyDNOperation)
             {
               ModifyDNOperation castOp = (ModifyDNOperation) op;
-              replayDone = solveNamingConflict(castOp, msg);
+              ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg;
+              dependency = remotePendingChanges.checkDependencies(modifyDNMsg);
+              replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg);
             }
             else
             {
@@ -2385,8 +2395,8 @@
             else
             {
               /*
-               * Create a new operation reflecting the new state of the
-               * UpdateMsg after conflict resolution modified it.
+               * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution
+               * modified it and try replaying it again. Dependencies might have been replayed by now.
                *  Note: When msg is a DeleteMsg, the DeleteOperation is properly
                *  created with subtreeDelete request control when needed.
                */
@@ -4293,16 +4303,13 @@
     // number of updates in the pending list
     addMonitorData(attributes, "pending-updates", pendingChanges.size());
 
-    addMonitorData(attributes, "replayed-updates-ok",
-        numReplayedPostOpCalled.get());
-    addMonitorData(attributes, "resolved-modify-conflicts",
-        numResolvedModifyConflicts.get());
-    addMonitorData(attributes, "resolved-naming-conflicts",
-        numResolvedNamingConflicts.get());
-    addMonitorData(attributes, "unresolved-naming-conflicts",
-        numUnresolvedNamingConflicts.get());
-    addMonitorData(attributes, "remote-pending-changes-size",
-        remotePendingChanges.getQueueSize());
+    addMonitorData(attributes, "replayed-updates-ok", numReplayedPostOpCalled.get());
+    addMonitorData(attributes, "resolved-modify-conflicts", numResolvedModifyConflicts.get());
+    addMonitorData(attributes, "resolved-naming-conflicts", numResolvedNamingConflicts.get());
+    addMonitorData(attributes, "unresolved-naming-conflicts", numUnresolvedNamingConflicts.get());
+    addMonitorData(attributes, "remote-pending-changes-size", remotePendingChanges.getQueueSize());
+    addMonitorData(attributes, "dependent-changes-size", remotePendingChanges.getDependentChangesSize());
+    addMonitorData(attributes, "changes-in-progress-size", remotePendingChanges.changesInProgressSize());
 
     return attributes;
   }

--
Gitblit v1.10.0