From 88e5620001d65afa8d0d8e07d1361fa44705743e Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 11 May 2007 13:19:28 +0000
Subject: [PATCH] This code allows the replication code to replay operation in the correct order when operation have dependencies (like adding child entry after parent)
---
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java | 428 ++++++++++++++++++++++++++---------------------------
1 files changed, 209 insertions(+), 219 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
index e0a9cb7..93d7ba1 100644
--- a/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -49,6 +49,7 @@
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
+import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -128,17 +129,14 @@
{
private ReplicationMonitor monitor;
- private ChangeNumberGenerator changeNumberGenerator;
private ReplicationBroker broker;
private List<ListenerThread> synchroThreads =
new ArrayList<ListenerThread>();
- private final SortedMap<ChangeNumber, PendingChange> pendingChanges =
- new TreeMap<ChangeNumber, PendingChange>();
private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
new TreeMap<ChangeNumber, UpdateMessage>();
- private int numRcvdUpdates = 0;
- private int numSentUpdates = 0;
+ private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
+ private AtomicInteger numSentUpdates = new AtomicInteger(0);
private AtomicInteger numProcessedUpdates = new AtomicInteger();
private int debugCount = 0;
private PersistentServerState state;
@@ -150,6 +148,19 @@
private int maxSendDelay = 0;
/**
+ * This object is used to store the list of update currently being
+ * done on the local database.
+ * It contain both the update that are done directly on this server
+ * and the updates that was done on another server, transmitted
+ * by the replication server and that are currently replayed.
+ * It is usefull to make sure that dependencies between operations
+ * are correctly fullfilled, that the local operations are sent in a
+ * correct order to the replication server and that the ServerState
+ * is not updated too early.
+ */
+ private PendingChanges pendingChanges;
+
+ /**
* The time in milliseconds between heartbeats from the replication
* server. Zero means heartbeats are off.
*/
@@ -245,7 +256,7 @@
private ConfigEntry backendConfigEntry;
private List<DN> branches = new ArrayList<DN>(0);
- private int listenerThreadNumber = 1;
+ private int listenerThreadNumber = 10;
private boolean receiveStatus = true;
private Collection<String> replicationServers;
@@ -317,12 +328,6 @@
DirectoryServer.registerMonitorProvider(monitor);
/*
- * ChangeNumberGenerator is used to create new unique ChangeNumbers
- * for each operation done on the replication domain.
- */
- changeNumberGenerator = new ChangeNumberGenerator(serverId, state);
-
- /*
* create the broker object used to publish and receive changes
*/
try
@@ -348,6 +353,14 @@
*/
}
+ /*
+ * ChangeNumberGenerator is used to create new unique ChangeNumbers
+ * for each operation done on the replication domain.
+ */
+ pendingChanges =
+ new PendingChanges(new ChangeNumberGenerator(serverId, state),
+ broker, state);
+
// listen for changes on the configuration
configuration.addChangeListener(this);
}
@@ -444,23 +457,37 @@
* of the parent entry
*/
- // There is a potential of perfs improvement here
- // if we could avoid the following parent entry retrieval
- DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
-
- if (parentDnFromCtx != null)
+ String parentUid = ctx.getParentUid();
+ // root entry have no parent,
+ // there is no need to check for it.
+ if (parentUid != null)
{
- DN entryDN = addOperation.getEntryDN();
- DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
- if ((parentDnFromEntryDn != null)
- && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
+ // There is a potential of perfs improvement here
+ // if we could avoid the following parent entry retrieval
+ DN parentDnFromCtx = findEntryDN(ctx.getParentUid());
+
+ if (parentDnFromCtx == null)
{
- // parentEntry has been renamed
- // replication name conflict resolution is expected to fix that
- // later in the flow
+ // The parent does not exist with the specified unique id
+ // stop the operation with NO_SUCH_OBJECT and let the
+ // conflict resolution or the dependency resolution solve this.
addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
return new SynchronizationProviderResult(false);
}
+ else
+ {
+ DN entryDN = addOperation.getEntryDN();
+ DN parentDnFromEntryDn = entryDN.getParentDNInSuffix();
+ if ((parentDnFromEntryDn != null)
+ && (!parentDnFromCtx.equals(parentDnFromEntryDn)))
+ {
+ // parentEntry has been renamed
+ // replication name conflict resolution is expected to fix that
+ // later in the flow
+ addOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ }
}
}
return new SynchronizationProviderResult(true);
@@ -633,89 +660,96 @@
*/
public UpdateMessage receive()
{
- synchronized (broker)
+ UpdateMessage update = pendingChanges.getNextUpdate();
+
+ if (update == null)
{
- UpdateMessage update = null;
- while (update == null)
+ synchronized (broker)
{
- ReplicationMessage msg;
- try
+ while (update == null)
{
- msg = broker.receive();
- if (msg == null)
+ ReplicationMessage msg;
+ try
{
- // The server is in the shutdown process
- return null;
- }
- log("Broker received message :" + msg);
- if (msg instanceof AckMessage)
- {
- AckMessage ack = (AckMessage) msg;
- receiveAck(ack);
- }
- else if (msg instanceof UpdateMessage)
- {
- update = (UpdateMessage) msg;
- receiveUpdate(update);
- }
- else if (msg instanceof InitializeRequestMessage)
- {
- // Another server requests us to provide entries
- // for a total update
- InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
- try
+ msg = broker.receive();
+ if (msg == null)
{
- initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
- null);
+ // The server is in the shutdown process
+ return null;
}
- catch(DirectoryException de)
+ log("Broker received message :" + msg);
+ if (msg instanceof AckMessage)
{
- // Returns an error message to notify the sender
- int msgID = de.getMessageID();
- ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
- msgID, de.getMessage());
- broker.publish(errorMsg);
+ AckMessage ack = (AckMessage) msg;
+ receiveAck(ack);
}
- }
- else if (msg instanceof InitializeTargetMessage)
- {
- // Another server is exporting its entries to us
- InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
+ else if (msg instanceof InitializeRequestMessage)
+ {
+ // Another server requests us to provide entries
+ // for a total update
+ InitializeRequestMessage initMsg = (InitializeRequestMessage) msg;
+ try
+ {
+ initializeTarget(initMsg.getsenderID(), initMsg.getsenderID(),
+ null);
+ }
+ catch(DirectoryException de)
+ {
+ // Returns an error message to notify the sender
+ int msgID = de.getMessageID();
+ ErrorMessage errorMsg =
+ new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ broker.publish(errorMsg);
+ }
+ }
+ else if (msg instanceof InitializeTargetMessage)
+ {
+ // Another server is exporting its entries to us
+ InitializeTargetMessage initMsg = (InitializeTargetMessage) msg;
- try
- {
- importBackend(initMsg);
+ try
+ {
+ importBackend(initMsg);
+ }
+ catch(DirectoryException de)
+ {
+ // Return an error message to notify the sender
+ int msgID = de.getMessageID();
+ ErrorMessage errorMsg =
+ new ErrorMessage(initMsg.getsenderID(),
+ msgID, de.getMessage());
+ log(getMessage(msgID,
+ backend.getBackendID()) + de.getMessage());
+ broker.publish(errorMsg);
+ }
}
- catch(DirectoryException de)
+ else if (msg instanceof ErrorMessage)
{
- // Return an error message to notify the sender
- int msgID = de.getMessageID();
- ErrorMessage errorMsg = new ErrorMessage(initMsg.getsenderID(),
- msgID, de.getMessage());
- log(getMessage(msgID, backend.getBackendID()) + de.getMessage());
- broker.publish(errorMsg);
+ if (ieContext != null)
+ {
+ // This is an error termination for the 2 following cases :
+ // - either during an export
+ // - or before an import really started
+ // For example, when we publish a request and the
+ // replicationServer did not find any import source.
+ abandonImportExport((ErrorMessage)msg);
+ }
+ }
+ else if (msg instanceof UpdateMessage)
+ {
+ update = (UpdateMessage) msg;
+ receiveUpdate(update);
}
}
- else if (msg instanceof ErrorMessage)
+ catch (SocketTimeoutException e)
{
- if (ieContext != null)
- {
- // This is an error termination for the 2 following cases :
- // - either during an export
- // - or before an import really started
- // For example, when we publish a request and the
- // replicationServer did not find any import source.
- abandonImportExport((ErrorMessage)msg);
- }
+ // just retry
}
- } catch (SocketTimeoutException e)
- {
- // just retry
}
-
}
- return update;
}
+ return update;
}
/**
@@ -725,21 +759,8 @@
*/
public void receiveUpdate(UpdateMessage update)
{
- ChangeNumber changeNumber = update.getChangeNumber();
-
- synchronized (pendingChanges)
- {
- if (pendingChanges.containsKey(changeNumber))
- {
- /*
- * This should never happen,
- * TODO log error and throw exception
- */
- }
- pendingChanges.put(changeNumber,
- new PendingChange(changeNumber, null, update));
- numRcvdUpdates++;
- }
+ pendingChanges.putRemoteUpdate(update);
+ numRcvdUpdates.incrementAndGet();
}
/**
@@ -752,10 +773,9 @@
UpdateMessage update;
ChangeNumber changeNumber = ack.getChangeNumber();
- synchronized (pendingChanges)
+ synchronized (waitingAckMsgs)
{
- update = waitingAckMsgs.get(changeNumber);
- waitingAckMsgs.remove(changeNumber);
+ update = waitingAckMsgs.remove(changeNumber);
}
if (update != null)
{
@@ -798,61 +818,55 @@
* This is an operation type that we do not know about
* It should never happen.
*/
- synchronized (pendingChanges)
- {
- pendingChanges.remove(curChangeNumber);
- int msgID = MSGID_UNKNOWN_TYPE;
- String message = getMessage(msgID, op.getOperationType().toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- return;
- }
+ pendingChanges.remove(curChangeNumber);
+ int msgID = MSGID_UNKNOWN_TYPE;
+ String message = getMessage(msgID, op.getOperationType().toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
}
}
- synchronized(pendingChanges)
+ if (result == ResultCode.SUCCESS)
{
- if (result == ResultCode.SUCCESS)
+ try
{
- PendingChange curChange = pendingChanges.get(curChangeNumber);
- if (curChange == null)
- {
- // This should never happen
- int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
- String message = getMessage(msgID, curChangeNumber.toString(),
- op.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- return;
- }
- curChange.setCommitted(true);
+ pendingChanges.commit(curChangeNumber, op, msg);
+ }
+ catch (NoSuchElementException e)
+ {
+ int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
+ String message = getMessage(msgID, curChangeNumber.toString(),
+ op.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
+ }
- if (op.isSynchronizationOperation())
- curChange.setOp(op);
- else
- curChange.setMsg(msg);
-
- if (msg != null && isAssured)
+ if (msg != null && isAssured)
+ {
+ synchronized (waitingAckMsgs)
{
- // Add the assured message to the list of those whose acknowledgements
- // we are awaiting.
+ // Add the assured message to the list of update that are
+ // waiting acknowledgements
waitingAckMsgs.put(curChangeNumber, msg);
}
}
- else if (!op.isSynchronizationOperation())
- {
- // Remove an unsuccessful non-replication operation from the pending
- // changes list.
- if (curChangeNumber != null)
- {
- pendingChanges.remove(curChangeNumber);
- }
- }
-
- pushCommittedChanges();
}
+ else if (!op.isSynchronizationOperation())
+ {
+ // Remove an unsuccessful non-replication operation from the pending
+ // changes list.
+ if (curChangeNumber != null)
+ {
+ pendingChanges.remove(curChangeNumber);
+ }
+ }
+
+ int pushedChanges = pendingChanges.pushCommittedChanges();
+ numSentUpdates.addAndGet(pushedChanges);
// Wait for acknowledgement of an assured message.
if (msg != null && isAssured)
@@ -880,7 +894,7 @@
*/
public int getNumRcvdUpdates()
{
- return numRcvdUpdates;
+ return numRcvdUpdates.get();
}
/**
@@ -890,11 +904,11 @@
*/
public int getNumSentUpdates()
{
- return numSentUpdates;
+ return numSentUpdates.get();
}
/**
- * get the number of updates in the pending list.
+ * Get the number of updates in the pending list.
*
* @return The number of updates in the pending list
*/
@@ -1052,45 +1066,63 @@
{
Operation op = null;
boolean done = false;
+ boolean dependency = false;
ChangeNumber changeNumber = null;
int retryCount = 10;
+ boolean firstTry = true;
try
{
- while (!done && retryCount-- > 0)
+ while ((!dependency) && (!done) && (retryCount-- > 0))
{
op = msg.createOperation(conn);
op.setInternalOperation(true);
op.setSynchronizationOperation(true);
changeNumber = OperationContext.getChangeNumber(op);
- if (changeNumber != null)
- changeNumberGenerator.adjust(changeNumber);
op.run();
ResultCode result = op.getResultCode();
+
if (result != ResultCode.SUCCESS)
{
if (op instanceof ModifyOperation)
{
ModifyOperation newOp = (ModifyOperation) op;
- done = solveNamingConflict(newOp, msg);
+ dependency = pendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
}
else if (op instanceof DeleteOperation)
{
DeleteOperation newOp = (DeleteOperation) op;
- done = solveNamingConflict(newOp, msg);
+ dependency = pendingChanges.checkDependencies(newOp);
+ if ((!dependency) && (!firstTry))
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
}
else if (op instanceof AddOperation)
{
AddOperation newOp = (AddOperation) op;
- done = solveNamingConflict(newOp, msg);
-
- } else if (op instanceof ModifyDNOperation)
+ dependency = pendingChanges.checkDependencies(newOp);
+ if (!dependency)
+ {
+ done = solveNamingConflict(newOp, msg);
+ }
+ }
+ else if (op instanceof ModifyDNOperation)
{
- ModifyDNOperation newOp = (ModifyDNOperation) op;
- done = solveNamingConflict(newOp, msg);
+ ModifyDNMsg newMsg = (ModifyDNMsg) msg;
+ dependency = pendingChanges.checkDependencies(newMsg);
+ if (!dependency)
+ {
+ ModifyDNOperation newOp = (ModifyDNOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
}
else
{
@@ -1108,9 +1140,10 @@
{
done = true;
}
+ firstTry = false;
}
- if (!done)
+ if (!done && !dependency)
{
// Continue with the next change but the servers could now become
// inconsistent.
@@ -1119,6 +1152,7 @@
String message = getMessage(msgID, op.toString());
logError(ErrorLogCategory.SYNCHRONIZATION,
ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+
updateError(changeNumber);
}
}
@@ -1177,9 +1211,12 @@
}
finally
{
- if (msg.isAssured())
- ack(msg.getChangeNumber());
- incProcessedUpdates();
+ if (!dependency)
+ {
+ if (msg.isAssured())
+ ack(msg.getChangeNumber());
+ incProcessedUpdates();
+ }
}
}
@@ -1193,12 +1230,9 @@
*/
public void updateError(ChangeNumber changeNumber)
{
- synchronized (pendingChanges)
- {
- PendingChange change = pendingChanges.get(changeNumber);
- change.setCommitted(true);
- pushCommittedChanges();
- }
+ pendingChanges.commit(changeNumber);
+ int pushedChanges = pendingChanges.pushCommittedChanges();
+ numSentUpdates.addAndGet(pushedChanges);
}
/**
@@ -1210,15 +1244,7 @@
*/
private ChangeNumber generateChangeNumber(Operation operation)
{
- ChangeNumber changeNumber;
-
- changeNumber = changeNumberGenerator.NewChangeNumber();
- PendingChange change = new PendingChange(changeNumber, operation, null);
- synchronized(pendingChanges)
- {
- pendingChanges.put(changeNumber, change);
- }
- return changeNumber;
+ return pendingChanges.putLocalOperation(operation);
}
@@ -1604,42 +1630,6 @@
}
/**
- * Push all committed local changes to the replicationServer service.
- * PRECONDITION : The pendingChanges lock must be held before calling
- * this method.
- */
- private void pushCommittedChanges()
- {
- if (pendingChanges.isEmpty())
- return;
-
- ChangeNumber firstChangeNumber = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstChangeNumber);
-
- while ((firstChange != null) && firstChange.isCommitted())
- {
- if ((firstChange.getOp() != null ) &&
- (firstChange.getOp().isSynchronizationOperation() == false))
- {
- numSentUpdates++;
- broker.publish(firstChange.getMsg());
- }
- state.update(firstChangeNumber);
- pendingChanges.remove(firstChangeNumber);
-
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
- else
- {
- firstChangeNumber = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstChangeNumber);
- }
- }
- }
-
- /**
* Get the maximum receive window size.
*
* @return The maximum receive window size.
--
Gitblit v1.10.0