From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput
---
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 43 +++++++++++++++++++++++++------------------
1 files changed, 25 insertions(+), 18 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 1be7e3c..70fc1a8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2016 ForgeRock AS
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -2266,6 +2266,15 @@
}
/**
+ * Marks the specified message as the one currently processed by a replay thread.
+ * @param msg the message being processed
+ */
+ void markInProgress(LDAPUpdateMsg msg)
+ {
+ remotePendingChanges.markInProgress(msg);
+ }
+
+ /**
* Create and replay a synchronized Operation from an UpdateMsg.
*
* @param msg
@@ -2292,7 +2301,6 @@
// error handling paths.
Operation nextOp = op = msg.createOperation(conn);
dependency = remotePendingChanges.checkDependencies(op, msg);
-
boolean replayDone = false;
int retryCount = 10;
while (!dependency && !replayDone && retryCount-- > 0)
@@ -2350,25 +2358,27 @@
ModifyOperation castOp = (ModifyOperation) op;
dependency = remotePendingChanges.checkDependencies(castOp);
ModifyMsg modifyMsg = (ModifyMsg) msg;
- replayDone = solveNamingConflict(castOp, modifyMsg);
+ replayDone = !dependency && solveNamingConflict(castOp, modifyMsg);
}
else if (op instanceof DeleteOperation)
{
DeleteOperation castOp = (DeleteOperation) op;
dependency = remotePendingChanges.checkDependencies(castOp);
- replayDone = solveNamingConflict(castOp, msg);
+ replayDone = !dependency && solveNamingConflict(castOp, msg);
}
else if (op instanceof AddOperation)
{
AddOperation castOp = (AddOperation) op;
AddMsg addMsg = (AddMsg) msg;
dependency = remotePendingChanges.checkDependencies(castOp);
- replayDone = solveNamingConflict(castOp, addMsg);
+ replayDone = !dependency && solveNamingConflict(castOp, addMsg);
}
else if (op instanceof ModifyDNOperation)
{
ModifyDNOperation castOp = (ModifyDNOperation) op;
- replayDone = solveNamingConflict(castOp, msg);
+ ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg;
+ dependency = remotePendingChanges.checkDependencies(modifyDNMsg);
+ replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg);
}
else
{
@@ -2385,8 +2395,8 @@
else
{
/*
- * Create a new operation reflecting the new state of the
- * UpdateMsg after conflict resolution modified it.
+ * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution
+ * modified it and try replaying it again. Dependencies might have been replayed by now.
* Note: When msg is a DeleteMsg, the DeleteOperation is properly
* created with subtreeDelete request control when needed.
*/
@@ -4293,16 +4303,13 @@
// number of updates in the pending list
addMonitorData(attributes, "pending-updates", pendingChanges.size());
- addMonitorData(attributes, "replayed-updates-ok",
- numReplayedPostOpCalled.get());
- addMonitorData(attributes, "resolved-modify-conflicts",
- numResolvedModifyConflicts.get());
- addMonitorData(attributes, "resolved-naming-conflicts",
- numResolvedNamingConflicts.get());
- addMonitorData(attributes, "unresolved-naming-conflicts",
- numUnresolvedNamingConflicts.get());
- addMonitorData(attributes, "remote-pending-changes-size",
- remotePendingChanges.getQueueSize());
+ addMonitorData(attributes, "replayed-updates-ok", numReplayedPostOpCalled.get());
+ addMonitorData(attributes, "resolved-modify-conflicts", numResolvedModifyConflicts.get());
+ addMonitorData(attributes, "resolved-naming-conflicts", numResolvedNamingConflicts.get());
+ addMonitorData(attributes, "unresolved-naming-conflicts", numUnresolvedNamingConflicts.get());
+ addMonitorData(attributes, "remote-pending-changes-size", remotePendingChanges.getQueueSize());
+ addMonitorData(attributes, "dependent-changes-size", remotePendingChanges.getDependentChangesSize());
+ addMonitorData(attributes, "changes-in-progress-size", remotePendingChanges.changesInProgressSize());
return attributes;
}
--
Gitblit v1.10.0