From 0d566a1779183a479c33447ac6b0224b705a33c9 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 18 Jun 2007 06:50:18 +0000
Subject: [PATCH] Fix for 1797 : dead locks during replication testing
---
opends/src/server/org/opends/server/replication/plugin/PendingChanges.java | 373 ------------------------
opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 469 +++++++++++++++++++++++++++++++
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 52 ++-
opends/src/server/org/opends/server/replication/common/ServerState.java | 6
4 files changed, 518 insertions(+), 382 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 1b5f36a..048891a 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -151,6 +151,7 @@
{
if (changeNumber == null)
return false;
+
synchronized(this)
{
Short id = changeNumber.getServerId();
@@ -181,9 +182,9 @@
public Set<String> toStringSet()
{
HashSet<String> set = new HashSet<String>();
+
synchronized (this)
{
-
for (Short key : list.keySet())
{
ChangeNumber change = list.get(key);
@@ -204,6 +205,7 @@
public ArrayList<ASN1OctetString> toASN1ArrayList()
{
ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
+
synchronized (this)
{
for (Short id : list.keySet())
@@ -215,7 +217,7 @@
return values;
}
/**
- * return the text representation of ServerState.
+ * Return the text representation of ServerState.
* @return the text representation of ServerState
*/
@Override
diff --git a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 02f24c2..7d181f0 100644
--- a/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opends/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -28,28 +28,23 @@
import java.util.NoSuchElementException;
import java.util.SortedMap;
-import java.util.SortedSet;
import java.util.TreeMap;
-import java.util.TreeSet;
-import org.opends.server.core.AddOperation;
-import org.opends.server.core.DeleteOperation;
-import org.opends.server.core.ModifyOperation;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.DeleteMsg;
-import org.opends.server.replication.protocol.ModifyDNMsg;
-import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.UpdateMessage;
-import org.opends.server.types.DN;
import org.opends.server.types.Operation;
/**
- *
- * This class is use to store the list of operations currently
+ * This class is use to store the list of local operations currently
* in progress and not yet committed in the database.
+ *
+ * It is used to make sure that operations are sent to the Replication
+ * Server in the order defined by their ChangeNumber.
+ * It is also used to update the ServerState at the appropriate time.
+ *
+ * On object of this class is instanciated for each ReplicationDomain.
*/
public class PendingChanges
{
@@ -60,14 +55,6 @@
new TreeMap<ChangeNumber, PendingChange>();
/**
- * A sorted set containing the list of PendingChanges that have
- * not been replayed correctly because they are dependent on
- * another change to be completed.
- */
- private SortedSet<PendingChange> dependentChanges =
- new TreeSet<PendingChange>();
-
- /**
* The ChangeNumberGenerator to use to create new unique ChangeNumbers
* for each operation done on the replication domain.
*/
@@ -129,13 +116,10 @@
*
* @param changeNumber The ChangeNumber of the update message that must be
* set as committed.
- * @param op The Operation associated of the update when the
- * update is a local update.
- * @param msg The message associated to the update when the update
- * was received from a replication server.
+ * @param msg The message associated to the update.
*/
public synchronized void commit(ChangeNumber changeNumber,
- Operation op, UpdateMessage msg)
+ UpdateMessage msg)
{
PendingChange curChange = pendingChanges.get(changeNumber);
if (curChange == null)
@@ -144,10 +128,7 @@
}
curChange.setCommitted(true);
- if (op.isSynchronizationOperation())
- curChange.setOp(op);
- else
- curChange.setMsg(msg);
+ curChange.setMsg(msg);
}
/**
@@ -186,21 +167,6 @@
}
/**
- * Add a new UpdateMessage that was received from the replication server
- * to the pendingList.
- *
- * @param update The UpdateMessage that was received from the replication
- * server and that will be added to the pending list.
- */
- public synchronized void putRemoteUpdate(UpdateMessage update)
- {
- ChangeNumber changeNumber = update.getChangeNumber();
- changeNumberGenerator.adjust(changeNumber);
- pendingChanges.put(changeNumber, new PendingChange(changeNumber, null,
- update));
- }
-
- /**
* Push all committed local changes to the replicationServer service.
*
* @return The number of pushed updates.
@@ -237,323 +203,4 @@
}
return numSentUpdates;
}
-
- /**
- * Get the first update in the list that have some dependencies cleared.
- *
- * @return The UpdateMessage to be handled.
- */
- public synchronized UpdateMessage getNextUpdate()
- {
- /*
- * Parse the list of Update with dependencies and check if the dependencies
- * are now cleared until an Update withour dependencies is found.
- */
- for (PendingChange change : dependentChanges)
- {
- if (change.dependenciesIsCovered(state))
- {
- dependentChanges.remove(change);
- return change.getMsg();
- }
- }
- return null;
- }
-
-
- /**
- * Check if the given AddOperation has some dependencies on any
- * currently running previous operation.
- * Update the dependency list in the associated PendingChange if
- * there are some dependencies.
- * AddOperation depends on
- *
- * - DeleteOperation done on the same DN
- * - ModifyDnOperation with the same target DN as the ADD DN
- * - ModifyDnOperation with new DN equals to the ADD DN parent
- * - AddOperation done on the parent DN of the ADD DN
- *
- * @param op The AddOperation to be checked.
- *
- * @return A boolean indicating if this operation has some dependencies.
- */
- public synchronized boolean checkDependencies(AddOperation op)
- {
- boolean hasDependencies = false;
- DN targetDn = op.getEntryDN();
- ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
- PendingChange change = pendingChanges.get(changeNumber);
- if (change == null)
- return false;
-
- for (PendingChange pendingChange : pendingChanges.values())
- {
- if (pendingChange.getChangeNumber().older(changeNumber))
- {
- UpdateMessage pendingMsg = pendingChange.getMsg();
- if (pendingMsg != null)
- {
- if (pendingMsg instanceof DeleteMsg)
- {
- /*
- * Check is the operation to be run is a deleteOperation on the
- * same DN.
- */
- if (pendingChange.getTargetDN().equals(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * parent of the current AddOperation.
- */
- if (pendingChange.getTargetDN().isAncestorOf(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- /*
- * Check if the operation to be run is ModifyDnOperation with
- * the same target DN as the ADD DN
- * or a ModifyDnOperation with new DN equals to the ADD DN parent
- */
- if (pendingChange.getTargetDN().equals(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- else
- {
- ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
- if (pendingModDn.newDNIsParent(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- }
- }
- }
- }
- return hasDependencies;
- }
-
- /**
- * Check if the given ModifyOperation has some dependencies on any
- * currently running previous operation.
- * Update the dependency list in the associated PendingChange if
- * there are some dependencies.
- *
- * ModifyOperation depends on
- * - AddOperation done on the same DN
- *
- * @param op The ModifyOperation to be checked.
- *
- * @return A boolean indicating if this operation has some dependencies.
- */
- public synchronized boolean checkDependencies(ModifyOperation op)
- {
- boolean hasDependencies = false;
- DN targetDn = op.getEntryDN();
- ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
- PendingChange change = pendingChanges.get(changeNumber);
- if (change == null)
- return false;
-
- for (PendingChange pendingChange : pendingChanges.values())
- {
- if (pendingChange.getChangeNumber().older(changeNumber))
- {
- UpdateMessage pendingMsg = pendingChange.getMsg();
- if (pendingMsg != null)
- {
- if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * same DN.
- */
- if (pendingChange.getTargetDN().equals(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- }
- }
- }
- return hasDependencies;
- }
-
- /**
- * Mark the first pendingChange as dependent on the second PendingChange.
- * @param dependentChange The PendingChange that depend on the second
- * PendingChange.
- * @param pendingChange The PendingChange on which the first PendingChange
- * is dependent.
- */
- private void addDependency(
- PendingChange dependentChange, PendingChange pendingChange)
- {
- dependentChange.addDependency(pendingChange.getChangeNumber());
- dependentChanges.add(dependentChange);
- }
-
- /**
- * Check if the given ModifyDNMsg has some dependencies on any
- * currently running previous operation.
- * Update the dependency list in the associated PendingChange if
- * there are some dependencies.
- *
- * Modify DN Operation depends on
- * - AddOperation done on the same DN as the target DN of the MODDN operation
- * - AddOperation done on the new parent of the MODDN operation
- * - DeleteOperation done on the new DN of the MODDN operation
- * - ModifyDNOperation done from the new DN of the MODDN operation
- *
- * @param msg The ModifyDNMsg to be checked.
- *
- * @return A boolean indicating if this operation has some dependencies.
- */
- public synchronized boolean checkDependencies(ModifyDNMsg msg)
- {
- boolean hasDependencies = false;
- ChangeNumber changeNumber = msg.getChangeNumber();
- PendingChange change = pendingChanges.get(changeNumber);
- if (change == null)
- return false;
-
- DN targetDn = change.getTargetDN();
-
-
- for (PendingChange pendingChange : pendingChanges.values())
- {
- if (pendingChange.getChangeNumber().older(changeNumber))
- {
- UpdateMessage pendingMsg = pendingChange.getMsg();
- if (pendingMsg != null)
- {
- if (pendingMsg instanceof DeleteMsg)
- {
- // Check if the target of the Delete is the same
- // as the new DN of this ModifyDN
- if (msg.newDNIsEqual(pendingChange.getTargetDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- // Check if the Add Operation was done on the new parent of
- // the MODDN operation
- if (msg.newParentIsEqual(pendingChange.getTargetDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- // Check if the AddOperation was done on the same DN as the
- // target DN of the MODDN operation
- if (pendingChange.getTargetDN().equals(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- // Check if the ModifyDNOperation was done from the new DN of
- // the MODDN operation
- if (msg.newDNIsEqual(pendingChange.getTargetDN()))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- }
- }
- }
- return hasDependencies;
- }
-
- /**
- * Check if the given DeleteOperation has some dependencies on any
- * currently running previous operation.
- * Update the dependency list in the associated PendingChange if
- * there are some dependencies.
- *
- * DeleteOperation depends on
- * - DeleteOperation done on children DN
- * - ModifyDnOperation with target DN that are children of the DEL DN
- * - AddOperation done on the same DN
- *
- *
- * @param op The DeleteOperation to be checked.
- *
- * @return A boolean indicating if this operation has some dependencies.
- */
- public synchronized boolean checkDependencies(DeleteOperation op)
- {
- boolean hasDependencies = false;
- DN targetDn = op.getEntryDN();
- ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
- PendingChange change = pendingChanges.get(changeNumber);
- if (change == null)
- return false;
-
- for (PendingChange pendingChange : pendingChanges.values())
- {
- if (pendingChange.getChangeNumber().older(changeNumber))
- {
- UpdateMessage pendingMsg = pendingChange.getMsg();
- if (pendingMsg != null)
- {
- if (pendingMsg instanceof DeleteMsg)
- {
- /*
- * Check if the operation to be run is a deleteOperation on a
- * children of the current DeleteOperation.
- */
- if (pendingChange.getTargetDN().isDescendantOf(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof AddMsg)
- {
- /*
- * Check if the operation to be run is an addOperation on a
- * parent of the current DeleteOperation.
- */
- if (pendingChange.getTargetDN().equals(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- else if (pendingMsg instanceof ModifyDNMsg)
- {
- /*
- * Check if the operation to be run is an ModifyDNOperation
- * on a children of the current DeleteOperation
- */
- if (pendingChange.getTargetDN().isDescendantOf(targetDn))
- {
- hasDependencies = true;
- addDependency(change, pendingChange);
- }
- }
- }
- }
- }
- return hasDependencies;
- }
}
diff --git a/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
new file mode 100644
index 0000000..9e871da
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -0,0 +1,469 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Portions Copyright 2007 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.plugin;
+
+import java.util.NoSuchElementException;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.OperationContext;
+import org.opends.server.replication.protocol.UpdateMessage;
+import org.opends.server.types.DN;
+
+/**
+ *
+ * This class is used to store the list of remote changes received
+ * from a replication server and taht are either currently being replayed
+ * or that are waiting for being replayed.
+ *
+ * It is used to know when the ServerState must be updated and to conpute
+ * the dependencies between operations.
+ *
+ * One of this object is instanciated for each ReplicationDomain.
+ *
+ */
+public class RemotePendingChanges
+{
+ /**
+ * A map used to store the pending changes.
+ */
+ private SortedMap<ChangeNumber, PendingChange> pendingChanges =
+ new TreeMap<ChangeNumber, PendingChange>();
+
+ /**
+ * A sorted set containing the list of PendingChanges that have
+ * not been replayed correctly because they are dependent on
+ * another change to be completed.
+ */
+ private SortedSet<PendingChange> dependentChanges =
+ new TreeSet<PendingChange>();
+
+ /**
+ * The ServerState that will be updated when UpdateMessage are fully replayed.
+ */
+ private ServerState state;
+
+ /**
+ * The ChangeNumberGenerator to must be adjusted when new changes
+ * are received from a remote server.
+ */
+ private ChangeNumberGenerator changeNumberGenerator;
+
+ /**
+ * Creates a new RemotePendingChanges using the provided ServerState.
+ *
+ * @param changeNumberGenerator The ChangeNumberGenerator that should
+ * be adjusted when changes are received.
+ * @param state The ServerState that will be updated when UpdateMessage
+ * have been fully replayed.
+ */
+ public RemotePendingChanges(ChangeNumberGenerator changeNumberGenerator,
+ ServerState state)
+ {
+ this.changeNumberGenerator = changeNumberGenerator;
+ this.state = state;
+ }
+
+ /**
+ * Add a new UpdateMessage that was received from the replication server
+ * to the pendingList.
+ *
+ * @param update The UpdateMessage that was received from the replication
+ * server and that will be added to the pending list.
+ */
+ public synchronized void putRemoteUpdate(UpdateMessage update)
+ {
+ ChangeNumber changeNumber = update.getChangeNumber();
+ changeNumberGenerator.adjust(changeNumber);
+ pendingChanges.put(changeNumber, new PendingChange(changeNumber, null,
+ update));
+ }
+
+ /**
+ * Mark an update message as committed.
+ *
+ * @param changeNumber The ChangeNumber of the update message that must be
+ * set as committed.
+ */
+ public synchronized void commit(ChangeNumber changeNumber)
+ {
+ PendingChange curChange = pendingChanges.get(changeNumber);
+ if (curChange == null)
+ {
+ throw new NoSuchElementException();
+ }
+ curChange.setCommitted(true);
+
+ ChangeNumber firstChangeNumber = pendingChanges.firstKey();
+ PendingChange firstChange = pendingChanges.get(firstChangeNumber);
+
+ while ((firstChange != null) && firstChange.isCommitted())
+ {
+ state.update(firstChangeNumber);
+ pendingChanges.remove(firstChangeNumber);
+
+ if (pendingChanges.isEmpty())
+ {
+ firstChange = null;
+ }
+ else
+ {
+ firstChangeNumber = pendingChanges.firstKey();
+ firstChange = pendingChanges.get(firstChangeNumber);
+ }
+ }
+ }
+
+ /**
+ * Get the first update in the list that have some dependencies cleared.
+ *
+ * @return The UpdateMessage to be handled.
+ */
+ public synchronized UpdateMessage getNextUpdate()
+ {
+ /*
+ * Parse the list of Update with dependencies and check if the dependencies
+ * are now cleared until an Update withour dependencies is found.
+ */
+ for (PendingChange change : dependentChanges)
+ {
+ if (change.dependenciesIsCovered(state))
+ {
+ dependentChanges.remove(change);
+ return change.getMsg();
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Mark the first pendingChange as dependent on the second PendingChange.
+ * @param dependentChange The PendingChange that depend on the second
+ * PendingChange.
+ * @param pendingChange The PendingChange on which the first PendingChange
+ * is dependent.
+ */
+ private void addDependency(
+ PendingChange dependentChange, PendingChange pendingChange)
+ {
+ dependentChange.addDependency(pendingChange.getChangeNumber());
+ dependentChanges.add(dependentChange);
+ }
+
+ /**
+ * Check if the given AddOperation has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ * AddOperation depends on
+ *
+ * - DeleteOperation done on the same DN
+ * - ModifyDnOperation with the same target DN as the ADD DN
+ * - ModifyDnOperation with new DN equals to the ADD DN parent
+ * - AddOperation done on the parent DN of the ADD DN
+ *
+ * @param op The AddOperation to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(AddOperation op)
+ {
+ boolean hasDependencies = false;
+ DN targetDn = op.getEntryDN();
+ ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check is the operation to be run is a deleteOperation on the
+ * same DN.
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof AddMsg)
+ {
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current AddOperation.
+ */
+ if (pendingChange.getTargetDN().isAncestorOf(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ /*
+ * Check if the operation to be run is ModifyDnOperation with
+ * the same target DN as the ADD DN
+ * or a ModifyDnOperation with new DN equals to the ADD DN parent
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ else
+ {
+ ModifyDNMsg pendingModDn = (ModifyDNMsg) pendingChange.getMsg();
+ if (pendingModDn.newDNIsParent(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+
+ /**
+ * Check if the given ModifyOperation has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ *
+ * ModifyOperation depends on
+ * - AddOperation done on the same DN
+ *
+ * @param op The ModifyOperation to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(ModifyOperation op)
+ {
+ boolean hasDependencies = false;
+ DN targetDn = op.getEntryDN();
+ ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof AddMsg)
+ {
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * same DN.
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+
+ /**
+ * Check if the given ModifyDNMsg has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ *
+ * Modify DN Operation depends on
+ * - AddOperation done on the same DN as the target DN of the MODDN operation
+ * - AddOperation done on the new parent of the MODDN operation
+ * - DeleteOperation done on the new DN of the MODDN operation
+ * - ModifyDNOperation done from the new DN of the MODDN operation
+ *
+ * @param msg The ModifyDNMsg to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(ModifyDNMsg msg)
+ {
+ boolean hasDependencies = false;
+ ChangeNumber changeNumber = msg.getChangeNumber();
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ DN targetDn = change.getTargetDN();
+
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ // Check if the target of the Delete is the same
+ // as the new DN of this ModifyDN
+ if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof AddMsg)
+ {
+ // Check if the Add Operation was done on the new parent of
+ // the MODDN operation
+ if (msg.newParentIsEqual(pendingChange.getTargetDN()))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ // Check if the AddOperation was done on the same DN as the
+ // target DN of the MODDN operation
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ // Check if the ModifyDNOperation was done from the new DN of
+ // the MODDN operation
+ if (msg.newDNIsEqual(pendingChange.getTargetDN()))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+
+ /**
+ * Check if the given DeleteOperation has some dependencies on any
+ * currently running previous operation.
+ * Update the dependency list in the associated PendingChange if
+ * there are some dependencies.
+ *
+ * DeleteOperation depends on
+ * - DeleteOperation done on children DN
+ * - ModifyDnOperation with target DN that are children of the DEL DN
+ * - AddOperation done on the same DN
+ *
+ *
+ * @param op The DeleteOperation to be checked.
+ *
+ * @return A boolean indicating if this operation has some dependencies.
+ */
+ public synchronized boolean checkDependencies(DeleteOperation op)
+ {
+ boolean hasDependencies = false;
+ DN targetDn = op.getEntryDN();
+ ChangeNumber changeNumber = OperationContext.getChangeNumber(op);
+ PendingChange change = pendingChanges.get(changeNumber);
+ if (change == null)
+ return false;
+
+ for (PendingChange pendingChange : pendingChanges.values())
+ {
+ if (pendingChange.getChangeNumber().older(changeNumber))
+ {
+ UpdateMessage pendingMsg = pendingChange.getMsg();
+ if (pendingMsg != null)
+ {
+ if (pendingMsg instanceof DeleteMsg)
+ {
+ /*
+ * Check if the operation to be run is a deleteOperation on a
+ * children of the current DeleteOperation.
+ */
+ if (pendingChange.getTargetDN().isDescendantOf(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof AddMsg)
+ {
+ /*
+ * Check if the operation to be run is an addOperation on a
+ * parent of the current DeleteOperation.
+ */
+ if (pendingChange.getTargetDN().equals(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ else if (pendingMsg instanceof ModifyDNMsg)
+ {
+ /*
+ * Check if the operation to be run is an ModifyDNOperation
+ * on a children of the current DeleteOperation
+ */
+ if (pendingChange.getTargetDN().isDescendantOf(targetDn))
+ {
+ hasDependencies = true;
+ addDependency(change, pendingChange);
+ }
+ }
+ }
+ }
+ }
+ return hasDependencies;
+ }
+}
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index 3d76049..c7e9295 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -170,17 +170,22 @@
/**
* This object is used to store the list of update currently being
* done on the local database.
- * It contain both the update that are done directly on this server
- * and the updates that was done on another server, transmitted
- * by the replication server and that are currently replayed.
- * It is usefull to make sure that dependencies between operations
- * are correctly fullfilled, that the local operations are sent in a
+ * Is is usefull to make sure that the local operations are sent in a
* correct order to the replication server and that the ServerState
* is not updated too early.
*/
private PendingChanges pendingChanges;
/**
+ * It contain the updates that were done on other servers, transmitted
+ * by the replication server and that are currently replayed.
+ * It is usefull to make sure that dependencies between operations
+ * are correctly fullfilled and to to make sure that the ServerState is
+ * not updated too early.
+ */
+ private RemotePendingChanges remotePendingChanges;
+
+ /**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
@@ -373,10 +378,15 @@
* ChangeNumberGenerator is used to create new unique ChangeNumbers
* for each operation done on the replication domain.
*/
+ ChangeNumberGenerator generator =
+ new ChangeNumberGenerator(serverId, state);
+
pendingChanges =
new PendingChanges(new ChangeNumberGenerator(serverId, state),
broker, state);
+ remotePendingChanges = new RemotePendingChanges(generator, state);
+
// listen for changes on the configuration
configuration.addChangeListener(this);
}
@@ -679,7 +689,7 @@
*/
public UpdateMessage receive()
{
- UpdateMessage update = pendingChanges.getNextUpdate();
+ UpdateMessage update = remotePendingChanges.getNextUpdate();
if (update == null)
{
@@ -774,7 +784,7 @@
*/
public void receiveUpdate(UpdateMessage update)
{
- pendingChanges.putRemoteUpdate(update);
+ remotePendingChanges.putRemoteUpdate(update);
numRcvdUpdates.incrementAndGet();
}
@@ -847,7 +857,14 @@
{
try
{
- pendingChanges.commit(curChangeNumber, op, msg);
+ if (op.isSynchronizationOperation())
+ {
+ remotePendingChanges.commit(curChangeNumber);
+ }
+ else
+ {
+ pendingChanges.commit(curChangeNumber, msg);
+ }
}
catch (NoSuchElementException e)
{
@@ -880,8 +897,11 @@
}
}
- int pushedChanges = pendingChanges.pushCommittedChanges();
- numSentUpdates.addAndGet(pushedChanges);
+ if (!op.isSynchronizationOperation())
+ {
+ int pushedChanges = pendingChanges.pushCommittedChanges();
+ numSentUpdates.addAndGet(pushedChanges);
+ }
// Wait for acknowledgement of an assured message.
if (msg != null && isAssured)
@@ -1111,7 +1131,7 @@
if (op instanceof ModifyOperation)
{
ModifyOperation newOp = (ModifyOperation) op;
- dependency = pendingChanges.checkDependencies(newOp);
+ dependency = remotePendingChanges.checkDependencies(newOp);
if (!dependency)
{
done = solveNamingConflict(newOp, msg);
@@ -1120,7 +1140,7 @@
else if (op instanceof DeleteOperation)
{
DeleteOperation newOp = (DeleteOperation) op;
- dependency = pendingChanges.checkDependencies(newOp);
+ dependency = remotePendingChanges.checkDependencies(newOp);
if ((!dependency) && (!firstTry))
{
done = solveNamingConflict(newOp, msg);
@@ -1130,7 +1150,7 @@
{
AddOperation newOp = (AddOperation) op;
AddMsg addMsg = (AddMsg) msg;
- dependency = pendingChanges.checkDependencies(newOp);
+ dependency = remotePendingChanges.checkDependencies(newOp);
if (!dependency)
{
done = solveNamingConflict(newOp, addMsg);
@@ -1139,7 +1159,7 @@
else if (op instanceof ModifyDNOperation)
{
ModifyDNMsg newMsg = (ModifyDNMsg) msg;
- dependency = pendingChanges.checkDependencies(newMsg);
+ dependency = remotePendingChanges.checkDependencies(newMsg);
if (!dependency)
{
ModifyDNOperation newOp = (ModifyDNOperation) op;
@@ -1253,9 +1273,7 @@
*/
public void updateError(ChangeNumber changeNumber)
{
- pendingChanges.commit(changeNumber);
- int pushedChanges = pendingChanges.pushCommittedChanges();
- numSentUpdates.addAndGet(pushedChanges);
+ remotePendingChanges.commit(changeNumber);
}
/**
--
Gitblit v1.10.0