From 3c140af6c756b30b325ce3c6ed080e8898e2b7ec Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 24 Feb 2016 13:52:15 +0000
Subject: [PATCH] OPENDJ-2190 Replicas cannot always keep up with sustained high write throughput
---
opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java | 545 +++++++++++++++++++++++++++++++-----------------------
1 files changed, 310 insertions(+), 235 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
index 79edb18..21fcad9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -27,7 +27,11 @@
package org.opends.server.replication.plugin;
import java.util.*;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import net.jcip.annotations.GuardedBy;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperationBasis;
@@ -51,6 +55,7 @@
final class RemotePendingChanges
{
/** A map used to store the pending changes. */
+ @GuardedBy("pendingChangesLock")
private final SortedMap<CSN, PendingChange> pendingChanges = new TreeMap<>();
/**
@@ -58,7 +63,18 @@
* not been replayed correctly because they are dependent on
* another change to be completed.
*/
+ @GuardedBy("dependentChangesLock")
private final SortedSet<PendingChange> dependentChanges = new TreeSet<>();
+ /**
+ * {@code activeAndDependentChanges} also contains changes discovered to be dependent
+ * on currently in progress changes.
+ */
+ private final ConcurrentSkipListSet<PendingChange> activeAndDependentChanges = new ConcurrentSkipListSet<>();
+
+ private ReentrantReadWriteLock pendingChangesLock = new ReentrantReadWriteLock(true);
+ private ReentrantReadWriteLock.ReadLock pendingChangesReadLock = pendingChangesLock.readLock();
+ private ReentrantReadWriteLock.WriteLock pendingChangesWriteLock = pendingChangesLock.writeLock();
+ private ReentrantLock dependentChangesLock = new ReentrantLock();
/** The ServerState that will be updated when LDAPUpdateMsg are fully replayed. */
private final ServerState state;
@@ -75,13 +91,49 @@
}
/**
- * Returns the number of changes currently in this list.
+ * Returns the number of changes waiting to be replayed.
*
- * @return The number of changes currently in this list.
+ * @return The number of changes waiting to be replayed
*/
- public synchronized int getQueueSize()
+ public int getQueueSize()
{
- return pendingChanges.size();
+ pendingChangesReadLock.lock();
+ try
+ {
+ return pendingChanges.size();
+ }
+ finally
+ {
+ pendingChangesReadLock.unlock();
+ }
+ }
+
+ /**
+ * Returns the number of changes actively being replayed.
+ *
+ * @return the number of changes actively being replayed.
+ */
+ public int changesInProgressSize()
+ {
+ return activeAndDependentChanges.size();
+ }
+
+ /**
+ * Returns the number of changes depending on other changes.
+ *
+ * @return the number of changes depending on other changes.
+ */
+ public int getDependentChangesSize()
+ {
+ dependentChangesLock.lock();
+ try
+ {
+ return dependentChanges.size();
+ }
+ finally
+ {
+ dependentChangesLock.unlock();
+ }
}
/**
@@ -93,11 +145,18 @@
* @return {@code false} if the update was already registered in the pending
* changes.
*/
- public synchronized boolean putRemoteUpdate(LDAPUpdateMsg update)
+ public boolean putRemoteUpdate(LDAPUpdateMsg update)
{
- CSN csn = update.getCSN();
- return pendingChanges.put(csn,
- new PendingChange(csn, null, update)) == null;
+ pendingChangesWriteLock.lock();
+ try
+ {
+ CSN csn = update.getCSN();
+ return pendingChanges.put(csn, new PendingChange(csn, null, update)) == null;
+ }
+ finally
+ {
+ pendingChangesWriteLock.unlock();
+ }
}
/**
@@ -106,72 +165,111 @@
* @param csn
* The CSN of the update message that must be set as committed.
*/
- public synchronized void commit(CSN csn)
+ public void commit(CSN csn)
{
- PendingChange curChange = pendingChanges.get(csn);
- if (curChange == null)
+ pendingChangesWriteLock.lock();
+ try
{
- throw new NoSuchElementException();
+ PendingChange curChange = pendingChanges.get(csn);
+ if (curChange == null)
+ {
+ throw new NoSuchElementException();
+ }
+ curChange.setCommitted(true);
+ activeAndDependentChanges.remove(curChange);
+
+ Iterator<Map.Entry<CSN, PendingChange>> it = pendingChanges.entrySet().iterator();
+ while (it.hasNext())
+ {
+ Map.Entry<CSN, PendingChange> change = it.next();
+ PendingChange pendingChange = change.getValue();
+ if (!pendingChange.isCommitted())
+ {
+ break;
+ }
+ if (pendingChange.getMsg().contributesToDomainState())
+ {
+ state.update(change.getKey());
+ }
+ it.remove();
+ }
}
- curChange.setCommitted(true);
-
- CSN firstCSN = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstCSN);
-
- while (firstChange != null && firstChange.isCommitted())
+ finally
{
- if (firstChange.getMsg().contributesToDomainState())
- {
- state.update(firstCSN);
- }
- pendingChanges.remove(firstCSN);
-
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
- else
- {
- firstCSN = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstCSN);
- }
+ pendingChangesWriteLock.unlock();
}
}
+ public void markInProgress(LDAPUpdateMsg msg)
+ {
+ pendingChangesReadLock.lock();
+ try
+ {
+ activeAndDependentChanges.add(pendingChanges.get(msg.getCSN()));
+ }
+ finally
+ {
+ pendingChangesReadLock.unlock();
+ }
+ }
/**
* Get the first update in the list that have some dependencies cleared.
*
* @return The LDAPUpdateMsg to be handled.
*/
- public synchronized LDAPUpdateMsg getNextUpdate()
+ public LDAPUpdateMsg getNextUpdate()
{
- /*
- * Parse the list of Update with dependencies and check if the dependencies
- * are now cleared until an Update without dependencies is found.
- */
- for (PendingChange change : dependentChanges)
+ pendingChangesReadLock.lock();
+ dependentChangesLock.lock();
+ try
{
- if (change.dependenciesIsCovered(state))
+ if (!dependentChanges.isEmpty())
{
- dependentChanges.remove(change);
- return change.getLDAPUpdateMsg();
+ PendingChange firstDependentChange = dependentChanges.first();
+ if (pendingChanges.firstKey().isNewerThanOrEqualTo(firstDependentChange.getCSN()))
+ {
+ dependentChanges.remove(firstDependentChange);
+ return firstDependentChange.getLDAPUpdateMsg();
+ }
}
+ return null;
}
- return null;
+ finally
+ {
+ dependentChangesLock.unlock();
+ pendingChangesReadLock.unlock();
+ }
}
/**
* Mark the first pendingChange as dependent on the second PendingChange.
* @param dependentChange The PendingChange that depend on the second
* PendingChange.
- * @param pendingChange The PendingChange on which the first PendingChange
- * is dependent.
*/
- private void addDependency(
- PendingChange dependentChange, PendingChange pendingChange)
+ private void addDependency(PendingChange dependentChange)
{
- dependentChange.addDependency(pendingChange.getCSN());
- dependentChanges.add(dependentChange);
+ dependentChangesLock.lock();
+ try
+ {
+ dependentChanges.add(dependentChange);
+ }
+ finally
+ {
+ dependentChangesLock.unlock();
+ }
+ }
+
+ private PendingChange getPendingChange(CSN csn)
+ {
+ pendingChangesReadLock.lock();
+ try
+ {
+ return pendingChanges.get(csn);
+ }
+ finally
+ {
+ pendingChangesReadLock.unlock();
+ }
}
/**
@@ -190,80 +288,70 @@
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(AddOperation op)
+ public boolean checkDependencies(AddOperation op)
{
boolean hasDependencies = false;
final DN targetDN = op.getEntryDN();
final CSN csn = OperationContext.getCSN(op);
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
return false;
}
- for (PendingChange pendingChange : pendingChanges.values())
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg != null)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check if the operation to be run is a deleteOperation on the same DN.
+ */
+ if (pendingMsg.getDN().equals(targetDN))
{
- if (pendingMsg instanceof DeleteMsg)
- {
- /*
- * Check is the operation to be run is a deleteOperation on the
- * same DN.
- */
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * parent of the current AddOperation.
- */
- if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- /*
- * Check if the operation to be run is ModifyDnOperation with
- * the same target DN as the ADD DN
- * or a ModifyDnOperation with new DN equals to the ADD DN parent
- */
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- else
- {
- final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
- if (pendingModDn.newDNIsParent(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof AddMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current AddOperation.
+ */
+ if (pendingMsg.getDN().isSuperiorOrEqualTo(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ /*
+ * Check if the operation to be run is ModifyDnOperation with
+ * the same target DN as the ADD DN
+ * or a ModifyDnOperation with new DN equals to the ADD DN parent
+ */
+ if (pendingMsg.getDN().equals(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ else
+ {
+ final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+ if (pendingModDn.newDNIsParent(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
}
}
return hasDependencies;
@@ -277,45 +365,48 @@
*
* ModifyOperation depends on
* - AddOperation done on the same DN
+ * - ModifyDNOperation having newDN the same as targetDN
*
* @param op The ModifyOperation to be checked.
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(ModifyOperation op)
+ public boolean checkDependencies(ModifyOperation op)
{
boolean hasDependencies = false;
- final DN targetDN = op.getEntryDN();
final CSN csn = OperationContext.getCSN(op);
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
- return false;
+ return false;
}
- for (PendingChange pendingChange : pendingChanges.values())
+ final DN targetDN = change.getLDAPUpdateMsg().getDN();
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg instanceof AddMsg)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof AddMsg)
+ {
+ // Check if the operation to be run is an addOperation on a same DN.
+ if (pendingMsg.getDN().equals(targetDN))
{
- // Check if the operation to be run is an addOperation on a same DN.
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof ModifyDNMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ if (((ModifyDNMsg) pendingMsg).newDNIsEqual(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
}
}
return hasDependencies;
@@ -333,76 +424,68 @@
* - DeleteOperation done on the new DN of the MODDN operation
* - ModifyDNOperation done from the new DN of the MODDN operation
*
+ * TODO: Consider cases where there is a rename A -> B then rename B -> C. Second change depends on first
+ *
* @param msg The ModifyDNMsg to be checked.
*
* @return A boolean indicating if this operation has some dependencies.
*/
- private synchronized boolean checkDependencies(ModifyDNMsg msg)
+ public boolean checkDependencies(ModifyDNMsg msg)
{
boolean hasDependencies = false;
final CSN csn = msg.getCSN();
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
return false;
}
final DN targetDN = change.getLDAPUpdateMsg().getDN();
-
- for (PendingChange pendingChange : pendingChanges.values())
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg != null)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ // Check if the target of the Delete is the same
+ // as the new DN of this ModifyDN
+ if (msg.newDNIsEqual(pendingMsg.getDN()))
{
- if (pendingMsg instanceof DeleteMsg)
- {
- // Check if the target of the Delete is the same
- // as the new DN of this ModifyDN
- if (msg.newDNIsEqual(pendingMsg.getDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- // Check if the Add Operation was done on the new parent of
- // the MODDN operation
- if (msg.newParentIsEqual(pendingMsg.getDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- // Check if the AddOperation was done on the same DN as the
- // target DN of the MODDN operation
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- // Check if the ModifyDNOperation was done from the new DN of
- // the MODDN operation
- if (msg.newDNIsEqual(pendingMsg.getDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof AddMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ // Check if the Add Operation was done on the new parent of
+ // the MODDN operation
+ if (msg.newParentIsEqual(pendingMsg.getDN()))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ // Check if the AddOperation was done on the same DN as the
+ // target DN of the MODDN operation
+ if (pendingMsg.getDN().equals(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ // Check if the ModifyDNOperation was done from the new DN of
+ // the MODDN operation
+ if (msg.newDNIsEqual(pendingMsg.getDN()))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
}
}
return hasDependencies;
@@ -424,72 +507,62 @@
*
* @return A boolean indicating if this operation has some dependencies.
*/
- public synchronized boolean checkDependencies(DeleteOperation op)
+ public boolean checkDependencies(DeleteOperation op)
{
boolean hasDependencies = false;
final DN targetDN = op.getEntryDN();
final CSN csn = OperationContext.getCSN(op);
- final PendingChange change = pendingChanges.get(csn);
+ final PendingChange change = getPendingChange(csn);
+
if (change == null)
{
return false;
}
- for (PendingChange pendingChange : pendingChanges.values())
+ for (PendingChange pendingChange : activeAndDependentChanges)
{
- if (pendingChange.getCSN().isOlderThan(csn))
+ if (pendingChange.getCSN().isNewerThanOrEqualTo(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
- if (pendingMsg != null)
+ // From now on, the dependency should be for newer changes to be dependent on this one, so we can stop for now.
+ break;
+ }
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check if the operation to be run is a deleteOperation on a
+ * children of the current DeleteOperation.
+ */
+ if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
{
- if (pendingMsg instanceof DeleteMsg)
- {
- /*
- * Check if the operation to be run is a deleteOperation on a
- * children of the current DeleteOperation.
- */
- if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * parent of the current DeleteOperation.
- */
- if (pendingMsg.getDN().equals(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
- /*
- * Check if the operation to be run is an ModifyDNOperation
- * on a children of the current DeleteOperation
- */
- if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN)
- || pendingModDn.newDNIsParent(targetDN))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
+ hasDependencies = true;
+ addDependency(change);
}
}
- else
+ else if (pendingMsg instanceof AddMsg)
{
- // We reached an operation that is newer than the operation
- // for which we are doing the dependency check so it is
- // not possible to find another operation with some dependency.
- // break the loop to avoid going through the potentially large
- // list of pending changes.
- break;
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current DeleteOperation.
+ */
+ if (pendingMsg.getDN().equals(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ final ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingMsg;
+ /*
+ * Check if the operation to be run is an ModifyDNOperation
+ * on a children of the current DeleteOperation
+ */
+ if (pendingMsg.getDN().isSubordinateOrEqualTo(targetDN) || pendingModDn.newDNIsParent(targetDN))
+ {
+ hasDependencies = true;
+ addDependency(change);
+ }
}
}
return hasDependencies;
@@ -514,16 +587,18 @@
{
DeleteOperation newOp = (DeleteOperation) op;
return checkDependencies(newOp);
-
- } else if (op instanceof AddOperation)
+ }
+ else if (op instanceof AddOperation)
{
AddOperation newOp = (AddOperation) op;
return checkDependencies(newOp);
- } else if (op instanceof ModifyDNOperationBasis)
+ }
+ else if (op instanceof ModifyDNOperationBasis)
{
ModifyDNMsg newMsg = (ModifyDNMsg) msg;
return checkDependencies(newMsg);
- } else
+ }
+ else
{
return true; // unknown type of operation ?!
}
--
Gitblit v1.10.0