mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
08.03.2008 7adb93986ace907531875e25be1f94d735fbb068
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -25,6 +25,7 @@
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
@@ -36,28 +37,31 @@
import static org.opends.server.util.StaticUtils.createEntry;
import static org.opends.server.util.StaticUtils.getFileForPath;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.replication.common.StatusMachine.*;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.service.ReplicationMonitor;
import java.util.Collection;
import org.opends.server.types.Attributes;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CheckedOutputStream;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.server.ConfigurationChangeListener;
@@ -68,7 +72,6 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.jeb.BackendImpl;
import org.opends.server.backends.task.Task;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
@@ -82,49 +85,31 @@
import org.opends.server.protocols.asn1.ASN1Exception;
import org.opends.server.protocols.asn1.ASN1OctetString;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPAttribute;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.EntryMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.tasks.InitializeTargetTask;
import org.opends.server.tasks.InitializeTask;
import org.opends.server.tasks.TaskUtils;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.ExistingFileBehavior;
import org.opends.server.types.AbstractOperation;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ConfigChangeResult;
@@ -133,6 +118,7 @@
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.ExistingFileBehavior;
import org.opends.server.types.LDAPException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
@@ -144,6 +130,7 @@
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchResultReference;
import org.opends.server.types.SearchScope;
import org.opends.server.types.SynchronizationProviderResult;
import org.opends.server.types.operation.PluginOperation;
@@ -163,9 +150,9 @@
 *  handle conflict resolution,
 *  handle protocol messages from the replicationServer.
 */
public class ReplicationDomain extends DirectoryThread
public class LDAPReplicationDomain extends ReplicationDomain
       implements ConfigurationChangeListener<ReplicationDomainCfg>,
                  AlertGenerator
                  AlertGenerator, InternalSearchListener
{
  /**
   * The fully-qualified name of this class.
@@ -185,37 +172,20 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private ReplicationMonitor monitor;
  private ReplicationBroker broker;
  // Thread waiting for incoming update messages for this domain and pushing
  // them to the global incoming update message queue for later processing by
  // replay threads.
  private ListenerThread listenerThread;
  // The update to replay message queue where the listener thread is going to
  // push incoming update messages.
  private LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue;
  private SortedMap<ChangeNumber, UpdateMsg> waitingAckMsgs =
    new TreeMap<ChangeNumber, UpdateMsg>();
  private AtomicInteger numRcvdUpdates = new AtomicInteger(0);
  private AtomicInteger numSentUpdates = new AtomicInteger(0);
  private AtomicInteger numProcessedUpdates = new AtomicInteger();
  private AtomicInteger numResolvedNamingConflicts = new AtomicInteger();
  private AtomicInteger numResolvedModifyConflicts = new AtomicInteger();
  private AtomicInteger numUnresolvedNamingConflicts = new AtomicInteger();
  private int debugCount = 0;
  private PersistentServerState state;
  private final PersistentServerState state;
  private int numReplayedPostOpCalled = 0;
  private int maxReceiveQueue = 0;
  private int maxSendQueue = 0;
  private int maxReceiveDelay = 0;
  private int maxSendDelay = 0;
  private long generationId = -1;
  private boolean generationIdSavedStatus = false;
  ChangeNumberGenerator generator;
  private ChangeNumberGenerator generator;
  /**
   * This object is used to store the list of update currently being
@@ -235,19 +205,8 @@
   */
  private RemotePendingChanges remotePendingChanges;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  private short serverId;
  // The context related to an import or export being processed
  // Null when none is being processed.
  private IEContext ieContext = null;
  private Collection<String> replicationServers;
  private DN baseDn;
  private boolean shutdown = false;
@@ -260,37 +219,10 @@
  private boolean disabled = false;
  private boolean stateSavingDisabled = false;
  private int window = 100;
  /*
   * Assured mode properties
   */
  // Is assured mode enabled or not for this domain ?
  private boolean assured = false;
  // Assured sub mode (used when assured is true)
  private AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  // Safe Data level (used when assuredMode is SAFE_DATA)
  private byte assuredSdLevel = (byte)1;
  // Timeout (in milliseconds) when waiting for acknowledgments
  private long assuredTimeout = 1000;
  // Group id
  private byte groupId = (byte)1;
  // Referrals urls to be published to other servers of the topology
  // TODO: fill that with all currently opened urls if no urls configured
  private List<String> refUrls = new ArrayList<String>();
  // Current status for this replicated domain
  private ServerStatus status = ServerStatus.NOT_CONNECTED_STATUS;
  /*
   * Properties for the last topology info received from the network.
   */
  // Info for other DSs.
  // Warning: does not contain info for us (for our server id)
  private List<DSInfo> dsList = new ArrayList<DSInfo>();
  // Info for other RSs.
  private List<RSInfo> rsList = new ArrayList<RSInfo>();
  // This list is used to temporary store operations that needs
  // to be replayed at session establishment time.
  private TreeSet<FakeOperation> replayOperations  =
    new TreeSet<FakeOperation>(new FakeOperationComparator());;
  /**
   * The isolation policy that this domain is going to use.
@@ -312,131 +244,47 @@
   */
  private boolean done = true;
  private ServerStateFlush flushThread;
  /**
   * This class contain the context related to an import or export
   * launched on the domain.
   * The thread that periodically saves the ServerState of this
   * LDAPReplicationDomain in the database.
   */
  private class IEContext
  private class  ServerStateFlush extends DirectoryThread
  {
    // The task that initiated the operation.
    Task initializeTask;
    // The input stream for the import
    ReplLDIFInputStream ldifImportInputStream = null;
    // The target in the case of an export
    short exportTarget = RoutableMsg.UNKNOWN_SERVER;
    // The source in the case of an import
    short importSource = RoutableMsg.UNKNOWN_SERVER;
    // The total entry count expected to be processed
    long entryCount = 0;
    // The count for the entry not yet processed
    long entryLeftCount = 0;
    // The exception raised when any
    DirectoryException exception = null;
    /**
     * Initializes the import/export counters with the provider value.
     * @param total
     * @param left
     * @throws DirectoryException
     */
    public void setCounters(long total, long left)
      throws DirectoryException
    protected ServerStateFlush()
    {
      entryCount = total;
      entryLeftCount = left;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setTotal(entryCount);
          ((InitializeTask)initializeTask).setLeft(entryCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setTotal(entryCount);
          ((InitializeTargetTask)initializeTask).setLeft(entryCount);
        }
      }
    }
    /**
     * Update the counters of the task for each entry processed during
     * an import or export.
     * @throws DirectoryException
     */
    public void updateCounters()
      throws DirectoryException
    {
      entryLeftCount--;
      if (initializeTask != null)
      {
        if (initializeTask instanceof InitializeTask)
        {
          ((InitializeTask)initializeTask).setLeft(entryLeftCount);
        }
        else if (initializeTask instanceof InitializeTargetTask)
        {
          ((InitializeTargetTask)initializeTask).setLeft(entryLeftCount);
        }
      }
      super("Replication State Saver for server id " +
            serverId + " and domain " + baseDn.toString());
    }
    /**
     * {@inheritDoc}
     */
    public String toString()
    {
      return new String("[ Entry count=" + this.entryCount +
                        ", Entry left count=" + this.entryLeftCount + "]");
    }
  }
  /**
   * This thread is launched when we want to export data to another server that
   * has requested to be initialized with the data of our backend.
   */
  private class ExportThread extends DirectoryThread
  {
    // Id of server that will receive updates
    private short target;
    /**
     * Constructor for the ExportThread.
     *
     * @param target Id of server that will receive updates
     */
    public ExportThread(short target)
    {
      super("Export thread " + serverId);
      this.target = target;
    }
    /**
     * Run method for this class.
     */
    @Override
    public void run()
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread starting.");
      }
      done = false;
      try
      while (shutdown  == false)
      {
        initializeRemote(target, target, null);
      } catch (DirectoryException de)
      {
      // An error message has been sent to the peer
      // Nothing more to do locally
        try
        {
          synchronized (this)
          {
            this.wait(1000);
            if (!disabled && !stateSavingDisabled )
            {
              // save the ServerState
              state.save();
            }
          }
        } catch (InterruptedException e)
        { }
      }
      if (debugEnabled())
      {
        TRACER.debugInfo("Export thread stopping.");
      }
      state.save();
      done = true;
    }
  }
@@ -447,18 +295,24 @@
   * @param updateToReplayQueue The queue for update messages to replay.
   * @throws ConfigException In case of invalid configuration.
   */
  public ReplicationDomain(ReplicationDomainCfg configuration,
  public LDAPReplicationDomain(ReplicationDomainCfg configuration,
    LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue)
    throws ConfigException
  {
    super("Replication State Saver for server id " + configuration.getServerId()
      + " and domain " + configuration.getBaseDN());
    super(configuration.getBaseDN().toNormalizedString(),
        (short) configuration.getServerId());
    /**
     * The time in milliseconds between heartbeats from the replication
     * server.  Zero means heartbeats are off.
     */
    long heartbeatInterval = 0;
    // Read the configuration parameters.
    replicationServers = configuration.getReplicationServer();
    Set<String> replicationServers = configuration.getReplicationServer();
    serverId = (short) configuration.getServerId();
    baseDn = configuration.getBaseDN();
    window  = configuration.getWindowSize();
    int window  = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    isolationpolicy = configuration.getIsolationPolicy();
    configDn = configuration.dn();
@@ -471,28 +325,22 @@
    switch (assuredType)
    {
      case NOT_ASSURED:
        assured = false;
        setAssured(false);
        break;
      case SAFE_DATA:
        assured = true;
        this.assuredMode = AssuredMode.SAFE_DATA_MODE;
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_DATA_MODE);
        break;
      case SAFE_READ:
        assured = true;
        this.assuredMode = AssuredMode.SAFE_READ_MODE;
        setAssured(true);
        setAssuredMode(AssuredMode.SAFE_READ_MODE);
        break;
    }
    this.assuredSdLevel = (byte)configuration.getAssuredSdLevel();
    this.groupId = (byte)configuration.getGroupId();
    this.assuredTimeout = configuration.getAssuredTimeout();
    SortedSet<String> urls = configuration.getReferralsUrl();
    if (urls != null)
    {
      for (String url : urls)
      {
        this.refUrls.add(url);
      }
    }
    setAssuredSdLevel((byte)configuration.getAssuredSdLevel());
    setAssuredTimeout(configuration.getAssuredTimeout());
    setGroupId((byte)configuration.getGroupId());
    setURLs(configuration.getReferralsUrl());
    /*
     * Modify conflicts are solved for all suffixes but the schema suffix
@@ -510,19 +358,6 @@
      solveConflictFlag = true;
    }
    /*
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNmber seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDn, serverId);
    /*
     * Create a replication monitor object responsible for publishing
     * monitoring information below cn=monitor.
     */
    monitor = new ReplicationMonitor(this);
    DirectoryServer.registerMonitorProvider(monitor);
    Backend backend = retrievesBackend(baseDn);
    if (backend == null)
    {
@@ -541,14 +376,12 @@
    }
    /*
     * create the broker object used to publish and receive changes
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNmber seen from all LDAP servers in the topology.
     */
    broker = new ReplicationBroker(this, state, baseDn, serverId,
        maxReceiveQueue, maxReceiveDelay, maxSendQueue, maxSendDelay, window,
        heartbeatInterval, generationId,
        new ReplSessionSecurity(configuration),getGroupId());
    state = new PersistentServerState(baseDn, serverId, getServerState());
    broker.start(replicationServers);
    startPublishService(replicationServers, window, heartbeatInterval);
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
@@ -557,14 +390,12 @@
     * The generator time is adjusted to the time of the last CN received from
     * remote other servers.
     */
    generator =
      new ChangeNumberGenerator(serverId, state);
    generator = getGenerator();
    pendingChanges =
      new PendingChanges(generator,
                         broker, state);
      new PendingChanges(generator, this);
    remotePendingChanges = new RemotePendingChanges(generator, state);
    remotePendingChanges = new RemotePendingChanges(getServerState());
    // listen for changes on the configuration
    configuration.addChangeListener(this);
@@ -573,7 +404,6 @@
    DirectoryServer.registerAlertGenerator(this);
  }
  /**
   * Returns the base DN of this ReplicationDomain.
   *
@@ -741,7 +571,7 @@
    {
      // this isolation policy specifies that the updates are denied
      // when the broker is not connected.
      return broker.isConnected();
      return isConnected();
    }
    // we should never get there as the only possible policies are
    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
@@ -931,322 +761,6 @@
  }
  /**
   * Receives an update message from the replicationServer.
   * also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
  public UpdateMsg receive()
  {
    UpdateMsg update = null;
    while ( (update == null) && (!shutdown) )
    {
      InitializeRequestMsg initMsg = null;
      ReplicationMsg msg;
      try
      {
        msg = broker.receive();
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        if (debugEnabled())
          if (!(msg instanceof HeartbeatMsg))
            TRACER.debugVerbose("Message received <" + msg + ">");
        if (msg instanceof AckMsg)
        {
          AckMsg ack = (AckMsg) msg;
          receiveAck(ack);
        }
        else if (msg instanceof InitializeRequestMsg)
        {
          // Another server requests us to provide entries
          // for a total update
          initMsg = (InitializeRequestMsg)msg;
        }
        else if (msg instanceof InitializeTargetMsg)
        {
          // Another server is exporting its entries to us
          InitializeTargetMsg importMsg = (InitializeTargetMsg) msg;
          try
          {
            // This must be done while we are still holding the
            // broker lock because we are now going to receive a
            // bunch of entries from the remote server and we
            // want the import thread to catch them and
            // not the ListenerThread.
            initialize(importMsg);
          }
          catch(DirectoryException de)
          {
            // Returns an error message to notify the sender
            ErrorMsg errorMsg =
              new ErrorMsg(importMsg.getsenderID(),
                  de.getMessageObject());
            MessageBuilder mb = new MessageBuilder();
            mb.append(de.getMessageObject());
            TRACER.debugInfo(Message.toString(mb.toMessage()));
            broker.publish(errorMsg);
          }
        }
        else if (msg instanceof ErrorMsg)
        {
          if (ieContext != null)
          {
            // This is an error termination for the 2 following cases :
            // - either during an export
            // - or before an import really started
            //   For example, when we publish a request and the
            //  replicationServer did not find any import source.
            abandonImportExport((ErrorMsg)msg);
          }
          else
          {
            /*
             * Log error message
             */
            ErrorMsg errorMsg = (ErrorMsg)msg;
            logError(ERR_ERROR_MSG_RECEIVED.get(
                errorMsg.getDetails()));
          }
        }
        if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg)msg;
          receiveTopo(topoMsg);
        }
        if (msg instanceof ChangeStatusMsg)
        {
          ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
          receiveChangeStatus(csMsg);
        }
        else if (msg instanceof UpdateMsg)
        {
          update = (UpdateMsg) msg;
          receiveUpdate(update);
        }
      }
      catch (SocketTimeoutException e)
      {
        // just retry
      }
      // Test if we have received and export request message and
      // if that's the case handle it now.
      // This must be done outside of the portion of code protected
      // by the broker lock so that we keep receiveing update
      // when we are doing and export and so that a possible
      // closure of the socket happening when we are publishing the
      // entries to the remote can be handled by the other
      // replay thread when they call this method and therefore the
      // broker.receive() method.
      if (initMsg != null)
      {
        // Do this work in a thread to allow replay thread continue working
        ExportThread exportThread = new ExportThread(initMsg.getsenderID());
        exportThread.start();
      }
    }
    return update;
  }
  /**
   * Processes an incoming TopologyMsg.
   * Updates the structures for the local view of the topology.
   *
   * @param topoMsg The topology information received from RS.
   */
  public void receiveTopo(TopologyMsg topoMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn
        + " received topology info update:\n" + topoMsg);
    // Store new lists
    synchronized(getDsList())
    {
      synchronized(getRsList())
      {
        dsList = topoMsg.getDsList();
        rsList = topoMsg.getRsList();
      }
    }
  }
  /**
   * Set the initial status of the domain, once he is connected to the topology.
   * @param initStatus The status to enter the state machine with
   */
  public void setInitialStatus(ServerStatus initStatus)
  {
    // Sanity check: is it a valid initial status?
    if (!isValidInitialStatus(initStatus))
    {
      Message msg = ERR_DS_INVALID_INIT_STATUS.get(initStatus.toString(),
        baseDn.toString(), Short.toString(serverId));
      logError(msg);
    } else
    {
      status = initStatus;
    }
  }
  /**
   * Processes an incoming ChangeStatusMsg. Compute new status according to
   * given order. Then update domain for being compliant with new status
   * definition.
   * @param csMsg The received status message
   */
  private void receiveChangeStatus(ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn +
        " received change status message:\n" + csMsg);
    ServerStatus reqStatus = csMsg.getRequestedStatus();
    // Translate requested status to a state machine event
    StatusMachineEvent event = StatusMachineEvent.statusToEvent(reqStatus);
    if (event == StatusMachineEvent.INVALID_EVENT)
    {
      Message msg = ERR_DS_INVALID_REQUESTED_STATUS.get(reqStatus.toString(),
        baseDn.toString(), Short.toString(serverId));
      logError(msg);
      return;
    }
    // Compute new status and do matching tasks
    // Use synchronized as admin task (thread) could order to go in admin status
    // for instance (concurrent with receive thread).
    synchronized (status)
    {
      ServerStatus newStatus =
        StatusMachine.computeNewStatus(status, event);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
          Short.toString(serverId), status.toString(), event.toString());
        logError(msg);
        return;
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn +
        " new status is: " + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
      updateDomainForNewStatus();
    }
  }
  /**
   * Called when first connection or disconnection detected.
   */
  public void toNotConnectedStatus()
  {
    // Go into not connected status
    // Use synchronized as somebody could ask another status change at the same
    // time
    synchronized (status)
    {
      StatusMachineEvent event =
        StatusMachineEvent.TO_NOT_CONNECTED_STATUS_EVENT;
      ServerStatus newStatus =
        StatusMachine.computeNewStatus(status, event);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        Message msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
          Short.toString(serverId), status.toString(), event.toString());
        logError(msg);
        return;
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
        TRACER.debugInfo("Replication domain " + baseDn +
          " new status is: " + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
      updateDomainForNewStatus();
    }
  }
  /**
   * Perform whatever actions are needed to apply properties for being
   * compliant with new status. Must be called in synchronized section for
   * status. The new status is already set in status variable.
   */
  private void updateDomainForNewStatus()
  {
    switch (status)
    {
      case NOT_CONNECTED_STATUS:
        break;
      case NORMAL_STATUS:
        break;
      case DEGRADED_STATUS:
        break;
      case FULL_UPDATE_STATUS:
        // Signal RS we just entered the full update status
        broker.signalStatusChange(status);
        break;
      case BAD_GEN_ID_STATUS:
        break;
      default:
        if (debugEnabled())
          TRACER.debugInfo("updateDomainForNewStatus: unexpected status: " +
            status);
    }
  }
  /**
   * Do the necessary processing when an UpdateMsg was received.
   *
   * @param update The received UpdateMsg.
   */
  public void receiveUpdate(UpdateMsg update)
  {
    remotePendingChanges.putRemoteUpdate(update);
    numRcvdUpdates.incrementAndGet();
  }
  /**
   * Do the necessary processing when an AckMsg is received.
   *
   * @param ack The AckMsg that was received.
   */
  public void receiveAck(AckMsg ack)
  {
    UpdateMsg update;
    ChangeNumber changeNumber = ack.getChangeNumber();
    synchronized (waitingAckMsgs)
    {
      update = waitingAckMsgs.remove(changeNumber);
    }
    if (update != null)
    {
      synchronized (update)
      {
        update.notify();
      }
    }
  }
  /**
   * Check if an operation must be synchronized.
   * Also update the list of pending changes and the server RUV
   * @param op the operation
@@ -1258,19 +772,17 @@
    {
      numReplayedPostOpCalled++;
    }
    UpdateMsg msg = null;
    LDAPUpdateMsg msg = null;
    // Note that a failed non-replication operation might not have a change
    // number.
    ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
    boolean isAssured = isAssured(op);
    if ((result == ResultCode.SUCCESS) && (!op.isSynchronizationOperation()))
    {
      // Generate a replication message for a successful non-replication
      // operation.
      msg = UpdateMsg.generateMsg(op);
      msg = LDAPUpdateMsg.generateMsg(op);
      if (msg == null)
      {
@@ -1307,16 +819,6 @@
        return;
      }
      if (msg != null && isAssured)
      {
        synchronized (waitingAckMsgs)
        {
          // Add the assured message to the list of update that are
          // waiting acknowledgements
          waitingAckMsgs.put(curChangeNumber, msg);
        }
      }
      if (generationIdSavedStatus != true)
      {
        this.saveGenerationId(generationId);
@@ -1334,53 +836,8 @@
    if (!op.isSynchronizationOperation())
    {
      int pushedChanges = pendingChanges.pushCommittedChanges();
      numSentUpdates.addAndGet(pushedChanges);
      pendingChanges.pushCommittedChanges();
    }
    // Wait for acknowledgement of an assured message.
    if (msg != null && isAssured)
    {
      synchronized (msg)
      {
        while (waitingAckMsgs.containsKey(msg.getChangeNumber()))
        {
          // TODO : should have a configurable timeout to get
          // out of this loop
          try
          {
            msg.wait(1000);
          } catch (InterruptedException e)
          { }
        }
      }
    }
  }
  /**
   * get the number of updates received by the replication plugin.
   *
   * @return the number of updates received
   */
  public int getNumRcvdUpdates()
  {
    if (numRcvdUpdates != null)
      return numRcvdUpdates.get();
    else
      return 0;
  }
  /**
   * Get the number of updates sent by the replication plugin.
   *
   * @return the number of updates sent
   */
  public int getNumSentUpdates()
  {
    if (numSentUpdates != null)
      return numSentUpdates.get();
    else
      return 0;
  }
  /**
@@ -1397,27 +854,6 @@
  }
  /**
   * Increment the number of processed updates.
   */
  public void incProcessedUpdates()
  {
    numProcessedUpdates.incrementAndGet();
  }
  /**
   * get the number of updates replayed by the replication.
   *
   * @return The number of updates replayed by the replication
   */
  public int getNumProcessedUpdates()
  {
    if (numProcessedUpdates != null)
      return numProcessedUpdates.get();
    else
      return 0;
  }
  /**
   * get the number of updates replayed successfully by the replication.
   *
   * @return The number of updates replayed successfully
@@ -1428,16 +864,6 @@
  }
  /**
   * get the ServerState.
   *
   * @return the ServerState
   */
  public ServerState getServerState()
  {
    return state;
  }
  /**
   * Get the debugCount.
   *
   * @return Returns the debugCount.
@@ -1448,49 +874,6 @@
  }
  /**
   * Send an Ack message.
   *
   * @param changeNumber The ChangeNumber for which the ack must be sent.
   */
  public void ack(ChangeNumber changeNumber)
  {
    broker.publish(new AckMsg(changeNumber));
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public void run()
  {
    done = false;
    // Create the listener thread
    listenerThread = new ListenerThread(this, updateToReplayQueue);
    listenerThread.start();
    while (shutdown  == false)
    {
      try
      {
        synchronized (this)
        {
          this.wait(1000);
          if (!disabled && !stateSavingDisabled )
          {
            // save the RUV
            state.save();
          }
        }
      } catch (InterruptedException e)
      { }
    }
    state.save();
    done = true;
  }
  /**
   * Shutdown this ReplicationDomain.
   */
  public void shutdown()
@@ -1498,27 +881,19 @@
    // stop the flush thread
    shutdown = true;
    // Stop the listener thread
    if (listenerThread != null)
    // stop the thread in charge of flushing the ServerState.
    if (flushThread != null)
    {
      listenerThread.shutdown();
      synchronized (flushThread)
      {
        flushThread.notify();
      }
    }
    synchronized (this)
    {
      this.notify();
    }
    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
    DirectoryServer.deregisterAlertGenerator(this);
    // stop the ReplicationBroker
    broker.stop();
    // Wait for the listener thread to stop
    if (listenerThread != null)
      listenerThread.waitForShutdown();
    // stop the ReplicationDomain
    stopDomain();
    // wait for completion of the persistentServerState thread.
    try
@@ -1534,26 +909,11 @@
  }
  /**
   * Get the name of the replicationServer to which this domain is currently
   * connected.
   *
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   */
  public String getReplicationServer()
  {
    if (broker != null)
      return broker.getReplicationServer();
    else
      return "Not connected";
  }
  /**
   * Create and replay a synchronized Operation from an UpdateMsg.
   *
   * @param msg The UpdateMsg to be replayed.
   */
  public void replay(UpdateMsg msg)
  public void replay(LDAPUpdateMsg msg)
  {
    Operation op = null;
    boolean done = false;
@@ -1565,6 +925,7 @@
    // whose dependency has been replayed until no more left.
    do
    {
      String replayErrorMsg = null;
      try
      {
        op = msg.createOperation(conn);
@@ -1572,12 +933,12 @@
        while ((!dependency) && (!done) && (retryCount-- > 0))
        {
          // Try replay the operation
          op.setInternalOperation(true);
          op.setSynchronizationOperation(true);
          changeNumber = OperationContext.getChangeNumber(op);
          ((AbstractOperation) op).run();
          // Try replay the operation
          ResultCode result = op.getResultCode();
          if (result != ResultCode.SUCCESS)
@@ -1638,7 +999,7 @@
            op.getErrorMessage().toString());
          logError(message);
          numUnresolvedNamingConflicts.incrementAndGet();
          replayErrorMsg = message.toString();
          updateError(changeNumber);
        }
      } catch (ASN1Exception e)
@@ -1646,16 +1007,19 @@
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
      } catch (LDAPException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
      } catch (DataFormatException e)
      {
        Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
          String.valueOf(msg) + stackTraceToSingleLineString(e));
        logError(message);
        replayErrorMsg = message.toString();
      } catch (Exception e)
      {
        if (changeNumber != null)
@@ -1669,21 +1033,20 @@
          Message message = ERR_EXCEPTION_REPLAYING_OPERATION.get(
            stackTraceToSingleLineString(e), op.toString());
          logError(message);
          replayErrorMsg = message.toString();
          updateError(changeNumber);
        } else
        {
          Message message = ERR_EXCEPTION_DECODING_OPERATION.get(
            String.valueOf(msg) + stackTraceToSingleLineString(e));
          logError(message);
          replayErrorMsg = message.toString();
        }
      } finally
      {
        if (!dependency)
        {
          broker.updateWindowAfterReplay();
          if (msg.isAssured())
            ack(msg.getChangeNumber());
          incProcessedUpdates();
          processUpdateDone(msg, replayErrorMsg);
        }
      }
@@ -1913,7 +1276,7 @@
  * @return true if the process is completed, false if it must continue..
  */
 private boolean solveNamingConflict(DeleteOperation op,
     UpdateMsg msg)
     LDAPUpdateMsg msg)
 {
   ResultCode result = op.getResultCode();
   DeleteContext ctx = (DeleteContext) op.getAttachment(SYNCHROCONTEXT);
@@ -1983,7 +1346,7 @@
 * @throws Exception When the operation is not valid.
 */
private boolean solveNamingConflict(ModifyDNOperation op,
    UpdateMsg msg) throws Exception
    LDAPUpdateMsg msg) throws Exception
{
  ResultCode result = op.getResultCode();
  ModifyDnContext ctx = (ModifyDnContext) op.getAttachment(SYNCHROCONTEXT);
@@ -2391,83 +1754,6 @@
  }
  /**
   * Check if an operation must be processed as an assured operation.
   *
   * @param op the operation to be checked.
   * @return true if the operations must be processed as an assured operation.
   */
  private boolean isAssured(PostOperationOperation op)
  {
    // TODO : should have a filtering mechanism for checking
    // operation that are assured and operations that are not.
    return false;
  }
  /**
   * Get the maximum receive window size.
   *
   * @return The maximum receive window size.
   */
  public int getMaxRcvWindow()
  {
    if (broker != null)
      return broker.getMaxRcvWindow();
    else
      return 0;
  }
  /**
   * Get the current receive window size.
   *
   * @return The current receive window size.
   */
  public int getCurrentRcvWindow()
  {
    if (broker != null)
      return broker.getCurrentRcvWindow();
    else
      return 0;
  }
  /**
   * Get the maximum send window size.
   *
   * @return The maximum send window size.
   */
  public int getMaxSendWindow()
  {
    if (broker != null)
      return broker.getMaxSendWindow();
    else
      return 0;
  }
  /**
   * Get the current send window size.
   *
   * @return The current send window size.
   */
  public int getCurrentSendWindow()
  {
    if (broker != null)
      return broker.getCurrentSendWindow();
    else
      return 0;
  }
  /**
   * Get the number of times the replication connection was lost.
   * @return The number of times the replication connection was lost.
   */
  public int getNumLostConnections()
  {
    if (broker != null)
      return broker.getNumLostConnections();
    else
      return 0;
  }
  /**
   * Get the number of modify conflicts successfully resolved.
   * @return The number of modify conflicts successfully resolved.
   */
@@ -2477,7 +1763,7 @@
  }
  /**
   * Get the number of namign conflicts successfully resolved.
   * Get the number of naming conflicts successfully resolved.
   * @return The number of naming conflicts successfully resolved.
   */
  public int getNumResolvedNamingConflicts()
@@ -2495,18 +1781,9 @@
  }
  /**
   * Get the server ID.
   * @return The server ID.
   */
  public short getServerId()
  {
    return serverId;
  }
  /**
   * Check if the domain solve conflicts.
   *
   * @return a boolean indicating if the domain should sove conflicts.
   * @return a boolean indicating if the domain should solve conflicts.
   */
  public boolean solveConflict()
  {
@@ -2526,16 +1803,7 @@
    state.save();
    state.clearInMemory();
    disabled = true;
    // Stop the listener thread
    if (listenerThread != null)
      listenerThread.shutdown();
    broker.stop(); // This will cut the session and wake up the listener
    // Wait for the listener thread to stop
    if (listenerThread != null)
      listenerThread.waitForShutdown();
    disableService(); // This will cut the session and wake up the listener
  }
  /**
@@ -2578,16 +1846,7 @@
      return;
    }
    // After an on-line import, the value of the generationId is new
    // and it is necessary for the broker to send this new value as part
    // of the serverStart message.
    broker.setGenerationId(generationId);
    broker.start(replicationServers);
    // Create the listener thread
    listenerThread = new ListenerThread(this, updateToReplayQueue);
    listenerThread.start();
    enableService();
    disabled = false;
  }
@@ -2600,7 +1859,7 @@
   */
  public long computeGenerationId() throws DirectoryException
  {
    long genId = exportBackend(true);
    long genId = exportBackend(null, true);
    if (debugEnabled())
      TRACER.debugInfo("Computed generationId: generationId=" + genId);
@@ -2609,11 +1868,9 @@
  }
  /**
   * Returns the generationId set for this domain.
   *
   * @return The generationId.
   * {@inheritDoc}
   */
  public long getGenerationId()
  public long getGenerationID()
  {
    return generationId;
  }
@@ -2627,7 +1884,7 @@
  /**
   * Stores the value of the generationId.
   * @param generationId The value of the generationId.
   * @return a ResultCode indicating if the method was successfull.
   * @return a ResultCode indicating if the method was successful.
   */
  public ResultCode saveGenerationId(long generationId)
  {
@@ -2792,45 +2049,8 @@
  }
  /**
   * Reset the generationId of this domain in the whole topology.
   * A message is sent to the Replication Servers for them to reset
   * their change dbs.
   *
   * @param generationIdNewValue The new value of the generation Id.
   * @throws DirectoryException when an error occurs
   */
  public void resetGenerationId(Long generationIdNewValue)
  throws DirectoryException
  {
    if (debugEnabled())
      TRACER.debugInfo(
          this.getName() + "resetGenerationId" + generationIdNewValue);
    if (!isConnected())
    {
      ResultCode resultCode = ResultCode.OTHER;
      Message message = ERR_RESET_GENERATION_CONN_ERR_ID.get(
          baseDn.toNormalizedString());
      throw new DirectoryException(
         resultCode, message);
    }
    ResetGenerationIdMsg genIdMessage = null;
    if (generationIdNewValue == null)
    {
      genIdMessage = new ResetGenerationIdMsg(this.generationId);
    }
    else
    {
      genIdMessage = new ResetGenerationIdMsg(generationIdNewValue);
    }
    broker.publish(genIdMessage);
  }
  /**
   * Do whatever is needed when a backup is started.
   * We need to make sure that the serverState is correclty save.
   * We need to make sure that the serverState is correctly save.
   */
  public void backupStart()
  {
@@ -2849,103 +2069,7 @@
   * Total Update >>
   */
  /**
   * Receives bytes related to an entry in the context of an import to
   * initialize the domain (called by ReplLDIFInputStream).
   *
   * @return The bytes. Null when the Done or Err message has been received
   */
  public byte[] receiveEntryBytes()
  {
    ReplicationMsg msg;
    while (true)
    {
      try
      {
        msg = broker.receive();
        if (debugEnabled())
          TRACER.debugVerbose(
              " sid:" + this.serverId +
              " base DN:" + this.baseDn +
              " Import EntryBytes received " + msg);
        if (msg == null)
        {
          // The server is in the shutdown process
          return null;
        }
        if (msg instanceof EntryMsg)
        {
          EntryMsg entryMsg = (EntryMsg)msg;
          byte[] entryBytes = entryMsg.getEntryBytes();
          ieContext.updateCounters();
          return entryBytes;
        }
        else if (msg instanceof DoneMsg)
        {
          // This is the normal termination of the import
          // No error is stored and the import is ended
          // by returning null
          return null;
        }
        else if (msg instanceof ErrorMsg)
        {
          // This is an error termination during the import
          // The error is stored and the import is ended
          // by returning null
          ErrorMsg errorMsg = (ErrorMsg)msg;
          ieContext.exception = new DirectoryException(
                                      ResultCode.OTHER,
                                      errorMsg.getDetails());
          return null;
        }
        else
        {
          // Other messages received during an import are trashed
        }
      }
      catch(Exception e)
      {
        // TODO: i18n
        ieContext.exception = new DirectoryException(ResultCode.OTHER,
            Message.raw("received an unexpected message type" +
                e.getLocalizedMessage()));
      }
    }
  }
  /**
   * Processes an error message received while an import/export is
   * on going.
   * @param errorMsg The error message received.
   */
  protected void abandonImportExport(ErrorMsg errorMsg)
  {
    // FIXME TBD Treat the case where the error happens while entries
    // are being exported
    if (debugEnabled())
      TRACER.debugVerbose(
          " abandonImportExport:" + this.serverId +
          " base DN:" + this.baseDn +
          " Error Msg received " + errorMsg);
    if (ieContext != null)
    {
      ieContext.exception = new DirectoryException(ResultCode.OTHER,
          errorMsg.getDetails());
      if (ieContext.initializeTask instanceof InitializeTask)
      {
        // Update the task that initiated the import
        ((InitializeTask)ieContext.initializeTask).
        updateTaskCompletionState(ieContext.exception);
        releaseIEContext();
      }
    }
  }
  /**
   * Clears all the entries from the JE backend determined by the
@@ -3000,16 +2124,31 @@
  }
  /**
   * This method trigger an export of the replicated data.
   *
   * @param output               The OutputStream where the export should
   *                             be produced.
   * @throws DirectoryException  When needed.
   */
  protected void exportBackend(OutputStream output) throws DirectoryException
  {
    exportBackend(output, false);
  }
  /**
   * Export the entries from the backend and/or compute the generation ID.
   * The ieContext must have been set before calling.
   * @param checksumOutput true is the exportBackend is called to compute
   *                       the generationID
   *
   * @return The computed  generationID.
   * @param output              The OutputStream where the export should
   *                            be produced.
   * @param checksumOutput      A boolean indicating if this export is
   *                            invoked to perform a checksum only
   *
   * @return The computed       GenerationID.
   *
   * @throws DirectoryException when an error occurred
   */
  protected long exportBackend(boolean checksumOutput)
  protected long exportBackend(OutputStream output, boolean checksumOutput)
  throws DirectoryException
  {
    long genID = 0;
@@ -3042,7 +2181,7 @@
    }
    OutputStream os;
    ReplLDIFOutputStream ros;
    ReplLDIFOutputStream ros = null;
    if (checksumOutput)
    {
@@ -3060,8 +2199,7 @@
    }
    else
    {
      ros = new ReplLDIFOutputStream(this, (short)-1);
      os = ros;
      os = output;
    }
    LDIFExportConfig exportConfig = new LDIFExportConfig(os);
@@ -3096,7 +2234,7 @@
    }
    catch (DirectoryException de)
    {
      if ((checksumOutput) &&
      if ((ros != null) &&
          (ros.getNumExportedEntries() >= entryCount))
      {
        // This is the normal end when computing the generationId
@@ -3170,277 +2308,6 @@
  }
  /**
   * Get the internal broker to perform some operations on it.
   *
   * @return The broker for this domain.
   */
  ReplicationBroker getBroker()
  {
    return broker;
  }
  /**
   * Exports an entry in LDIF format.
   *
   * @param  lDIFEntry The entry to be exported..
   *
   * @throws IOException when an error occurred.
   */
  public void exportLDIFEntry(String lDIFEntry) throws IOException
  {
    // If an error was raised - like receiving an ErrorMsg
    // we just let down the export.
    if (ieContext.exception != null)
    {
      IOException ioe = new IOException(ieContext.exception.getMessage());
      ieContext = null;
      throw ioe;
    }
    EntryMsg entryMessage = new EntryMsg(
        serverId, ieContext.exportTarget, lDIFEntry.getBytes());
    broker.publish(entryMessage);
    try
    {
      ieContext.updateCounters();
    }
    catch (DirectoryException de)
    {
      throw new IOException(de.getMessage());
    }
  }
  /**
   * Initializes this domain from another source server.
   *
   * @param source The source from which to initialize
   * @param initTask The task that launched the initialization
   *                 and should be updated of its progress.
   * @throws DirectoryException when an error occurs
   */
  public void initializeFromRemote(short source, Task initTask)
  throws DirectoryException
  {
    if (debugEnabled())
      TRACER.debugInfo("Entering initializeFromRemote");
    acquireIEContext();
    ieContext.initializeTask = initTask;
    InitializeRequestMsg initializeMsg = new InitializeRequestMsg(
        baseDn, serverId, source);
    // Publish Init request msg
    broker.publish(initializeMsg);
    // .. we expect to receive entries or err after that
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param sourceString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeSource(String sourceString)
  throws DirectoryException
  {
    short  source = 0;
    Throwable cause = null;
    try
    {
      source = Integer.decode(sourceString).shortValue();
      if ((source >= -1) && (source != serverId))
      {
        // TODO Verifies serverID is in the domain
        // We shold check here that this is a server implied
        // in the current domain.
        return source;
      }
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    Message message = ERR_INVALID_IMPORT_SOURCE.get();
    if (cause != null)
    {
      throw new DirectoryException(
          resultCode, message, cause);
    }
    else
    {
      throw new DirectoryException(
          resultCode, message);
    }
  }
  /**
   * Verifies that the given string represents a valid source
   * from which this server can be initialized.
   * @param targetString The string representing the source
   * @return The source as a short value
   * @throws DirectoryException if the string is not valid
   */
  public short decodeTarget(String targetString)
  throws DirectoryException
  {
    short  target = 0;
    Throwable cause;
    if (targetString.equalsIgnoreCase("all"))
    {
      return RoutableMsg.ALL_SERVERS;
    }
    // So should be a serverID
    try
    {
      target = Integer.decode(targetString).shortValue();
      if (target >= 0)
      {
        // FIXME Could we check now that it is a know server in the domain ?
      }
      return target;
    }
    catch(Exception e)
    {
      cause = e;
    }
    ResultCode resultCode = ResultCode.OTHER;
    Message message = ERR_INVALID_EXPORT_TARGET.get();
    if (cause != null)
      throw new DirectoryException(
          resultCode, message, cause);
    else
      throw new DirectoryException(
          resultCode, message);
  }
  private synchronized void acquireIEContext()
  throws DirectoryException
  {
    if (ieContext != null)
    {
      // Rejects 2 simultaneous exports
      Message message = ERR_SIMULTANEOUS_IMPORT_EXPORT_REJECTED.get();
      throw new DirectoryException(ResultCode.OTHER,
          message);
    }
    ieContext = new IEContext();
  }
  private synchronized void releaseIEContext()
  {
    ieContext = null;
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument.
   * @param target The target that should be initialized
   * @param initTask The task that triggers this initialization and that should
   *                 be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  public void initializeRemote(short target, Task initTask)
  throws DirectoryException
  {
    initializeRemote(target, serverId, initTask);
  }
  /**
   * Process the initialization of some other server or servers in the topology
   * specified by the target argument when this initialization specifying the
   * server that requests the initialization.
   *
   * @param target The target that should be initialized.
   * @param requestorID The server that initiated the export.
   * @param initTask The task that triggers this initialization and that should
   *  be updated with its progress.
   *
   * @exception DirectoryException When an error occurs.
   */
  public void initializeRemote(short target, short requestorID, Task initTask)
  throws DirectoryException
  {
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_START.get(
      Short.toString(serverId),
      baseDn.toString(),
      Short.toString(requestorID));
    logError(msg);
    boolean contextAcquired=false;
    try
    {
      Backend backend = retrievesBackend(this.baseDn);
      if (!backend.supportsLDIFExport())
      {
        Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
                            backend.getBackendID().toString());
        logError(message);
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      acquireIEContext();
      contextAcquired = true;
      // The number of entries to be exported is the number of entries under
      // the base DN entry and the base entry itself.
      long entryCount = backend.numSubordinates(baseDn, true) + 1;
      ieContext.exportTarget = target;
      if (initTask != null)
      {
        ieContext.initializeTask = initTask;
      }
      ieContext.setCounters(entryCount, entryCount);
      // Send start message to the peer
      InitializeTargetMsg initializeMessage = new InitializeTargetMsg(
          baseDn, serverId, ieContext.exportTarget, requestorID, entryCount);
      broker.publish(initializeMessage);
      exportBackend(false);
      // Notify the peer of the success
      DoneMsg doneMsg = new DoneMsg(serverId,
          initializeMessage.getDestination());
      broker.publish(doneMsg);
      releaseIEContext();
    }
    catch(DirectoryException de)
    {
      // Notify the peer of the failure
      ErrorMsg errorMsg =
        new ErrorMsg(target,
                         de.getMessageObject());
      broker.publish(errorMsg);
      if (contextAcquired)
        releaseIEContext();
      throw(de);
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
      Short.toString(serverId),
      baseDn.toString(),
      Short.toString(requestorID));
    logError(msg);
  }
  /**
   * Process backend before import.
   * @param backend The backend.
   * @throws Exception
@@ -3468,51 +2335,16 @@
  }
  /**
   * Initializes the domain's backend with received entries.
   * @param initializeMessage The message that initiated the import.
   * @exception DirectoryException Thrown when an error occurs.
   * This method should trigger an import of the replicated data.
   *
   * @param input                The InputStream from which
   * @throws DirectoryException  When needed.
   */
  protected void initialize(InitializeTargetMsg initializeMessage)
  throws DirectoryException
  public void importBackend(InputStream input) throws DirectoryException
  {
    LDIFImportConfig importConfig = null;
    DirectoryException de = null;
    Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_START.get(
      Short.toString(serverId),
      baseDn.toString(),
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
    // Go into full update status
    // Use synchronized as somebody could ask another status change at the same
    // time
    synchronized (status)
    {
      StatusMachineEvent event = StatusMachineEvent.TO_FULL_UPDATE_STATUS_EVENT;
      ServerStatus newStatus =
        StatusMachine.computeNewStatus(status, event);
      if (newStatus == ServerStatus.INVALID_STATUS)
      {
        msg = ERR_DS_CANNOT_CHANGE_STATUS.get(baseDn.toString(),
          Short.toString(serverId), status.toString(), event.toString());
        logError(msg);
        return;
      }
      // Store new status
      status = newStatus;
      if (debugEnabled())
      TRACER.debugInfo("Replication domain " + baseDn +
        " new status is: " + status);
      // Perform whatever actions are needed to apply properties for being
      // compliant with new status
      updateDomainForNewStatus();
    }
    Backend backend = retrievesBackend(baseDn);
    try
@@ -3526,26 +2358,8 @@
      }
      else
      {
        if (initializeMessage.getRequestorID() == serverId)
        {
          // The import responds to a request we did so the IEContext
          // is already acquired
        }
        else
        {
          acquireIEContext();
        }
        ieContext.importSource = initializeMessage.getsenderID();
        ieContext.entryLeftCount = initializeMessage.getEntryCount();
        ieContext.setCounters(initializeMessage.getEntryCount(),
            initializeMessage.getEntryCount());
        preBackendImport(backend);
        ieContext.ldifImportInputStream = new ReplLDIFInputStream(this);
        importConfig =
          new LDIFImportConfig(ieContext.ldifImportInputStream);
          new LDIFImportConfig(input);
        List<DN> includeBranches = new ArrayList<DN>();
        includeBranches.add(this.baseDn);
        importConfig.setIncludeBranches(includeBranches);
@@ -3553,11 +2367,12 @@
        // TODO How to deal with rejected entries during the import
        importConfig.writeRejectedEntries(
          getFileForPath("logs" + File.separator +
              "replInitRejectedEntries").getAbsolutePath(),
          ExistingFileBehavior.OVERWRITE);
            getFileForPath("logs" + File.separator +
            "replInitRejectedEntries").getAbsolutePath(),
            ExistingFileBehavior.OVERWRITE);
        // Process import
        preBackendImport(backend);
        backend.importLDIF(importConfig);
        stateSavingDisabled = false;
@@ -3570,9 +2385,6 @@
    }
    finally
    {
      if ((ieContext != null)  && (ieContext.exception != null))
        de = ieContext.exception;
      // Cleanup
      if (importConfig != null)
      {
@@ -3592,7 +2404,6 @@
          TRACER.debugInfo(
              "After import, the replication plugin restarts connections" +
              " to all RSs to provide new generation ID=" + generationId);
        broker.setGenerationId(generationId);
      }
      catch (DirectoryException fe)
      {
@@ -3604,29 +2415,12 @@
        if (de == null)
          de = fe;
      }
      // Re-exchange generationID and state with RS
      broker.reStart();
      // Update the task that initiated the import
      if ((ieContext != null ) && (ieContext.initializeTask != null))
      {
        ((InitializeTask)ieContext.initializeTask).
        updateTaskCompletionState(de);
      }
      releaseIEContext();
    }
    // Sends up the root error.
    if (de != null)
    {
      throw de;
    }
    msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
      Short.toString(serverId),
      baseDn.toString(),
      Long.toString(initializeMessage.getRequestorID()));
    logError(msg);
  }
  /**
@@ -3660,10 +2454,10 @@
   * @throws DirectoryException When an error occurred or no domain
   * match the provided baseDn.
   */
  public static ReplicationDomain retrievesReplicationDomain(DN baseDn)
  public static LDAPReplicationDomain retrievesReplicationDomain(DN baseDn)
  throws DirectoryException
  {
    ReplicationDomain replicationDomain = null;
    LDAPReplicationDomain replicationDomain = null;
    // Retrieves the domain
    DirectoryServer.getSynchronizationProviders();
@@ -3678,7 +2472,7 @@
      }
      // From the domainDN retrieves the replication domain
      ReplicationDomain sdomain =
      LDAPReplicationDomain sdomain =
        MultimasterReplication.findDomain(baseDn, null);
      if (sdomain == null)
      {
@@ -3714,15 +2508,6 @@
    return retrievesBackend(baseDn);
  }
  /**
   * Returns a boolean indicating if an import or export is currently
   * processed.
   * @return The status
   */
  public boolean ieRunning()
  {
    return (ieContext != null);
  }
  /*
   * <<Total Update
   */
@@ -3769,7 +2554,7 @@
  {
    // Check that there is not already a domain with the same DN
    DN dn = configuration.getBaseDN();
    ReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
    LDAPReplicationDomain domain = MultimasterReplication.findDomain(dn, null);
    if ((domain != null) && (domain.baseDn.equals(dn)))
    {
      Message message = ERR_SYNC_INVALID_DN.get();
@@ -3793,37 +2578,12 @@
  public ConfigChangeResult applyConfigurationChange(
         ReplicationDomainCfg configuration)
  {
    // server id and base dn are readonly.
    // isolationPolicy can be set immediately and will apply
    // to the next updates.
    // The other parameters needs to be renegociated with the ReplicationServer
    // so that requires restarting the session with the ReplicationServer.
    Boolean needToRestartSession = false;
    Collection<String> newReplServers = configuration.getReplicationServer();
    // A new session is necessary only when information regarding
    // the connection is modified
    if ((!(replicationServers.size() == newReplServers.size()
        && replicationServers.containsAll(newReplServers))) ||
        window != configuration.getWindowSize() ||
        heartbeatInterval != configuration.getHeartbeatInterval())
      needToRestartSession = true;
    replicationServers = newReplServers;
    window = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
        maxSendQueue, maxSendDelay, window, heartbeatInterval);
    isolationpolicy = configuration.getIsolationPolicy();
    // To be able to stop and restart the broker properly just
    // disable and enable the domain. That way a new session
    // with the new configuration is available.
    if (needToRestartSession)
    {
      this.disable();
      this.enable();
    }
    changeConfig(
        configuration.getReplicationServer(),
        configuration.getWindowSize(),
        configuration.getHeartbeatInterval());
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
@@ -3867,101 +2627,265 @@
  }
  /**
   * Check if the domain is connected to a ReplicationServer.
   * Starts the Replication Domain.
   */
  public void start()
  {
    // Create the ServerStateFlush thread
    flushThread = new ServerStateFlush();
    flushThread.start();
    startListenService();
  }
  /**
   * {@inheritDoc}
   */
  public void sessionInitiated(
      ServerStatus initStatus,
      ServerState replicationServerState,
      ProtocolSession session)
  {
    super.sessionInitiated(initStatus, replicationServerState, session);
    try
    {
      /*
       * We must not publish changes to a replicationServer that has
       * not seen all our previous changes because this could cause
       * some other ldap servers to miss those changes.
       * Check that the ReplicationServer has seen all our previous
       * changes.
       */
      ChangeNumber replServerMaxChangeNumber =
        replicationServerState.getMaxChangeNumber(serverId);
      if (replServerMaxChangeNumber == null)
      {
        replServerMaxChangeNumber = new ChangeNumber(0, 0, serverId);
      }
      ChangeNumber ourMaxChangeNumber =
        state.getMaxChangeNumber(serverId);
      if ((ourMaxChangeNumber != null) &&
          (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
      {
        // Replication server is missing some of our changes: let's
        // send them to him.
        Message message = DEBUG_GOING_TO_SEARCH_FOR_CHANGES.get();
        logError(message);
        /*
         * Get all the changes that have not been seen by this
         * replication server and populate the replayOperations
         * list.
         */
        InternalSearchOperation op = searchForChangedEntries(
            baseDn, replServerMaxChangeNumber, this);
        if (op.getResultCode() != ResultCode.SUCCESS)
        {
          /*
           * An error happened trying to search for the updates
           * This server will start accepting again new updates but
           * some inconsistencies will stay between servers.
           * Log an error for the repair tool
           * that will need to re-synchronize the servers.
           */
          message = ERR_CANNOT_RECOVER_CHANGES.get(
              baseDn.toNormalizedString());
          logError(message);
        } else
        {
          for (FakeOperation replayOp : replayOperations)
          {
            ChangeNumber cn = replayOp.getChangeNumber();
            /*
             * Because the entry returned by the search operation
             * can contain old historical information, it is
             * possible that some of the FakeOperation are
             * actually older than the
             * Only send the Operation if it was newer than
             * the last ChangeNumber known by the Replication Server.
             */
            if (cn.newer(replServerMaxChangeNumber))
            {
              message =
                DEBUG_SENDING_CHANGE.get(
                    replayOp.getChangeNumber().toString());
              logError(message);
              session.publish(replayOp.generateMessage());
            }
          }
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
        }
        replayOperations.clear();
      }
    } catch (Exception e)
    {
      Message message = ERR_PUBLISHING_FAKE_OPS.get(
          baseDn.toNormalizedString(),
          e.getLocalizedMessage() + stackTraceToSingleLineString(e));
      logError(message);
    }
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    Short serverId = fromChangeNumber.getServerId();
    String maxValueForId = "ffffffffffffffff" +
      String.format("%04x", serverId) + "ffffffff";
    LDAPFilter filter = LDAPFilter.decode(
       "(&(" + Historical.HISTORICALATTRIBUTENAME + ">=dummy:"
       + fromChangeNumber + ")(" + Historical.HISTORICALATTRIBUTENAME +
       "<=dummy:" + maxValueForId + "))");
    LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(Historical.HISTORICALATTRIBUTENAME);
    attrs.add(Historical.ENTRYUIDNAME);
    attrs.add("*");
    return conn.processSearch(
      new ASN1OctetString(baseDn.toString()),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, 0, false, filter,
      attrs,
      resultListener);
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchEntry(
    InternalSearchOperation searchOperation,
    SearchResultEntry searchEntry)
  {
    /*
     * This call back is called at session establishment phase
     * for each entry that has been changed by this server and the changes
     * have not been sent to any Replication Server.
     * The role of this method is to build equivalent operation from
     * the historical information and add them in the replayOperations
     * table.
     */
    Iterable<FakeOperation> updates =
      Historical.generateFakeOperations(searchEntry);
    for (FakeOperation op : updates)
    {
      replayOperations.add(op);
    }
  }
  /**
   * {@inheritDoc}
   */
  public void handleInternalSearchReference(
    InternalSearchOperation searchOperation,
    SearchResultReference searchReference)
  {
    // TODO to be implemented
  }
  /**
   * This method should return the total number of objects in the
   * replicated domain.
   * This count will be used for reporting.
   *
   * @return true if the server is connected, false if not.
   * @throws DirectoryException when needed.
   *
   * @return The number of objects in the replication domain.
   */
  public boolean isConnected()
  public long countEntries() throws DirectoryException
  {
    if (broker != null)
      return broker.isConnected();
    else
    Backend backend = retrievesBackend(baseDn);
    if (!backend.supportsLDIFExport())
    {
      Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
                          backend.getBackendID().toString());
      logError(message);
      throw new DirectoryException(ResultCode.OTHER, message);
    }
    return backend.numSubordinates(baseDn, true) + 1;
  }
  /**
   * {@inheritDoc}
   */
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    if (updateMsg instanceof LDAPUpdateMsg)
    {
      LDAPUpdateMsg msg = (LDAPUpdateMsg) updateMsg;
      // put the UpdateMsg in the RemotePendingChanges list.
      remotePendingChanges.putRemoteUpdate(msg);
      // Put update message into the replay queue
      // (block until some place in the queue is available)
      UpdateToReplay updateToReplay = new UpdateToReplay(msg, this);
      updateToReplayQueue.offer(updateToReplay);
      return false;
    }
    // unknown message type, this should not happen, just ignore it.
    return true;
  }
  /**
   * Determine whether the connection to the replication server is encrypted.
   * @return true if the connection is encrypted, false otherwise.
   * Monitoring information for the LDAPReplicationDomain.
   *
   * @return Monitoring attributes specific to the LDAPReplicationDomain.
   */
  public boolean isSessionEncrypted()
  public Collection<Attribute> getAdditionalMonitoring()
  {
    if (broker != null)
      return broker.isSessionEncrypted();
    else
      return false;
  }
    ArrayList<Attribute> attributes = new ArrayList<Attribute>();
  /**
   * Gets the info for DSs in the topology (except us).
   * @return The info for DSs in the topology (except us)
   */
  public List<DSInfo> getDsList()
  {
    return dsList;
  }
    /* get number of changes in the pending list */
    ReplicationMonitor.addMonitorData(
        attributes, "pending-updates", getPendingUpdatesCount());
  /**
   * Gets the info for RSs in the topology (except the one we are connected
   * to).
   * @return The info for RSs in the topology (except the one we are connected
   * to)
   */
  public List<RSInfo> getRsList()
  {
    return rsList;
  }
    /* get number of changes successfully */
    ReplicationMonitor.addMonitorData(attributes, "replayed-updates-ok",
        getNumReplayedPostOpCalled());
  /**
   * Tells if assured replication is enabled for this domain.
   * @return True if assured replication is enabled for this domain.
   */
  public boolean isAssured()
  {
    return assured;
  }
    /* get number of modify conflicts */
    ReplicationMonitor.addMonitorData(attributes, "resolved-modify-conflicts",
        getNumResolvedModifyConflicts());
  /**
   * Gives the mode for the assured replication of the domain.
   * @return The mode for the assured replication of the domain.
   */
  public AssuredMode getAssuredMode()
  {
    return assuredMode;
  }
    /* get number of naming conflicts */
    ReplicationMonitor.addMonitorData(attributes, "resolved-naming-conflicts",
        getNumResolvedNamingConflicts());
  /**
   * Gives the assured level of the replication of the domain.
   * @return The assured level of the replication of the domain.
   */
  public byte getAssuredSdLevel()
  {
    return assuredSdLevel;
  }
    /* get number of unresolved naming conflicts */
    ReplicationMonitor.addMonitorData(attributes, "unresolved-naming-conflicts",
        getNumUnresolvedNamingConflicts());
  /**
   * Gets the group id for this domain.
   * @return The group id for this domain.
   */
  public byte getGroupId()
  {
    return groupId;
  }
  /**
   * Gets the referrals URLs this domain publishes.
   * @return The referrals URLs this domain publishes.
   */
  public List<String> getRefUrls()
  {
    return refUrls;
  }
  /**
   * Gets the status for this domain.
   * @return The status for this domain.
   */
  public ServerStatus getStatus()
  {
    return status;
    return attributes;
  }
}