From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput
---
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java | 545 +++++++++++++++++++++++++-------------------
opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java | 7
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java | 37 --
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java | 47 ++-
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 43 ++-
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java | 6
6 files changed, 379 insertions(+), 306 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 1be7e3c..70fc1a8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2016 ForgeRock AS
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -2266,6 +2266,15 @@
}
/**
+ * Marks the specified message as the one currently processed by a replay thread.
+ * @param msg the message being processed
+ */
+ void markInProgress(LDAPUpdateMsg msg)
+ {
+ remotePendingChanges.markInProgress(msg);
+ }
+
+ /**
* Create and replay a synchronized Operation from an UpdateMsg.
*
* @param msg
@@ -2292,7 +2301,6 @@
// error handling paths.
Operation nextOp = op = msg.createOperation(conn);
dependency = remotePendingChanges.checkDependencies(op, msg);
-
boolean replayDone = false;
int retryCount = 10;
while (!dependency && !replayDone && retryCount-- > 0)
@@ -2350,25 +2358,27 @@
ModifyOperation castOp = (ModifyOperation) op;
dependency = remotePendingChanges.checkDependencies(castOp);
ModifyMsg modifyMsg = (ModifyMsg) msg;
- replayDone = solveNamingConflict(castOp, modifyMsg);
+ replayDone = !dependency && solveNamingConflict(castOp, modifyMsg);
}
else if (op instanceof DeleteOperation)
{
DeleteOperation castOp = (DeleteOperation) op;
dependency = remotePendingChanges.checkDependencies(castOp);
- replayDone = solveNamingConflict(castOp, msg);
+ replayDone = !dependency && solveNamingConflict(castOp, msg);
}
else if (op instanceof AddOperation)
{
AddOperation castOp = (AddOperation) op;
AddMsg addMsg = (AddMsg) msg;
dependency = remotePendingChanges.checkDependencies(castOp);
- replayDone = solveNamingConflict(castOp, addMsg);
+ replayDone = !dependency && solveNamingConflict(castOp, addMsg);
}
else if (op instanceof ModifyDNOperation)
{
ModifyDNOperation castOp = (ModifyDNOperation) op;
- replayDone = solveNamingConflict(castOp, msg);
+ ModifyDNMsg modifyDNMsg = (ModifyDNMsg) msg;
+ dependency = remotePendingChanges.checkDependencies(modifyDNMsg);
+ replayDone = !dependency && solveNamingConflict(castOp, modifyDNMsg);
}
else
{
@@ -2385,8 +2395,8 @@
else
{
/*
- * Create a new operation reflecting the new state of the
- * UpdateMsg after conflict resolution modified it.
+ * Create a new operation reflecting the new state of the UpdateMsg after conflict resolution
+ * modified it and try replaying it again. Dependencies might have been replayed by now.
* Note: When msg is a DeleteMsg, the DeleteOperation is properly
* created with subtreeDelete request control when needed.
*/
@@ -4293,16 +4303,13 @@
// number of updates in the pending list
addMonitorData(attributes, "pending-updates", pendingChanges.size());
- addMonitorData(attributes, "replayed-updates-ok",
- numReplayedPostOpCalled.get());
- addMonitorData(attributes, "resolved-modify-conflicts",
- numResolvedModifyConflicts.get());
- addMonitorData(attributes, "resolved-naming-conflicts",
- numResolvedNamingConflicts.get());
- addMonitorData(attributes, "unresolved-naming-conflicts",
- numUnresolvedNamingConflicts.get());
- addMonitorData(attributes, "remote-pending-changes-size",
- remotePendingChanges.getQueueSize());
+ addMonitorData(attributes, "replayed-updates-ok", numReplayedPostOpCalled.get());
+ addMonitorData(attributes, "resolved-modify-conflicts", numResolvedModifyConflicts.get());
+ addMonitorData(attributes, "resolved-naming-conflicts", numResolvedNamingConflicts.get());
+ addMonitorData(attributes, "unresolved-naming-conflicts", numUnresolvedNamingConflicts.get());
+ addMonitorData(attributes, "remote-pending-changes-size", remotePendingChanges.getQueueSize());
+ addMonitorData(attributes, "dependent-changes-size", remotePendingChanges.getDependentChangesSize());
+ addMonitorData(attributes, "changes-in-progress-size", remotePendingChanges.changesInProgressSize());
return attributes;
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
index 8527c20..1d87753 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2016 ForgeRock AS
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -41,6 +41,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -316,9 +317,10 @@
{
replayThreads.clear();
+ ReentrantLock switchQueueLock = new ReentrantLock();
for (int i = 0; i < replayThreadNumber; i++)
{
- ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
+ ReplayThread replayThread = new ReplayThread(updateToReplayQueue, switchQueueLock);
replayThread.start();
replayThreads.add(replayThread);
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
index 9e7309f..2104731 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/PendingChange.java
@@ -22,12 +22,11 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions copyright 2014 ForgeRock AS
+ * Portions copyright 2014-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.operation.PluginOperation;
@@ -42,7 +41,6 @@
private boolean committed;
private UpdateMsg msg;
private final PluginOperation op;
- private ServerState dependencyState;
/**
* Construct a new PendingChange.
@@ -128,35 +126,6 @@
return this.op;
}
- /**
- * Add the given CSN to the list of dependencies of this PendingChange.
- *
- * @param csn
- * The CSN to add to the list of dependencies of this PendingChange.
- */
- public void addDependency(CSN csn)
- {
- if (dependencyState == null)
- {
- dependencyState = new ServerState();
- }
- dependencyState.update(csn);
- }
-
- /**
- * Check if the given ServerState covers the dependencies of this
- * PendingChange.
- *
- * @param state The ServerState for which dependencies must be checked,
- *
- * @return A boolean indicating if the given ServerState covers the
- * dependencies of this PendingChange.
- */
- public boolean dependenciesIsCovered(ServerState state)
- {
- return state.cover(dependencyState);
- }
-
/** {@inheritDoc} */
@Override
public int compareTo(PendingChange o)
@@ -173,8 +142,6 @@
+ ", csn=" + csn.toStringUI()
+ ", msg=[" + msg
+ "], isOperationSynchronized="
- + (op != null ? op.isSynchronizationOperation() : "false")
- + ", dependencyState="
- + (dependencyState != null ? dependencyState : "");
+ + (op != null ? op.isSynchronizationOperation() : "false");
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
index 79edb18..21fcad9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,7 +27,11 @@
package org.opends.server.replication.plugin;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import net.jcip.annotations.GuardedBy;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -51,6 +55,7 @@
final class RemotePendingChanges
{
/** A map used to store the pending changes. */
+ @GuardedBy("pendingChangesLock")
private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>();
/**
@@ -58,7 +63,18 @@
* not been replayed correctly because they are dependent on
* another change to be completed.
*/
+ @GuardedBy("dependentChangesLock")
private final SortedSet<PendingChange> dependentChanges = new TreeSet<>();
+ /**
+ * {@code activeAndDependentChanges} also contains changes discovered to be dependent
+ * on currently in progress changes.
+ */
+ private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>();
+
+ private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true);
+ private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock();
+ private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock();
+ private ReentrantLock dependentChangesLock = new ReentrantLock();
/** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */
private final ServerState state;
@@ -75,13 +91,49 @@
}
/**
- * Returns the number of changes currently in this list.
+ * Returns the number of changes waiting to be replayed.
*
- * @return The number of changes currently in this list.
+ * @return The number of changes waiting to be replayed
*/
- public synchronized int getQueueSize()
+ public int getQueueSize()
{
- return pendingChanges.size();
+ pendingChangesReadLock.lock();
+ try
+ {
+ return pendingChanges.size();
+ }
+ finally
+ {
+ pendingChangesReadLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of changes actively being replayed.
+ *
+ * @return the number of changes actively being replayed.
+ */
+ public int changesInProgressSize()
+ {
+ return activeAndDependentChanges.size();
+ }
+
+ /**
+ * Returns the number of changes depending on other changes.
+ *
+ * @return the number of changes depending on other changes.
+ */
+ public int getDependentChangesSize()
+ {
+ dependentChangesLock.lock();
+ try
+ {
+ return dependentChanges.size();
+ }
+ finally
+ {
+ dependentChangesLock.unlock();
+ }
}
/**
@@ -93,11 +145,18 @@
* @return {@code false} if the update was already registered in the pending
* changes.
*/
- public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update)
+ public boolean putRemoteUpdate(LDAPUpdateMsg update)
{
- CSN csn = update.getCSN();
- return pendingChanges.put(csn,
- new PendingChange(csn, null, update)) == null;
+ pendingChangesWriteLock.lock();
+ try
+ {
+ CSN csn = update.getCSN();
+ return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null;
+ }
+ finally
+ {
+ pendingChangesWriteLock.unlock();
+ }
}
/**
@@ -106,72 +165,111 @@
* @param csn
* The CSN of the update message that must be set as committed.
*/
- public synchronized void commit(CSN csn)
+ public void commit(CSN csn)
{
- PendingChange curChange = pendingChanges.get(csn);
- if (curChange == null)
+ pendingChangesWriteLock.lock();
+ try
{
- throw new NoSuchElementException();
+ PendingChange curChange = pendingChanges.get(csn);
+ if (curChange == null)
+ {
+ throw new NoSuchElementException();
+ }
+ curChange.setCommitted(true);
+ activeAndDependentChanges.remove(curChange);
+
+ Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<CSN, PendingChange> change = it.next();
+ PendingChange pendingChange = change.getValue();
+ if (!pendingChange.isCommitted())
+ {
+ break;
+ }
+ if (pendingChange.getMsg().contributesToDomainState())
+ {
+ state.update(change.getKey());
+ }
+ it.remove();
+ }
}
- curChange.setCommitted(true);
-
- CSN firstCSN = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstCSN);
-
- while (firstChange != null && firstChange.isCommitted())
+ finally
{
- if (firstChange.getMsg().contributesToDomainState())
- {
- state.update(firstCSN);
- }
- pendingChanges.remove(firstCSN);
-
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
- else
- {
- firstCSN = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstCSN);
- }
+ pendingChangesWriteLock.unlock();
}
}
+ public void markInProgress(LDAPUpdateMsg msg)
+ {
+ pendingChangesReadLock.lock();
+ try
+ {
+ activeAndDependentChanges.add(pendingChanges.get(msg.getCSN()));
+ }
+ finally
+ {
+ pendingChangesReadLock.unlock();
+ }
+ }
/**
* Get the first update in the list that have some dependencies cleared.
*
* @return The LDAPUpdateMsg to be handled.
*/
- public synchronized LDAPUpdateMsg getNextUpdate()
+ public LDAPUpdateMsg getNextUpdate()
{
- /*
- * Parse the list of Update with dependencies and check if the dependencies
- * are now cleared until an Update without dependencies is found.
- */
- for (PendingChange change : dependentChanges)
+ pendingChangesReadLock.lock();
+ dependentChangesLock.lock();
+ try
{
- if (change.dependenciesIsCovered(state))
+ if (!dependentChanges.isEmpty())
{
- dependentChanges.remove(change);
- return change.getLDAPUpdateMsg();
+ PendingChange firstDependentChange = dependentChanges.first();
+ if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN()))
+ {
+ dependentChanges.remove(firstDependentChange);
+ return firstDependentChange.getLDAPUpdateMsg();
+ }
}
+ return null;
}
- return null;
+ finally
+ {
+ dependentChangesLock.unlock();
+ pendingChangesReadLock.unlock();
+ }
}
/**
* Mark the first pendingChange as dependent on the second PendingChange.
* @param dependentChange The PendingChange that depend on the second
* PendingChange.
- * @param pendingChange The PendingChange on which the first PendingChange
- * is dependent.
*/
- private void addDependency(
- PendingChange dependentChange, PendingChange pendingChange)
+ private void addDependency(PendingChange dependentChange)
{
- dependentChange.addDependency(pendingChange.getCSN());
- dependentChanges.add(dependentChange);
+ dependentChangesLock.lock();
+ try
+ {
+ dependentChanges.add(dependentChange);
+ }
+ finally
+ {
+ dependentChangesLock.unlock();
+ }
+ }
+
+ private PendingChange getPendingChange(CSN csn)
+ {
+ pendingChangesReadLock.lock();
+ try
+ {
+ return pendingChanges.get(csn);
+ }
+ finally
+ {
+ pendingChangesReadLock.unlock();
+ }
}
/**
@@ -190,80 +288,70 @@
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(AddOperation op)
+ public boolean checkDependencies(AddOperation op)
{
boolean hasDependencies = false;
final DN targetDN = op.getEntryDN();
final CSN csn = OperationContext.getCSN(op);
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
return false;
}
- for (PendingChange pendingChange : pendingChanges.values())
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg != null)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check if the operation to be run is a deleteOperation on the same DN.
+ */
+ if (pendingMsg.getDN().equals(targetDN))
{
- if (pendingMsg instanceof DeleteMsg)
- {
- /*
- * Check is the operation to be run is a deleteOperation on the
- * same DN.
- */
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * parent of the current AddOperation.
- */
- if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- /*
- * Check if the operation to be run is ModifyDnOperation with
- * the same target DN as the ADD DN
- * or a ModifyDnOperation with new DN equals to the ADD DN parent
- */
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- else
- {
- final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
- if (pendingModDn.newDNIsParent(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof AddMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current AddOperation.
+ */
+ if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ /*
+ * Check if the operation to be run is ModifyDnOperation with
+ * the same target DN as the ADD DN
+ * or a ModifyDnOperation with new DN equals to the ADD DN parent
+ */
+ if (pendingMsg.getDN().equals(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ else
+ {
+ final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+ if (pendingModDn.newDNIsParent(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
}
}
return hasDependencies;
@@ -277,45 +365,48 @@
*
* ModifyOperation depends on
* - AddOperation done on the same DN
+ * - ModifyDNOperation having newDN the same as targetDN
*
* @param op The ModifyOperation to be checked.
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(ModifyOperation op)
+ public boolean checkDependencies(ModifyOperation op)
{
boolean hasDependencies = false;
- final DN targetDN = op.getEntryDN();
final CSN csn = OperationContext.getCSN(op);
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
- return false;
+ return false;
}
- for (PendingChange pendingChange : pendingChanges.values())
+ final DN targetDN = change.getLDAPUpdateMsg().getDN();
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg instanceof AddMsg)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof AddMsg)
+ {
+ // Check if the operation to be run is an addOperation on a same DN.
+ if (pendingMsg.getDN().equals(targetDN))
{
- // Check if the operation to be run is an addOperation on a same DN.
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof ModifyDNMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
}
}
return hasDependencies;
@@ -333,76 +424,68 @@
* - DeleteOperation done on the new DN of the MODDN operation
* - ModifyDNOperation done from the new DN of the MODDN operation
*
+ * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first
+ *
* @param msg The ModifyDNMsg to be checked.
*
* @return A boolean indicating if this operation has some dependencies.
*/
- private synchronized boolean checkDependencies(ModifyDNMsg msg)
+ public boolean checkDependencies(ModifyDNMsg msg)
{
boolean hasDependencies = false;
final CSN csn = msg.getCSN();
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
return false;
}
final DN targetDN = change.getLDAPUpdateMsg().getDN();
-
- for (PendingChange pendingChange : pendingChanges.values())
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg != null)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ // Check if the target of the Delete is the same
+ // as the new DN of this ModifyDN
+ if (msg.newDNIsEqual(pendingMsg.getDN()))
{
- if (pendingMsg instanceof DeleteMsg)
- {
- // Check if the target of the Delete is the same
- // as the new DN of this ModifyDN
- if (msg.newDNIsEqual(pendingMsg.getDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- // Check if the Add Operation was done on the new parent of
- // the MODDN operation
- if (msg.newParentIsEqual(pendingMsg.getDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- // Check if the AddOperation was done on the same DN as the
- // target DN of the MODDN operation
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- // Check if the ModifyDNOperation was done from the new DN of
- // the MODDN operation
- if (msg.newDNIsEqual(pendingMsg.getDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof AddMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ // Check if the Add Operation was done on the new parent of
+ // the MODDN operation
+ if (msg.newParentIsEqual(pendingMsg.getDN()))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ // Check if the AddOperation was done on the same DN as the
+ // target DN of the MODDN operation
+ if (pendingMsg.getDN().equals(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ // Check if the ModifyDNOperation was done from the new DN of
+ // the MODDN operation
+ if (msg.newDNIsEqual(pendingMsg.getDN()))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
}
}
return hasDependencies;
@@ -424,72 +507,62 @@
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(DeleteOperation op)
+ public boolean checkDependencies(DeleteOperation op)
{
boolean hasDependencies = false;
final DN targetDN = op.getEntryDN();
final CSN csn = OperationContext.getCSN(op);
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
return false;
}
- for (PendingChange pendingChange : pendingChanges.values())
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg != null)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check if the operation to be run is a deleteOperation on a
+ * children of the current DeleteOperation.
+ */
+ if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
{
- if (pendingMsg instanceof DeleteMsg)
- {
- /*
- * Check if the operation to be run is a deleteOperation on a
- * children of the current DeleteOperation.
- */
- if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * parent of the current DeleteOperation.
- */
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
- /*
- * Check if the operation to be run is an ModifyDNOperation
- * on a children of the current DeleteOperation
- */
- if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)
- || pendingModDn.newDNIsParent(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof AddMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current DeleteOperation.
+ */
+ if (pendingMsg.getDN().equals(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+ /*
+ * Check if the operation to be run is an ModifyDNOperation
+ * on a children of the current DeleteOperation
+ */
+ if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
}
}
return hasDependencies;
@@ -514,16 +587,18 @@
{
DeleteOperation newOp = (DeleteOperation) op;
return checkDependencies(newOp);
-
- } else if (op instanceof AddOperation)
+ }
+ else if (op instanceof AddOperation)
{
AddOperation newOp = (AddOperation) op;
return checkDependencies(newOp);
- } else if (op instanceof ModifyDNOperationBasis)
+ }
+ else if (op instanceof ModifyDNOperationBasis)
{
ModifyDNMsg newMsg = (ModifyDNMsg) msg;
return checkDependencies(newMsg);
- } else
+ }
+ else
{
return true; // unknown type of operation ?!
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
index 8c19fef..4d1f54d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/ReplayThread.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2008 Sun Microsystems, Inc.
- * Portions Copyright 2011-2015 ForgeRock AS
+ * Portions Copyright 2011-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -32,6 +32,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.api.DirectoryThread;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -49,6 +50,7 @@
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
private final BlockingQueue<UpdateToReplay> updateToReplayQueue;
+ private final ReentrantLock switchQueueLock;
private AtomicBoolean shutdown = new AtomicBoolean(false);
private static int count;
@@ -56,11 +58,13 @@
* Constructor for the ReplayThread.
*
* @param updateToReplayQueue The queue of update messages we have to replay
+ * @param switchQueueLock lock to ensure moving updates from one queue to another is atomic
*/
- public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue)
+ public ReplayThread(BlockingQueue<UpdateToReplay> updateToReplayQueue, ReentrantLock switchQueueLock)
{
- super("Replica replay thread " + count++);
- this.updateToReplayQueue = updateToReplayQueue;
+ super("Replica replay thread " + count++);
+ this.updateToReplayQueue = updateToReplayQueue;
+ this.switchQueueLock = switchQueueLock;
}
/**
@@ -86,19 +90,34 @@
{
try
{
- UpdateToReplay updateToreplay;
- // Loop getting an updateToReplayQueue from the update message queue and
- // replaying matching changes
- while (!shutdown.get() &&
- ((updateToreplay = updateToReplayQueue.poll(1L,
- TimeUnit.SECONDS)) != null))
+ if (switchQueueLock.tryLock(1L, TimeUnit.SECONDS))
{
- // Find replication domain for that update message
- LDAPUpdateMsg updateMsg = updateToreplay.getUpdateMessage();
- LDAPReplicationDomain domain = updateToreplay.getReplicationDomain();
+ LDAPReplicationDomain domain;
+ LDAPUpdateMsg updateMsg;
+ try
+ {
+ if (shutdown.get())
+ {
+ break;
+ }
+ UpdateToReplay updateToreplay = updateToReplayQueue.poll(1L, TimeUnit.SECONDS);
+ if (updateToreplay == null)
+ {
+ continue;
+ }
+ // Find replication domain for that update message and mark it as "in progress"
+ updateMsg = updateToreplay.getUpdateMessage();
+ domain = updateToreplay.getReplicationDomain();
+ domain.markInProgress(updateMsg);
+ }
+ finally
+ {
+ switchQueueLock.unlock();
+ }
domain.replay(updateMsg, shutdown);
}
- } catch (Exception e)
+ }
+ catch (Exception e)
{
/*
* catch all exceptions happening so that the thread never dies even
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
index d7f4e82..2c8f796 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/plugin/NamingConflictTest.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2013-2015 ForgeRock AS
+ * Portions Copyright 2013-2016 ForgeRock AS.
*/
package org.opends.server.replication.plugin;
@@ -40,6 +40,7 @@
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
@@ -374,6 +375,8 @@
private void replayMsg(UpdateMsg updateMsg) throws InterruptedException
{
domain.processUpdate(updateMsg);
- domain.replay(queue.take().getUpdateMessage(), SHUTDOWN);
+ LDAPUpdateMsg ldapUpdate = queue.take().getUpdateMessage();
+ domain.markInProgress(ldapUpdate);
+ domain.replay(ldapUpdate, SHUTDOWN);
}
}
--
Gitblit v1.10.0