From b5acb25ee2ad9bf8b166b9de1a34e6aab6ea23b7 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Sep 2006 12:04:47 +0000
Subject: [PATCH] issue 604 : solve the naming conflict that might happen when several masters are used there are 3 main parts in this commit : - attach the replication context in an OperationContext - if operation replay fails then fix the problem  - in the pre-op checks for conflict and cause failure if necessary most of the time there should be no conflict and the operation should be processed normally

---
 opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java |  948 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 802 insertions(+), 146 deletions(-)

diff --git a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
index a37fe14..428c528 100644
--- a/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
+++ b/opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -26,15 +26,21 @@
  */
 package org.opends.server.synchronization;
 
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 import static org.opends.server.util.TimeThread.getTime;
 import static org.opends.server.synchronization.SynchMessages.*;
 import static org.opends.server.loggers.Error.*;
 import static org.opends.server.messages.MessageHandler.*;
+import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
+import static org.opends.server.synchronization.Historical.*;
 
 import java.util.ArrayList;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.TreeMap;
+import java.util.zip.DataFormatException;
 
 import org.opends.server.api.ConfigurableComponent;
 import org.opends.server.api.DirectoryThread;
@@ -47,17 +53,27 @@
 import org.opends.server.config.StringConfigAttribute;
 import org.opends.server.core.AddOperation;
 import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.DirectoryException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.ModifyDNOperation;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.Operation;
 import org.opends.server.messages.MessageHandler;
+import org.opends.server.protocols.asn1.ASN1Exception;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.protocols.ldap.LDAPException;
 import org.opends.server.types.ConfigChangeResult;
 import org.opends.server.types.DN;
+import org.opends.server.types.DereferencePolicy;
 import org.opends.server.types.Entry;
 import org.opends.server.types.ErrorLogCategory;
 import org.opends.server.types.ErrorLogSeverity;
+import org.opends.server.types.RDN;
 import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchScope;
 import org.opends.server.types.SynchronizationProviderResult;
 
 /**
@@ -88,9 +104,6 @@
   private ServerState state;
   private int numReplayedPostOpCalled = 0;
 
-  private boolean assuredFlag = false;
-
-
   private int maxReceiveQueue = 0;
   private int maxSendQueue = 0;
   private int maxReceiveDelay = 0;
@@ -114,6 +127,8 @@
 
   private DN configDn;
 
+  private InternalClientConnection conn = new InternalClientConnection();
+
   static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
   static String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
   static String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -379,8 +394,7 @@
           broker.restartReceive();
           for (int i=0; i<listenerThreadNumber; i++)
           {
-            ListenerThread myThread = new ListenerThread(this,
-                                                         changeNumberGenerator);
+            ListenerThread myThread = new ListenerThread(this);
             myThread.start();
             synchroThreads.add(myThread);
           }
@@ -416,6 +430,155 @@
   }
 
   /**
+   * Implement the  handleConflictResolution phase of the deleteOperation.
+   *
+   * @param deleteOperation The deleteOperation.
+   * @return A SynchronizationProviderResult indicating if the operation
+   *         can continue.
+   */
+  public SynchronizationProviderResult handleConflictResolution(
+      DeleteOperation deleteOperation)
+  {
+    DeleteContext ctx =
+      (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
+    Entry deletedEntry = deleteOperation.getEntryToDelete();
+
+    if (ctx != null)
+    {
+      /*
+       * This is a synchronization operation
+       * Check that the modified entry has the same entryuuid
+       * has was in the original message.
+       */
+      String operationEntryUUID = ctx.getEntryUid();
+      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
+      if (!operationEntryUUID.equals(modifiedEntryUUID))
+      {
+        /*
+         * The changes entry is not the same entry as the one on
+         * the original change was performed.
+         * Probably the original entry was renamed and replaced with
+         * another entry.
+         * We must not let the change proceed, return a negative
+         * result and set the result code to NO_SUCH_OBJET.
+         * When the operation will return, the thread that started the
+         * operation will try to find the correct entry and restart a new
+         * operation.
+         */
+        deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+        return new SynchronizationProviderResult(false);
+      }
+    }
+    else
+    {
+      // There is no Synchronization context attached to the operation
+      // so this is not a synchronization operation.
+      ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
+      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
+      ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
+      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
+    }
+    return new SynchronizationProviderResult(true);
+  }
+
+  /**
+   * Implement the  handleConflictResolution phase of the addOperation.
+   *
+   * @param addOperation The AddOperation.
+   * @return A SynchronizationProviderResult indicating if the operation
+   *         can continue.
+   */
+  public SynchronizationProviderResult handleConflictResolution(
+      AddOperation addOperation)
+  {
+    if (addOperation.isSynchronizationOperation())
+    {
+      AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
+      /*
+       * If an entry with the same entry uniqueID already exist then
+       * this operation has already been replayed in the past.
+       */
+      String uuid = ctx.getEntryUid();
+      if (findEntryDN(uuid) != null)
+      {
+        addOperation.setResultCode(ResultCode.SUCCESS);
+        return new SynchronizationProviderResult(false);
+      }
+    }
+    return new SynchronizationProviderResult(true);
+  }
+
+  /**
+   * Implement the  handleConflictResolution phase of the ModifyDNOperation.
+   *
+   * @param modifyDNOperation The ModifyDNOperation.
+   * @return A SynchronizationProviderResult indicating if the operation
+   *         can continue.
+   */
+  public SynchronizationProviderResult handleConflictResolution(
+      ModifyDNOperation modifyDNOperation)
+  {
+    ModifyDnContext ctx =
+      (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
+    if (ctx != null)
+    {
+      /*
+       * This is a synchronization operation
+       * Check that the modified entry has the same entryuuid
+       * as was in the original message.
+       */
+      String modifiedEntryUUID =
+        Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
+      if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
+      {
+        /*
+         * The modified entry is not the same entry as the one on
+         * the original change was performed.
+         * Probably the original entry was renamed and replaced with
+         * another entry.
+         * We must not let the change proceed, return a negative
+         * result and set the result code to NO_SUCH_OBJET.
+         * When the operation will return, the thread that started the
+         * operation will try to find the correct entry and restart a new
+         * operation.
+         */
+        modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+        return new SynchronizationProviderResult(false);
+      }
+      if (modifyDNOperation.getNewSuperior() != null)
+      {
+        /*
+         * Also check that the current id of the
+         * parent is the same as when the operation was performed.
+         */
+        String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
+        if (!newParentId.equals(ctx.getNewParentId()))
+        {
+          modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+          return new SynchronizationProviderResult(false);
+        }
+      }
+    }
+    else
+    {
+      // There is no Synchronization context attached to the operation
+      // so this is not a synchronization operation.
+      ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
+      String newParentId = null;
+      if (modifyDNOperation.getNewSuperior() != null)
+      {
+        newParentId = findEntryId(modifyDNOperation.getNewSuperior());
+      }
+
+      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
+      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
+      ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
+      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
+    }
+    return new SynchronizationProviderResult(true);
+  }
+
+  /**
    * Handle the conflict resolution.
    * Called by the core server after locking the entry and before
    * starting the actual modification.
@@ -425,25 +588,43 @@
   public SynchronizationProviderResult handleConflictResolution(
                                                 ModifyOperation modifyOperation)
   {
-    //  If operation do not yet have a change number, generate it
-    ChangeNumber changeNumber =
-      (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
-    if (changeNumber == null)
-    {
-      synchronized(pendingChanges)
-      {
-        changeNumber = changeNumberGenerator.NewChangeNumber();
-        pendingChanges.put(changeNumber, new PendingChange(changeNumber,
-                                                           modifyOperation,
-                                                           null));
-      }
-      modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber);
-    }
+    ModifyContext ctx =
+      (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
 
-    // if Operation is a synchronization operation, solve conflicts
-    if (modifyOperation.isSynchronizationOperation())
+    Entry modifiedEntry = modifyOperation.getModifiedEntry();
+    if (ctx == null)
     {
-      Entry modifiedEntry = modifyOperation.getModifiedEntry();
+      // There is no Synchronization context attached to the operation
+      // so this is not a synchronization operation.
+      ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
+      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
+      ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
+      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
+    }
+    else
+    {
+      String modifiedEntryUUID = ctx.getEntryUid();
+      String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
+      if (!currentEntryUUID.equals(modifiedEntryUUID))
+      {
+        /*
+         * The current modified entry is not the same entry as the one on
+         * the original modification was performed.
+         * Probably the original entry was renamed and replaced with
+         * another entry.
+         * We must not let the modification proceed, return a negative
+         * result and set the result code to NO_SUCH_OBJET.
+         * When the operation will return, the thread that started the
+         * operation will try to find the correct entry and restart a new
+         * operation.
+         */
+        modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
+        return new SynchronizationProviderResult(false);
+      }
+
+      /*
+       * Solve the conflicts between modify operations
+       */
       Historical historicalInformation = Historical.load(modifiedEntry);
       modifyOperation.setAttachment(HISTORICAL, historicalInformation);
 
@@ -456,16 +637,29 @@
          * stop the processing and send an OK result
          */
         modifyOperation.setResultCode(ResultCode.SUCCESS);
-        /*
-         * TODO : check that post operation do get called and
-         * that pendingChanges do get updated
-         */
         return new SynchronizationProviderResult(false);
       }
     }
     return new SynchronizationProviderResult(true);
   }
 
+  /**
+   * The preOperation phase for the add Operation.
+   * Its job is to generate the Synchronization context associated to the
+   * operation. It is necessary to do it in this phase because contrary to
+   * the other operations, the entry uid is not set when the handleConflict
+   * phase is called.
+   *
+   * @param addOperation The Add Operation.
+   */
+  public void doPreOperation(AddOperation addOperation)
+  {
+    AddContext ctx = new AddContext(generateChangeNumber(addOperation),
+        Historical.getEntryUuid(addOperation),
+        findEntryId(addOperation.getEntryDN().getParent()));
+
+    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
+  }
 
   /**
    * Receive an update message from the changelog.
@@ -494,6 +688,7 @@
 
   /**
    * Do the necessary processing when an UpdateMessage was received.
+   *
    * @param update The received UpdateMessage.
    */
   public void receiveUpdate(UpdateMessage update)
@@ -548,49 +743,20 @@
   {
     numReplayedPostOpCalled++;
     UpdateMessage msg = null;
-    ChangeNumber curChangeNumber =
-      (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
+    ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
 
-    if (op.getResultCode() != ResultCode.SUCCESS)
-    {
-      if (curChangeNumber != null)
-      {
-        /*
-         * This code can be executed by multiple threads
-         * Since TreeMap is not synchronized, it is mandatory to synchronize
-         * it now.
-         */
-        synchronized (pendingChanges)
-        {
-          pendingChanges.remove(curChangeNumber);
-        }
-      }
-      return;
-    }
+    ResultCode result = op.getResultCode();
+    boolean isAssured = isAssured(op);
 
-    if (!op.isSynchronizationOperation())
+    if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
     {
-      switch (op.getOperationType())
+      msg = UpdateMessage.generateMsg(op, isAssured);
+
+      if (msg == null)
       {
-      case MODIFY :
-        msg = new ModifyMsg((ModifyOperation) op);
-        break;
-      case ADD:
-        msg = new AddMsg((AddOperation) op);
-        break;
-      case DELETE :
-        msg = new DeleteMsg((DeleteOperation) op);
-        break;
-      case MODIFY_DN :
-        msg = new ModifyDNMsg((ModifyDNOperation) op);
-        break;
-      default :
         /*
          * This is an operation type that we do not know about
-         * It should never happen
-         * This code can be executed by multiple threads
-         * Since TreeMap is not synchronized, it is mandatory to synchronize
-         * it now.
+         * It should never happen.
          */
         synchronized (pendingChanges)
         {
@@ -603,68 +769,44 @@
           return;
         }
       }
-      if (isAssured(op))
-      {
-        msg.setAssured();
-      }
     }
 
     synchronized(pendingChanges)
     {
-      PendingChange curChange = pendingChanges.get(curChangeNumber);
-      if (curChange == null)
+      if (result == ResultCode.SUCCESS)
       {
-        // This should never happen
-        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
-        String message = getMessage(msgID, curChangeNumber.toString(),
-                                    op.toString());
-        logError(ErrorLogCategory.SYNCHRONIZATION,
-                 ErrorLogSeverity.SEVERE_ERROR,
-                 message, msgID);
-        return;
-      }
-      curChange.setCommitted(true);
-
-      if (op.isSynchronizationOperation())
-        curChange.setOp(op);
-      else
-        curChange.setMsg(msg);
-
-      ChangeNumber firstChangeNumber = pendingChanges.firstKey();
-      PendingChange firstChange = pendingChanges.get(firstChangeNumber);
-      ChangeNumber lastCommittedChangeNumber = null;
-
-      if (!op.isSynchronizationOperation() && msg.isAssured())
-      {
-        waitingAckMsgs.put(curChangeNumber, msg);
-      }
-
-      while ((firstChange != null) && firstChange.isCommitted())
-      {
-        if (firstChange.getOp().isSynchronizationOperation() == false)
+        PendingChange curChange = pendingChanges.get(curChangeNumber);
+        if (curChange == null)
         {
-          numSentUpdates++;
-          broker.publish(firstChange.getMsg());
+          // This should never happen
+          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
+          String message = getMessage(msgID, curChangeNumber.toString(),
+              op.toString());
+          logError(ErrorLogCategory.SYNCHRONIZATION,
+              ErrorLogSeverity.SEVERE_ERROR,
+              message, msgID);
+          return;
         }
+        curChange.setCommitted(true);
 
-        lastCommittedChangeNumber = firstChange.getChangeNumber();
-
-        pendingChanges.remove(lastCommittedChangeNumber);
-        if (pendingChanges.isEmpty())
-        {
-          firstChange = null;
-        }
+        if (op.isSynchronizationOperation())
+          curChange.setOp(op);
         else
+          curChange.setMsg(msg);
+
+        if (!op.isSynchronizationOperation() && isAssured && (msg != null))
         {
-          firstChangeNumber = pendingChanges.firstKey();
-          firstChange = pendingChanges.get(firstChangeNumber);
+          waitingAckMsgs.put(curChangeNumber, msg);
         }
       }
-      if (lastCommittedChangeNumber != null)
-        state.update(lastCommittedChangeNumber);
+      else if (!op.isSynchronizationOperation())
+        pendingChanges.remove(curChangeNumber);
+
+      pushCommittedChanges();
     }
 
-    if (!op.isSynchronizationOperation() && msg.isAssured())
+    if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null)
+        && (result == ResultCode.SUCCESS))
     {
       synchronized (msg)
       {
@@ -683,19 +825,6 @@
   }
 
   /**
-   * Check if an operation must be processed as an assured operation.
-   *
-   * @param op the operation to be checked.
-   * @return true if the operations must be processed as an assured operation.
-   */
-  private boolean isAssured(Operation op)
-  {
-    // TODO : should have a filtering mechanism for checking
-    // operation that are assured and operations that are not.
-    return assuredFlag;
-  }
-
-  /**
    * get the number of updates received by the synchronization plugin.
    *
    * @return the number of updates received
@@ -790,27 +919,6 @@
   }
 
   /**
-   * Generate and set the ChangeNumber of a given Operation.
-   *
-   * @param operation The Operation for which the ChangeNumber must be set.
-   */
-  public void setChangeNumber(Operation operation)
-  {
-    ChangeNumber changeNumber =
-      (ChangeNumber) operation.getAttachment(SYNCHRONIZATION);
-    if (changeNumber == null)
-    {
-      synchronized(pendingChanges)
-      {
-        changeNumber = changeNumberGenerator.NewChangeNumber();
-        pendingChanges.put(changeNumber, new PendingChange(changeNumber,
-            operation, null));
-      }
-      operation.setAttachment(SYNCHRONIZATION, changeNumber);
-    }
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -826,7 +934,7 @@
     synchroThreads = new ArrayList<ListenerThread>();
     for (int i=0; i<10; i++)
     {
-      ListenerThread myThread = new ListenerThread(this, changeNumberGenerator);
+      ListenerThread myThread = new ListenerThread(this);
       myThread.start();
       synchroThreads.add(myThread);
     }
@@ -913,6 +1021,126 @@
   }
 
   /**
+   * Create and replay a synchronized Operation from an UpdateMessage.
+   *
+   * @param msg The UpdateMessage to be replayed.
+   */
+  public void replay(UpdateMessage msg)
+  {
+    Operation op = null;
+    boolean done = false;
+    ChangeNumber changeNumber = null;
+
+    try
+    {
+      while (!done)
+      {
+        op = msg.createOperation(conn);
+
+        op.setInternalOperation(true);
+        op.setSynchronizationOperation(true);
+        changeNumber = OperationContext.getChangeNumber(op);
+        if (changeNumber != null)
+          changeNumberGenerator.adjust(changeNumber);
+
+        op.run();
+
+        ResultCode result = op.getResultCode();
+        if (result != ResultCode.SUCCESS)
+        {
+          if (op instanceof ModifyOperation)
+          {
+            ModifyOperation newOp = (ModifyOperation) op;
+            done = solveNamingConflict(newOp, msg);
+          }
+          else if (op instanceof DeleteOperation)
+          {
+            DeleteOperation newOp = (DeleteOperation) op;
+            done = solveNamingConflict(newOp, msg);
+          }
+          else if (op instanceof AddOperation)
+          {
+            AddOperation newOp = (AddOperation) op;
+            done = solveNamingConflict(newOp, msg);
+
+          } else if (op instanceof ModifyDNOperation)
+          {
+            ModifyDNOperation newOp = (ModifyDNOperation) op;
+            done = solveNamingConflict(newOp, msg);
+          }
+          else
+          {
+            done = true;  // unknown type of operation ?!
+          }
+        }
+        else
+        {
+          done = true;
+        }
+      }
+    }
+    catch (ASN1Exception e)
+    {
+      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+      String message = getMessage(msgID, msg) +
+      stackTraceToSingleLineString(e);
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+          ErrorLogSeverity.SEVERE_ERROR,
+          message, msgID);
+    }
+    catch (LDAPException e)
+    {
+      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+      String message = getMessage(msgID, msg) +
+      stackTraceToSingleLineString(e);
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+          ErrorLogSeverity.SEVERE_ERROR,
+          message, msgID);
+    }
+    catch (DataFormatException e)
+    {
+      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+      String message = getMessage(msgID, msg) +
+      stackTraceToSingleLineString(e);
+      logError(ErrorLogCategory.SYNCHRONIZATION,
+          ErrorLogSeverity.SEVERE_ERROR,
+          message, msgID);
+    }
+    catch (Exception e)
+    {
+      if (changeNumber != null)
+      {
+        /*
+         * An Exception happened during the replay process.
+         * Continue with the next change but the servers will know start
+         * to be inconsistent.
+         * TODO : REPAIR : Should let the repair tool know about this
+         */
+        int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
+        String message = getMessage(msgID, stackTraceToSingleLineString(e),
+            op.toString());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+        updateError(changeNumber);
+      }
+      else
+      {
+        int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
+        String message = getMessage(msgID, stackTraceToSingleLineString(e),
+            msg.toString());
+        logError(ErrorLogCategory.SYNCHRONIZATION,
+            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
+      }
+    }
+    finally
+    {
+      if (msg.isAssured())
+        ack(msg.getChangeNumber());
+      incProcessedUpdates();
+    }
+  }
+
+  /**
    * This methods is called when an error happends while replaying
    * and operation.
    * It is necessary because the postOPeration does not always get
@@ -925,6 +1153,434 @@
     synchronized (pendingChanges)
     {
       pendingChanges.remove(changeNumber);
+      pushCommittedChanges();
     }
   }
+
+  /**
+   * Generate a new change number and insert it in the pending list.
+   *
+   * @param operation The operation for which the change number must be
+   *                  generated.
+   * @return The new change number.
+   */
+  private ChangeNumber generateChangeNumber(Operation operation)
+  {
+    ChangeNumber changeNumber;
+    synchronized(pendingChanges)
+    {
+      changeNumber = changeNumberGenerator.NewChangeNumber();
+      pendingChanges.put(changeNumber,
+          new PendingChange(changeNumber, operation, null));
+    }
+    return changeNumber;
+  }
+
+
+  /**
+   * Find the Unique Id of the entry with the provided DN by doing a
+   * search of the entry and extracting its uniqueID from its attributes.
+   *
+   * @param dn The dn of the entry for which the unique Id is searched.
+   *
+   * @return The unique Id of the entry whith the provided DN.
+   */
+  private String findEntryId(DN dn)
+  {
+    if (dn == null)
+      return null;
+    try
+    {
+      LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
+      attrs.add(ENTRYUIDNAME);
+      InternalSearchOperation search = conn.processSearch(dn,
+            SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
+            0, 0, false,
+            SearchFilter.createFilterFromString("objectclass=*"),
+            attrs);
+
+      if (search.getResultCode() == ResultCode.SUCCESS)
+      {
+        LinkedList<SearchResultEntry> result = search.getSearchEntries();
+        if (!result.isEmpty())
+        {
+          SearchResultEntry resultEntry = result.getFirst();
+          if (resultEntry != null)
+          {
+            return Historical.getEntryUuid(resultEntry);
+          }
+        }
+      }
+    } catch (DirectoryException e)
+    {
+      // never happens because the filter is always valid.
+    }
+    return null;
+  }
+
+  /**
+   * find the current dn of an entry from its entry uuid.
+   *
+   * @param uuid the Entry Unique ID.
+   * @return The curernt dn of the entry or null if there is no entry with
+   *         the specified uuid.
+   */
+  private DN findEntryDN(String uuid)
+  {
+    try
+    {
+      InternalSearchOperation search = conn.processSearch(baseDN,
+            SearchScope.WHOLE_SUBTREE,
+            SearchFilter.createFilterFromString("entryuuid="+uuid));
+      if (search.getResultCode() == ResultCode.SUCCESS)
+      {
+        LinkedList<SearchResultEntry> result = search.getSearchEntries();
+        if (!result.isEmpty())
+        {
+          SearchResultEntry resultEntry = result.getFirst();
+          if (resultEntry != null)
+          {
+            return resultEntry.getDN();
+          }
+        }
+      }
+    } catch (DirectoryException e)
+    {
+      // never happens because the filter is always valid.
+    }
+    return null;
+  }
+
+  /**
+   * Solve a conflict detected when replaying a modify operation.
+   *
+   * @param op The operation that triggered the conflict detection.
+   * @param msg The operation that triggered the conflict detection.
+   * @return true if the process is completed, false if it must continue..
+   */
+  private boolean solveNamingConflict(ModifyOperation op,
+      UpdateMessage msg)
+  {
+    ResultCode result = op.getResultCode();
+    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
+    String entryUid = ctx.getEntryUid();
+
+    if (result == ResultCode.NO_SUCH_OBJECT)
+    {
+      /*
+       * This error may happen the operation is a modification but
+       * the entry had been renamed on a different master in the same time.
+       * search if the entry has been renamed, and return the new dn
+       * of the entry.
+       */
+      msg.setDn(findEntryDN(entryUid).toString());
+      return false;
+    }
+    return true;
+  }
+
+ /** Solve a conflict detected when replaying a delete operation.
+  *
+  * @param op The operation that triggered the conflict detection.
+  * @param msg The operation that triggered the conflict detection.
+  * @return true if the process is completed, false if it must continue..
+  */
+ private boolean solveNamingConflict(DeleteOperation op,
+     UpdateMessage msg)
+ {
+   ResultCode result = op.getResultCode();
+   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
+   String entryUid = ctx.getEntryUid();
+
+   if (result == ResultCode.NO_SUCH_OBJECT)
+   {
+     /*
+      * Find if the entry is still in the database.
+      */
+     DN currentDn = findEntryDN(entryUid);
+     if (currentDn == null)
+     {
+       /*
+        * The entry has already been deleted, either because this delete
+        * has already been replayed or because another concurrent delete
+        * has already done the job.
+        * In any case, there is is nothing more to do.
+        */
+       return true;
+     }
+     else
+     {
+       /*
+        * This entry has been renamed, replay the delete using its new DN.
+        */
+       msg.setDn(currentDn.toString());
+       return false;
+     }
+   }
+   else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
+   {
+     /*
+      * This may happen when we replay a DELETE done on a master
+      * but children of this entry have been added on another master.
+      */
+
+     /*
+      * TODO : either delete all the childs or rename the child below
+      * the top suffix by adding entryuuid in dn and delete this entry.
+      */
+   }
+   return true;
+ }
+
+  /**
+   * Solve a conflict detected when replaying a ADD operation.
+   *
+   * @param op The operation that triggered the conflict detection.
+   * @param msg The operation that triggered the conflict detection.
+   * @return true if the process is completed, false if it must continue.
+   * @throws Exception When the operation is not valid.
+   */
+  private boolean solveNamingConflict(AddOperation op,
+      UpdateMessage msg) throws Exception
+  {
+    ResultCode result = op.getResultCode();
+    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
+    String entryUid = ctx.getEntryUid();
+    String parentUniqueId = ctx.getParentUid();
+
+    if (result == ResultCode.NO_SUCH_OBJECT)
+    {
+      /*
+       * This can happen if the parent has been renamed or deleted
+       * find the parent dn and calculate a new dn for the entry
+       */
+      if (parentUniqueId == null)
+      {
+        /*
+         * This entry is the base dn of the backend.
+         * It is quite weird that the operation result be NO_SUCH_OBJECT.
+         * There is notthing more we can do except TODO log a
+         * message for the repair tool to look at this problem.
+         */
+        return true;
+      }
+      DN parentDn = findEntryDN(parentUniqueId);
+      if (parentDn == null)
+      {
+        /*
+         * The parent has been deleted, so this entry should not
+         * exist don't do the ADD.
+         */
+        return true;
+      }
+      else
+      {
+        RDN entryRdn = op.getEntryDN().getRDN();
+        msg.setDn(parentDn + "," + entryRdn);
+        return false;
+      }
+    }
+    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
+    {
+      /*
+       * This can happen if
+       *  - two adds are done on different servers but with the
+       *    same target DN.
+       *  - the same ADD is being replayed for the second time on this server.
+       * if the nsunique ID already exist, assume this is a replay and
+       *        don't do anything
+       * if the entry unique id do not exist, generate conflict.
+       */
+      if (findEntryDN(entryUid) != null)
+      {
+        // entry already exist : this is a replay
+        return true;
+      }
+      else
+      {
+        addConflict(op);
+        msg.setDn(generateConflictDn(entryUid, msg.getDn()));
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Solve a conflict detected when replaying a Modify DN operation.
+   *
+   * @param op The operation that triggered the conflict detection.
+   * @param msg The operation that triggered the conflict detection.
+   * @return true if the process is completed, false if it must continue.
+   * @throws Exception When the operation is not valid.
+   */
+  private boolean solveNamingConflict(ModifyDNOperation op,
+      UpdateMessage msg) throws Exception
+  {
+    ResultCode result = op.getResultCode();
+    ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
+    String entryUid = ctx.getEntryUid();
+    String newSuperiorID = ctx.getNewParentId();
+
+    if (result == ResultCode.NO_SUCH_OBJECT)
+    {
+      ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
+
+      /*
+       * four possible cases :
+       * - the modified entry has been renamed
+       * - the new parent has been renamed
+       * - the operation is replayed for the second time.
+       * - the entry has been deleted
+       * action :
+       *  - change the target dn and the new parent dn and
+       *        restart the operation,
+       *  - don't do anything if the operation is replayed.
+       */
+
+      // Construct the new DN to use for the entry.
+      DN entryDN = op.getEntryDN();
+      DN newSuperior = findEntryDN(newSuperiorID);
+      RDN newRDN = op.getNewRDN();
+      DN parentDN;
+
+      if (newSuperior == null)
+      {
+        parentDN = entryDN.getParent();
+      }
+      else
+      {
+        parentDN = newSuperior;
+      }
+
+      if ((parentDN == null) || parentDN.isNullDN())
+      {
+        /* this should never happen
+         * can't solve any conflict in this case.
+         */
+        throw new Exception("operation parameters are invalid");
+      }
+
+      RDN[] parentComponents = parentDN.getRDNComponents();
+      RDN[] newComponents    = new RDN[parentComponents.length+1];
+      System.arraycopy(parentComponents, 0, newComponents, 1,
+          parentComponents.length);
+      newComponents[0] = newRDN;
+
+      DN newDN = new DN(newComponents);
+
+      // get the current DN of this entry in the database.
+      DN currentDN = findEntryDN(entryUid);
+
+      // if the newDN and the current DN match then the operation
+      // is a no-op (this was probably a second replay)
+      // don't do anything.
+      if (newDN.equals(currentDN))
+      {
+        return true;
+      }
+
+      msg.setDn(currentDN.toString());
+      modifyDnMsg.setNewSuperior(newSuperior.toString());
+      return false;
+    }
+    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
+    {
+      /*
+       * This may happen when two modifyDn operation
+       * are done on different servers but with the same target DN
+       * add the conflict object class to the entry
+       * and rename it using its entryuuid.
+       */
+      generateAddConflictOp(op);
+      msg.setDn(generateConflictDn(entryUid, msg.getDn()));
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Generate a modification to add the conflict ObjectClass to an entry
+   * whose Dn is now conflicting with another entry.
+   *
+   * @param op The operation causing the conflict.
+   */
+  private void generateAddConflictOp(ModifyDNOperation op)
+  {
+    // TODO
+  }
+
+  /**
+   * Add the conflict object class to an entry that could
+   * not be added because it is conflicting with another entry.
+   *
+   * @param addOp The conflicting Add Operation.
+   */
+  private void addConflict(AddOperation addOp)
+  {
+    /*
+     * TODO
+     */
+  }
+
+  /**
+   * Generate the Dn to use for a conflicting entry.
+   *
+   * @param op Operation that generated the conflict
+   * @param dn Original dn.
+   * @return The generated Dn for a conflicting entry.
+   */
+  private String generateConflictDn(String entryUid, String dn)
+  {
+    return dn + "entryuuid=" + entryUid;
+  }
+
+  /**
+   * Check if an operation must be processed as an assured operation.
+   *
+   * @param op the operation to be checked.
+   * @return true if the operations must be processed as an assured operation.
+   */
+  private boolean isAssured(Operation op)
+  {
+    // TODO : should have a filtering mechanism for checking
+    // operation that are assured and operations that are not.
+    return false;
+  }
+
+  /**
+   * Push all committed local changes to the changelog service.
+   * PRECONDITION : The pendingChanges lock must be held before calling
+   * this method.
+   */
+  private void pushCommittedChanges()
+  {
+    if (pendingChanges.isEmpty())
+      return;
+
+    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
+    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
+
+    while ((firstChange != null) && firstChange.isCommitted())
+    {
+      if (firstChange.getOp().isSynchronizationOperation() == false)
+      {
+        numSentUpdates++;
+        broker.publish(firstChange.getMsg());
+      }
+      state.update(firstChangeNumber);
+      pendingChanges.remove(firstChangeNumber);
+
+      if (pendingChanges.isEmpty())
+      {
+        firstChange = null;
+      }
+      else
+      {
+        firstChangeNumber = pendingChanges.firstKey();
+        firstChange = pendingChanges.get(firstChangeNumber);
+      }
+    }
+  }
+
 }

--
Gitblit v1.10.0