mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
01.04.2006 b5acb25ee2ad9bf8b166b9de1a34e6aab6ea23b7
opends/src/server/org/opends/server/synchronization/SynchronizationDomain.java
@@ -26,15 +26,21 @@
 */
package org.opends.server.synchronization;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.util.TimeThread.getTime;
import static org.opends.server.synchronization.SynchMessages.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import static org.opends.server.synchronization.OperationContext.SYNCHROCONTEXT;
import static org.opends.server.synchronization.Historical.*;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.zip.DataFormatException;
import org.opends.server.api.ConfigurableComponent;
import org.opends.server.api.DirectoryThread;
@@ -47,17 +53,27 @@
import org.opends.server.config.StringConfigAttribute;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.Operation;
import org.opends.server.messages.MessageHandler;
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPException;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.Entry;
import org.opends.server.types.ErrorLogCategory;
import org.opends.server.types.ErrorLogSeverity;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SynchronizationProviderResult;
/**
@@ -88,9 +104,6 @@
  private ServerState state;
  private int numReplayedPostOpCalled = 0;
  private boolean assuredFlag = false;
  private int maxReceiveQueue = 0;
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
@@ -114,6 +127,8 @@
  private DN configDn;
  private InternalClientConnection conn = new InternalClientConnection();
  static String CHANGELOG_SERVER_ATTR = "ds-cfg-changelog-server";
  static String BASE_DN_ATTR = "ds-cfg-synchronization-dn";
  static String SERVER_ID_ATTR = "ds-cfg-directory-server-id";
@@ -379,8 +394,7 @@
          broker.restartReceive();
          for (int i=0; i<listenerThreadNumber; i++)
          {
            ListenerThread myThread = new ListenerThread(this,
                                                         changeNumberGenerator);
            ListenerThread myThread = new ListenerThread(this);
            myThread.start();
            synchroThreads.add(myThread);
          }
@@ -416,6 +430,155 @@
  }
  /**
   * Implement the  handleConflictResolution phase of the deleteOperation.
   *
   * @param deleteOperation The deleteOperation.
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
      DeleteOperation deleteOperation)
  {
    DeleteContext ctx =
      (DeleteContext) deleteOperation.getAttachment(SYNCHROCONTEXT);
    Entry deletedEntry = deleteOperation.getEntryToDelete();
    if (ctx != null)
    {
      /*
       * This is a synchronization operation
       * Check that the modified entry has the same entryuuid
       * has was in the original message.
       */
      String operationEntryUUID = ctx.getEntryUid();
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
      if (!operationEntryUUID.equals(modifiedEntryUUID))
      {
        /*
         * The changes entry is not the same entry as the one on
         * the original change was performed.
         * Probably the original entry was renamed and replaced with
         * another entry.
         * We must not let the change proceed, return a negative
         * result and set the result code to NO_SUCH_OBJET.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         */
        deleteOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
        return new SynchronizationProviderResult(false);
      }
    }
    else
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
      ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * Implement the  handleConflictResolution phase of the addOperation.
   *
   * @param addOperation The AddOperation.
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
      AddOperation addOperation)
  {
    if (addOperation.isSynchronizationOperation())
    {
      AddContext ctx = (AddContext) addOperation.getAttachment(SYNCHROCONTEXT);
      /*
       * If an entry with the same entry uniqueID already exist then
       * this operation has already been replayed in the past.
       */
      String uuid = ctx.getEntryUid();
      if (findEntryDN(uuid) != null)
      {
        addOperation.setResultCode(ResultCode.SUCCESS);
        return new SynchronizationProviderResult(false);
      }
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * Implement the  handleConflictResolution phase of the ModifyDNOperation.
   *
   * @param modifyDNOperation The ModifyDNOperation.
   * @return A SynchronizationProviderResult indicating if the operation
   *         can continue.
   */
  public SynchronizationProviderResult handleConflictResolution(
      ModifyDNOperation modifyDNOperation)
  {
    ModifyDnContext ctx =
      (ModifyDnContext) modifyDNOperation.getAttachment(SYNCHROCONTEXT);
    if (ctx != null)
    {
      /*
       * This is a synchronization operation
       * Check that the modified entry has the same entryuuid
       * as was in the original message.
       */
      String modifiedEntryUUID =
        Historical.getEntryUuid(modifyDNOperation.getOriginalEntry());
      if (!modifiedEntryUUID.equals(ctx.getEntryUid()))
      {
        /*
         * The modified entry is not the same entry as the one on
         * the original change was performed.
         * Probably the original entry was renamed and replaced with
         * another entry.
         * We must not let the change proceed, return a negative
         * result and set the result code to NO_SUCH_OBJET.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         */
        modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
        return new SynchronizationProviderResult(false);
      }
      if (modifyDNOperation.getNewSuperior() != null)
      {
        /*
         * Also check that the current id of the
         * parent is the same as when the operation was performed.
         */
        String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
        if (!newParentId.equals(ctx.getNewParentId()))
        {
          modifyDNOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
          return new SynchronizationProviderResult(false);
        }
      }
    }
    else
    {
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
      String newParentId = null;
      if (modifyDNOperation.getNewSuperior() != null)
      {
        newParentId = findEntryId(modifyDNOperation.getNewSuperior());
      }
      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
      ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * Handle the conflict resolution.
   * Called by the core server after locking the entry and before
   * starting the actual modification.
@@ -425,25 +588,43 @@
  public SynchronizationProviderResult handleConflictResolution(
                                                ModifyOperation modifyOperation)
  {
    //  If operation do not yet have a change number, generate it
    ChangeNumber changeNumber =
      (ChangeNumber) modifyOperation.getAttachment(SYNCHRONIZATION);
    if (changeNumber == null)
    {
      synchronized(pendingChanges)
      {
        changeNumber = changeNumberGenerator.NewChangeNumber();
        pendingChanges.put(changeNumber, new PendingChange(changeNumber,
                                                           modifyOperation,
                                                           null));
      }
      modifyOperation.setAttachment(SYNCHRONIZATION, changeNumber);
    }
    ModifyContext ctx =
      (ModifyContext) modifyOperation.getAttachment(SYNCHROCONTEXT);
    // if Operation is a synchronization operation, solve conflicts
    if (modifyOperation.isSynchronizationOperation())
    Entry modifiedEntry = modifyOperation.getModifiedEntry();
    if (ctx == null)
    {
      Entry modifiedEntry = modifyOperation.getModifiedEntry();
      // There is no Synchronization context attached to the operation
      // so this is not a synchronization operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
      String modifiedEntryUUID = Historical.getEntryUuid(modifiedEntry);
      ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    else
    {
      String modifiedEntryUUID = ctx.getEntryUid();
      String currentEntryUUID = Historical.getEntryUuid(modifiedEntry);
      if (!currentEntryUUID.equals(modifiedEntryUUID))
      {
        /*
         * The current modified entry is not the same entry as the one on
         * the original modification was performed.
         * Probably the original entry was renamed and replaced with
         * another entry.
         * We must not let the modification proceed, return a negative
         * result and set the result code to NO_SUCH_OBJET.
         * When the operation will return, the thread that started the
         * operation will try to find the correct entry and restart a new
         * operation.
         */
        modifyOperation.setResultCode(ResultCode.NO_SUCH_OBJECT);
        return new SynchronizationProviderResult(false);
      }
      /*
       * Solve the conflicts between modify operations
       */
      Historical historicalInformation = Historical.load(modifiedEntry);
      modifyOperation.setAttachment(HISTORICAL, historicalInformation);
@@ -456,16 +637,29 @@
         * stop the processing and send an OK result
         */
        modifyOperation.setResultCode(ResultCode.SUCCESS);
        /*
         * TODO : check that post operation do get called and
         * that pendingChanges do get updated
         */
        return new SynchronizationProviderResult(false);
      }
    }
    return new SynchronizationProviderResult(true);
  }
  /**
   * The preOperation phase for the add Operation.
   * Its job is to generate the Synchronization context associated to the
   * operation. It is necessary to do it in this phase because contrary to
   * the other operations, the entry uid is not set when the handleConflict
   * phase is called.
   *
   * @param addOperation The Add Operation.
   */
  public void doPreOperation(AddOperation addOperation)
  {
    AddContext ctx = new AddContext(generateChangeNumber(addOperation),
        Historical.getEntryUuid(addOperation),
        findEntryId(addOperation.getEntryDN().getParent()));
    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
  }
  /**
   * Receive an update message from the changelog.
@@ -494,6 +688,7 @@
  /**
   * Do the necessary processing when an UpdateMessage was received.
   *
   * @param update The received UpdateMessage.
   */
  public void receiveUpdate(UpdateMessage update)
@@ -548,49 +743,20 @@
  {
    numReplayedPostOpCalled++;
    UpdateMessage msg = null;
    ChangeNumber curChangeNumber =
      (ChangeNumber) op.getAttachment(SYNCHRONIZATION);
    ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
    if (op.getResultCode() != ResultCode.SUCCESS)
    {
      if (curChangeNumber != null)
      {
        /*
         * This code can be executed by multiple threads
         * Since TreeMap is not synchronized, it is mandatory to synchronize
         * it now.
         */
        synchronized (pendingChanges)
        {
          pendingChanges.remove(curChangeNumber);
        }
      }
      return;
    }
    ResultCode result = op.getResultCode();
    boolean isAssured = isAssured(op);
    if (!op.isSynchronizationOperation())
    if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
    {
      switch (op.getOperationType())
      msg = UpdateMessage.generateMsg(op, isAssured);
      if (msg == null)
      {
      case MODIFY :
        msg = new ModifyMsg((ModifyOperation) op);
        break;
      case ADD:
        msg = new AddMsg((AddOperation) op);
        break;
      case DELETE :
        msg = new DeleteMsg((DeleteOperation) op);
        break;
      case MODIFY_DN :
        msg = new ModifyDNMsg((ModifyDNOperation) op);
        break;
      default :
        /*
         * This is an operation type that we do not know about
         * It should never happen
         * This code can be executed by multiple threads
         * Since TreeMap is not synchronized, it is mandatory to synchronize
         * it now.
         * It should never happen.
         */
        synchronized (pendingChanges)
        {
@@ -603,68 +769,44 @@
          return;
        }
      }
      if (isAssured(op))
      {
        msg.setAssured();
      }
    }
    synchronized(pendingChanges)
    {
      PendingChange curChange = pendingChanges.get(curChangeNumber);
      if (curChange == null)
      if (result == ResultCode.SUCCESS)
      {
        // This should never happen
        int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
        String message = getMessage(msgID, curChangeNumber.toString(),
                                    op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        return;
      }
      curChange.setCommitted(true);
      if (op.isSynchronizationOperation())
        curChange.setOp(op);
      else
        curChange.setMsg(msg);
      ChangeNumber firstChangeNumber = pendingChanges.firstKey();
      PendingChange firstChange = pendingChanges.get(firstChangeNumber);
      ChangeNumber lastCommittedChangeNumber = null;
      if (!op.isSynchronizationOperation() && msg.isAssured())
      {
        waitingAckMsgs.put(curChangeNumber, msg);
      }
      while ((firstChange != null) && firstChange.isCommitted())
      {
        if (firstChange.getOp().isSynchronizationOperation() == false)
        PendingChange curChange = pendingChanges.get(curChangeNumber);
        if (curChange == null)
        {
          numSentUpdates++;
          broker.publish(firstChange.getMsg());
          // This should never happen
          int msgID = MSGID_OPERATION_NOT_FOUND_IN_PENDING;
          String message = getMessage(msgID, curChangeNumber.toString(),
              op.toString());
          logError(ErrorLogCategory.SYNCHRONIZATION,
              ErrorLogSeverity.SEVERE_ERROR,
              message, msgID);
          return;
        }
        curChange.setCommitted(true);
        lastCommittedChangeNumber = firstChange.getChangeNumber();
        pendingChanges.remove(lastCommittedChangeNumber);
        if (pendingChanges.isEmpty())
        {
          firstChange = null;
        }
        if (op.isSynchronizationOperation())
          curChange.setOp(op);
        else
          curChange.setMsg(msg);
        if (!op.isSynchronizationOperation() && isAssured && (msg != null))
        {
          firstChangeNumber = pendingChanges.firstKey();
          firstChange = pendingChanges.get(firstChangeNumber);
          waitingAckMsgs.put(curChangeNumber, msg);
        }
      }
      if (lastCommittedChangeNumber != null)
        state.update(lastCommittedChangeNumber);
      else if (!op.isSynchronizationOperation())
        pendingChanges.remove(curChangeNumber);
      pushCommittedChanges();
    }
    if (!op.isSynchronizationOperation() && msg.isAssured())
    if ((!op.isSynchronizationOperation()) && msg.isAssured() && (msg != null)
        && (result == ResultCode.SUCCESS))
    {
      synchronized (msg)
      {
@@ -683,19 +825,6 @@
  }
  /**
   * Check if an operation must be processed as an assured operation.
   *
   * @param op the operation to be checked.
   * @return true if the operations must be processed as an assured operation.
   */
  private boolean isAssured(Operation op)
  {
    // TODO : should have a filtering mechanism for checking
    // operation that are assured and operations that are not.
    return assuredFlag;
  }
  /**
   * get the number of updates received by the synchronization plugin.
   *
   * @return the number of updates received
@@ -790,27 +919,6 @@
  }
  /**
   * Generate and set the ChangeNumber of a given Operation.
   *
   * @param operation The Operation for which the ChangeNumber must be set.
   */
  public void setChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber =
      (ChangeNumber) operation.getAttachment(SYNCHRONIZATION);
    if (changeNumber == null)
    {
      synchronized(pendingChanges)
      {
        changeNumber = changeNumberGenerator.NewChangeNumber();
        pendingChanges.put(changeNumber, new PendingChange(changeNumber,
            operation, null));
      }
      operation.setAttachment(SYNCHRONIZATION, changeNumber);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
@@ -826,7 +934,7 @@
    synchroThreads = new ArrayList<ListenerThread>();
    for (int i=0; i<10; i++)
    {
      ListenerThread myThread = new ListenerThread(this, changeNumberGenerator);
      ListenerThread myThread = new ListenerThread(this);
      myThread.start();
      synchroThreads.add(myThread);
    }
@@ -913,6 +1021,126 @@
  }
  /**
   * Create and replay a synchronized Operation from an UpdateMessage.
   *
   * @param msg The UpdateMessage to be replayed.
   */
  public void replay(UpdateMessage msg)
  {
    Operation op = null;
    boolean done = false;
    ChangeNumber changeNumber = null;
    try
    {
      while (!done)
      {
        op = msg.createOperation(conn);
        op.setInternalOperation(true);
        op.setSynchronizationOperation(true);
        changeNumber = OperationContext.getChangeNumber(op);
        if (changeNumber != null)
          changeNumberGenerator.adjust(changeNumber);
        op.run();
        ResultCode result = op.getResultCode();
        if (result != ResultCode.SUCCESS)
        {
          if (op instanceof ModifyOperation)
          {
            ModifyOperation newOp = (ModifyOperation) op;
            done = solveNamingConflict(newOp, msg);
          }
          else if (op instanceof DeleteOperation)
          {
            DeleteOperation newOp = (DeleteOperation) op;
            done = solveNamingConflict(newOp, msg);
          }
          else if (op instanceof AddOperation)
          {
            AddOperation newOp = (AddOperation) op;
            done = solveNamingConflict(newOp, msg);
          } else if (op instanceof ModifyDNOperation)
          {
            ModifyDNOperation newOp = (ModifyDNOperation) op;
            done = solveNamingConflict(newOp, msg);
          }
          else
          {
            done = true;  // unknown type of operation ?!
          }
        }
        else
        {
          done = true;
        }
      }
    }
    catch (ASN1Exception e)
    {
      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
      String message = getMessage(msgID, msg) +
      stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    catch (LDAPException e)
    {
      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
      String message = getMessage(msgID, msg) +
      stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    catch (DataFormatException e)
    {
      int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
      String message = getMessage(msgID, msg) +
      stackTraceToSingleLineString(e);
      logError(ErrorLogCategory.SYNCHRONIZATION,
          ErrorLogSeverity.SEVERE_ERROR,
          message, msgID);
    }
    catch (Exception e)
    {
      if (changeNumber != null)
      {
        /*
         * An Exception happened during the replay process.
         * Continue with the next change but the servers will know start
         * to be inconsistent.
         * TODO : REPAIR : Should let the repair tool know about this
         */
        int msgID = MSGID_EXCEPTION_REPLAYING_OPERATION;
        String message = getMessage(msgID, stackTraceToSingleLineString(e),
            op.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
        updateError(changeNumber);
      }
      else
      {
        int msgID = MSGID_EXCEPTION_DECODING_OPERATION;
        String message = getMessage(msgID, stackTraceToSingleLineString(e),
            msg.toString());
        logError(ErrorLogCategory.SYNCHRONIZATION,
            ErrorLogSeverity.SEVERE_ERROR, message, msgID);
      }
    }
    finally
    {
      if (msg.isAssured())
        ack(msg.getChangeNumber());
      incProcessedUpdates();
    }
  }
  /**
   * This methods is called when an error happends while replaying
   * and operation.
   * It is necessary because the postOPeration does not always get
@@ -925,6 +1153,434 @@
    synchronized (pendingChanges)
    {
      pendingChanges.remove(changeNumber);
      pushCommittedChanges();
    }
  }
  /**
   * Generate a new change number and insert it in the pending list.
   *
   * @param operation The operation for which the change number must be
   *                  generated.
   * @return The new change number.
   */
  private ChangeNumber generateChangeNumber(Operation operation)
  {
    ChangeNumber changeNumber;
    synchronized(pendingChanges)
    {
      changeNumber = changeNumberGenerator.NewChangeNumber();
      pendingChanges.put(changeNumber,
          new PendingChange(changeNumber, operation, null));
    }
    return changeNumber;
  }
  /**
   * Find the Unique Id of the entry with the provided DN by doing a
   * search of the entry and extracting its uniqueID from its attributes.
   *
   * @param dn The dn of the entry for which the unique Id is searched.
   *
   * @return The unique Id of the entry whith the provided DN.
   */
  private String findEntryId(DN dn)
  {
    if (dn == null)
      return null;
    try
    {
      LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
      attrs.add(ENTRYUIDNAME);
      InternalSearchOperation search = conn.processSearch(dn,
            SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES,
            0, 0, false,
            SearchFilter.createFilterFromString("objectclass=*"),
            attrs);
      if (search.getResultCode() == ResultCode.SUCCESS)
      {
        LinkedList<SearchResultEntry> result = search.getSearchEntries();
        if (!result.isEmpty())
        {
          SearchResultEntry resultEntry = result.getFirst();
          if (resultEntry != null)
          {
            return Historical.getEntryUuid(resultEntry);
          }
        }
      }
    } catch (DirectoryException e)
    {
      // never happens because the filter is always valid.
    }
    return null;
  }
  /**
   * find the current dn of an entry from its entry uuid.
   *
   * @param uuid the Entry Unique ID.
   * @return The curernt dn of the entry or null if there is no entry with
   *         the specified uuid.
   */
  private DN findEntryDN(String uuid)
  {
    try
    {
      InternalSearchOperation search = conn.processSearch(baseDN,
            SearchScope.WHOLE_SUBTREE,
            SearchFilter.createFilterFromString("entryuuid="+uuid));
      if (search.getResultCode() == ResultCode.SUCCESS)
      {
        LinkedList<SearchResultEntry> result = search.getSearchEntries();
        if (!result.isEmpty())
        {
          SearchResultEntry resultEntry = result.getFirst();
          if (resultEntry != null)
          {
            return resultEntry.getDN();
          }
        }
      }
    } catch (DirectoryException e)
    {
      // never happens because the filter is always valid.
    }
    return null;
  }
  /**
   * Solve a conflict detected when replaying a modify operation.
   *
   * @param op The operation that triggered the conflict detection.
   * @param msg The operation that triggered the conflict detection.
   * @return true if the process is completed, false if it must continue..
   */
  private boolean solveNamingConflict(ModifyOperation op,
      UpdateMessage msg)
  {
    ResultCode result = op.getResultCode();
    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
    String entryUid = ctx.getEntryUid();
    if (result == ResultCode.NO_SUCH_OBJECT)
    {
      /*
       * This error may happen the operation is a modification but
       * the entry had been renamed on a different master in the same time.
       * search if the entry has been renamed, and return the new dn
       * of the entry.
       */
      msg.setDn(findEntryDN(entryUid).toString());
      return false;
    }
    return true;
  }
 /** Solve a conflict detected when replaying a delete operation.
  *
  * @param op The operation that triggered the conflict detection.
  * @param msg The operation that triggered the conflict detection.
  * @return true if the process is completed, false if it must continue..
  */
 private boolean solveNamingConflict(DeleteOperation op,
     UpdateMessage msg)
 {
   ResultCode result = op.getResultCode();
   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
   String entryUid = ctx.getEntryUid();
   if (result == ResultCode.NO_SUCH_OBJECT)
   {
     /*
      * Find if the entry is still in the database.
      */
     DN currentDn = findEntryDN(entryUid);
     if (currentDn == null)
     {
       /*
        * The entry has already been deleted, either because this delete
        * has already been replayed or because another concurrent delete
        * has already done the job.
        * In any case, there is is nothing more to do.
        */
       return true;
     }
     else
     {
       /*
        * This entry has been renamed, replay the delete using its new DN.
        */
       msg.setDn(currentDn.toString());
       return false;
     }
   }
   else if (result == ResultCode.NOT_ALLOWED_ON_NONLEAF)
   {
     /*
      * This may happen when we replay a DELETE done on a master
      * but children of this entry have been added on another master.
      */
     /*
      * TODO : either delete all the childs or rename the child below
      * the top suffix by adding entryuuid in dn and delete this entry.
      */
   }
   return true;
 }
  /**
   * Solve a conflict detected when replaying a ADD operation.
   *
   * @param op The operation that triggered the conflict detection.
   * @param msg The operation that triggered the conflict detection.
   * @return true if the process is completed, false if it must continue.
   * @throws Exception When the operation is not valid.
   */
  private boolean solveNamingConflict(AddOperation op,
      UpdateMessage msg) throws Exception
  {
    ResultCode result = op.getResultCode();
    AddContext ctx = (AddContext) op.getAttachment(SYNCHROCONTEXT);
    String entryUid = ctx.getEntryUid();
    String parentUniqueId = ctx.getParentUid();
    if (result == ResultCode.NO_SUCH_OBJECT)
    {
      /*
       * This can happen if the parent has been renamed or deleted
       * find the parent dn and calculate a new dn for the entry
       */
      if (parentUniqueId == null)
      {
        /*
         * This entry is the base dn of the backend.
         * It is quite weird that the operation result be NO_SUCH_OBJECT.
         * There is notthing more we can do except TODO log a
         * message for the repair tool to look at this problem.
         */
        return true;
      }
      DN parentDn = findEntryDN(parentUniqueId);
      if (parentDn == null)
      {
        /*
         * The parent has been deleted, so this entry should not
         * exist don't do the ADD.
         */
        return true;
      }
      else
      {
        RDN entryRdn = op.getEntryDN().getRDN();
        msg.setDn(parentDn + "," + entryRdn);
        return false;
      }
    }
    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
    {
      /*
       * This can happen if
       *  - two adds are done on different servers but with the
       *    same target DN.
       *  - the same ADD is being replayed for the second time on this server.
       * if the nsunique ID already exist, assume this is a replay and
       *        don't do anything
       * if the entry unique id do not exist, generate conflict.
       */
      if (findEntryDN(entryUid) != null)
      {
        // entry already exist : this is a replay
        return true;
      }
      else
      {
        addConflict(op);
        msg.setDn(generateConflictDn(entryUid, msg.getDn()));
        return false;
      }
    }
    return true;
  }
  /**
   * Solve a conflict detected when replaying a Modify DN operation.
   *
   * @param op The operation that triggered the conflict detection.
   * @param msg The operation that triggered the conflict detection.
   * @return true if the process is completed, false if it must continue.
   * @throws Exception When the operation is not valid.
   */
  private boolean solveNamingConflict(ModifyDNOperation op,
      UpdateMessage msg) throws Exception
  {
    ResultCode result = op.getResultCode();
    ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
    String entryUid = ctx.getEntryUid();
    String newSuperiorID = ctx.getNewParentId();
    if (result == ResultCode.NO_SUCH_OBJECT)
    {
      ModifyDNMsg modifyDnMsg = (ModifyDNMsg) msg;
      /*
       * four possible cases :
       * - the modified entry has been renamed
       * - the new parent has been renamed
       * - the operation is replayed for the second time.
       * - the entry has been deleted
       * action :
       *  - change the target dn and the new parent dn and
       *        restart the operation,
       *  - don't do anything if the operation is replayed.
       */
      // Construct the new DN to use for the entry.
      DN entryDN = op.getEntryDN();
      DN newSuperior = findEntryDN(newSuperiorID);
      RDN newRDN = op.getNewRDN();
      DN parentDN;
      if (newSuperior == null)
      {
        parentDN = entryDN.getParent();
      }
      else
      {
        parentDN = newSuperior;
      }
      if ((parentDN == null) || parentDN.isNullDN())
      {
        /* this should never happen
         * can't solve any conflict in this case.
         */
        throw new Exception("operation parameters are invalid");
      }
      RDN[] parentComponents = parentDN.getRDNComponents();
      RDN[] newComponents    = new RDN[parentComponents.length+1];
      System.arraycopy(parentComponents, 0, newComponents, 1,
          parentComponents.length);
      newComponents[0] = newRDN;
      DN newDN = new DN(newComponents);
      // get the current DN of this entry in the database.
      DN currentDN = findEntryDN(entryUid);
      // if the newDN and the current DN match then the operation
      // is a no-op (this was probably a second replay)
      // don't do anything.
      if (newDN.equals(currentDN))
      {
        return true;
      }
      msg.setDn(currentDN.toString());
      modifyDnMsg.setNewSuperior(newSuperior.toString());
      return false;
    }
    else if (result == ResultCode.ENTRY_ALREADY_EXISTS)
    {
      /*
       * This may happen when two modifyDn operation
       * are done on different servers but with the same target DN
       * add the conflict object class to the entry
       * and rename it using its entryuuid.
       */
      generateAddConflictOp(op);
      msg.setDn(generateConflictDn(entryUid, msg.getDn()));
      return false;
    }
    return true;
  }
  /**
   * Generate a modification to add the conflict ObjectClass to an entry
   * whose Dn is now conflicting with another entry.
   *
   * @param op The operation causing the conflict.
   */
  private void generateAddConflictOp(ModifyDNOperation op)
  {
    // TODO
  }
  /**
   * Add the conflict object class to an entry that could
   * not be added because it is conflicting with another entry.
   *
   * @param addOp The conflicting Add Operation.
   */
  private void addConflict(AddOperation addOp)
  {
    /*
     * TODO
     */
  }
  /**
   * Generate the Dn to use for a conflicting entry.
   *
   * @param op Operation that generated the conflict
   * @param dn Original dn.
   * @return The generated Dn for a conflicting entry.
   */
  private String generateConflictDn(String entryUid, String dn)
  {
    return dn + "entryuuid=" + entryUid;
  }
  /**
   * Check if an operation must be processed as an assured operation.
   *
   * @param op the operation to be checked.
   * @return true if the operations must be processed as an assured operation.
   */
  private boolean isAssured(Operation op)
  {
    // TODO : should have a filtering mechanism for checking
    // operation that are assured and operations that are not.
    return false;
  }
  /**
   * Push all committed local changes to the changelog service.
   * PRECONDITION : The pendingChanges lock must be held before calling
   * this method.
   */
  private void pushCommittedChanges()
  {
    if (pendingChanges.isEmpty())
      return;
    ChangeNumber firstChangeNumber = pendingChanges.firstKey();
    PendingChange firstChange = pendingChanges.get(firstChangeNumber);
    while ((firstChange != null) && firstChange.isCommitted())
    {
      if (firstChange.getOp().isSynchronizationOperation() == false)
      {
        numSentUpdates++;
        broker.publish(firstChange.getMsg());
      }
      state.update(firstChangeNumber);
      pendingChanges.remove(firstChangeNumber);
      if (pendingChanges.isEmpty())
      {
        firstChange = null;
      }
      else
      {
        firstChangeNumber = pendingChanges.firstKey();
        firstChange = pendingChanges.get(firstChangeNumber);
      }
    }
  }
}