From b5acb25ee2ad9bf8b166b9de1a34e6aab6ea23b7 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Sep 2006 12:04:47 +0000
Subject: [PATCH] issue 604 : solve the naming conflict that might happen when several masters are used there are 3 main parts in this commit : - attach the replication context in an OperationContext - if operation replay fails then fix the problem - in the pre-op checks for conflict and cause failure if necessary most of the time there should be no conflict and the operation should be processed normally
---
opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java | 948 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 802 insertions(+), 146 deletions(-)
diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index a37fe14..428c528 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -26,15 +26,21 @@
*/
package org.opends.server.synchronization;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
+import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
+import static org.opends.server.synchronization.Historical.*;
import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
+import java.util.zip.DataFormatException;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
@@ -47,17 +53,27 @@
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
+import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
+import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchScope;
import org.opends.server.types.SynchronizationProviderResult;
/**
@@ -88,9 +104,6 @@
private ServerState state;
private int numReplayedPostOpCalled = 0;
- private boolean assuredFlag = false;
-
-
private int maxReceiveQueue = 0;
private int maxSendQueue = 0;
private int maxReceiveDelay = 0;
@@ -114,6 +127,8 @@
private DN configDn;
+ private InternalClientConnection conn = new InternalClientConnection();
+
static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
static String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
static String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -379,8 +394,7 @@
broker.restartReceive();
for (int i=0; i<listenerThreadNumber; i++)
{
- ListenerThread myThread = new ListenerThread(this,
- changeNumberGenerator);
+ ListenerThread myThread = new ListenerThread(this);
myThread.start();
synchroThreads.add(myThread);
}
@@ -416,6 +430,155 @@
}
/**
+ * Implement the handleConflictResolution phase of the deleteOperation.
+ *
+ * @param deleteOperation The deleteOperation.
+ * @return A SynchronizationProviderResult indicating if the operation
+ * can continue.
+ */
+ public SynchronizationProviderResult handleConflictResolution(
+ DeleteOperation deleteOperation)
+ {
+ DeleteContext ctx =
+ (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
+ Entry deletedEntry = deleteOperation.getEntryToDelete();
+
+ if (ctx != null)
+ {
+ /*
+ * This is a synchronization operation
+ * Check that the modified entry has the same entryuuid
+ * has was in the original message.
+ */
+ String operationEntryUUID = ctx.getEntryUid();
+ String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
+ if (!operationEntryUUID.equals(modifiedEntryUUID))
+ {
+ /*
+ * The changes entry is not the same entry as the one on
+ * the original change was performed.
+ * Probably the original entry was renamed and replaced with
+ * another entry.
+ * We must not let the change proceed, return a negative
+ * result and set the result code to NO_SUCH_OBJET.
+ * When the operation will return, the thread that started the
+ * operation will try to find the correct entry and restart a new
+ * operation.
+ */
+ deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ }
+ else
+ {
+ // There is no Synchronization context attached to the operation
+ // so this is not a synchronization operation.
+ ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
+ String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
+ ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
+ deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
+ * Implement the handleConflictResolution phase of the addOperation.
+ *
+ * @param addOperation The AddOperation.
+ * @return A SynchronizationProviderResult indicating if the operation
+ * can continue.
+ */
+ public SynchronizationProviderResult handleConflictResolution(
+ AddOperation addOperation)
+ {
+ if (addOperation.isSynchronizationOperation())
+ {
+ AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
+ /*
+ * If an entry with the same entry uniqueID already exist then
+ * this operation has already been replayed in the past.
+ */
+ String uuid = ctx.getEntryUid();
+ if (findEntryDN(uuid) != null)
+ {
+ addOperation.setResultCode(ResultCode.SUCCESS);
+ return new SynchronizationProviderResult(false);
+ }
+ }
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
+ * Implement the handleConflictResolution phase of the ModifyDNOperation.
+ *
+ * @param modifyDNOperation The ModifyDNOperation.
+ * @return A SynchronizationProviderResult indicating if the operation
+ * can continue.
+ */
+ public SynchronizationProviderResult handleConflictResolution(
+ ModifyDNOperation modifyDNOperation)
+ {
+ ModifyDnContext ctx =
+ (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
+ if (ctx != null)
+ {
+ /*
+ * This is a synchronization operation
+ * Check that the modified entry has the same entryuuid
+ * as was in the original message.
+ */
+ String modifiedEntryUUID =
+ Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
+ if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
+ {
+ /*
+ * The modified entry is not the same entry as the one on
+ * the original change was performed.
+ * Probably the original entry was renamed and replaced with
+ * another entry.
+ * We must not let the change proceed, return a negative
+ * result and set the result code to NO_SUCH_OBJET.
+ * When the operation will return, the thread that started the
+ * operation will try to find the correct entry and restart a new
+ * operation.
+ */
+ modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ if (modifyDNOperation.getNewSuperior() != null)
+ {
+ /*
+ * Also check that the current id of the
+ * parent is the same as when the operation was performed.
+ */
+ String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
+ if (!newParentId.equals(ctx.getNewParentId()))
+ {
+ modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+ }
+ }
+ else
+ {
+ // There is no Synchronization context attached to the operation
+ // so this is not a synchronization operation.
+ ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
+ String newParentId = null;
+ if (modifyDNOperation.getNewSuperior() != null)
+ {
+ newParentId = findEntryId(modifyDNOperation.getNewSuperior());
+ }
+
+ Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
+ String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
+ ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
+ modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
+ return new SynchronizationProviderResult(true);
+ }
+
+ /**
* Handle the conflict resolution.
* Called by the core server after locking the entry and before
* starting the actual modification.
@@ -425,25 +588,43 @@
public SynchronizationProviderResult handleConflictResolution(
ModifyOperation modifyOperation)
{
- // If operation do not yet have a change number, generate it
- ChangeNumber changeNumber =
- (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
- if (changeNumber == null)
- {
- synchronized(pendingChanges)
- {
- changeNumber = changeNumberGenerator.NewChangeNumber();
- pendingChanges.put(changeNumber, new PendingChange(changeNumber,
- modifyOperation,
- null));
- }
- modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber);
- }
+ ModifyContext ctx =
+ (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
- // if Operation is a synchronization operation, solve conflicts
- if (modifyOperation.isSynchronizationOperation())
+ Entry modifiedEntry = modifyOperation.getModifiedEntry();
+ if (ctx == null)
{
- Entry modifiedEntry = modifyOperation.getModifiedEntry();
+ // There is no Synchronization context attached to the operation
+ // so this is not a synchronization operation.
+ ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
+ String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
+ ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
+ modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
+ else
+ {
+ String modifiedEntryUUID = ctx.getEntryUid();
+ String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
+ if (!currentEntryUUID.equals(modifiedEntryUUID))
+ {
+ /*
+ * The current modified entry is not the same entry as the one on
+ * the original modification was performed.
+ * Probably the original entry was renamed and replaced with
+ * another entry.
+ * We must not let the modification proceed, return a negative
+ * result and set the result code to NO_SUCH_OBJET.
+ * When the operation will return, the thread that started the
+ * operation will try to find the correct entry and restart a new
+ * operation.
+ */
+ modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+ return new SynchronizationProviderResult(false);
+ }
+
+ /*
+ * Solve the conflicts between modify operations
+ */
Historical historicalInformation = Historical.load(modifiedEntry);
modifyOperation.setAttachment(HISTORICAL, historicalInformation);
@@ -456,16 +637,29 @@
* stop the processing and send an OK result
*/
modifyOperation.setResultCode(ResultCode.SUCCESS);
- /*
- * TODO : check that post operation do get called and
- * that pendingChanges do get updated
- */
return new SynchronizationProviderResult(false);
}
}
return new SynchronizationProviderResult(true);
}
+ /**
+ * The preOperation phase for the add Operation.
+ * Its job is to generate the Synchronization context associated to the
+ * operation. It is necessary to do it in this phase because contrary to
+ * the other operations, the entry uid is not set when the handleConflict
+ * phase is called.
+ *
+ * @param addOperation The Add Operation.
+ */
+ public void doPreOperation(AddOperation addOperation)
+ {
+ AddContext ctx = new AddContext(generateChangeNumber(addOperation),
+ Historical.getEntryUuid(addOperation),
+ findEntryId(addOperation.getEntryDN().getParent()));
+
+ addOperation.setAttachment(SYNCHROCONTEXT, ctx);
+ }
/**
* Receive an update message from the changelog.
@@ -494,6 +688,7 @@
/**
* Do the necessary processing when an UpdateMessage was received.
+ *
* @param update The received UpdateMessage.
*/
public void receiveUpdate(UpdateMessage update)
@@ -548,49 +743,20 @@
{
numReplayedPostOpCalled++;
UpdateMessage msg = null;
- ChangeNumber curChangeNumber =
- (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
+ ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
- if (op.getResultCode() != ResultCode.SUCCESS)
- {
- if (curChangeNumber != null)
- {
- /*
- * This code can be executed by multiple threads
- * Since TreeMap is not synchronized, it is mandatory to synchronize
- * it now.
- */
- synchronized (pendingChanges)
- {
- pendingChanges.remove(curChangeNumber);
- }
- }
- return;
- }
+ ResultCode result = op.getResultCode();
+ boolean isAssured = isAssured(op);
- if (!op.isSynchronizationOperation())
+ if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
{
- switch (op.getOperationType())
+ msg = UpdateMessage.generateMsg(op, isAssured);
+
+ if (msg == null)
{
- case MODIFY :
- msg = new ModifyMsg((ModifyOperation) op);
- break;
- case ADD:
- msg = new AddMsg((AddOperation) op);
- break;
- case DELETE :
- msg = new DeleteMsg((DeleteOperation) op);
- break;
- case MODIFY_DN :
- msg = new ModifyDNMsg((ModifyDNOperation) op);
- break;
- default :
/*
* This is an operation type that we do not know about
- * It should never happen
- * This code can be executed by multiple threads
- * Since TreeMap is not synchronized, it is mandatory to synchronize
- * it now.
+ * It should never happen.
*/
synchronized (pendingChanges)
{
@@ -603,68 +769,44 @@
return;
}
}
- if (isAssured(op))
- {
- msg.setAssured();
- }
}
synchronized(pendingChanges)
{
- PendingChange curChange = pendingChanges.get(curChangeNumber);
- if (curChange == null)
+ if (result == ResultCode.SUCCESS)
{
- // This should never happen
- int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
- String message = getMessage(msgID, curChangeNumber.toString(),
- op.toString());
- logError(ErrorLogCategory.SYNCHRONIZATION,
- ErrorLogSeverity.SEVERE_ERROR,
- message, msgID);
- return;
- }
- curChange.setCommitted(true);
-
- if (op.isSynchronizationOperation())
- curChange.setOp(op);
- else
- curChange.setMsg(msg);
-
- ChangeNumber firstChangeNumber = pendingChanges.firstKey();
- PendingChange firstChange = pendingChanges.get(firstChangeNumber);
- ChangeNumber lastCommittedChangeNumber = null;
-
- if (!op.isSynchronizationOperation() && msg.isAssured())
- {
- waitingAckMsgs.put(curChangeNumber, msg);
- }
-
- while ((firstChange != null) && firstChange.isCommitted())
- {
- if (firstChange.getOp().isSynchronizationOperation() == false)
+ PendingChange curChange = pendingChanges.get(curChangeNumber);
+ if (curChange == null)
{
- numSentUpdates++;
- broker.publish(firstChange.getMsg());
+ // This should never happen
+ int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
+ String message = getMessage(msgID, curChangeNumber.toString(),
+ op.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ return;
}
+ curChange.setCommitted(true);
- lastCommittedChangeNumber = firstChange.getChangeNumber();
-
- pendingChanges.remove(lastCommittedChangeNumber);
- if (pendingChanges.isEmpty())
- {
- firstChange = null;
- }
+ if (op.isSynchronizationOperation())
+ curChange.setOp(op);
else
+ curChange.setMsg(msg);
+
+ if (!op.isSynchronizationOperation() && isAssured && (msg != null))
{
- firstChangeNumber = pendingChanges.firstKey();
- firstChange = pendingChanges.get(firstChangeNumber);
+ waitingAckMsgs.put(curChangeNumber, msg);
}
}
- if (lastCommittedChangeNumber != null)
- state.update(lastCommittedChangeNumber);
+ else if (!op.isSynchronizationOperation())
+ pendingChanges.remove(curChangeNumber);
+
+ pushCommittedChanges();
}
- if (!op.isSynchronizationOperation() && msg.isAssured())
+ if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null)
+ && (result == ResultCode.SUCCESS))
{
synchronized (msg)
{
@@ -683,19 +825,6 @@
}
/**
- * Check if an operation must be processed as an assured operation.
- *
- * @param op the operation to be checked.
- * @return true if the operations must be processed as an assured operation.
- */
- private boolean isAssured(Operation op)
- {
- // TODO : should have a filtering mechanism for checking
- // operation that are assured and operations that are not.
- return assuredFlag;
- }
-
- /**
* get the number of updates received by the synchronization plugin.
*
* @return the number of updates received
@@ -790,27 +919,6 @@
}
/**
- * Generate and set the ChangeNumber of a given Operation.
- *
- * @param operation The Operation for which the ChangeNumber must be set.
- */
- public void setChangeNumber(Operation operation)
- {
- ChangeNumber changeNumber =
- (ChangeNumber) operation.getAttachment(SYNCHRONIZATION);
- if (changeNumber == null)
- {
- synchronized(pendingChanges)
- {
- changeNumber = changeNumberGenerator.NewChangeNumber();
- pendingChanges.put(changeNumber, new PendingChange(changeNumber,
- operation, null));
- }
- operation.setAttachment(SYNCHRONIZATION, changeNumber);
- }
- }
-
- /**
* {@inheritDoc}
*/
@Override
@@ -826,7 +934,7 @@
synchroThreads = new ArrayList<ListenerThread>();
for (int i=0; i<10; i++)
{
- ListenerThread myThread = new ListenerThread(this, changeNumberGenerator);
+ ListenerThread myThread = new ListenerThread(this);
myThread.start();
synchroThreads.add(myThread);
}
@@ -913,6 +1021,126 @@
}
/**
+ * Create and replay a synchronized Operation from an UpdateMessage.
+ *
+ * @param msg The UpdateMessage to be replayed.
+ */
+ public void replay(UpdateMessage msg)
+ {
+ Operation op = null;
+ boolean done = false;
+ ChangeNumber changeNumber = null;
+
+ try
+ {
+ while (!done)
+ {
+ op = msg.createOperation(conn);
+
+ op.setInternalOperation(true);
+ op.setSynchronizationOperation(true);
+ changeNumber = OperationContext.getChangeNumber(op);
+ if (changeNumber != null)
+ changeNumberGenerator.adjust(changeNumber);
+
+ op.run();
+
+ ResultCode result = op.getResultCode();
+ if (result != ResultCode.SUCCESS)
+ {
+ if (op instanceof ModifyOperation)
+ {
+ ModifyOperation newOp = (ModifyOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ else if (op instanceof DeleteOperation)
+ {
+ DeleteOperation newOp = (DeleteOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ else if (op instanceof AddOperation)
+ {
+ AddOperation newOp = (AddOperation) op;
+ done = solveNamingConflict(newOp, msg);
+
+ } else if (op instanceof ModifyDNOperation)
+ {
+ ModifyDNOperation newOp = (ModifyDNOperation) op;
+ done = solveNamingConflict(newOp, msg);
+ }
+ else
+ {
+ done = true; // unknown type of operation ?!
+ }
+ }
+ else
+ {
+ done = true;
+ }
+ }
+ }
+ catch (ASN1Exception e)
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, msg) +
+ stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ catch (LDAPException e)
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, msg) +
+ stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ catch (DataFormatException e)
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, msg) +
+ stackTraceToSingleLineString(e);
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR,
+ message, msgID);
+ }
+ catch (Exception e)
+ {
+ if (changeNumber != null)
+ {
+ /*
+ * An Exception happened during the replay process.
+ * Continue with the next change but the servers will know start
+ * to be inconsistent.
+ * TODO : REPAIR : Should let the repair tool know about this
+ */
+ int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e),
+ op.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ updateError(changeNumber);
+ }
+ else
+ {
+ int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+ String message = getMessage(msgID, stackTraceToSingleLineString(e),
+ msg.toString());
+ logError(ErrorLogCategory.SYNCHRONIZATION,
+ ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+ }
+ }
+ finally
+ {
+ if (msg.isAssured())
+ ack(msg.getChangeNumber());
+ incProcessedUpdates();
+ }
+ }
+
+ /**
* This methods is called when an error happends while replaying
* and operation.
* It is necessary because the postOPeration does not always get
@@ -925,6 +1153,434 @@
synchronized (pendingChanges)
{
pendingChanges.remove(changeNumber);
+ pushCommittedChanges();
}
}
+
+ /**
+ * Generate a new change number and insert it in the pending list.
+ *
+ * @param operation The operation for which the change number must be
+ * generated.
+ * @return The new change number.
+ */
+ private ChangeNumber generateChangeNumber(Operation operation)
+ {
+ ChangeNumber changeNumber;
+ synchronized(pendingChanges)
+ {
+ changeNumber = changeNumberGenerator.NewChangeNumber();
+ pendingChanges.put(changeNumber,
+ new PendingChange(changeNumber, operation, null));
+ }
+ return changeNumber;
+ }
+
+
+ /**
+ * Find the Unique Id of the entry with the provided DN by doing a
+ * search of the entry and extracting its uniqueID from its attributes.
+ *
+ * @param dn The dn of the entry for which the unique Id is searched.
+ *
+ * @return The unique Id of the entry whith the provided DN.
+ */
+ private String findEntryId(DN dn)
+ {
+ if (dn == null)
+ return null;
+ try
+ {
+ LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+ attrs.add(ENTRYUIDNAME);
+ InternalSearchOperation search = conn.processSearch(dn,
+ SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, 0, false,
+ SearchFilter.createFilterFromString("objectclass=*"),
+ attrs);
+
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ if (!result.isEmpty())
+ {
+ SearchResultEntry resultEntry = result.getFirst();
+ if (resultEntry != null)
+ {
+ return Historical.getEntryUuid(resultEntry);
+ }
+ }
+ }
+ } catch (DirectoryException e)
+ {
+ // never happens because the filter is always valid.
+ }
+ return null;
+ }
+
+ /**
+ * find the current dn of an entry from its entry uuid.
+ *
+ * @param uuid the Entry Unique ID.
+ * @return The curernt dn of the entry or null if there is no entry with
+ * the specified uuid.
+ */
+ private DN findEntryDN(String uuid)
+ {
+ try
+ {
+ InternalSearchOperation search = conn.processSearch(baseDN,
+ SearchScope.WHOLE_SUBTREE,
+ SearchFilter.createFilterFromString("entryuuid="+uuid));
+ if (search.getResultCode() == ResultCode.SUCCESS)
+ {
+ LinkedList<SearchResultEntry> result = search.getSearchEntries();
+ if (!result.isEmpty())
+ {
+ SearchResultEntry resultEntry = result.getFirst();
+ if (resultEntry != null)
+ {
+ return resultEntry.getDN();
+ }
+ }
+ }
+ } catch (DirectoryException e)
+ {
+ // never happens because the filter is always valid.
+ }
+ return null;
+ }
+
+ /**
+ * Solve a conflict detected when replaying a modify operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue..
+ */
+ private boolean solveNamingConflict(ModifyOperation op,
+ UpdateMessage msg)
+ {
+ ResultCode result = op.getResultCode();
+ ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ /*
+ * This error may happen the operation is a modification but
+ * the entry had been renamed on a different master in the same time.
+ * search if the entry has been renamed, and return the new dn
+ * of the entry.
+ */
+ msg.setDn(findEntryDN(entryUid).toString());
+ return false;
+ }
+ return true;
+ }
+
+ /** Solve a conflict detected when replaying a delete operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue..
+ */
+ private boolean solveNamingConflict(DeleteOperation op,
+ UpdateMessage msg)
+ {
+ ResultCode result = op.getResultCode();
+ DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ /*
+ * Find if the entry is still in the database.
+ */
+ DN currentDn = findEntryDN(entryUid);
+ if (currentDn == null)
+ {
+ /*
+ * The entry has already been deleted, either because this delete
+ * has already been replayed or because another concurrent delete
+ * has already done the job.
+ * In any case, there is is nothing more to do.
+ */
+ return true;
+ }
+ else
+ {
+ /*
+ * This entry has been renamed, replay the delete using its new DN.
+ */
+ msg.setDn(currentDn.toString());
+ return false;
+ }
+ }
+ else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
+ {
+ /*
+ * This may happen when we replay a DELETE done on a master
+ * but children of this entry have been added on another master.
+ */
+
+ /*
+ * TODO : either delete all the childs or rename the child below
+ * the top suffix by adding entryuuid in dn and delete this entry.
+ */
+ }
+ return true;
+ }
+
+ /**
+ * Solve a conflict detected when replaying a ADD operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue.
+ * @throws Exception When the operation is not valid.
+ */
+ private boolean solveNamingConflict(AddOperation op,
+ UpdateMessage msg) throws Exception
+ {
+ ResultCode result = op.getResultCode();
+ AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+ String parentUniqueId = ctx.getParentUid();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ /*
+ * This can happen if the parent has been renamed or deleted
+ * find the parent dn and calculate a new dn for the entry
+ */
+ if (parentUniqueId == null)
+ {
+ /*
+ * This entry is the base dn of the backend.
+ * It is quite weird that the operation result be NO_SUCH_OBJECT.
+ * There is notthing more we can do except TODO log a
+ * message for the repair tool to look at this problem.
+ */
+ return true;
+ }
+ DN parentDn = findEntryDN(parentUniqueId);
+ if (parentDn == null)
+ {
+ /*
+ * The parent has been deleted, so this entry should not
+ * exist don't do the ADD.
+ */
+ return true;
+ }
+ else
+ {
+ RDN entryRdn = op.getEntryDN().getRDN();
+ msg.setDn(parentDn + "," + entryRdn);
+ return false;
+ }
+ }
+ else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
+ {
+ /*
+ * This can happen if
+ * - two adds are done on different servers but with the
+ * same target DN.
+ * - the same ADD is being replayed for the second time on this server.
+ * if the nsunique ID already exist, assume this is a replay and
+ * don't do anything
+ * if the entry unique id do not exist, generate conflict.
+ */
+ if (findEntryDN(entryUid) != null)
+ {
+ // entry already exist : this is a replay
+ return true;
+ }
+ else
+ {
+ addConflict(op);
+ msg.setDn(generateConflictDn(entryUid, msg.getDn()));
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Solve a conflict detected when replaying a Modify DN operation.
+ *
+ * @param op The operation that triggered the conflict detection.
+ * @param msg The operation that triggered the conflict detection.
+ * @return true if the process is completed, false if it must continue.
+ * @throws Exception When the operation is not valid.
+ */
+ private boolean solveNamingConflict(ModifyDNOperation op,
+ UpdateMessage msg) throws Exception
+ {
+ ResultCode result = op.getResultCode();
+ ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
+ String entryUid = ctx.getEntryUid();
+ String newSuperiorID = ctx.getNewParentId();
+
+ if (result == ResultCode.NO_SUCH_OBJECT)
+ {
+ ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
+
+ /*
+ * four possible cases :
+ * - the modified entry has been renamed
+ * - the new parent has been renamed
+ * - the operation is replayed for the second time.
+ * - the entry has been deleted
+ * action :
+ * - change the target dn and the new parent dn and
+ * restart the operation,
+ * - don't do anything if the operation is replayed.
+ */
+
+ // Construct the new DN to use for the entry.
+ DN entryDN = op.getEntryDN();
+ DN newSuperior = findEntryDN(newSuperiorID);
+ RDN newRDN = op.getNewRDN();
+ DN parentDN;
+
+ if (newSuperior == null)
+ {
+ parentDN = entryDN.getParent();
+ }
+ else
+ {
+ parentDN = newSuperior;
+ }
+
+ if ((parentDN == null) || parentDN.isNullDN())
+ {
+ /* this should never happen
+ * can't solve any conflict in this case.
+ */
+ throw new Exception("operation parameters are invalid");
+ }
+
+ RDN[] parentComponents = parentDN.getRDNComponents();
+ RDN[] newComponents = new RDN[parentComponents.length+1];
+ System.arraycopy(parentComponents, 0, newComponents, 1,
+ parentComponents.length);
+ newComponents[0] = newRDN;
+
+ DN newDN = new DN(newComponents);
+
+ // get the current DN of this entry in the database.
+ DN currentDN = findEntryDN(entryUid);
+
+ // if the newDN and the current DN match then the operation
+ // is a no-op (this was probably a second replay)
+ // don't do anything.
+ if (newDN.equals(currentDN))
+ {
+ return true;
+ }
+
+ msg.setDn(currentDN.toString());
+ modifyDnMsg.setNewSuperior(newSuperior.toString());
+ return false;
+ }
+ else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
+ {
+ /*
+ * This may happen when two modifyDn operation
+ * are done on different servers but with the same target DN
+ * add the conflict object class to the entry
+ * and rename it using its entryuuid.
+ */
+ generateAddConflictOp(op);
+ msg.setDn(generateConflictDn(entryUid, msg.getDn()));
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Generate a modification to add the conflict ObjectClass to an entry
+ * whose Dn is now conflicting with another entry.
+ *
+ * @param op The operation causing the conflict.
+ */
+ private void generateAddConflictOp(ModifyDNOperation op)
+ {
+ // TODO
+ }
+
+ /**
+ * Add the conflict object class to an entry that could
+ * not be added because it is conflicting with another entry.
+ *
+ * @param addOp The conflicting Add Operation.
+ */
+ private void addConflict(AddOperation addOp)
+ {
+ /*
+ * TODO
+ */
+ }
+
+ /**
+ * Generate the Dn to use for a conflicting entry.
+ *
+ * @param op Operation that generated the conflict
+ * @param dn Original dn.
+ * @return The generated Dn for a conflicting entry.
+ */
+ private String generateConflictDn(String entryUid, String dn)
+ {
+ return dn + "entryuuid=" + entryUid;
+ }
+
+ /**
+ * Check if an operation must be processed as an assured operation.
+ *
+ * @param op the operation to be checked.
+ * @return true if the operations must be processed as an assured operation.
+ */
+ private boolean isAssured(Operation op)
+ {
+ // TODO : should have a filtering mechanism for checking
+ // operation that are assured and operations that are not.
+ return false;
+ }
+
+ /**
+ * Push all committed local changes to the changelog service.
+ * PRECONDITION : The pendingChanges lock must be held before calling
+ * this method.
+ */
+ private void pushCommittedChanges()
+ {
+ if (pendingChanges.isEmpty())
+ return;
+
+ ChangeNumber firstChangeNumber = pendingChanges.firstKey();
+ PendingChange firstChange = pendingChanges.get(firstChangeNumber);
+
+ while ((firstChange != null) && firstChange.isCommitted())
+ {
+ if (firstChange.getOp().isSynchronizationOperation() == false)
+ {
+ numSentUpdates++;
+ broker.publish(firstChange.getMsg());
+ }
+ state.update(firstChangeNumber);
+ pendingChanges.remove(firstChangeNumber);
+
+ if (pendingChanges.isEmpty())
+ {
+ firstChange = null;
+ }
+ else
+ {
+ firstChangeNumber = pendingChanges.firstKey();
+ firstChange = pendingChanges.get(firstChangeNumber);
+ }
+ }
+ }
+
}
--
Gitblit v1.10.0