mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

jcduff
23.04.2008 b4f8838b15342670c31753a484abf0129e3c9653
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -36,6 +36,7 @@
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.replication.common.StatusMachine.*;
import java.io.File;
import java.io.IOException;
@@ -50,10 +51,10 @@
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Adler32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
@@ -85,31 +86,42 @@
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
import org.opends.server.replication.protocol.ErrorMessage;
import org.opends.server.replication.protocol.HeartbeatMessage;
import org.opends.server.replication.protocol.InitializeRequestMessage;
import org.opends.server.replication.protocol.InitializeTargetMessage;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.protocol.ResetGenerationId;
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.ExistingFileBehavior;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.Attribute;
@@ -183,8 +195,8 @@
  // The update to replay message queue where the listener thread is going to
  // push incoming update messages.
  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
  private SortedMap<ChangeNumber, UpdateMessage> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMessage>();
  private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMsg>();
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
  private AtomicInteger numProcessedUpdates = new AtomicInteger();
@@ -208,7 +220,7 @@
  /**
   * This object is used to store the list of update currently being
   * done on the local database.
   * Is is usefull to make sure that the local operations are sent in a
   * Is is useful to make sure that the local operations are sent in a
   * correct order to the replication server and that the ServerState
   * is not updated too early.
   */
@@ -217,8 +229,8 @@
  /**
   * It contain the updates that were done on other servers, transmitted
   * by the replication server and that are currently replayed.
   * It is usefull to make sure that dependencies between operations
   * are correctly fullfilled and to to make sure that the ServerState is
   * It is useful to make sure that dependencies between operations
   * are correctly fulfilled and to to make sure that the ServerState is
   * not updated too early.
   */
  private RemotePendingChanges remotePendingChanges;
@@ -228,7 +240,7 @@
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  short serverId;
  private short serverId;
  // The context related to an import or export being processed
  // Null when none is being processed.
@@ -236,7 +248,7 @@
  private Collection<String> replicationServers;
  private DN baseDN;
  private DN baseDn;
  private boolean shutdown = false;
@@ -250,12 +262,42 @@
  private int window = 100;
  /*
   * Assured mode properties
   */
  // Is assured mode enabled or not for this domain ?
  private boolean assured = false;
  // Assured sub mode (used when assured is true)
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // Safe Data level (used when assuredMode is SAFE_DATA)
  private byte assuredSdLevel = (byte)1;
  // Timeout (in milliseconds) when waiting for acknowledgments
  private long assuredTimeout = 1000;
  // Group id
  private byte groupId = (byte)1;
  // Referrals urls to be published to other servers of the topology
  // TODO: fill that with all currently opened urls if no urls configured
  private List<String> refUrls = new ArrayList<String>();
  // Current status for this replicated domain
  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
  /*
   * Properties for the last topology info received from the network.
   */
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  // Info for other RSs.
  private List<RSInfo> rsList = new ArrayList<RSInfo>();
  /**
   * The isolation policy that this domain is going to use.
   * This field describes the behavior of the domain when an update is
   * attempted and the domain could not connect to any Replication Server.
   * Possible values are accept-updates or deny-updates, but other values
   * may be added in the futur.
   * may be added in the future.
   */
  private IsolationPolicy isolationpolicy;
@@ -281,9 +323,9 @@
    // The input stream for the import
    ReplLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMessage.UNKNOWN_SERVER;
    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMessage.UNKNOWN_SERVER;
    short importSource = RoutableMsg.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
@@ -295,7 +337,9 @@
    /**
     * Initializes the import/export counters with the provider value.
     * @param count The value with which to initialize the counters.
     * @param total
     * @param left
     * @throws DirectoryException
     */
    public void setCounters(long total, long left)
      throws DirectoryException
@@ -321,6 +365,7 @@
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     * @throws DirectoryException
     */
    public void updateCounters()
      throws DirectoryException
@@ -366,7 +411,7 @@
     */
    public ExportThread(short target)
    {
      super("Export thread");
      super("Export thread " + serverId);
      this.target = target;
    }
@@ -406,12 +451,13 @@
    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
    throws ConfigException
  {
    super("replicationDomain_" + configuration.getBaseDN());
    super("Replication State Saver for server id " + configuration.getServerId()
      + " and domain " + configuration.getBaseDN());
    // Read the configuration parameters.
    replicationServers = configuration.getReplicationServer();
    serverId = (short) configuration.getServerId();
    baseDN = configuration.getBaseDN();
    baseDn = configuration.getBaseDN();
    window  = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    isolationpolicy = configuration.getIsolationPolicy();
@@ -419,13 +465,43 @@
    this.updateToReplayQueue = updateToReplayQueue;
    /*
     * Fill assured configuration properties
     */
    AssuredType assuredType = configuration.getAssuredType();
    switch (assuredType)
    {
      case NOT_ASSURED:
        assured = false;
        break;
      case SAFE_DATA:
        assured = true;
        this.assuredMode = AssuredMode.SAFE_DATA_MODE;
        break;
      case SAFE_READ:
        assured = true;
        this.assuredMode = AssuredMode.SAFE_READ_MODE;
        break;
    }
    this.assuredSdLevel = (byte)configuration.getAssuredSdLevel();
    this.groupId = (byte)configuration.getGroupId();
    this.assuredTimeout = configuration.getAssuredTimeout();
    SortedSet<String> urls = configuration.getReferralsUrl();
    if (urls != null)
    {
      for (String url : urls)
      {
        this.refUrls.add(url);
      }
    }
    /*
     * Modify conflicts are solved for all suffixes but the schema suffix
     * because we don't want to store extra information in the schema
     * ldif files.
     * This has no negative impact because the changes on schema should
     * not produce conflicts.
     */
    if (baseDN.compareTo(DirectoryServer.getSchemaDN()) == 0)
    if (baseDn.compareTo(DirectoryServer.getSchemaDN()) == 0)
    {
      solveConflictFlag = false;
    }
@@ -438,7 +514,7 @@
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNmber seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDN, serverId);
    state = new PersistentServerState(baseDn, serverId);
    /*
     * Create a replication monitor object responsible for publishing
@@ -447,11 +523,11 @@
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    Backend backend = retrievesBackend(baseDN);
    Backend backend = retrievesBackend(baseDn);
    if (backend == null)
    {
      throw new ConfigException(ERR_SEARCHING_DOMAIN_BACKEND.get(
                                  baseDN.toNormalizedString()));
                                  baseDn.toNormalizedString()));
    }
    try
@@ -461,16 +537,16 @@
    catch (DirectoryException e)
    {
      logError(ERR_LOADING_GENERATION_ID.get(
          baseDN.toNormalizedString(), e.getLocalizedMessage()));
          baseDn.toNormalizedString(), e.getLocalizedMessage()));
    }
    /*
     * create the broker object used to publish and receive changes
     */
    broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
        maxReceiveDelay, maxSendQueue, maxSendDelay, window,
    broker = new ReplicationBroker(this, state, baseDn, serverId,
        maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window,
        heartbeatInterval, generationId,
        new ReplSessionSecurity(configuration));
        new ReplSessionSecurity(configuration),getGroupId());
    broker.start(replicationServers);
@@ -505,7 +581,7 @@
   */
  public DN getBaseDN()
  {
    return baseDN;
    return baseDn;
  }
  /**
@@ -521,7 +597,7 @@
    if ((!deleteOperation.isSynchronizationOperation())
        && (!brokerIsConnected(deleteOperation)))
    {
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
      return new SynchronizationProviderResult.StopProcessing(
          ResultCode.UNWILLING_TO_PERFORM, msg);
    }
@@ -535,7 +611,7 @@
      /*
       * This is a replication operation
       * Check that the modified entry has the same entryuuid
       * has was in the original message.
       * as it was in the original message.
       */
      String operationEntryUUID = ctx.getEntryUid();
      String modifiedEntryUUID = Historical.getEntryUuid(deletedEntry);
@@ -581,7 +657,7 @@
    if ((!addOperation.isSynchronizationOperation())
        && (!brokerIsConnected(addOperation)))
    {
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
      return new SynchronizationProviderResult.StopProcessing(
          ResultCode.UNWILLING_TO_PERFORM, msg);
    }
@@ -686,7 +762,7 @@
    if ((!modifyDNOperation.isSynchronizationOperation())
        && (!brokerIsConnected(modifyDNOperation)))
    {
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
      return new SynchronizationProviderResult.StopProcessing(
          ResultCode.UNWILLING_TO_PERFORM, msg);
    }
@@ -725,7 +801,7 @@
         * parent is the same as when the operation was performed.
         */
        String newParentId = findEntryId(modifyDNOperation.getNewSuperior());
        if ((newParentId != null) &&
        if ((newParentId != null) && (ctx.getNewParentId() != null) &&
            (!newParentId.equals(ctx.getNewParentId())))
        {
        return new SynchronizationProviderResult.StopProcessing(
@@ -765,7 +841,7 @@
    if ((!modifyOperation.isSynchronizationOperation())
        && (!brokerIsConnected(modifyOperation)))
    {
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDN.toString());
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
      return new SynchronizationProviderResult.StopProcessing(
          ResultCode.UNWILLING_TO_PERFORM, msg);
    }
@@ -851,6 +927,7 @@
        findEntryId(addOperation.getEntryDN().getParentDNInSuffix()));
    addOperation.setAttachment(SYNCHROCONTEXT, ctx);
    Historical.generateState(addOperation);
  }
  /**
@@ -858,14 +935,14 @@
   * also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
  public UpdateMessage receive()
  public UpdateMsg receive()
  {
    UpdateMessage update = null;
    UpdateMsg update = null;
    while (update == null)
    while ( (update == null) && (!shutdown) )
    {
      InitializeRequestMessage initMsg = null;
      ReplicationMessage msg;
      InitializeRequestMsg initMsg = null;
      ReplicationMsg msg;
      try
      {
        msg = broker.receive();
@@ -876,24 +953,24 @@
        }
        if (debugEnabled())
          if (!(msg instanceof HeartbeatMessage))
          if (!(msg instanceof HeartbeatMsg))
            TRACER.debugVerbose("Message received <" + msg + ">");
        if (msg instanceof AckMessage)
        if (msg instanceof AckMsg)
        {
          AckMessage ack = (AckMessage) msg;
          AckMsg ack = (AckMsg) msg;
          receiveAck(ack);
        }
        else if (msg instanceof InitializeRequestMessage)
        else if (msg instanceof InitializeRequestMsg)
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMessage)msg;
          initMsg = (InitializeRequestMsg)msg;
        }
        else if (msg instanceof InitializeTargetMessage)
        else if (msg instanceof InitializeTargetMsg)
        {
          // Another server is exporting its entries to us
          InitializeTargetMessage importMsg = (InitializeTargetMessage) msg;
          InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
          try
          {
@@ -907,8 +984,8 @@
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMessage errorMsg =
              new ErrorMessage(importMsg.getsenderID(),
            ErrorMsg errorMsg =
              new ErrorMsg(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
@@ -916,7 +993,7 @@
            broker.publish(errorMsg);
          }
        }
        else if (msg instanceof ErrorMessage)
        else if (msg instanceof ErrorMsg)
        {
          if (ieContext != null)
          {
@@ -925,22 +1002,31 @@
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMessage)msg);
            abandonImportExport((ErrorMsg)msg);
          }
          else
          {
            /* We can receive an error message from the replication server
             * in the following cases :
             * - we connected with an incorrect generation id
            /*
             * Log error message
             */
            ErrorMessage errorMsg = (ErrorMessage)msg;
            ErrorMsg errorMsg = (ErrorMsg)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
          }
        }
        else if (msg instanceof UpdateMessage)
        if (msg instanceof TopologyMsg)
        {
          update = (UpdateMessage) msg;
          TopologyMsg topoMsg = (TopologyMsg)msg;
          receiveTopo(topoMsg);
        }
        if (msg instanceof ChangeStatusMsg)
        {
          ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
          receiveChangeStatus(csMsg);
        }
        else if (msg instanceof UpdateMsg)
        {
          update = (UpdateMsg) msg;
          receiveUpdate(update);
        }
      }
@@ -968,24 +1054,183 @@
  }
  /**
   * Do the necessary processing when an UpdateMessage was received.
   * Processes an incoming TopologyMsg.
   * Updates the structures for the local view of the topology.
   *
   * @param update The received UpdateMessage.
   * @param topoMsg The topology information received from RS.
   */
  public void receiveUpdate(UpdateMessage update)
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn
        + " received topology info update:\n" + topoMsg);
    // Store new lists
    synchronized(getDsList())
    {
      synchronized(getRsList())
      {
        dsList = topoMsg.getDsList();
        rsList = topoMsg.getRsList();
      }
    }
  }
  /**
   * Set the initial status of the domain, once he is connected to the topology.
   * @param initStatus The status to enter the state machine with
   */
  public void setInitialStatus(ServerStatus initStatus)
  {
    // Sanity check: is it a valid initial status?
    if (!isValidInitialStatus(initStatus))
    {
      Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
        baseDn.toString(), Short.toString(serverId));
      logError(msg);
    } else
    {
      status = initStatus;
    }
  }
  /**
   * Processes an incoming ChangeStatusMsg. Compute new status according to
   * given order. Then update domain for being compliant with new status
   * definition.
   * @param csMsg The received status message
   */
  private void receiveChangeStatus(ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn +
        " received change status message:\n" + csMsg);
    ServerStatus reqStatus = csMsg.getRequestedStatus();
    // Translate requested status to a state machine event
    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
        baseDn.toString(), Short.toString(serverId));
      logError(msg);
      return;
    }
    // Compute new status and do matching tasks
    // Use synchronized as admin task (thread) could order to go in admin status
    // for instance (concurrent with receive thread).
    synchronized (status)
    {
      ServerStatus newStatus =
        StatusMachine.computeNewStatus(status, event);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
          Short.toString(serverId), status.toString(), event.toString());
        logError(msg);
        return;
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn +
        " new status is: " + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
      updateDomainForNewStatus();
    }
  }
  /**
   * Called when first connection or disconnection detected.
   */
  public void toNotConnectedStatus()
  {
    // Go into not connected status
    // Use synchronized as somebody could ask another status change at the same
    // time
    synchronized (status)
    {
      StatusMachineEvent event =
        StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT;
      ServerStatus newStatus =
        StatusMachine.computeNewStatus(status, event);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
          Short.toString(serverId), status.toString(), event.toString());
        logError(msg);
        return;
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
        TRACER.debugInfo("Replication domain " + baseDn +
          " new status is: " + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
      updateDomainForNewStatus();
    }
  }
  /**
   * Perform whatever actions are needed to apply properties for being
   * compliant with new status. Must be called in synchronized section for
   * status. The new status is already set in status variable.
   */
  private void updateDomainForNewStatus()
  {
    switch (status)
    {
      case NOT_CONNECTED_STATUS:
        break;
      case NORMAL_STATUS:
        break;
      case DEGRADED_STATUS:
        break;
      case FULL_UPDATE_STATUS:
        // Signal RS we just entered the full update status
        broker.signalStatusChange(status);
        break;
      case BAD_GEN_ID_STATUS:
        break;
      default:
        if (debugEnabled())
          TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " +
            status);
    }
  }
  /**
   * Do the necessary processing when an UpdateMsg was received.
   *
   * @param update The received UpdateMsg.
   */
  public void receiveUpdate(UpdateMsg update)
  {
    remotePendingChanges.putRemoteUpdate(update);
    numRcvdUpdates.incrementAndGet();
  }
  /**
   * Do the necessary processing when an AckMessage is received.
   * Do the necessary processing when an AckMsg is received.
   *
   * @param ack The AckMessage that was received.
   * @param ack The AckMsg that was received.
   */
  public void receiveAck(AckMessage ack)
  public void receiveAck(AckMsg ack)
  {
    UpdateMessage update;
    UpdateMsg update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (waitingAckMsgs)
@@ -1013,7 +1258,7 @@
    {
      numReplayedPostOpCalled++;
    }
    UpdateMessage msg = null;
    UpdateMsg msg = null;
    // Note that a failed non-replication operation might not have a change
    // number.
@@ -1025,7 +1270,7 @@
    {
      // Generate a replication message for a successful non-replication
      // operation.
      msg = UpdateMessage.generateMsg(op, isAssured);
      msg = UpdateMsg.generateMsg(op);
      if (msg == null)
      {
@@ -1209,7 +1454,7 @@
   */
  public void ack(ChangeNumber changeNumber)
  {
    broker.publish(new AckMessage(changeNumber));
    broker.publish(new AckMsg(changeNumber));
  }
  /**
@@ -1304,18 +1549,17 @@
  }
  /**
   * Create and replay a synchronized Operation from an UpdateMessage.
   * Create and replay a synchronized Operation from an UpdateMsg.
   *
   * @param msg The UpdateMessage to be replayed.
   * @param msg The UpdateMsg to be replayed.
   */
  public void replay(UpdateMessage msg)
  public void replay(UpdateMsg msg)
  {
    Operation op = null;
    boolean done = false;
    boolean dependency = false;
    ChangeNumber changeNumber = null;
    int retryCount = 10;
    boolean firstTry = true;
    // Try replay the operation, then flush (replaying) any pending operation
    // whose dependency has been replayed until no more left.
@@ -1323,10 +1567,11 @@
    {
      try
      {
        op = msg.createOperation(conn);
        dependency = remotePendingChanges.checkDependencies(op, msg);
        while ((!dependency) && (!done) && (retryCount-- > 0))
        {
          op = msg.createOperation(conn);
          op.setInternalOperation(true);
          op.setSynchronizationOperation(true);
          changeNumber = OperationContext.getChangeNumber(op);
@@ -1341,36 +1586,23 @@
            {
              ModifyOperation newOp = (ModifyOperation) op;
              dependency = remotePendingChanges.checkDependencies(newOp);
              if ((!dependency) && (!firstTry))
              {
                done = solveNamingConflict(newOp, msg);
              }
              ModifyMsg modifyMsg = (ModifyMsg) msg;
              done = solveNamingConflict(newOp, modifyMsg);
            } else if (op instanceof DeleteOperation)
            {
              DeleteOperation newOp = (DeleteOperation) op;
              dependency = remotePendingChanges.checkDependencies(newOp);
              if ((!dependency) && (!firstTry))
              {
                done = solveNamingConflict(newOp, msg);
              }
              done = solveNamingConflict(newOp, msg);
            } else if (op instanceof AddOperation)
            {
              AddOperation newOp = (AddOperation) op;
              AddMsg addMsg = (AddMsg) msg;
              dependency = remotePendingChanges.checkDependencies(newOp);
              if ((!dependency) && (!firstTry))
              {
                done = solveNamingConflict(newOp, addMsg);
              }
              done = solveNamingConflict(newOp, addMsg);
            } else if (op instanceof ModifyDNOperationBasis)
            {
              ModifyDNMsg newMsg = (ModifyDNMsg) msg;
              dependency = remotePendingChanges.checkDependencies(newMsg);
              if ((!dependency) && (!firstTry))
              {
                ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
                done = solveNamingConflict(newOp, msg);
              }
              ModifyDNOperationBasis newOp = (ModifyDNOperationBasis) op;
                  done = solveNamingConflict(newOp, msg);
            } else
            {
              done = true;  // unknown type of operation ?!
@@ -1382,11 +1614,19 @@
              // however we still need to push this change to the serverState
              updateError(changeNumber);
            }
          } else
            else
            {
              /*
               * Create a new operation as the ConflictResolution
               * different operation.
               */
              op = msg.createOperation(conn);
            }
          }
          else
          {
            done = true;
          }
          firstTry = false;
        }
        if (!done && !dependency)
@@ -1457,7 +1697,6 @@
      dependency = false;
      changeNumber = null;
      retryCount = 10;
      firstTry = true;
    } while (msg != null);
  }
@@ -1494,14 +1733,16 @@
   *
   * @param dn The dn of the entry for which the unique Id is searched.
   *
   * @return The unique Id of the entry whith the provided DN.
   * @return The unique Id of the entry with the provided DN.
   */
  private String findEntryId(DN dn)
  static String findEntryId(DN dn)
  {
    if (dn == null)
      return null;
    try
    {
      InternalClientConnection conn =
                InternalClientConnection.getRootConnection();
      LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
      attrs.add(ENTRYUIDNAME);
      InternalSearchOperation search = conn.processSearch(dn,
@@ -1530,17 +1771,17 @@
  }
  /**
   * find the current dn of an entry from its entry uuid.
   * find the current DN of an entry from its entry UUID.
   *
   * @param uuid the Entry Unique ID.
   * @return The curernt dn of the entry or null if there is no entry with
   *         the specified uuid.
   * @return The current DN of the entry or null if there is no entry with
   *         the specified UUID.
   */
  private DN findEntryDN(String uuid)
  {
    try
    {
      InternalSearchOperation search = conn.processSearch(baseDN,
      InternalSearchOperation search = conn.processSearch(baseDn,
            SearchScope.WHOLE_SUBTREE,
            SearchFilter.createFilterFromString("entryuuid="+uuid));
      if (search.getResultCode() == ResultCode.SUCCESS)
@@ -1570,7 +1811,7 @@
   * @return true if the process is completed, false if it must continue..
   */
  private boolean solveNamingConflict(ModifyOperation op,
      UpdateMessage msg)
      ModifyMsg msg)
  {
    ResultCode result = op.getResultCode();
    ModifyContext ctx = (ModifyContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1601,6 +1842,57 @@
        return true;
      }
    }
    else if (result == ResultCode.NOT_ALLOWED_ON_RDN)
    {
      DN currentDN = findEntryDN(entryUid);
      RDN currentRDN = null;
      if (currentDN != null)
      {
        currentRDN = currentDN.getRDN();
      }
      else
      {
        // The entry does not exist anymore.
        numResolvedNamingConflicts.incrementAndGet();
        return true;
      }
      // The modify operation is trying to delete the value that is
      // currently used in the RDN. We need to alter the modify so that it does
      // not remove the current RDN value(s).
      List<Modification> mods = op.getModifications();
      for (Modification mod : mods)
      {
        AttributeType modAttrType = mod.getAttribute().getAttributeType();
        if ((mod.getModificationType() == ModificationType.DELETE) ||
            (mod.getModificationType() == ModificationType.REPLACE))
        {
          if (currentRDN.hasAttributeType(modAttrType))
          {
            // the attribute can't be deleted because it is used
            // in the RDN, turn this operation is a replace with the
            // current RDN value(s);
            mod.setModificationType(ModificationType.REPLACE);
            Attribute newAttribute = mod.getAttribute();
            AttributeBuilder attrBuilder;
            if (newAttribute == null)
            {
              attrBuilder = new AttributeBuilder(modAttrType);
            }
            else
            {
              attrBuilder = new AttributeBuilder(newAttribute);
            }
            attrBuilder.add(currentRDN.getAttributeValue(modAttrType));
            mod.setAttribute(attrBuilder.toAttribute());
          }
        }
      }
      msg.setMods(mods);
      numResolvedNamingConflicts.incrementAndGet();
      return false;
    }
    else
    {
      // The other type of errors can not be caused by naming conflicts.
@@ -1621,7 +1913,7 @@
  * @return true if the process is completed, false if it must continue..
  */
 private boolean solveNamingConflict(DeleteOperation op,
     UpdateMessage msg)
     UpdateMsg msg)
 {
   ResultCode result = op.getResultCode();
   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1691,7 +1983,7 @@
 * @throws Exception When the operation is not valid.
 */
private boolean solveNamingConflict(ModifyDNOperation op,
    UpdateMessage msg) throws Exception
    UpdateMsg msg) throws Exception
{
  ResultCode result = op.getResultCode();
  ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1710,40 +2002,43 @@
   *  - don't do anything if the operation is replayed.
   */
  // Construct the new DN to use for the entry.
  DN entryDN = op.getEntryDN();
  DN newSuperior = findEntryDN(newSuperiorID);
  RDN newRDN = op.getNewRDN();
  DN parentDN;
  if (newSuperior == null)
  {
    parentDN = entryDN.getParent();
  }
  else
  {
    parentDN = newSuperior;
  }
  if ((parentDN == null) || parentDN.isNullDN())
  {
    /* this should never happen
     * can't solve any conflict in this case.
     */
    throw new Exception("operation parameters are invalid");
  }
  DN newDN = parentDN.concat(newRDN);
  // get the current DN of this entry in the database.
  DN currentDN = findEntryDN(entryUid);
  // Construct the new DN to use for the entry.
  DN entryDN = op.getEntryDN();
  DN newSuperior = null;
  RDN newRDN = op.getNewRDN();
  if (newSuperiorID != null)
  {
    newSuperior = findEntryDN(newSuperiorID);
  }
  else
  {
    newSuperior = entryDN.getParent();
  }
  //If we could not find the new parent entry, we missed this entry
  // earlier or it has disappeared from the database
  // Log this information for the repair tool and mark the entry
  // as conflicting.
  // stop the processing.
  if (newSuperior == null)
  {
    markConflictEntry(op, currentDN, currentDN.getParent().concat(newRDN));
    numUnresolvedNamingConflicts.incrementAndGet();
    return true;
  }
  DN newDN = newSuperior.concat(newRDN);
  if (currentDN == null)
  {
    // The entry targetted by the Modify DN is not in the database
    // The entry targeted by the Modify DN is not in the database
    // anymore.
    // This is a conflict between a delete and this modify DN.
    // The entry has been deleted anymore so we can safely assume
    // The entry has been deleted, we can safely assume
    // that the operation is completed.
    numResolvedNamingConflicts.incrementAndGet();
    return true;
@@ -1758,19 +2053,8 @@
    return true;
  }
  // If we could not find the new parent entry, we missed this entry
  // earlier or it has disappeared from the database
  // Log this information for the repair tool and mark the entry
  // as conflicting.
  // stop the processing.
  if (newSuperior == null)
  {
    markConflictEntry(op, currentDN, newDN);
    numUnresolvedNamingConflicts.incrementAndGet();
    return true;
  }
  if ((result == ResultCode.NO_SUCH_OBJECT) ||
      (result == ResultCode.UNWILLING_TO_PERFORM) ||
      (result == ResultCode.OBJECTCLASS_VIOLATION))
  {
    /*
@@ -1858,7 +2142,7 @@
        msg.setDn(generateConflictRDN(entryUid,
                    op.getEntryDN().getRDN().toString()) + ","
                    + baseDN);
                    + baseDn);
        // reset the parent uid so that the check done is the handleConflict
        // phase does not fail.
        msg.setParentUid(null);
@@ -1964,7 +2248,7 @@
      }
    } catch (DirectoryException e)
    {
      // log errror and information for the REPAIR tool.
      // log error and information for the REPAIR tool.
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_EXCEPTION_RENAME_CONFLICT_ENTRY.get());
      mb.append(String.valueOf(entryDN));
@@ -1979,7 +2263,7 @@
  /**
   * Rename an entry that was conflicting so that it stays below the
   * baseDN of the replicationDomain.
   * baseDn of the replicationDomain.
   *
   * @param conflictOp The Operation that caused the conflict.
   * @param dn         The DN of the entry to be renamed.
@@ -1991,7 +2275,7 @@
      InternalClientConnection.getRootConnection();
    ModifyDNOperation newOp = conn.processModifyDN(
        dn, generateDeleteConflictDn(uid, dn),false, baseDN);
        dn, generateDeleteConflictDn(uid, dn),false, baseDn);
    if (newOp.getResultCode() != ResultCode.SUCCESS)
    {
@@ -2022,11 +2306,10 @@
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    AttributeType attrType =
      DirectoryServer.getAttributeType(DS_SYNC_CONFLICT, true);
    LinkedHashSet<AttributeValue> values = new LinkedHashSet<AttributeValue>();
    values.add(new AttributeValue(attrType, conflictDN.toString()));
    Attribute attr = new Attribute(attrType, DS_SYNC_CONFLICT, values);
    AttributeType attrType = DirectoryServer.getAttributeType(DS_SYNC_CONFLICT,
        true);
    Attribute attr = Attributes.create(attrType, new AttributeValue(
        attrType, conflictDN.toString()));
    List<Modification> mods = new ArrayList<Modification>();
    Modification mod = new Modification(ModificationType.REPLACE, attr);
    mods.add(mod);
@@ -2042,7 +2325,7 @@
      logError(mb.toMessage());
    }
    // Generate an alert to let the administratot know that some
    // Generate an alert to let the administration know that some
    // conflict could not be solved.
    Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(conflictDN.toString());
    DirectoryServer.sendAlertNotification(this,
@@ -2055,12 +2338,12 @@
   *
   * @param msg            The conflicting Add Operation.
   *
   * @throws ASN1Exception When an encoding error happenned manipulating the
   * @throws ASN1Exception When an encoding error happened manipulating the
   *                       msg.
   */
  private void addConflict(AddMsg msg) throws ASN1Exception
  {
    // Generate an alert to let the administratot know that some
    // Generate an alert to let the administrator know that some
    // conflict could not be solved.
    Message alertMessage = NOTE_UNRESOLVED_CONFLICT.get(msg.getDn());
    DirectoryServer.sendAlertNotification(this,
@@ -2215,7 +2498,7 @@
   * Get the server ID.
   * @return The server ID.
   */
  public int getServerId()
  public short getServerId()
  {
    return serverId;
  }
@@ -2273,7 +2556,7 @@
  /**
   * Enable back the domain after a previous disable.
   * The domain will connect back to a replication Server and
   * will recreate threads to listen for messages from the Sycnhronization
   * will recreate threads to listen for messages from the Synchronization
   * server.
   * The generationId will be retrieved or computed if necessary.
   * The ServerState will also be read again from the local database.
@@ -2291,7 +2574,7 @@
       * should we stop the modifications ?
       */
      logError(ERR_LOADING_GENERATION_ID.get(
          baseDN.toNormalizedString(), e.getLocalizedMessage()));
          baseDn.toNormalizedString(), e.getLocalizedMessage()));
      return;
    }
@@ -2349,7 +2632,7 @@
  public ResultCode saveGenerationId(long generationId)
  {
    // The generationId is stored in the root entry of the domain.
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDn.toString());
    ArrayList<ASN1OctetString> values = new ArrayList<ASN1OctetString>();
    ASN1OctetString value = new ASN1OctetString(Long.toString(generationId));
@@ -2383,7 +2666,7 @@
        Message message = ERR_UPDATING_GENERATION_ID.get(
            op.getResultCode().getResultCodeName() + " " +
            op.getErrorMessage(),
            baseDN.toString());
            baseDn.toString());
        logError(message);
      }
    }
@@ -2410,9 +2693,9 @@
    if (debugEnabled())
      TRACER.debugInfo(
          "Attempt to read generation ID from DB " + baseDN.toString());
          "Attempt to read generation ID from DB " + baseDn.toString());
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDN.toString());
    ASN1OctetString asn1BaseDn = new ASN1OctetString(baseDn.toString());
    boolean found = false;
    LDAPFilter filter;
    try
@@ -2442,7 +2725,7 @@
      Message message = ERR_SEARCHING_GENERATION_ID.get(
          search.getResultCode().getResultCodeName() + " " +
          search.getErrorMessage(),
          baseDN.toString());
          baseDn.toString());
      logError(message);
    }
@@ -2460,27 +2743,26 @@
        if (attrs != null)
        {
          Attribute attr = attrs.get(0);
          LinkedHashSet<AttributeValue> values = attr.getValues();
          if (values.size()>1)
          if (attr.size()>1)
          {
            Message message = ERR_LOADING_GENERATION_ID.get(
                baseDN.toString(), "#Values=" + values.size() +
                baseDn.toString(), "#Values=" + attr.size() +
                " Must be exactly 1 in entry " +
                resultEntry.toLDIFString());
            logError(message);
          }
          else if (values.size() == 1)
          else if (attr.size() == 1)
          {
            found=true;
            try
            {
              generationId = Long.decode(values.iterator().next().
              generationId = Long.decode(attr.iterator().next().
                  getStringValue());
            }
            catch(Exception e)
            {
              Message message = ERR_LOADING_GENERATION_ID.get(
                baseDN.toString(), e.getLocalizedMessage());
                baseDn.toString(), e.getLocalizedMessage());
              logError(message);
            }
          }
@@ -2495,7 +2777,7 @@
      if (debugEnabled())
        TRACER.debugInfo("Generation ID created for domain base DN=" +
            baseDN.toString() +
            baseDn.toString() +
            " generationId=" + generationId);
    }
    else
@@ -2503,7 +2785,7 @@
      generationIdSavedStatus = true;
      if (debugEnabled())
        TRACER.debugInfo(
            "Generation ID successfully read from domain base DN=" + baseDN +
            "Generation ID successfully read from domain base DN=" + baseDn +
            " generationId=" + generationId);
    }
    return generationId;
@@ -2528,19 +2810,20 @@
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
          baseDN.toNormalizedString());
          baseDn.toNormalizedString());
      throw new DirectoryException(
         resultCode, message);
    }
    ResetGenerationId genIdMessage = null;
    ResetGenerationIdMsg genIdMessage = null;
    if (generationIdNewValue == null)
    {
      genIdMessage = new ResetGenerationId(this.generationId);
      genIdMessage = new ResetGenerationIdMsg(this.generationId);
    }
    else
    {
      genIdMessage = new ResetGenerationId(generationIdNewValue);
      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
    }
    broker.publish(genIdMessage);
  }
@@ -2574,7 +2857,7 @@
   */
  public byte[] receiveEntryBytes()
  {
    ReplicationMessage msg;
    ReplicationMsg msg;
    while (true)
    {
      try
@@ -2584,7 +2867,7 @@
        if (debugEnabled())
          TRACER.debugVerbose(
              " sid:" + this.serverId +
              " base DN:" + this.baseDN +
              " base DN:" + this.baseDn +
              " Import EntryBytes received " + msg);
        if (msg == null)
        {
@@ -2592,26 +2875,26 @@
          return null;
        }
        if (msg instanceof EntryMessage)
        if (msg instanceof EntryMsg)
        {
          EntryMessage entryMsg = (EntryMessage)msg;
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMessage)
        else if (msg instanceof DoneMsg)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          return null;
        }
        else if (msg instanceof ErrorMessage)
        else if (msg instanceof ErrorMsg)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMessage errorMsg = (ErrorMessage)msg;
          ErrorMsg errorMsg = (ErrorMsg)msg;
          ieContext.exception = new DirectoryException(
                                      ResultCode.OTHER,
                                      errorMsg.getDetails());
@@ -2637,7 +2920,7 @@
   * on going.
   * @param errorMsg The error message received.
   */
  protected void abandonImportExport(ErrorMessage errorMsg)
  protected void abandonImportExport(ErrorMsg errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
@@ -2645,7 +2928,7 @@
    if (debugEnabled())
      TRACER.debugVerbose(
          " abandonImportExport:" + this.serverId +
          " base DN:" + this.baseDN +
          " base DN:" + this.baseDn +
          " Error Msg received " + errorMsg);
    if (ieContext != null)
@@ -2730,8 +3013,8 @@
  throws DirectoryException
  {
    long genID = 0;
    Backend backend = retrievesBackend(this.baseDN);
    long bec = backend.numSubordinates(baseDN, true) + 1;
    Backend backend = retrievesBackend(this.baseDn);
    long bec = backend.numSubordinates(baseDn, true) + 1;
    long entryCount = (bec<1000?bec:1000);
    //  Acquire a shared lock for the backend.
@@ -2764,10 +3047,10 @@
    if (checksumOutput)
    {
      ros = new ReplLDIFOutputStream(this, entryCount);
      os = new CheckedOutputStream(ros, new Adler32());
      os = new CheckedOutputStream(ros, new GenerationIdChecksum());
      try
      {
        os.write((Long.toString(backend.numSubordinates(baseDN, true) + 1)).
        os.write((Long.toString(backend.numSubordinates(baseDn, true) + 1)).
            getBytes());
      }
      catch(Exception e)
@@ -2782,9 +3065,9 @@
    }
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
    // baseDN branch is the only one included in the export
    // baseDn branch is the only one included in the export
    List<DN> includeBranches = new ArrayList<DN>(1);
    includeBranches.add(this.baseDN);
    includeBranches.add(this.baseDn);
    exportConfig.setIncludeBranches(includeBranches);
    // For the checksum computing mode, only consider the 'stable' attributes
@@ -2838,19 +3121,15 @@
    }
    finally
    {
      // Clean up after the export by closing the export config.
      // Will also flush the export and export the remaining entries.
      exportConfig.close();
      if (checksumOutput)
      {
        genID =
         ((CheckedOutputStream)os).getChecksum().getValue();
      }
      else
      {
        // Clean up after the export by closing the export config.
        // Will also flush the export and export the remaining entries.
        // This is a real export where writer has been initialized.
        exportConfig.close();
      }
      //  Release the shared lock on the backend.
      try
@@ -2882,12 +3161,12 @@
   * Retrieves the backend related to the domain.
   *
   * @return The backend of that domain.
   * @param baseDN The baseDN to retrieve the backend
   * @param baseDn The baseDn to retrieve the backend
   */
  protected static Backend retrievesBackend(DN baseDN)
  protected static Backend retrievesBackend(DN baseDn)
  {
    // Retrieves the backend related to this domain
    return DirectoryServer.getBackend(baseDN);
    return DirectoryServer.getBackend(baseDn);
  }
  /**
@@ -2909,7 +3188,7 @@
   */
  public void exportLDIFEntry(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMessage
    // If an error was raised - like receiving an ErrorMsg
    // we just let down the export.
    if (ieContext.exception != null)
    {
@@ -2918,7 +3197,7 @@
      throw ioe;
    }
    EntryMessage entryMessage = new EntryMessage(
    EntryMsg entryMessage = new EntryMsg(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
@@ -2949,8 +3228,8 @@
    acquireIEContext();
    ieContext.initializeTask = initTask;
    InitializeRequestMessage initializeMsg = new InitializeRequestMessage(
        baseDN, serverId, source);
    InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
        baseDn, serverId, source);
    // Publish Init request msg
    broker.publish(initializeMsg);
@@ -3014,7 +3293,7 @@
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMessage.ALL_SERVERS;
      return RoutableMsg.ALL_SERVERS;
    }
    // So should be a serverID
@@ -3092,10 +3371,17 @@
  public void initializeRemote(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
      Short.toString(serverId),
      baseDn.toString(),
      Short.toString(requestorID));
    logError(msg);
    boolean contextAcquired=false;
    try
    {
      Backend backend = retrievesBackend(this.baseDN);
      Backend backend = retrievesBackend(this.baseDn);
      if (!backend.supportsLDIFExport())
      {
@@ -3110,7 +3396,7 @@
      // The number of entries to be exported is the number of entries under
      // the base DN entry and the base entry itself.
      long entryCount = backend.numSubordinates(baseDN, true) + 1;
      long entryCount = backend.numSubordinates(baseDn, true) + 1;
      ieContext.exportTarget = target;
      if (initTask != null)
      {
@@ -3119,15 +3405,15 @@
      ieContext.setCounters(entryCount, entryCount);
      // Send start message to the peer
      InitializeTargetMessage initializeMessage = new InitializeTargetMessage(
          baseDN, serverId, ieContext.exportTarget, requestorID, entryCount);
      InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
          baseDn, serverId, ieContext.exportTarget, requestorID, entryCount);
      broker.publish(initializeMessage);
      exportBackend(false);
      // Notify the peer of the success
      DoneMessage doneMsg = new DoneMessage(serverId,
      DoneMsg doneMsg = new DoneMsg(serverId,
          initializeMessage.getDestination());
      broker.publish(doneMsg);
@@ -3136,8 +3422,8 @@
    catch(DirectoryException de)
    {
      // Notify the peer of the failure
      ErrorMessage errorMsg =
        new ErrorMessage(target,
      ErrorMsg errorMsg =
        new ErrorMsg(target,
                         de.getMessageObject());
      broker.publish(errorMsg);
@@ -3146,6 +3432,12 @@
      throw(de);
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
      Short.toString(serverId),
      baseDn.toString(),
      Short.toString(requestorID));
    logError(msg);
  }
  /**
@@ -3180,13 +3472,48 @@
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   */
  protected void initialize(InitializeTargetMessage initializeMessage)
  protected void initialize(InitializeTargetMsg initializeMessage)
  throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    Backend backend = retrievesBackend(baseDN);
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
      Short.toString(serverId),
      baseDn.toString(),
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
    // Go into full update status
    // Use synchronized as somebody could ask another status change at the same
    // time
    synchronized (status)
    {
      StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT;
      ServerStatus newStatus =
        StatusMachine.computeNewStatus(status, event);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
          Short.toString(serverId), status.toString(), event.toString());
        logError(msg);
        return;
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn +
        " new status is: " + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
      updateDomainForNewStatus();
    }
    Backend backend = retrievesBackend(baseDn);
    try
    {
@@ -3220,7 +3547,7 @@
        importConfig =
          new LDIFImportConfig(ieContext.ldifImportInputStream);
        List<DN> includeBranches = new ArrayList<DN>();
        includeBranches.add(this.baseDN);
        includeBranches.add(this.baseDn);
        importConfig.setIncludeBranches(includeBranches);
        importConfig.setAppendToExistingData(false);
@@ -3254,7 +3581,7 @@
        // Re-enable backend
        closeBackendImport(backend);
        backend = retrievesBackend(baseDN);
        backend = retrievesBackend(baseDn);
      }
      try
@@ -3294,6 +3621,12 @@
    {
      throw de;
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
      Short.toString(serverId),
      baseDn.toString(),
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
  }
  /**
@@ -3320,21 +3653,21 @@
  }
  /**
   * Retrieves a replication domain based on the baseDN.
   * Retrieves a replication domain based on the baseDn.
   *
   * @param baseDN The baseDN of the domain to retrieve
   * @param baseDn The baseDn of the domain to retrieve
   * @return The domain retrieved
   * @throws DirectoryException When an error occurred or no domain
   * match the provided baseDN.
   * match the provided baseDn.
   */
  public static ReplicationDomain retrievesReplicationDomain(DN baseDN)
  public static ReplicationDomain retrievesReplicationDomain(DN baseDn)
  throws DirectoryException
  {
    ReplicationDomain replicationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
    for (SynchronizationProvider provider :
    for (SynchronizationProvider<?> provider :
      DirectoryServer.getSynchronizationProviders())
    {
      if (!( provider instanceof MultimasterReplication))
@@ -3346,7 +3679,7 @@
      // From the domainDN retrieves the replication domain
      ReplicationDomain sdomain =
        MultimasterReplication.findDomain(baseDN, null);
        MultimasterReplication.findDomain(baseDn, null);
      if (sdomain == null)
      {
        break;
@@ -3365,7 +3698,7 @@
    {
      MessageBuilder mb = new MessageBuilder(ERR_NO_MATCHING_DOMAIN.get());
      mb.append(" ");
      mb.append(String.valueOf(baseDN));
      mb.append(String.valueOf(baseDn));
      throw new DirectoryException(ResultCode.OTHER,
         mb.toMessage());
    }
@@ -3378,11 +3711,11 @@
   */
  public Backend getBackend()
  {
    return retrievesBackend(baseDN);
    return retrievesBackend(baseDn);
  }
  /**
   * Returns a boolean indiciating if an import or export is currently
   * Returns a boolean indicating if an import or export is currently
   * processed.
   * @return The status
   */
@@ -3396,7 +3729,7 @@
  /**
   * Push the modifications contain the in given parameter has
   * Push the modifications contained in the given parameter as
   * a modification that would happen on a local server.
   * The modifications are not applied to the local database,
   * historical information is not updated but a ChangeNumber
@@ -3427,7 +3760,7 @@
   * @param configuration The configuration to check.
   * @param unacceptableReasons When the configuration is not acceptable, this
   *                            table is use to return the reasons why this
   *                            configuration is not acceptbale.
   *                            configuration is not acceptable.
   *
   * @return true if the configuration is acceptable, false other wise.
   */
@@ -3437,7 +3770,7 @@
    // Check that there is not already a domain with the same DN
    DN dn = configuration.getBaseDN();
    ReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
    if ((domain != null) && (domain.baseDN.equals(dn)))
    if ((domain != null) && (domain.baseDn.equals(dn)))
    {
      Message message = ERR_SYNC_INVALID_DN.get();
      unacceptableReasons.add(message);
@@ -3557,4 +3890,78 @@
    else
      return false;
  }
  /**
   * Gets the info for DSs in the topology (except us).
   * @return The info for DSs in the topology (except us)
   */
  public List<DSInfo> getDsList()
  {
    return dsList;
  }
  /**
   * Gets the info for RSs in the topology (except the one we are connected
   * to).
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  {
    return rsList;
  }
  /**
   * Tells if assured replication is enabled for this domain.
   * @return True if assured replication is enabled for this domain.
   */
  public boolean isAssured()
  {
    return assured;
  }
  /**
   * Gives the mode for the assured replication of the domain.
   * @return The mode for the assured replication of the domain.
   */
  public AssuredMode getAssuredMode()
  {
    return assuredMode;
  }
  /**
   * Gives the assured level of the replication of the domain.
   * @return The assured level of the replication of the domain.
   */
  public byte getAssuredSdLevel()
  {
    return assuredSdLevel;
  }
  /**
   * Gets the group id for this domain.
   * @return The group id for this domain.
   */
  public byte getGroupId()
  {
    return groupId;
  }
  /**
   * Gets the referrals URLs this domain publishes.
   * @return The referrals URLs this domain publishes.
   */
  public List<String> getRefUrls()
  {
    return refUrls;
  }
  /**
   * Gets the status for this domain.
   * @return The status for this domain.
   */
  public ServerStatus getStatus()
  {
    return status;
  }
}