mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
26.31.2007 71ebb3724c79a7d1218c36f080acd6ee162b9cd2
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -43,7 +43,8 @@
 * ServerState class.
 * This object is used to store the last update seem on this server
 * from each server.
 * It is exchanged with the changelog servers at connection establishment time.
 * It is exchanged with the replication servers at connection establishment
 * time.
 */
public class ServerState implements Iterable<Short>
{
opends/src/server/org/opends/server/replication/plugin/ListenerThread.java
@@ -37,7 +37,7 @@
import org.opends.server.types.ErrorLogSeverity;
/**
 * Thread that is used to get messages from the Changelog servers
 * Thread that is used to get messages from the Replication servers
 * and replay them in the current server.
 */
public class ListenerThread extends DirectoryThread
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -76,12 +76,7 @@
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
  static String CHANGELOG_DN = "cn=Changelog Server," +
    "cn=Multimaster Synchronization, cn=Synchronization Providers, cn=config";
  static String SYNCHRONIZATION_CLASS =
    "ds-cfg-synchronization-provider-config";
  private ChangelogListener changelog = null;
  private ReplicationServerListener replicationServer = null;
  private static Map<DN, ReplicationDomain> domains =
    new HashMap<DN, ReplicationDomain>() ;
@@ -94,7 +89,7 @@
      MultimasterSynchronizationProviderCfg configuration)
  throws ConfigException
  {
    changelog = new ChangelogListener(configuration);
    replicationServer = new ReplicationServerListener(configuration);
    // Register as an add and delete listener with the root configuration so we
    // can be notified if Multimaster domain entries are added or removed.
@@ -348,9 +343,9 @@
      domain.shutdown();
    }
    // shutdown the Changelog Service if necessary
    if (changelog != null)
      changelog.shutdown();
    // shutdown the ReplicationServer Service if necessary
    if (replicationServer != null)
      replicationServer.shutdown();
    DirectoryServer.deregisterBackupTaskListener(this);
    DirectoryServer.deregisterRestoreTaskListener(this);
opends/src/server/org/opends/server/replication/plugin/ReplicationBroker.java
File was renamed from opends/src/server/org/opends/server/replication/plugin/ChangelogBroker.java
@@ -52,7 +52,7 @@
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ChangelogStartMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ServerStartMessage;
import org.opends.server.replication.protocol.SocketSession;
@@ -72,13 +72,13 @@
/**
 * The broker for Multi-master Replication.
 */
public class ChangelogBroker implements InternalSearchListener
public class ReplicationBroker implements InternalSearchListener
{
  private boolean shutdown = false;
  private Collection<String> servers;
  private boolean connected = false;
  private final Object lock = new Object();
  private String changelogServer = "Not connected";
  private String replicationServer = "Not connected";
  private TreeSet<FakeOperation> replayOperations;
  private ProtocolSession session = null;
  private final ServerState state;
@@ -114,26 +114,26 @@
  /**
   * Creates a new Changelog Broker for a particular ReplicationDomain.
   * Creates a new ReplicationServer Broker for a particular ReplicationDomain.
   *
   * @param state The ServerState that should be used by this broker
   *              when negociating the session with the changelog servers.
   *              when negociating the session with the replicationServer.
   * @param baseDn The base DN that should be used by this broker
   *              when negociating the session with the changelog servers.
   *              when negociating the session with the replicationServer.
   * @param serverID The server ID that should be used by this broker
   *              when negociating the session with the changelog servers.
   *              when negociating the session with the replicationServer.
   * @param maxReceiveQueue The maximum size of the receive queue to use on
   *                         the changelog server.
   *                         the replicationServer.
   * @param maxReceiveDelay The maximum replication delay to use on the
   *                        changelog server.
   *                        replicationServer.
   * @param maxSendQueue The maximum size of the send queue to use on
   *                     the changelog server.
   * @param maxSendDelay The maximum send delay to use on the changelog server.
   *                     the replicationServer.
   * @param maxSendDelay The maximum send delay to use on the replicationServer.
   * @param window The size of the send and receive window to use.
   * @param heartbeatInterval The interval between heartbeats requested of the
   * changelog server, or zero if no heartbeats are requested.
   * replicationServer, or zero if no heartbeats are requested.
   */
  public ChangelogBroker(ServerState state, DN baseDn, short serverID,
  public ReplicationBroker(ServerState state, DN baseDn, short serverID,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
  {
@@ -153,7 +153,7 @@
  }
  /**
   * Start the ChangelogBroker.
   * Start the ReplicationBroker.
   *
   * @param servers list of servers used
   * @throws Exception : in case of errors
@@ -162,7 +162,7 @@
                    throws Exception
  {
    /*
     * Open Socket to the Changelog
     * Open Socket to the ReplicationServer
     * Send the Start message
     */
    shutdown = false;
@@ -182,14 +182,14 @@
  /**
   * Connect to a Changelog server.
   * Connect to a ReplicationServer.
   *
   * @throws NumberFormatException address was invalid
   * @throws IOException error during connection phase
   */
  private void connect() throws NumberFormatException, IOException
  {
    ChangelogStartMessage startMsg;
    ReplServerStartMessage startMsg;
    // Stop any existing heartbeat monitor from a previous session.
    if (heartbeatMonitor != null)
@@ -230,30 +230,30 @@
          /*
           * Read the ChangelogStartMessage that should come back.
           * Read the ReplServerStartMessage that should come back.
           */
          session.setSoTimeout(1000);
          startMsg = (ChangelogStartMessage) session.receive();
          startMsg = (ReplServerStartMessage) session.receive();
          session.setSoTimeout(timeout);
          /*
           * We must not publish changes to a changelog that has not
           * We must not publish changes to a replicationServer that has not
           * seen all our previous changes because this could cause some
           * other ldap servers to miss those changes.
           * Check that the Changelog has seen all our previous changes.
           * If not, try another changelog server.
           * If no other changelog server has seen all our changes, recover
           * those changes and send them again to any changelog server.
           * Check that the ReplicationServer has seen all our previous changes.
           * If not, try another replicationServer.
           * If no other replicationServer has seen all our changes, recover
           * those changes and send them again to any replicationServer.
           */
          ChangeNumber changelogMaxChangeNumber =
          ChangeNumber replServerMaxChangeNumber =
            startMsg.getServerState().getMaxChangeNumber(serverID);
          if (changelogMaxChangeNumber == null)
            changelogMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          if (replServerMaxChangeNumber == null)
            replServerMaxChangeNumber = new ChangeNumber(0, 0, serverID);
          ChangeNumber ourMaxChangeNumber =  state.getMaxChangeNumber(serverID);
          if ((ourMaxChangeNumber == null) ||
              (ourMaxChangeNumber.olderOrEqual(changelogMaxChangeNumber)))
              (ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
          {
            changelogServer = ServerAddr.toString();
            replicationServer = ServerAddr.toString();
            maxSendWindow = startMsg.getWindowSize();
            this.sendWindow = new Semaphore(maxSendWindow);
            connected = true;
@@ -263,7 +263,7 @@
          {
            if (checkState == true)
            {
              /* This changelog server is missing some
              /* This replicationServer is missing some
               * of our changes, we are going to try another server
               * but before log a notice message
               */
@@ -277,14 +277,14 @@
            {
              replayOperations.clear();
              /*
               * Get all the changes that have not been seen by this changelog
               * server and update it
               * Get all the changes that have not been seen by this
               * replicationServer and update it
               */
              InternalClientConnection conn =
                  InternalClientConnection.getRootConnection();
              LDAPFilter filter = LDAPFilter.decode(
                  "("+ Historical.HISTORICALATTRIBUTENAME +
                  ">=dummy:" + changelogMaxChangeNumber + ")");
                  ">=dummy:" + replServerMaxChangeNumber + ")");
              LinkedHashSet<String> attrs = new LinkedHashSet<String>(1);
              attrs.add(Historical.HISTORICALATTRIBUTENAME);
              InternalSearchOperation op = conn.processSearch(
@@ -308,7 +308,7 @@
              }
              else
              {
                changelogServer = ServerAddr.toString();
                replicationServer = ServerAddr.toString();
                maxSendWindow = startMsg.getWindowSize();
                this.sendWindow = new Semaphore(maxSendWindow);
                connected = true;
@@ -325,7 +325,7 @@
        {
          /*
           * There was no server waiting on this host:port
           * Log a notice and try the next changelog server in the list
           * Log a notice and try the next replicationServer in the list
           */
          int    msgID   = MSGID_NO_CHANGELOG_SERVER_LISTENING;
          String message = getMessage(msgID, server);
@@ -363,9 +363,9 @@
        if (checkState == true)
        {
          /*
           * We could not find a changelog server that has seen all the
           * We could not find a replicationServer that has seen all the
           * changes that this server has already processed, start again
           * the loop looking for any changelog server.
           * the loop looking for any replicationServer.
           */
          try
          {
@@ -385,7 +385,7 @@
        else
        {
          /*
           * This server could not find any changelog server
           * This server could not find any replicationServer
           * Let's wait a little and try again.
           */
          synchronized (this)
@@ -419,7 +419,7 @@
  /**
   * Restart the Changelog broker after a failure.
   * Restart the ReplicationServer broker after a failure.
   *
   * @param failingSession the socket which failed
   */
@@ -545,14 +545,14 @@
   */
  public void stop()
  {
    changelogServer = "stopped";
    replicationServer = "stopped";
    shutdown = true;
    connected = false;
    try
    {
      if (debugEnabled())
      {
        debugInfo("ChangelogBroker Stop Closing session");
        debugInfo("ReplicationBroker Stop Closing session");
      }
      if (session != null)
@@ -599,15 +599,15 @@
  }
  /**
   * Get the name of the changelog server to which this broker is currently
   * Get the name of the replicationServer to which this broker is currently
   * connected.
   *
   * @return the name of the changelog server to which this domain
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   */
  public String getChangelogServer()
  public String getReplicationServer()
  {
    return changelogServer;
    return replicationServer;
  }
  /**
   * {@inheritDoc}
@@ -621,7 +621,7 @@
     * TODO : implement code for ADD, DEL, MODDN operation
     *
     * Parse all ds-sync-hist attribute values
     *   - for each Changenumber>changelogMaxChangeNumber :
     *   - for each Changenumber > replication server MaxChangeNumber :
     *          build an attribute mod
     *
     */
@@ -699,7 +699,7 @@
  /**
   * Change some config parameters.
   *
   * @param changelogServers    The new list of changelog servers.
   * @param replicationServers    The new list of replication servers.
   * @param maxReceiveQueue     The max size of receive queue.
   * @param maxReceiveDelay     The max receive delay.
   * @param maxSendQueue        The max send queue.
@@ -707,11 +707,11 @@
   * @param window              The max window size.
   * @param heartbeatInterval   The heartbeat interval.
   */
  public void changeConfig(Collection<String> changelogServers,
  public void changeConfig(Collection<String> replicationServers,
      int maxReceiveQueue, int maxReceiveDelay, int maxSendQueue,
      int maxSendDelay, int window, long heartbeatInterval)
  {
    this.servers = changelogServers;
    this.servers = replicationServers;
    this.maxRcvWindow = window;
    this.heartbeatInterval = heartbeatInterval;
    this.maxReceiveDelay = maxReceiveDelay;
@@ -719,7 +719,7 @@
    this.maxSendDelay = maxSendDelay;
    this.maxSendQueue = maxSendQueue;
    // TODO : Changing those parameters requires to either restart a new
    // session with the changelog server or renegociate the parameters that
    // session with the replicationServer or renegociate the parameters that
    // were sent in the ServerStart message
  }
}
opends/src/server/org/opends/server/replication/plugin/ReplicationDomain.java
@@ -119,9 +119,9 @@
 *  This class implements the bulk part of the.of the Directory Server side
 *  of the replication code.
 *  It contains the root method for publishing a change,
 *  processing a change received from the changelog service,
 *  processing a change received from the replicationServer service,
 *  handle conflict resolution,
 *  handle protocol messages from the changelog server.
 *  handle protocol messages from the replicationServer.
 */
public class ReplicationDomain extends DirectoryThread
       implements ConfigurationChangeListener<MultimasterDomainCfg>
@@ -129,7 +129,7 @@
  private ReplicationMonitor monitor;
  private ChangeNumberGenerator changeNumberGenerator;
  private ChangelogBroker broker;
  private ReplicationBroker broker;
  private List<ListenerThread> synchroThreads =
    new ArrayList<ListenerThread>();
@@ -248,7 +248,7 @@
  private int listenerThreadNumber = 10;
  private boolean receiveStatus = true;
  private Collection<String> changelogServers;
  private Collection<String> replicationServers;
  private DN baseDN;
@@ -277,7 +277,7 @@
    super("replication flush");
    // Read the configuration parameters.
    changelogServers = configuration.getChangelogServer();
    replicationServers = configuration.getChangelogServer();
    serverId = (short) configuration.getServerId();
    baseDN = configuration.getSynchronizationDN();
    maxReceiveQueue = configuration.getMaxReceiveQueue();
@@ -327,12 +327,12 @@
     */
    try
    {
      broker = new ChangelogBroker(state, baseDN, serverId, maxReceiveQueue,
      broker = new ReplicationBroker(state, baseDN, serverId, maxReceiveQueue,
          maxReceiveDelay, maxSendQueue, maxSendDelay, window,
          heartbeatInterval);
      synchronized (broker)
      {
        broker.start(changelogServers);
        broker.start(replicationServers);
        if (!receiveStatus)
          broker.suspendReceive();
      }
@@ -342,7 +342,7 @@
    } catch (Exception e)
    {
     /* TODO should mark that changelog service is
     /* TODO should mark that replicationServer service is
      * not available, log an error and retry upon timeout
      * should we stop the modifications ?
      */
@@ -627,7 +627,7 @@
  }
  /**
   * Receives an update message from the changelog.
   * Receives an update message from the replicationServer.
   * also responsible for updating the list of pending changes
   * @return the received message - null if none
   */
@@ -704,7 +704,7 @@
              // - either during an export
              // - or before an import really started
              //   For example, when we publish a request and the
              //  changelog did not find any import source.
              //  replicationServer did not find any import source.
              abandonImportExport((ErrorMessage)msg);
            }
          }
@@ -1027,20 +1027,20 @@
    DirectoryServer.deregisterMonitorProvider(monitor.getMonitorInstanceName());
    // stop the ChangelogBroker
    // stop the ReplicationBroker
    broker.stop();
  }
  /**
   * Get the name of the changelog server to which this domain is currently
   * Get the name of the replicationServer to which this domain is currently
   * connected.
   *
   * @return the name of the changelog server to which this domain
   * @return the name of the replicationServer to which this domain
   *         is currently connected.
   */
  public String getChangelogServer()
  public String getReplicationServer()
  {
    return broker.getChangelogServer();
    return broker.getReplicationServer();
  }
  /**
@@ -1604,7 +1604,7 @@
  }
  /**
   * Push all committed local changes to the changelog service.
   * Push all committed local changes to the replicationServer service.
   * PRECONDITION : The pendingChanges lock must be held before calling
   * this method.
   */
@@ -1734,10 +1734,10 @@
    try
    {
      broker.start(changelogServers);
      broker.start(replicationServers);
    } catch (Exception e)
    {
      /* TODO should mark that changelog service is
      /* TODO should mark that replicationServer service is
       * not available, log an error and retry upon timeout
       * should we stop the modifications ?
       */
@@ -2616,7 +2616,7 @@
      // Re-exchange state with SS
      broker.stop();
      broker.start(changelogServers);
      broker.start(replicationServers);
    }
    catch(Exception e)
@@ -2803,16 +2803,16 @@
         MultimasterDomainCfg configuration)
  {
    // server id and base dn are readonly.
    // The other parameters needs to be renegociated with the Changelog Server.
    // so that requires restarting the session with the Changelog Server.
    changelogServers = configuration.getChangelogServer();
    // The other parameters needs to be renegociated with the ReplicationServer.
    // so that requires restarting the session with the ReplicationServer.
    replicationServers = configuration.getChangelogServer();
    maxReceiveQueue = configuration.getMaxReceiveQueue();
    maxReceiveDelay = (int) configuration.getMaxReceiveDelay();
    maxSendQueue = configuration.getMaxSendQueue();
    maxSendDelay = (int) configuration.getMaxSendDelay();
    window = configuration.getWindowSize();
    heartbeatInterval = configuration.getHeartbeatInterval();
    broker.changeConfig(changelogServers, maxReceiveQueue, maxReceiveDelay,
    broker.changeConfig(replicationServers, maxReceiveQueue, maxReceiveDelay,
                        maxSendQueue, maxSendDelay, window, heartbeatInterval);
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
opends/src/server/org/opends/server/replication/plugin/ReplicationMonitor.java
@@ -94,7 +94,7 @@
    attributes.add(attr);
    /* get the base dn */
    attr = new Attribute("connected-to", domain.getChangelogServer());
    attr = new Attribute("connected-to", domain.getReplicationServer());
    attributes.add(attr);
    /* get number of lost connections */
opends/src/server/org/opends/server/replication/plugin/ReplicationServerListener.java
File was renamed from opends/src/server/org/opends/server/replication/plugin/ChangelogListener.java
@@ -33,33 +33,34 @@
import org.opends.server.admin.std.server.ChangelogServerCfg;
import org.opends.server.admin.std.server.MultimasterSynchronizationProviderCfg;
import org.opends.server.config.ConfigException;
import org.opends.server.replication.server.Changelog;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.ConfigChangeResult;
import org.opends.server.types.ResultCode;
/**
 * This class is used to create and object that can
 * register in the admin framework as a listener for changes, add and delete
 * on the Changelog Server configuration objects.
 * on the ReplicationServer configuration objects.
 *
 */
public class ChangelogListener
public class ReplicationServerListener
       implements ConfigurationAddListener<ChangelogServerCfg>,
       ConfigurationDeleteListener<ChangelogServerCfg>
{
  Changelog changelog = null;
  ReplicationServer replicationServer = null;
  /**
   * Build a Changelog Listener from the given Multimaster configuration.
   * Build a ReplicationServer Listener from the given Multimaster
   * configuration.
   *
   * @param configuration The configuration that will be used to listen
   *                      for changelog configuration changes.
   *                      for replicationServer configuration changes.
   *
   * @throws ConfigException if the ChangelogListener can't register for
   * @throws ConfigException if the ReplicationServerListener can't register for
   *                         listening to changes on the provided configuration
   *                         object.
   */
  public ChangelogListener(
  public ReplicationServerListener(
      MultimasterSynchronizationProviderCfg configuration)
      throws ConfigException
  {
@@ -69,7 +70,7 @@
    if (configuration.hasChangelogServer())
    {
      ChangelogServerCfg server = configuration.getChangelogServer();
      changelog = new Changelog(server);
      replicationServer = new ReplicationServer(server);
    }
  }
@@ -81,7 +82,7 @@
  {
    try
    {
      changelog = new Changelog(configuration);
      replicationServer = new ReplicationServer(configuration);
      return new ConfigChangeResult(ResultCode.SUCCESS, false);
    } catch (ConfigException e)
    {
@@ -97,17 +98,17 @@
  public boolean isConfigurationAddAcceptable(
      ChangelogServerCfg configuration, List<String> unacceptableReasons)
  {
    return Changelog.isConfigurationAcceptable(
    return ReplicationServer.isConfigurationAcceptable(
      configuration, unacceptableReasons);
  }
  /**
   * Shutdown the Changelog servers.
   * Shutdown the Replication servers.
   */
  public void shutdown()
  {
    if (changelog != null)
      changelog.shutdown();
    if (replicationServer != null)
      replicationServer.shutdown();
  }
  /**
@@ -116,11 +117,11 @@
  public ConfigChangeResult applyConfigurationDelete(
      ChangelogServerCfg configuration)
  {
    // There can be only one changelog, just shutdown the changelog
    // currently configured.
    if (changelog != null)
    // There can be only one replicationServer, just shutdown the
    // replicationServer currently configured.
    if (replicationServer != null)
    {
      changelog.shutdown();
      replicationServer.shutdown();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
opends/src/server/org/opends/server/replication/plugin/package-info.java
@@ -41,9 +41,9 @@
 * <A HREF="ReplicationDomain.html"><B>ReplicationDomain</B></A>
 * contains the bulk of the Directory Server side of the
 * replication code. Most notably it contains the root method for
 * publishing a change, processing a change received from the changelog
 * publishing a change, processing a change received from the replicationServer
 * service, handle conflict resolution, handle protocol messages from the
 * changelog server.
 * replicationServer.
 * </li>
 * </ul>
 */
opends/src/server/org/opends/server/replication/protocol/AckMessage.java
@@ -32,7 +32,7 @@
import org.opends.server.replication.common.ChangeNumber;
/**
 * Used to send acks between LDAP and changelog servers.
 * Used to send acks between LDAP and replication servers.
 */
public class AckMessage extends ReplicationMessage
{
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -52,7 +52,7 @@
/**
 * This class is used to exchange Add operation between LDAP servers
 * and changelog servers.
 * and replication servers.
 */
public class AddMsg extends UpdateMessage
{
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -38,7 +38,7 @@
import org.opends.server.types.Operation;
/**
 * Object used when sending delete information to Changelogs.
 * Object used when sending delete information to replication servers.
 */
public class DeleteMsg extends UpdateMessage
{
opends/src/server/org/opends/server/replication/protocol/ErrorMessage.java
@@ -32,7 +32,7 @@
/**
 * This message is part of the replication protocol.
 * This message is sent by a server or a changelog server when an error
 * This message is sent by a server or a replication server when an error
 * is detected in the context of a total update.
 */
public class ErrorMessage extends RoutableMessage implements
@@ -63,7 +63,7 @@
  /**
   * Create a InitializeMessage.
   *
   * @param destination changelog server id
   * @param destination replication server id
   * @param msgID error message ID
   * @param details details of the error
   */
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMessage.java
File was renamed from opends/src/server/org/opends/server/replication/protocol/ChangelogStartMessage.java
@@ -35,9 +35,10 @@
import org.opends.server.types.DN;
/**
 * Message sent by a changelog server to another changelog server at Startup.
 * Message sent by a replication server to another replication server
 * at Startup.
 */
public class ChangelogStartMessage extends ReplicationMessage implements
public class ReplServerStartMessage extends ReplicationMessage implements
    Serializable
{
  private static final long serialVersionUID = -5871385537169856856L;
@@ -50,15 +51,15 @@
  private int windowSize;
  /**
   * Create a ChangelogStartMessage.
   * Create a ReplServerStartMessage.
   *
   * @param serverId changelog server id
   * @param serverURL changelog server URL
   * @param baseDn base DN for which the ChangelogStartMessage is created.
   * @param serverId replication server id
   * @param serverURL replication server URL
   * @param baseDn base DN for which the ReplServerStartMessage is created.
   * @param windowSize The window size.
   * @param serverState our ServerState for this baseDn.
   */
  public ChangelogStartMessage(short serverId, String serverURL, DN baseDn,
  public ReplServerStartMessage(short serverId, String serverURL, DN baseDn,
                               int windowSize,
                               ServerState serverState)
  {
@@ -73,22 +74,23 @@
  }
  /**
   * Creates a new ChangelogStartMessage by decoding the provided byte array.
   * Creates a new ReplServerStartMessage by decoding the provided byte array.
   * @param in A byte array containing the encoded information for the
   *             ChangelogStartMessage
   *             ReplServerStartMessage
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ChangelogStartMessage.
   *                             encoded ReplServerStartMessage.
   */
  public ChangelogStartMessage(byte[] in) throws DataFormatException
  public ReplServerStartMessage(byte[] in) throws DataFormatException
  {
    /* The ChangelogStartMessage is encoded in the form :
    /* The ReplServerStartMessage is encoded in the form :
     * <baseDn><ServerId><ServerUrl><windowsize><ServerState>
     */
    try
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_CHANGELOG_START)
        throw new DataFormatException("input is not a valid ChangelogStartMsg");
      if (in[0] != MSG_TYPE_REPL_SERVER_START)
        throw new DataFormatException(
              "input is not a valid ReplServerStartMsg");
      int pos = 1;
      /* read the dn
@@ -149,9 +151,9 @@
  }
  /**
   * Get the base DN from this ChangelogStartMessage.
   * Get the base DN from this ReplServerStartMessage.
   *
   * @return the base DN from this ChangelogStartMessage.
   * @return the base DN from this ReplServerStartMessage.
   */
  public DN getBaseDn()
  {
@@ -181,7 +183,7 @@
  @Override
  public byte[] getBytes()
  {
    /* The ChangelogStartMessage is stored in the form :
    /* The ReplServerStartMessage is stored in the form :
     * <operation type><basedn><serverid><serverURL><windowsize><serverState>
     */
    try {
@@ -198,7 +200,7 @@
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_CHANGELOG_START;
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_START;
      int pos = 1;
      /* put the baseDN and a terminating 0 */
opends/src/server/org/opends/server/replication/protocol/ReplicationMessage.java
@@ -44,7 +44,7 @@
  static final byte MSG_TYPE_MODIFYDN_REQUEST = 4;
  static final byte MSG_TYPE_ACK = 5;
  static final byte MSG_TYPE_SERVER_START = 6;
  static final byte MSG_TYPE_CHANGELOG_START = 7;
  static final byte MSG_TYPE_REPL_SERVER_START = 7;
  static final byte MSG_TYPE_WINDOW = 8;
  static final byte MSG_TYPE_HEARTBEAT = 9;
  static final byte MSG_TYPE_INITIALIZE_REQUEST = 10;
@@ -64,7 +64,7 @@
   * MSG_TYPE_MODIFY_DN_REQUEST
   * MSG_TYPE_ACK
   * MSG_TYPE_SERVER_START
   * MSG_TYPE_CHANGELOG_START
   * MSG_TYPE_REPL_SERVER_START
   * MSG_TYPE_WINDOW
   * MSG_TYPE_HEARTBEAT
   * MSG_TYPE_INITIALIZE
@@ -110,8 +110,8 @@
      case MSG_TYPE_SERVER_START:
        msg = new ServerStartMessage(buffer);
      break;
      case MSG_TYPE_CHANGELOG_START:
        msg = new ChangelogStartMessage(buffer);
      case MSG_TYPE_REPL_SERVER_START:
        msg = new ReplServerStartMessage(buffer);
      break;
      case MSG_TYPE_WINDOW:
        msg = new WindowMessage(buffer);
opends/src/server/org/opends/server/replication/protocol/RoutableMessage.java
@@ -67,8 +67,8 @@
  /**
   * Creates a routable message.
   * @param senderID changelog server id
   * @param destination changelog server id
   * @param senderID replication server id
   * @param destination replication server id
   */
  public RoutableMessage(short senderID, short destination)
  {
opends/src/server/org/opends/server/replication/protocol/ServerStartMessage.java
@@ -38,7 +38,7 @@
/**
 * This message is used by LDAP server when they first connect.
 * to a changelog server to let them know who they are and what is their state
 * to a replication server to let them know who they are and what is their state
 * (their RUV)
 */
public class ServerStartMessage extends ReplicationMessage implements
opends/src/server/org/opends/server/replication/protocol/WindowMessage.java
@@ -33,7 +33,7 @@
/**
 * This message is used by LDAP server when they first connect.
 * to a changelog server to let them know who they are and what is their state
 * to a replication server to let them know who they are and what is their state
 * (their RUV)
 */
public class WindowMessage extends ReplicationMessage implements
opends/src/server/org/opends/server/replication/protocol/package-info.java
@@ -26,7 +26,7 @@
 */
/**
 * This package contains the code used by the changelog and by the
 * This package contains the code used by the replication server and by the
 * code running on the Directory Server side to exchange their information.
 * <br>
 * <br>
@@ -35,15 +35,15 @@
 * <ul>
 * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
 * implements the ProtocolSession interface that is
 * used by the changelog server and the directory server to communicate.
 * used by the replication server and the directory server to communicate.
 * This is done by using the innate encoding/decoding capabilities of the
 * ReplicationMessages objects. This class is used by both the
 * changelog and the replication package.
 * server and the plugin package.
 * </li>
 * <li><A HREF="ReplicationMessage.html"><B>ReplicationMessage</B></A>
 * This class and the class that inherit from it contain the
 * messages that are used for communication between the changelog and the
 * Directory Server as well as the methods fro encoding/decoding them.
 * messages that are used for communication between the replication server and
 * the Directory Server as well as the methods fro encoding/decoding them.
 * </li>
 *  </ul>
 */
opends/src/server/org/opends/server/replication/server/ChangelogAckMessageList.java
File was deleted
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -50,16 +50,16 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.server.ChangelogDB.ChangelogCursor;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
/**
 * This class is used for managing the changelog database for each servers
 * in the topology.
 * This class is used for managing the replicationServer database for each
 * server in the topology.
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * This class is also able to generate a ChangelogIterator that can be
 * This class is also able to generate a ReplicationIterator that can be
 * used to read all changes from a given ChangeNUmber.
 *
 * This class publish some monitoring information below cn=monitor.
@@ -70,10 +70,10 @@
  // This queue hold all the updates not yet saved to stable storage
  // it is only used as a temporary placeholder so that the write
  // in the stable storage can be grouped for efficiency reason.
  // it is never read back by changelog threads that are responsible
  // for pushing the changes to other changelog server or to LDAP server
  // it is never read back by replicationServer threads that are responsible
  // for pushing the changes to other replication server or to LDAP server
  private LinkedList<UpdateMessage> msgQueue = new LinkedList<UpdateMessage>();
  private ChangelogDB db;
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
  private ChangeNumber lastChange = null;
  private short serverId;
@@ -93,21 +93,22 @@
   *
   * @param id Identifier of the DB.
   * @param baseDn the baseDn for which this DB was created.
   * @param changelog The Changelog that creates this dbHandler.
   * @param dbenv the Database Env to use to create the Changelog DB.
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, Changelog changelog,
      ChangelogDbEnv dbenv)
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
         throws DatabaseException
  {
    this.serverId = id;
    this.baseDn = baseDn;
    this.trimage = changelog.getTrimage();
    db = new ChangelogDB(id, baseDn, changelog, dbenv);
    this.trimage = replicationServer.getTrimage();
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
    thread = new DirectoryThread(this, "changelog db " + id + " " +  baseDn);
    thread = new DirectoryThread(this,
                                 "Replication Server db " + id + " " +  baseDn);
    thread.start();
    DirectoryServer.deregisterMonitorProvider(
@@ -194,13 +195,13 @@
  }
  /**
   * Generate a new ChangelogIterator that allows to browse the db
   * Generate a new ReplicationIterator that allows to browse the db
   * managed by this dbHandler and starting at the position defined
   * by a given changeNumber.
   *
   * @param changeNumber The position where the iterator must start.
   *
   * @return a new ChangelogIterator that allows to browse the db
   * @return a new ReplicationIterator that allows to browse the db
   *         managed by this dbHandler and starting at the position defined
   *         by a given changeNumber.
   *
@@ -208,7 +209,7 @@
   * @throws Exception  If there is no other change to push after change
   *         with changeNumber number.
   */
  public ChangelogIterator generateIterator(ChangeNumber changeNumber)
  public ReplicationIterator generateIterator(ChangeNumber changeNumber)
                           throws DatabaseException, Exception
  {
    /*
@@ -243,7 +244,7 @@
      flush();
    }
    return new ChangelogIterator(serverId, db, changeNumber);
    return new ReplicationIterator(serverId, db, changeNumber);
  }
  /**
@@ -297,7 +298,7 @@
  /**
   * Run method for this class.
   * Periodically Flushes the ChangelogCache from memory to the stable storage
   * Periodically Flushes the ReplicationCache from memory to the stable storage
   * and trims the old updates.
   */
  public void run()
@@ -337,7 +338,7 @@
  }
  /**
   * Flush old change information from this changelog database.
   * Flush old change information from this replicationServer database.
   * @throws DatabaseException In case of database problem.
   */
  private void trim() throws DatabaseException, Exception
@@ -352,7 +353,7 @@
    /* the trim is done by group in order to save some CPU and IO bandwidth
     * start the transaction then do a bunch of remove then commit
     */
    ChangelogCursor cursor;
    ReplServerDBCursor cursor;
    cursor = db.openDeleteCursor();
@@ -421,7 +422,7 @@
  {
    private DbMonitorProvider()
    {
      super("Changelog Database");
      super("ReplicationServer Database");
    }
    /**
@@ -431,7 +432,7 @@
    public ArrayList<Attribute> getMonitorData()
    {
      ArrayList<Attribute> attributes = new ArrayList<Attribute>();
      attributes.add(new Attribute("changelog-database",
      attributes.add(new Attribute("replicationServer-database",
                                   String.valueOf(serverId)));
      attributes.add(new Attribute("base-dn", baseDn.toString()));
      if (firstChange != null)
@@ -456,7 +457,7 @@
    @Override
    public String getMonitorInstanceName()
    {
      return "Changelog database " + baseDn.toString() +
      return "ReplicationServer database " + baseDn.toString() +
             " " + String.valueOf(serverId);
    }
opends/src/server/org/opends/server/replication/server/ReplServerAckMessageList.java
New file
@@ -0,0 +1,84 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE
 * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at
 * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
 * add the following below this CDDL HEADER, with the fields enclosed
 * by brackets "[]" replaced with your own identifying information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.server.replication.common.ChangeNumber;
/**
 * This class is used to store acks for update messages coming from
 * other replication servers.
 */
public class ReplServerAckMessageList extends AckMessageList
{
  private short replicationServerId;
  private ReplicationCache replicationCache;
  /**
   * Creates a new AckMessageList for a given ChangeNumber.
   *
   * @param changeNumber The ChangeNumber for which the ack list is created.
   * @param numExpectedAcks The number of acks waited before acking the
   *                        original change.
   * @param replicationServerId The Identifier of the replication server
   *                          from which the change was received.
   * @param replicationCache The ReplicationCache from which he change
   *                         was received.
   */
  public ReplServerAckMessageList(ChangeNumber changeNumber,
                                 int numExpectedAcks,
                                 short replicationServerId,
                                 ReplicationCache replicationCache)
  {
    super(changeNumber, numExpectedAcks);
    this.replicationServerId = replicationServerId;
    this.replicationCache = replicationCache;
  }
  /**
   * Get the Identifier of the replication server from which we received the
   * change.
   * @return Returns the Identifier of the replication server from which we
   *         received the change.
   */
  public short getReplicationServerId()
  {
    return replicationServerId;
  }
  /**
   * Get the replicationCache of the replication server from which we received
   * the change.
   * @return Returns the replicationCache of the replication server from which
   *         we received the change .
   */
  public ReplicationCache getChangelogCache()
  {
    return replicationCache;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationCache.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogCache.java
@@ -55,7 +55,7 @@
/**
 * This class define an in-memory cache that will be used to store
 * the messages that have been received from an LDAP server or
 * from another changelog server and that should be forwarded to
 * from another replication server and that should be forwarded to
 * other servers.
 *
 * The size of the cache is set by configuration.
@@ -68,7 +68,7 @@
 * received to the disk and for trimming them
 * Decision to trim can be based on disk space or age of the message
 */
public class ChangelogCache
public class ReplicationCache
{
  private Object flowControlLock = new Object();
  private DN baseDn = null;
@@ -80,22 +80,22 @@
   * must push to this particular server
   *
   * We add new TreeSet in the HashMap when a new server register
   * to this changelog server.
   * to this replication server.
   *
   */
  private Map<Short, ServerHandler> connectedServers =
    new ConcurrentHashMap<Short, ServerHandler>();
  /*
   * This map contains one ServerHandler for each changelog servers
   * with which we are connected (so normally all the changelogs)
   * This map contains one ServerHandler for each replication servers
   * with which we are connected (so normally all the replication servers)
   * the first update in the balanced tree is the next change that we
   * must push to this particular server
   *
   * We add new TreeSet in the HashMap when a new changelog server register
   * to this changelog server.
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private Map<Short, ServerHandler> changelogServers =
  private Map<Short, ServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ServerHandler>();
  /*
@@ -104,18 +104,19 @@
   */
  private Map<Short, DbHandler> sourceDbHandlers =
    new ConcurrentHashMap<Short, DbHandler>();
  private Changelog changelog;
  private ReplicationServer replicationServer;
  /**
   * Creates a new ChangelogCache associated to the DN baseDn.
   * Creates a new ReplicationCache associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ChangelogCache.
   * @param changelog the Changelog that created this changelog cache.
   * @param baseDn The baseDn associated to the ReplicationCache.
   * @param replicationServer the ReplicationServer that created this
   *                          replicationServer cache.
   */
  public ChangelogCache(DN baseDn, Changelog changelog)
  public ReplicationCache(DN baseDn, ReplicationServer replicationServer)
  {
    this.baseDn = baseDn;
    this.changelog = changelog;
    this.replicationServer = replicationServer;
  }
  /**
@@ -134,7 +135,7 @@
    /*
     * TODO : In case that the source server is a LDAP server this method
     * should check that change did get pushed to at least one
     * other changelog server before pushing it to the LDAP servers
     * other replication server before pushing it to the LDAP servers
     */
    sourceHandler.updateServerState(update);
@@ -145,7 +146,7 @@
      int count = this.NumServers();
      if (count > 1)
      {
        if (sourceHandler.isChangelogServer())
        if (sourceHandler.isReplicationServer())
          ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
                                      this, count - 1);
        else
@@ -168,13 +169,13 @@
      {
        try
        {
          dbHandler = changelog.newDbHandler(id, baseDn);
          dbHandler = replicationServer.newDbHandler(id, baseDn);
        } catch (DatabaseException e)
        {
          /*
           * Because of database problem we can't save any more changes
           * from at least one LDAP server.
           * This changelog therefore can't do it's job properly anymore
           * This replicationServer therefore can't do it's job properly anymore
           * and needs to close all its connections and shutdown itself.
           */
          int    msgID   = MSGID_CHANGELOG_SHUTDOWN_DATABASE_ERROR;
@@ -182,7 +183,7 @@
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          changelog.shutdown();
          replicationServer.shutdown();
          return;
        }
        sourceDbHandlers.put(id, dbHandler);
@@ -194,11 +195,11 @@
    /*
     * Push the message to the changelog servers
     * Push the message to the replication servers
     */
    if (!sourceHandler.isChangelogServer())
    if (!sourceHandler.isReplicationServer())
    {
      for (ServerHandler handler : changelogServers.values())
      for (ServerHandler handler : replicationServers.values())
      {
        handler.add(update, sourceHandler);
      }
@@ -224,7 +225,7 @@
  /**
   * Create initialize context necessary for finding the changes
   * that must be sent to a given LDAP or changelog server.
   * that must be sent to a given LDAP or replication server.
   *
   * @param handler handler for the server that must be started
   * @throws Exception when method has failed
@@ -254,32 +255,32 @@
  {
    handler.stopHandler();
    if (handler.isChangelogServer())
      changelogServers.remove(handler.getServerId());
    if (handler.isReplicationServer())
      replicationServers.remove(handler.getServerId());
    else
      connectedServers.remove(handler.getServerId());
  }
  /**
   * Create initialize context necessary for finding the changes
   * that must be sent to a given changelog server.
   * that must be sent to a given replication server.
   *
   * @param handler the server ID to which we want to forward changes
   * @throws Exception in case of errors
   */
  public void startChangelog(ServerHandler handler) throws Exception
  public void startReplicationServer(ServerHandler handler) throws Exception
  {
    /*
     * create the balanced tree that will be used to forward changes
     * TODO throw proper exception
     */
    synchronized (changelogServers)
    synchronized (replicationServers)
    {
      if (changelogServers.containsKey(handler.getServerId()))
      if (replicationServers.containsKey(handler.getServerId()))
      {
        throw new Exception("changelog Id already registered");
        throw new Exception("Replication Server Id already registered");
      }
      changelogServers.put(handler.getServerId(), handler);
      replicationServers.put(handler.getServerId(), handler);
    }
  }
@@ -317,7 +318,7 @@
  }
  /**
   * Return a Set of String containing the lists of Changelog servers
   * Return a Set of String containing the lists of Replication servers
   * connected to this server.
   * @return the set of connected servers
   */
@@ -325,7 +326,7 @@
  {
    LinkedHashSet<String> mySet = new LinkedHashSet<String>();
    for (ServerHandler handler : changelogServers.values())
    for (ServerHandler handler : replicationServers.values())
    {
      mySet.add(handler.getServerAddressURL());
    }
@@ -335,8 +336,8 @@
  /**
   * Return a Set containing the servers known by this changelog.
   * @return a set containing the servers known by this changelog.
   * Return a Set containing the servers known by this replicationServer.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getServers()
  {
@@ -349,9 +350,9 @@
   *
   * @param serverId Identifier of the server for which the iterator is created.
   * @param changeNumber Starting point for the iterator.
   * @return the created ChangelogIterator.
   * @return the created ReplicationIterator.
   */
  public ChangelogIterator getChangelogIterator(short serverId,
  public ReplicationIterator getChangelogIterator(short serverId,
                    ChangeNumber changeNumber)
  {
    DbHandler handler = sourceDbHandlers.get(serverId);
@@ -377,8 +378,8 @@
  }
  /**
   * creates a new ChangelogDB with specified identifier.
   * @param id the identifier of the new ChangelogDB.
   * creates a new ReplicationDB with specified identifier.
   * @param id the identifier of the new ReplicationDB.
   * @param db the new db.
   *
   * @throws DatabaseException If a database error happened.
@@ -398,7 +399,7 @@
   */
  private int NumServers()
  {
    return changelogServers.size() + connectedServers.size();
    return replicationServers.size() + connectedServers.size();
  }
@@ -417,7 +418,7 @@
     *    In this case, we can find the handler from the connectedServers map
     *  - the message that was acked comes from a server to which we are not
     *    connected.
     *    In this case we need to find the changelog server that forwarded
     *    In this case we need to find the replication server that forwarded
     *    the change and send back the ack to this server.
     */
    ServerHandler handler = connectedServers.get(
@@ -455,10 +456,10 @@
    }
    else if (msg.getDestination() == RoutableMessage.ALL_SERVERS)
    {
      if (!senderHandler.isChangelogServer())
      if (!senderHandler.isReplicationServer())
      {
        // Send to all changelogServers
        for (ServerHandler destinationHandler : changelogServers.values())
        // Send to all replicationServers
        for (ServerHandler destinationHandler : replicationServers.values())
        {
          servers.add(destinationHandler);
        }
@@ -488,7 +489,7 @@
        if (senderHandler.isLDAPserver())
        {
          // let's forward to the other changelogs
          servers.addAll(changelogServers.values());
          servers.addAll(replicationServers.values());
        }
      }
    }
@@ -551,7 +552,7 @@
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     *                     change was an LDAP server or a ReplicationServer.
     */
    public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
    {
@@ -565,7 +566,7 @@
     *
     * @param changeNumber The ChangeNumber of the change that must be acked.
     * @param isLDAPserver This boolean indicates if the server that sent the
     *                     change was an LDAP server or a Changelog server.
     *                     change was an LDAP server or a ReplicationServer.
     * @param serverId     The identifier of the server from which we
     *                     received the change..
     */
@@ -576,7 +577,7 @@
      if (isLDAPserver)
        handler = connectedServers.get(serverId);
      else
        handler = changelogServers.get(serverId);
        handler = replicationServers.get(serverId);
      // TODO : check for null handler and log error
      try
@@ -599,12 +600,12 @@
    }
    /**
     * Shutdown this ChangelogCache.
     * Shutdown this ReplicationCache.
     */
    public void shutdown()
    {
      // Close session with other changelogs
      for (ServerHandler serverHandler : changelogServers.values())
      for (ServerHandler serverHandler : replicationServers.values())
      {
        serverHandler.shutdown();
      }
@@ -647,7 +648,7 @@
    @Override
    public String toString()
    {
      return "ChangelogCache " + baseDn;
      return "ReplicationCache " + baseDn;
    }
    /**
@@ -656,7 +657,7 @@
     */
    public void checkAllSaturation() throws IOException
    {
      for (ServerHandler handler : changelogServers.values())
      for (ServerHandler handler : replicationServers.values())
      {
        handler.checkWindow();
      }
@@ -676,7 +677,7 @@
     */
    public boolean restartAfterSaturation(ServerHandler sourceHandler)
    {
      for (ServerHandler handler : changelogServers.values())
      for (ServerHandler handler : replicationServers.values())
      {
        if (!handler.restartAfterSaturation(sourceHandler))
          return false;
opends/src/server/org/opends/server/replication/server/ReplicationDB.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogDB.java
@@ -53,11 +53,11 @@
 * and the dbHandler class.
 * This is the only class that should have code using the BDB interfaces.
 */
public class ChangelogDB
public class ReplicationDB
{
  private Database db = null;
  private ChangelogDbEnv dbenv = null;
  private Changelog changelog;
  private ReplicationDbEnv dbenv = null;
  private ReplicationServer replicationServer;
  private Short serverId;
  private DN baseDn;
@@ -66,18 +66,19 @@
   * to store and retrieve changes from an LDAP server.
   * @param serverId Identifier of the LDAP server.
   * @param baseDn baseDn of the LDAP server.
   * @param changelog the Changelog that needs to be shutdown
   * @param replicationServer the ReplicationServer that needs to be shutdown
   * @param dbenv the Db encironemnet to use to create the db
   * @throws DatabaseException if a database problem happened
   */
  public ChangelogDB(Short serverId, DN baseDn, Changelog changelog,
                     ChangelogDbEnv dbenv)
  public ReplicationDB(Short serverId, DN baseDn,
                     ReplicationServer replicationServer,
                     ReplicationDbEnv dbenv)
                     throws DatabaseException
  {
    this.serverId = serverId;
    this.baseDn = baseDn;
    this.dbenv = dbenv;
    this.changelog = changelog;
    this.replicationServer = replicationServer;
    db = dbenv.getOrAddDb(serverId, baseDn);
  }
@@ -97,8 +98,8 @@
      for (UpdateMessage change : changes)
      {
        DatabaseEntry key = new ChangelogKey(change.getChangeNumber());
        DatabaseEntry data = new ChangelogData(change);
        DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
        DatabaseEntry data = new ReplicationData(change);
        try
        {
@@ -110,7 +111,7 @@
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          changelog.shutdown();
          replicationServer.shutdown();
        }
      }
@@ -124,7 +125,7 @@
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      changelog.shutdown();
      replicationServer.shutdown();
      if (txn != null)
      {
        try
@@ -132,7 +133,7 @@
          txn.abort();
        } catch (DatabaseException e1)
        {
          // can't do much more. The Changelog server is shuting down.
          // can't do much more. The ReplicationServer is shuting down.
        }
      }
    }
@@ -159,33 +160,34 @@
  }
  /**
   * Create a cursor that can be used to search or iterate on this Changelog DB.
   * Create a cursor that can be used to search or iterate on this
   * ReplicationServer DB.
   *
   * @param changeNumber The ChangeNumber from which the cursor must start.
   * @throws DatabaseException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ChangelogCursor creation failed.
   * @return The ChangelogCursor.
   * @throws Exception if the ReplServerDBCursor creation failed.
   * @return The ReplServerDBCursor.
   */
  public ChangelogCursor openReadCursor(ChangeNumber changeNumber)
  public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
                throws DatabaseException, Exception
  {
    return new ChangelogCursor(changeNumber);
    return new ReplServerDBCursor(changeNumber);
  }
  /**
   * Create a cursor that can be used to delete some record from this
   * Changelog database.
   * ReplicationServer database.
   *
   * @throws DatabaseException If a database error prevented the cursor
   *                           creation.
   * @throws Exception if the ChangelogCursor creation failed.
   * @return The ChangelogCursor.
   * @throws Exception if the ReplServerDBCursor creation failed.
   * @return The ReplServerDBCursor.
   */
  public ChangelogCursor openDeleteCursor()
  public ReplServerDBCursor openDeleteCursor()
                throws DatabaseException, Exception
  {
    return new ChangelogCursor();
    return new ReplServerDBCursor();
  }
  /**
@@ -237,7 +239,7 @@
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      changelog.shutdown();
      replicationServer.shutdown();
      return null;
    }
  }
@@ -278,7 +280,7 @@
      logError(ErrorLogCategory.SYNCHRONIZATION,
               ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
      changelog.shutdown();
      replicationServer.shutdown();
      return null;
    }
  }
@@ -293,10 +295,10 @@
  }
  /**
   * This Class implements a cursor that can be used to browse a changelog
   * database.
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  public class ChangelogCursor
  public class ReplServerDBCursor
  {
    private Cursor cursor = null;
    private Transaction txn = null;
@@ -304,20 +306,21 @@
    DatabaseEntry data = new DatabaseEntry();
    /**
     * Creates a ChangelogCursor that can be used for browsing a changelog db.
     * Creates a ReplServerDBCursor that can be used for browsing a
     * replicationServer db.
     *
     * @param startingChangeNumber The ChangeNumber from which the cursor must
     *        start.
     * @throws Exception When the startingChangeNumber does not exist.
     */
    private ChangelogCursor(ChangeNumber startingChangeNumber)
    private ReplServerDBCursor(ChangeNumber startingChangeNumber)
            throws Exception
    {
      cursor = db.openCursor(txn, null);
      if (startingChangeNumber != null)
      {
        key = new ChangelogKey(startingChangeNumber);
        key = new ReplicationKey(startingChangeNumber);
        data = new DatabaseEntry();
        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
@@ -342,14 +345,14 @@
      }
    }
    private ChangelogCursor() throws DatabaseException
    private ReplServerDBCursor() throws DatabaseException
    {
      txn = dbenv.beginTransaction();
      cursor = db.openCursor(txn, null);
    }
    /**
     * Close the Changelog Cursor.
     * Close the ReplicationServer Cursor.
     */
    public void close()
    {
@@ -366,7 +369,7 @@
        logError(ErrorLogCategory.SYNCHRONIZATION,
                 ErrorLogSeverity.SEVERE_ERROR,
                 message, msgID);
        changelog.shutdown();
        replicationServer.shutdown();
      }
      if (txn != null)
      {
@@ -380,7 +383,7 @@
          logError(ErrorLogCategory.SYNCHRONIZATION,
                   ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          changelog.shutdown();
          replicationServer.shutdown();
        }
      }
    }
@@ -432,11 +435,11 @@
          return null;
        }
        try {
          currentChange = ChangelogData.generateChange(data.getData());
          currentChange = ReplicationData.generateChange(data.getData());
        } catch (Exception e) {
          /*
           * An error happening trying to convert the data from the changelog
           * database to an Update Message.
           * An error happening trying to convert the data from the
           * replicationServer database to an Update Message.
           * This can only happen if the database is corrupted.
           * There is not much more that we can do at this point except trying
           * to continue with the next record.
opends/src/server/org/opends/server/replication/server/ReplicationDBException.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogDBException.java
@@ -34,24 +34,24 @@
/**
 * This class define an Exception that must be used when some error
 * condition was detected in the changelog database that cannot be recovered
 * automatically.
 * condition was detected in the replicationServer database that cannot be
 * recovered automatically.
 */
public class ChangelogDBException extends IdentifiedException
public class ReplicationDBException extends IdentifiedException
{
  private int messageID;
  private static final long serialVersionUID = -8812600147768060090L;
  /**
   * Creates a new Changelog db exception with the provided message.
   * This Exception must be used when the full changelog service is
   * Creates a new ReplicationServer db exception with the provided message.
   * This Exception must be used when the full replicationServer service is
   * compromised by the exception
   *
   * @param  messageID  The unique message ID for the provided message.
   * @param  message    The message to use for this exception.
   */
  public ChangelogDBException(int messageID, String message)
  public ReplicationDBException(int messageID, String message)
  {
    super(message);
opends/src/server/org/opends/server/replication/server/ReplicationData.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogData.java
@@ -32,15 +32,16 @@
import org.opends.server.replication.protocol.UpdateMessage;
/**
 * SuperClass of DatabaseEntry used for data stored in the Changelog Databases.
 * SuperClass of DatabaseEntry used for data stored in the ReplicationServer
 * Databases.
 */
public class ChangelogData extends DatabaseEntry
public class ReplicationData extends DatabaseEntry
{
  /**
   * Creates a new ChangelogData object from an UpdateMessage.
   * @param change the UpdateMessage used to create the ChangelogData.
   * Creates a new ReplicationData object from an UpdateMessage.
   * @param change the UpdateMessage used to create the ReplicationData.
   */
  public ChangelogData(UpdateMessage change)
  public ReplicationData(UpdateMessage change)
  {
    this.setData(change.getBytes());
  }
opends/src/server/org/opends/server/replication/server/ReplicationDbEnv.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogDbEnv.java
@@ -52,13 +52,13 @@
/**
 * This class is used to represent a Db environement that can be used
 * to create ChangelogDB.
 * to create ReplicationDB.
 */
public class ChangelogDbEnv
public class ReplicationDbEnv
{
  private Environment dbEnvironment = null;
  private Database stateDb = null;
  private Changelog changelog = null;
  private ReplicationServer replicationServer = null;
  /**
   * Initialize this class.
@@ -66,20 +66,21 @@
   * It also reads the currently known databases from the "changelogstate"
   * database.
   * @param path Path where the backing files must be created.
   * @param changelog the Changelog that creates this ChangelogDbEnv.
   * @param replicationServer the ReplicationServer that creates this
   *                          ReplicationDbEnv.
   * @throws DatabaseException If a DatabaseException occured that prevented
   *                           the initialization to happen.
   * @throws ChangelogDBException If a changelog internal error caused
   *                              a failure of the changelog processing.
   * @throws ReplicationDBException If a replicationServer internal error caused
   *                              a failure of the replicationServer processing.
   */
  public ChangelogDbEnv(String path, Changelog changelog)
         throws DatabaseException, ChangelogDBException
  public ReplicationDbEnv(String path, ReplicationServer replicationServer)
         throws DatabaseException, ReplicationDBException
  {
    this.changelog = changelog;
    this.replicationServer = replicationServer;
    EnvironmentConfig envConfig = new EnvironmentConfig();
    /* Create the DB Environment that will be used for all
     * the Changelog activities related to the db
     * the ReplicationServer activities related to the db
     */
    envConfig.setAllowCreate(true);
    envConfig.setTransactional(true);
@@ -109,10 +110,10 @@
   * for each of them.
   *
   * @throws DatabaseException in case of underlying DatabaseException
   * @throws ChangelogDBException when the information from the database
   * @throws ReplicationDBException when the information from the database
   *                              cannot be decoded correctly.
   */
  private void start() throws DatabaseException, ChangelogDBException
  private void start() throws DatabaseException, ReplicationDBException
  {
    Cursor cursor = stateDb.openCursor(null, null);
    DatabaseEntry key = new DatabaseEntry();
@@ -140,17 +141,18 @@
                     message, msgID);
          }
          DbHandler dbHandler =
            new DbHandler(serverId, baseDn, changelog, this);
          changelog.getChangelogCache(baseDn).newDb(serverId, dbHandler);
            new DbHandler(serverId, baseDn, replicationServer, this);
          replicationServer.getReplicationCache(baseDn).newDb(serverId,
                                                            dbHandler);
        } catch (NumberFormatException e)
        {
          // should never happen
          throw new ChangelogDBException(0,
              "changelog state database has a wrong format");
          throw new ReplicationDBException(0,
              "replicationServer state database has a wrong format");
        } catch (UnsupportedEncodingException e)
        {
          // should never happens
          throw new ChangelogDBException(0, "need UTF-8 support");
          throw new ReplicationDBException(0, "need UTF-8 support");
        }
        status = cursor.getNext(key, data, LockMode.DEFAULT);
      }
opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogIterator.java
@@ -30,19 +30,19 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMessage;
import org.opends.server.replication.server.ChangelogDB.ChangelogCursor;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
/**
 * This class allows to iterate through the changes received from a given
 * LDAP Server Identifier.
 */
public class ChangelogIterator
public class ReplicationIterator
{
  private UpdateMessage currentChange = null;
  private ChangelogCursor cursor = null;
  private ReplServerDBCursor cursor = null;
  /**
   * Creates a new ChangelogIterator.
   * Creates a new ReplicationIterator.
   * @param id the Identifier of the server on which the iterator applies.
   * @param db The db where the iterator must be created.
   * @param changeNumber The ChangeNumber after which the iterator must start.
@@ -50,8 +50,9 @@
   *         with changeNumber number.
   * @throws DatabaseException if a database problem happened.
   */
  public ChangelogIterator(short id, ChangelogDB db, ChangeNumber changeNumber)
                           throws Exception, DatabaseException
  public ReplicationIterator(
          short id, ReplicationDB db, ChangeNumber changeNumber)
          throws Exception, DatabaseException
  {
    cursor = db.openReadCursor(changeNumber);
    if (cursor == null)
@@ -74,7 +75,7 @@
  }
  /**
   * Go to the next change in the ChangelogDB or in the server Queue.
   * Go to the next change in the ReplicationDB or in the server Queue.
   * @return false if the iterator is already on the last change before
   *         this call.
   */
opends/src/server/org/opends/server/replication/server/ReplicationIteratorComparator.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogIteratorComparator.java
@@ -31,21 +31,21 @@
import org.opends.server.replication.common.ChangeNumber;
/**
 * This Class define a Comparator that allows to know which ChangelogIterator
 * This Class define a Comparator that allows to know which ReplicationIterator
 * contain the next UpdateMessage in the order defined by the ChangeNumber
 * of the UpdateMessage.
 */
public class ChangelogIteratorComparator
              implements Comparator<ChangelogIterator>
public class ReplicationIteratorComparator
              implements Comparator<ReplicationIterator>
{
  /**
   * Compare the ChangeNumber of the ChangelogIterators.
   * Compare the ChangeNumber of the ReplicationIterator.
   *
   * @param o1 first ChangelogIterator.
   * @param o2 second ChangelogIterator.
   * @param o1 first ReplicationIterator.
   * @param o2 second ReplicationIterator.
   * @return result of the comparison.
   */
  public int compare(ChangelogIterator o1, ChangelogIterator o2)
  public int compare(ReplicationIterator o1, ReplicationIterator o2)
  {
    ChangeNumber csn1 = o1.getChange().getChangeNumber();
    ChangeNumber csn2 = o2.getChange().getChangeNumber();
opends/src/server/org/opends/server/replication/server/ReplicationKey.java
File was renamed from opends/src/server/org/opends/server/replication/server/ChangelogKey.java
@@ -34,15 +34,15 @@
/**
 * Superclass of DatabaseEntry.
 * Useful to create Changelog keys from ChangeNumbers.
 * Useful to create ReplicationServer keys from ChangeNumbers.
 */
public class ChangelogKey extends DatabaseEntry
public class ReplicationKey extends DatabaseEntry
{
  /**
   * Creates a new ChangelogKey from the given ChangeNumber.
   * Creates a new ReplicationKey from the given ChangeNumber.
   * @param changeNumber The changeNumber to use.
   */
  public ChangelogKey(ChangeNumber changeNumber)
  public ReplicationKey(ChangeNumber changeNumber)
  {
    try
    {
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
File was renamed from opends/src/server/org/opends/server/replication/server/Changelog.java
@@ -62,16 +62,16 @@
import com.sleepycat.je.DatabaseException;
/**
 * Changelog Listener.
 * ReplicationServer Listener.
 *
 * This singleton is the main object of the changelog server
 * This singleton is the main object of the replication server
 * It waits for the incoming connections and create listener
 * and publisher objects for
 * connection with LDAP servers and with changelog servers
 * connection with LDAP servers and with replication servers
 *
 * It is responsible for creating the changelog cache and managing it
 * It is responsible for creating the replication server cache and managing it
 */
public class Changelog
public class ReplicationServer
  implements Runnable, ConfigurableComponent,
             ConfigurationChangeListener<ChangelogServerCfg>
{
@@ -84,14 +84,14 @@
  private boolean runListen = true;
  /* The list of changelog servers configured by the administrator */
  private Collection<String> changelogServers;
  /* The list of replication servers configured by the administrator */
  private Collection<String> replicationServers;
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private HashMap<DN, ChangelogCache> baseDNs =
          new HashMap<DN, ChangelogCache>();
  private HashMap<DN, ReplicationCache> baseDNs =
          new HashMap<DN, ReplicationCache>();
  private String localURL = "null";
  private boolean shutdown = false;
@@ -99,7 +99,7 @@
  private DN configDn;
  private List<ConfigAttribute> configAttributes =
          new ArrayList<ConfigAttribute>();
  private ChangelogDbEnv dbEnv;
  private ReplicationDbEnv dbEnv;
  private int rcvWindow;
  private int queueSize;
  private String dbDirname = null;
@@ -107,20 +107,21 @@
                        // de deleted from the persistent storage.
  /**
   * Creates a new Changelog using the provided configuration entry.
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this changelog.
   * @param configuration The configuration of this replication server.
   * @throws ConfigException When Configuration is invalid.
   */
  public Changelog(ChangelogServerCfg configuration) throws ConfigException
  public ReplicationServer(ChangelogServerCfg configuration)
         throws ConfigException
  {
    shutdown = false;
    runListen = true;
    int changelogPort = configuration.getChangelogPort();
    changelogServerId = (short) configuration.getChangelogServerId();
    changelogServers = configuration.getChangelogServer();
    if (changelogServers == null)
      changelogServers = new ArrayList<String>();
    replicationServers = configuration.getChangelogServer();
    if (replicationServers == null)
      replicationServers = new ArrayList<String>();
    queueSize = configuration.getQueueSize();
    trimAge = configuration.getChangelogPurgeDelay();
    dbDirname = configuration.getChangelogDbDirectory();
@@ -201,8 +202,8 @@
  /**
   * The run method for the Listen thread.
   * This thread accept incoming connections on the changelog server
   * ports from other changelog servers or from LDAP servers
   * This thread accept incoming connections on the replication server
   * ports from other replication servers or from LDAP servers
   * and spawn further thread responsible for handling those connections
   */
@@ -211,9 +212,9 @@
    Socket newSocket = null;
    while (shutdown == false)
    {
      // Wait on the changelog port.
      // Read incoming messages and create LDAP or Changelog listener and
      // Publisher.
      // Wait on the replicationServer port.
      // Read incoming messages and create LDAP or ReplicationServer listener
      // and Publisher.
      try
      {
@@ -232,9 +233,9 @@
  }
  /**
   * This method manages the connection with the other changelog servers.
   * It periodically checks that this changelog server is indeed connected
   * to all the other changelog servers and if not attempts to
   * This method manages the connection with the other replication servers.
   * It periodically checks that this replication server is indeed connected
   * to all the other replication servers and if not attempts to
   * make the connection.
   */
  private void runConnect()
@@ -243,21 +244,21 @@
    {
      /*
       * periodically check that we are connected to all other
       * changelog servers and if not establish the connection
       * replication servers and if not establish the connection
       */
      for (ChangelogCache changelogCache: baseDNs.values())
      for (ReplicationCache replicationCache: baseDNs.values())
      {
        Set<String> connectedChangelogs = changelogCache.getChangelogs();
        Set<String> connectedChangelogs = replicationCache.getChangelogs();
        /*
         * check that all changelog in the config are in the connected Set
         * if not create the connection
         * check that all replication server in the config are in the connected
         * Set. If not create the connection
         */
        for (String serverURL : changelogServers)
        for (String serverURL : replicationServers)
        {
          if ((serverURL.compareTo(this.serverURL) != 0) &&
              (!connectedChangelogs.contains(serverURL)))
          {
            this.connect(serverURL, changelogCache.getBaseDn());
            this.connect(serverURL, replicationCache.getBaseDn());
          }
        }
      }
@@ -309,10 +310,11 @@
  }
  /**
   * initialization function for the changelog.
   * initialization function for the replicationServer.
   *
   * @param  changelogId       The unique identifier for this changelog.
   * @param  changelogPort     The port on which the changelog should listen.
   * @param  changelogId       The unique identifier for this replicationServer.
   * @param  changelogPort     The port on which the replicationServer should
   *                           listen.
   *
   */
  private void initialize(short changelogId, int changelogPort)
@@ -320,18 +322,18 @@
    try
    {
      /*
       * Initialize the changelog database.
       * Initialize the replicationServer database.
       */
      dbEnv = new ChangelogDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
      dbEnv = new ReplicationDbEnv(getFileForPath(dbDirname).getAbsolutePath(),
          this);
      /*
       * create changelog cache
       * create replicationServer cache
       */
      serverId = changelogId;
      /*
       * Open changelog socket
       * Open replicationServer socket
       */
      String localhostname = InetAddress.getLocalHost().getHostName();
      String localAdddress = InetAddress.getLocalHost().getHostAddress();
@@ -344,9 +346,9 @@
      /*
       * create working threads
       */
      myListenThread = new DirectoryThread(this, "Changelog Listener");
      myListenThread = new DirectoryThread(this, "Replication Server Listener");
      myListenThread.start();
      myConnectThread = new DirectoryThread(this, "Changelog Connect");
      myConnectThread = new DirectoryThread(this, "Replication Server Connect");
      myConnectThread.start();
    } catch (DatabaseException e)
@@ -355,7 +357,7 @@
      String message = getMessage(msgID, dbDirname);
      logError(ErrorLogCategory.SYNCHRONIZATION, ErrorLogSeverity.SEVERE_ERROR,
               message, msgID);
    } catch (ChangelogDBException e)
    } catch (ReplicationDBException e)
    {
      int msgID = MSGID_COULD_NOT_READ_DB;
      String message = getMessage(msgID, dbDirname);
@@ -378,28 +380,28 @@
  }
  /**
   * Get the ChangelogCache associated to the base DN given in parameter.
   * Get the ReplicationCache associated to the base DN given in parameter.
   *
   * @param baseDn The base Dn for which the ChangelogCache must be returned.
   * @return The ChangelogCache associated to the base DN given in parameter.
   * @param baseDn The base Dn for which the ReplicationCache must be returned.
   * @return The ReplicationCache associated to the base DN given in parameter.
   */
  public ChangelogCache getChangelogCache(DN baseDn)
  public ReplicationCache getReplicationCache(DN baseDn)
  {
    ChangelogCache changelogCache;
    ReplicationCache replicationCache;
    synchronized (baseDNs)
    {
      changelogCache = baseDNs.get(baseDn);
      if (changelogCache == null)
        changelogCache = new ChangelogCache(baseDn, this);
      baseDNs.put(baseDn, changelogCache);
      replicationCache = baseDNs.get(baseDn);
      if (replicationCache == null)
        replicationCache = new ReplicationCache(baseDn, this);
      baseDNs.put(baseDn, replicationCache);
    }
    return changelogCache;
    return replicationCache;
  }
  /**
   * Shutdown the Changelog service and all its connections.
   * Shutdown the Replication Server service and all its connections.
   */
  public void shutdown()
  {
@@ -421,13 +423,13 @@
      listenSocket.close();
    } catch (IOException e)
    {
      // changelog service is closing anyway.
      // replication Server service is closing anyway.
    }
    // shutdown all the ChangelogCaches
    for (ChangelogCache changelogCache : baseDNs.values())
    for (ReplicationCache replicationCache : baseDNs.values())
    {
      changelogCache.shutdown();
      replicationCache.shutdown();
    }
    dbEnv.shutdown();
@@ -435,12 +437,12 @@
  /**
   * Creates a new DB handler for this Changelog and the serverId and
   * Creates a new DB handler for this ReplicationServer and the serverId and
   * DN given in parameter.
   *
   * @param id The serverId for which the dbHandler must be created.
   * @param baseDn The DN for which the dbHandler muste be created.
   * @return The new DB handler for this Changelog and the serverId and
   * @return The new DB handler for this ReplicationServer and the serverId and
   *         DN given in parameter.
   * @throws DatabaseException in case of underlying database problem.
   */
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,7 +50,7 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.ChangelogStartMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMessage;
@@ -69,7 +69,7 @@
/**
 * This class defines a server handler, which handles all interaction with a
 * changelog server.
 * replication server.
 */
public class ServerHandler extends MonitorProvider
{
@@ -79,7 +79,7 @@
  private MsgQueue lateQueue = new MsgQueue();
  private final Map<ChangeNumber, AckMessageList> waitingAcks  =
          new HashMap<ChangeNumber, AckMessageList>();
  private ChangelogCache changelogCache = null;
  private ReplicationCache replicationCache = null;
  private String serverURL;
  private int outCount = 0; // number of update sent to the server
  private int inCount = 0;  // number of updates received from the server
@@ -111,7 +111,7 @@
                                       // flow controled and should
                                       // be stopped from sending messsages.
  private int saturationCount = 0;
  private short changelogId;
  private short replicationServerId;
  /**
   * The time in milliseconds between heartbeats from the replication
@@ -124,8 +124,9 @@
   */
  HeartbeatThread heartbeatThread = null;
  private static final Map<ChangeNumber, ChangelogAckMessageList>
   changelogsWaitingAcks = new HashMap<ChangeNumber, ChangelogAckMessageList>();
  private static final Map<ChangeNumber, ReplServerAckMessageList>
   changelogsWaitingAcks =
       new HashMap<ChangeNumber, ReplServerAckMessageList>();
  /**
   * Creates a new server handler instance with the provided socket.
@@ -144,22 +145,24 @@
  /**
   * Do the exchange of start messages to know if the remote
   * server is an LDAP or changelog server and to exchange serverID.
   * server is an LDAP or replication server and to exchange serverID.
   * Then create the reader and writer thread.
   *
   * @param baseDn baseDn of the ServerHandler when this is an outgoing conn.
   *               null if this is an incoming connection.
   * @param changelogId The identifier of the changelog that creates this
   *                    server handler.
   * @param changelogURL The URL of the changelog that creates this
   *                    server handler.
   * @param replicationServerId The identifier of the replicationServer that
   *                            creates this server handler.
   * @param replicationServerURL The URL of the replicationServer that creates
   *                             this server handler.
   * @param windowSize the window size that this server handler must use.
   * @param changelog the Changelog that created this server handler.
   * @param replicationServer the ReplicationServer that created this server
   *                          handler.
   */
  public void start(DN baseDn, short changelogId, String changelogURL,
                    int windowSize, Changelog changelog)
  public void start(DN baseDn, short replicationServerId,
                    String replicationServerURL,
                    int windowSize, ReplicationServer replicationServer)
  {
    this.changelogId = changelogId;
    this.replicationServerId = replicationServerId;
    rcvWindowSizeHalf = windowSize/2;
    maxRcvWindow = windowSize;
    rcvWindow = windowSize;
@@ -168,10 +171,10 @@
      if (baseDn != null)
      {
        this.baseDn = baseDn;
        changelogCache = changelog.getChangelogCache(baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage msg =
          new ChangelogStartMessage(changelogId, changelogURL,
        replicationCache = replicationServer.getReplicationCache(baseDn);
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage msg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    baseDn, windowSize, localServerState);
        session.publish(msg);
@@ -225,17 +228,17 @@
        serverIsLDAPserver = true;
        changelogCache = changelog.getChangelogCache(this.baseDn);
        ServerState localServerState = changelogCache.getDbServerState();
        ChangelogStartMessage myStartMsg =
          new ChangelogStartMessage(changelogId, changelogURL,
        replicationCache = replicationServer.getReplicationCache(this.baseDn);
        ServerState localServerState = replicationCache.getDbServerState();
        ReplServerStartMessage myStartMsg =
          new ReplServerStartMessage(replicationServerId, replicationServerURL,
                                    this.baseDn, windowSize, localServerState);
        session.publish(myStartMsg);
        sendWindowSize = receivedMsg.getWindowSize();
      }
      else if (msg instanceof ChangelogStartMessage)
      else if (msg instanceof ReplServerStartMessage)
      {
        ChangelogStartMessage receivedMsg = (ChangelogStartMessage) msg;
        ReplServerStartMessage receivedMsg = (ReplServerStartMessage) msg;
        serverId = receivedMsg.getServerId();
        serverURL = receivedMsg.getServerURL();
        String[] splittedURL = serverURL.split(":");
@@ -244,11 +247,12 @@
        this.baseDn = receivedMsg.getBaseDn();
        if (baseDn == null)
        {
          changelogCache = changelog.getChangelogCache(this.baseDn);
          ServerState serverState = changelogCache.getDbServerState();
          ChangelogStartMessage outMsg =
            new ChangelogStartMessage(changelogId, changelogURL,
                                      this.baseDn, windowSize, serverState);
          replicationCache = replicationServer.getReplicationCache(this.baseDn);
          ServerState serverState = replicationCache.getDbServerState();
          ReplServerStartMessage outMsg =
            new ReplServerStartMessage(replicationServerId,
                                       replicationServerURL,
                                       this.baseDn, windowSize, serverState);
          session.publish(outMsg);
        }
        else
@@ -262,21 +266,21 @@
        return;   // we did not recognize the message, ignore it
      }
      changelogCache = changelog.getChangelogCache(this.baseDn);
      replicationCache = replicationServer.getReplicationCache(this.baseDn);
      if (serverIsLDAPserver)
      {
        changelogCache.startServer(this);
        replicationCache.startServer(this);
      }
      else
      {
        changelogCache.startChangelog(this);
        replicationCache.startReplicationServer(this);
      }
      writer = new ServerWriter(session, serverId, this, changelogCache);
      writer = new ServerWriter(session, serverId, this, replicationCache);
      reader = new ServerReader(session, serverId, this,
                                             changelogCache);
                                             replicationCache);
      reader.start();
      writer.start();
@@ -486,11 +490,12 @@
  }
  /**
   * Check if the server associated to this ServerHandler is a changelog server.
   * Check if the server associated to this ServerHandler is a replication
   * server.
   * @return true if the server associated to this ServerHandler is a
   *         changelog server.
   *         replication server.
   */
  public boolean isChangelogServer()
  public boolean isReplicationServer()
  {
    return (!serverIsLDAPserver);
  }
@@ -520,7 +525,7 @@
        * the sum of the number of missing changes for every dbHandler.
        */
       int totalCount = 0;
       ServerState dbState = changelogCache.getDbServerState();
       ServerState dbState = replicationCache.getDbServerState();
       for (short id : dbState)
       {
         int max = dbState.getMaxChangeNumber(id).getSeqnum();
@@ -554,7 +559,7 @@
   * Get an approximation of the delay by looking at the age of the odest
   * message that has not been sent to this server.
   * This is an approximation because the age is calculated using the
   * clock of the servee where the changelog is currently running
   * clock of the servee where the replicationServer is currently running
   * while it should be calculated using the clock of the server
   * that originally processed the change.
   *
@@ -686,7 +691,7 @@
      saturationCount = 0;
      try
      {
        changelogCache.checkAllSaturation();
        replicationCache.checkAllSaturation();
      }
      catch (IOException e)
      {
@@ -747,16 +752,16 @@
           *   load this change on the delayList
           *
           */
          ChangelogIteratorComparator comparator =
            new ChangelogIteratorComparator();
          SortedSet<ChangelogIterator> iteratorSortedSet =
            new TreeSet<ChangelogIterator>(comparator);
          ReplicationIteratorComparator comparator =
            new ReplicationIteratorComparator();
          SortedSet<ReplicationIterator> iteratorSortedSet =
            new TreeSet<ReplicationIterator>(comparator);
          /* fill the lateQueue */
          for (short serverId : changelogCache.getServers())
          for (short serverId : replicationCache.getServers())
          {
            ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
            ChangelogIterator iterator =
              changelogCache.getChangelogIterator(serverId, lastCsn);
            ReplicationIterator iterator =
              replicationCache.getChangelogIterator(serverId, lastCsn);
            if ((iterator != null) && (iterator.getChange() != null))
            {
              iteratorSortedSet.add(iterator);
@@ -764,7 +769,7 @@
          }
          while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))
          {
            ChangelogIterator iterator = iteratorSortedSet.first();
            ReplicationIterator iterator = iteratorSortedSet.first();
            iteratorSortedSet.remove(iterator);
            lateQueue.add(iterator.getChange());
            if (iterator.next())
@@ -772,7 +777,7 @@
            else
              iterator.releaseCursor();
          }
          for (ChangelogIterator iterator : iteratorSortedSet)
          for (ReplicationIterator iterator : iteratorSortedSet)
          {
            iterator.releaseCursor();
          }
@@ -928,13 +933,13 @@
    }
    if (completedFlag)
    {
      changelogCache.sendAck(changeNumber, true);
      replicationCache.sendAck(changeNumber, true);
    }
  }
  /**
   * Process reception of an for an update that was received from a
   * Changelog Server.
   * ReplicationServer.
   *
   * @param message the ack message that was received.
   * @param ackingServerId The  id of the server that acked the change.
@@ -942,7 +947,7 @@
  public static void ackChangelog(AckMessage message, short ackingServerId)
  {
    ChangeNumber changeNumber = message.getChangeNumber();
    ChangelogAckMessageList ackList;
    ReplServerAckMessageList ackList;
    boolean completedFlag;
    synchronized (changelogsWaitingAcks)
    {
@@ -958,9 +963,9 @@
    }
    if (completedFlag)
    {
      ChangelogCache changelogCache = ackList.getChangelogCache();
      changelogCache.sendAck(changeNumber, false,
                             ackList.getChangelogServerId());
      ReplicationCache replicationCache = ackList.getChangelogCache();
      replicationCache.sendAck(changeNumber, false,
                             ackList.getReplicationServerId());
    }
  }
@@ -982,24 +987,26 @@
  }
  /**
   * Add an update to the list of update received from a changelog server and
   * Add an update to the list of update received from a replicationServer and
   * waiting for acks.
   *
   * @param update The update that must be added to the list.
   * @param ChangelogServerId The identifier of the changelog that sent the
   *                          update.
   * @param changelogCache The ChangelogCache from which the change was
   *                       processed and to which the ack must later be sent.
   * @param ChangelogServerId The identifier of the replicationServer that sent
   *                          the update.
   * @param replicationCache The ReplicationCache from which the change was
   *                         processed and to which the ack must later be sent.
   * @param nbWaitedAck The number of ack that must be received before
   *                    the update is fully acked.
   */
  public static void addWaitingAck(UpdateMessage update,
      short ChangelogServerId, ChangelogCache changelogCache, int nbWaitedAck)
  public static void addWaitingAck(
      UpdateMessage update,
      short ChangelogServerId, ReplicationCache replicationCache,
      int nbWaitedAck)
  {
    ChangelogAckMessageList ackList =
          new ChangelogAckMessageList(update.getChangeNumber(),
    ReplServerAckMessageList ackList =
          new ReplServerAckMessageList(update.getChangeNumber(),
                                      nbWaitedAck,
                                      ChangelogServerId, changelogCache);
                                      ChangelogServerId, replicationCache);
    synchronized(changelogsWaitingAcks)
    {
      changelogsWaitingAcks.put(update.getChangeNumber(), ackList);
@@ -1031,7 +1038,7 @@
   * Check type of server handled.
   *
   * @return true if the handled server is an LDAP server.
   *         false if the handled server is a changelog server
   *         false if the handled server is a replicationServer
   */
  public boolean isLDAPserver()
  {
@@ -1074,7 +1081,7 @@
    if (serverIsLDAPserver)
      return "LDAP Server " + str;
    else
      return "Changelog Server " + str;
      return "Replication Server " + str;
  }
  /**
@@ -1122,7 +1129,7 @@
    if (serverIsLDAPserver)
      attributes.add(new Attribute("LDAP-Server", serverURL));
    else
      attributes.add(new Attribute("Changelog-Server", serverURL));
      attributes.add(new Attribute("ReplicationServer-Server", serverURL));
    attributes.add(new Attribute("server-id",
                                 String.valueOf(serverId)));
    attributes.add(new Attribute("base-dn",
@@ -1199,7 +1206,7 @@
      if (serverIsLDAPserver)
        localString = "Directory Server ";
      else
        localString = "Changelog Server ";
        localString = "Replication Server ";
      localString += serverId + " " + serverURL + " " + baseDn;
@@ -1233,7 +1240,7 @@
    {
      if (flowControl)
      {
        if (changelogCache.restartAfterSaturation(this))
        if (replicationCache.restartAfterSaturation(this))
        {
          flowControl = false;
        }
@@ -1277,13 +1284,15 @@
  public void process(RoutableMessage msg)
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
      debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") receives " + msg + " from " + serverId, 1);
        "SH(" + replicationServerId + ") receives " + msg +
            " from " + serverId, 1);
    changelogCache.process(msg, this);
    replicationCache.process(msg, this);
  }
  /**
@@ -1296,11 +1305,13 @@
  public void send(RoutableMessage msg) throws IOException
  {
    if (debugEnabled())
      debugInfo("SH(" + changelogId + ") forwards " + msg + " to " + serverId);
      debugInfo("SH(" + replicationServerId + ") forwards " +
                 msg + " to " + serverId);
    logError(ErrorLogCategory.SYNCHRONIZATION,
        ErrorLogSeverity.SEVERE_ERROR,
        "SH(" + changelogId + ") forwards " + msg + " to " + serverId, 1);
        "SH(" + replicationServerId + ") forwards " +
             msg + " to " + serverId, 1);
    session.publish(msg);
  }
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -48,13 +48,13 @@
/**
 * This class implement the part of the changelog that is reading
 * This class implement the part of the replicationServer that is reading
 * the connection from the LDAP servers to get all the updates that
 * were done on this replica and forward them to other servers.
 *
 * A single thread is dedicated to this work.
 * It waits in a blocking mode on the connection from the LDAP server
 * and upon receiving an update puts in into the changelog cache
 * and upon receiving an update puts in into the replicationServer cache
 * from where the other servers will grab it.
 */
public class ServerReader extends DirectoryThread
@@ -62,24 +62,24 @@
  private short serverId;
  private ProtocolSession session;
  private ServerHandler handler;
  private ChangelogCache changelogCache;
  private ReplicationCache replicationCache;
  /**
   * Constructor for the LDAP server reader part of the changelog.
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session The ProtocolSession from which to read the data.
   * @param serverId The server ID of the server from which we read changes.
   * @param handler The server handler for this server reader.
   * @param changelogCache The ChangelogCache for this server reader.
   * @param replicationCache The ReplicationCache for this server reader.
   */
  public ServerReader(ProtocolSession session, short serverId,
                      ServerHandler handler, ChangelogCache changelogCache)
                      ServerHandler handler, ReplicationCache replicationCache)
  {
    super(handler.toString() + " reader");
    this.session = session;
    this.serverId = serverId;
    this.handler = handler;
    this.changelogCache = changelogCache;
    this.replicationCache = replicationCache;
  }
  /**
@@ -90,7 +90,7 @@
    /*
     * TODO : catch exceptions in case of bugs
     * wait on input stream
     * grab all incoming messages and publish them to the changelogCache
     * grab all incoming messages and publish them to the replicationCache
     */
    try
    {
@@ -108,13 +108,13 @@
        {
          AckMessage ack = (AckMessage) msg;
          handler.checkWindow();
          changelogCache.ack(ack, serverId);
          replicationCache.ack(ack, serverId);
        }
        else if (msg instanceof UpdateMessage)
        {
          UpdateMessage update = (UpdateMessage) msg;
          handler.decAndCheckWindow();
          changelogCache.put(update, handler);
          replicationCache.put(update, handler);
        }
        else if (msg instanceof WindowMessage)
        {
@@ -190,7 +190,7 @@
      {
       // ignore
      }
      changelogCache.stopServer(handler);
      replicationCache.stopServer(handler);
    }
  }
}
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -49,7 +49,7 @@
{
  private ProtocolSession session;
  private ServerHandler handler;
  private ChangelogCache changelogCache;
  private ReplicationCache replicationCache;
  /**
   * Create a ServerWriter.
@@ -59,21 +59,21 @@
   * @param session the ProtocolSession that will be used to send updates.
   * @param serverId the Identifier of the server.
   * @param handler handler for which the ServerWriter is created.
   * @param changelogCache The ChangelogCache of this ServerWriter.
   * @param replicationCache The ReplicationCache of this ServerWriter.
   */
  public ServerWriter(ProtocolSession session, short serverId,
                      ServerHandler handler, ChangelogCache changelogCache)
                      ServerHandler handler, ReplicationCache replicationCache)
  {
    super(handler.toString() + " writer");
    this.session = session;
    this.handler = handler;
    this.changelogCache = changelogCache;
    this.replicationCache = replicationCache;
  }
  /**
   * Run method for the ServerWriter.
   * Loops waiting for changes from the ChangelogCache and forward them
   * Loops waiting for changes from the ReplicationCache and forward them
   * to the other servers
   */
  public void run()
@@ -81,7 +81,7 @@
    try {
      while (true)
      {
        UpdateMessage update = changelogCache.take(this.handler);
        UpdateMessage update = replicationCache.take(this.handler);
        if (update == null)
          return;       /* this connection is closing */
        session.publish(update);
@@ -131,7 +131,7 @@
      {
       // Can't do much more : ignore
      }
      changelogCache.stopServer(handler);
      replicationCache.stopServer(handler);
    }
  }
}
opends/src/server/org/opends/server/replication/server/package-info.java
@@ -26,17 +26,17 @@
 */
/**
 * This package contains the code for the changelog service part
 * This package contains the code for the Replication Server part
 * of the Multimaster replication feature.
 * <br>
 *
 * A changelog server is responsible for :
 * A replication server is responsible for :
 * <br>
 * <ul>
 * <li>listen for connections from ldap servers.</li>
 * <li>Connect/manage connection to other changelog servers.</li>
 * <li>Connect/manage connection to other replication servers.</li>
 * <li>Receive changes from ldap servers.</li>
 * <li>Forward changes to ldap server and other changelog servers.</li>
 * <li>Forward changes to ldap server and other replication servers.</li>
 * <li>Save changes to stable storage (includes trimming of older operations).
 * </li>
 * </ul>
@@ -46,19 +46,19 @@
 * <ul>
 * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
 * implements the ProtocolSession interface that is
 * used by the changelog server and the directory server to communicate.
 * used by the replication server and the directory server to communicate.
 * This is done by using the innate encoding/decoding capabilities of the
 * ReplicationMessages objects. This class is used by both the
 * changelog and the replication package.
 * replicationServer and the replication package.
 * </li>
 * <li><A HREF="ChangelogCache.html"><B>ChangelogCache</B></A>
 * implements the multiplexing part of the changelog
 * <li><A HREF="ReplicationCache.html"><B>ReplicationCache</B></A>
 * implements the multiplexing part of the replication
 * server. It contains method for forwarding all the received messages to
 * the ServerHandler and to the dbHandler objects.<br>
 * </li>
 * <li><A HREF="ServerHandler.html"><B>ServerHandler</B></A>
 * contains the code related to handler of remote
 * server. It can manage changelog servers of directory servers (may be it
 * server. It can manage replication servers of directory servers (may be it
 * shoudl be splitted in two different classes, one for each of these).<br>
 * </li>
 * <li><A HREF="ServerWriter.html"><B>ServerWriter</B></A>
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -46,7 +46,7 @@
import org.opends.server.messages.TaskMessages;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.ReplicationDomain;
import org.opends.server.replication.protocol.DoneMessage;
import org.opends.server.replication.protocol.EntryMessage;
@@ -56,8 +56,8 @@
import org.opends.server.replication.protocol.RoutableMessage;
import org.opends.server.replication.protocol.SocketSession;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.server.Changelog;
import org.opends.server.replication.server.ChangelogFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DN;
@@ -118,9 +118,9 @@
  int changelogPort = 8989;
  private DN baseDn;
  ChangelogBroker server2 = null;
  Changelog changelog1 = null;
  Changelog changelog2 = null;
  ReplicationBroker server2 = null;
  ReplicationServer changelog1 = null;
  ReplicationServer changelog2 = null;
  boolean emptyOldChanges = true;
  ReplicationDomain sd = null;
@@ -626,7 +626,7 @@
   * @param destinationServerID The target server.
   * @param requestorID The initiator server.
   */
  private void makeBrokerPublishEntries(ChangelogBroker broker,
  private void makeBrokerPublishEntries(ReplicationBroker broker,
      short senderID, short destinationServerID, short requestorID)
  {
    // Send entries
@@ -658,7 +658,7 @@
    }
  }
  void receiveUpdatedEntries(ChangelogBroker broker, short serverID,
  void receiveUpdatedEntries(ReplicationBroker broker, short serverID,
      String[] updatedEntries)
  {
    // Expect the broker to receive the entries
@@ -715,11 +715,11 @@
  }
  /**
   * Creates a new changelog server.
   * @param changelogId The serverID of the changelog to create.
   * @return The new changelog server.
   * Creates a new replicationServer.
   * @param changelogId The serverID of the replicationServer to create.
   * @return The new replicationServer.
   */
  private Changelog createChangelogServer(short changelogId)
  private ReplicationServer createChangelogServer(short changelogId)
  {
    try
    {
@@ -732,13 +732,13 @@
      {
        int chPort = getChangelogPort(changelogId);
        ChangelogFakeConfiguration conf =
          new ChangelogFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(chPort, null, 0, changelogId, 0, 100,
                                         null);
        Changelog changelog = new Changelog(conf);
        ReplicationServer replicationServer = new ReplicationServer(conf);
        Thread.sleep(1000);
        return changelog;
        return replicationServer;
      }
    }
    catch (Exception e)
@@ -750,12 +750,12 @@
  /**
   * Create a synchronized suffix in the current server providing the
   * changelog serverID.
   * replication Server ID.
   * @param changelogID
   */
  private void connectServer1ToChangelog(short changelogID)
  {
    // Connect DS to the changelog
    // Connect DS to the replicationServer
    try
    {
      // suffix synchronized
@@ -820,7 +820,7 @@
    {
      changelog1 = createChangelogServer(changelog1ID);
      // Connect DS to the changelog
      // Connect DS to the replicationServer
      connectServer1ToChangelog(changelog1ID);
      if (server2 == null)
@@ -874,7 +874,7 @@
    changelog1 = createChangelogServer(changelog1ID);
    // Connect DS to the changelog
    // Connect DS to the replicationServer
    connectServer1ToChangelog(changelog1ID);
    addTestEntriesToDB();
@@ -959,7 +959,7 @@
      server2 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server2ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    ChangelogBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"),
    ReplicationBroker server3 = openChangelogSession(DN.decode("dc=example,dc=com"),
        server3ID, 100, getChangelogPort(changelog1ID), 1000, emptyOldChanges);
    Thread.sleep(1000);
@@ -1217,7 +1217,7 @@
    changelog2 = createChangelogServer(changelog2ID);
    Thread.sleep(3000);
    // Connect DS to the changelog 1
    // Connect DS to the replicationServer 1
    connectServer1ToChangelog(changelog1ID);
    // Put entries in DB
@@ -1435,7 +1435,7 @@
      server2.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      // fromthe changelog server.
      // from the replicationServer.
      server2 = null;
    }
    super.cleanRealEntries();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ProtocolWindowTest.java
@@ -43,7 +43,7 @@
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
@@ -82,11 +82,11 @@
  /**
   * Test the window mechanism by :
   *  - creating a Changelog service client using the ChangelogBroker class.
   *  - creating a ReplicationServer service client using the ReplicationBroker class.
   *  - set a small window size.
   *  - perform more than the window size operations.
   *  - check that the Changelog has not sent more than window size operations.
   *  - receive all messages from the ChangelogBroker, check that
   *  - check that the ReplicationServer has not sent more than window size operations.
   *  - receive all messages from the ReplicationBroker, check that
   *    the client receives the correct number of operations.
   */
  @Test(enabled=true, groups="slow")
@@ -98,12 +98,12 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    ChangelogBroker broker = openChangelogSession(baseDn, (short) 13,
    ReplicationBroker broker = openChangelogSession(baseDn, (short) 13,
        WINDOW_SIZE, 8989, 1000, true);
    try {
      /* Test that changelog monitor and synchro plugin monitor informations
      /* Test that replicationServer monitor and synchro plugin monitor informations
       * publish the correct window size.
       * This allows both the check the monitoring code and to test that
       * configuration is working.
@@ -137,15 +137,15 @@
      assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD Replication message is not for the excepted DN");
      // send (2 * window + changelog queue) modify operations
      // so that window + changelog queue get stuck in the changelog queue
      // send (2 * window + replicationServer queue) modify operations
      // so that window + replicationServer queue get stuck in the replicationServer queue
      int count = WINDOW_SIZE * 2 + CHANGELOG_QUEUE_SIZE;
      processModify(count);
      // let some time to the message to reach the changelog client
      // let some time to the message to reach the replicationServer client
      Thread.sleep(500);
      // check that the changelog only sent WINDOW_SIZE messages
      // check that the replicationServer only sent WINDOW_SIZE messages
      assertTrue(searchUpdateSent());
      int rcvCount=0;
@@ -171,7 +171,7 @@
  }
  /**
   * Check that the Changelog queue size has correctly been configured
   * Check that the ReplicationServer queue size has correctly been configured
   * by reading the monitoring information.
   * @throws LDAPException
   */
@@ -202,7 +202,7 @@
  }
  /**
   * Search that the changelog has stopped sending changes after
   * Search that the replicationServer has stopped sending changes after
   * having reach the limit of the window size.
   * And that the number of waiting changes is accurate.
   * Do this by checking the monitoring information.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -66,12 +66,12 @@
  public void setup() throws Exception
  {
   /*
    * - Start a server and a changelog server, configure replication
    * - Start a server and a replicationServer, configure replication
    * - Do some changes.
    */
    TestCaseUtils.startServer();
    // find  a free port for the changelog server
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    socket.close();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -40,7 +40,7 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.core.DeleteOperation;
@@ -115,10 +115,10 @@
  }
  /**
   * Open a changelog session to the local Changelog server.
   * Open a replicationServer session to the local ReplicationServer.
   *
   */
  protected ChangelogBroker openChangelogSession(
  protected ReplicationBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, boolean emptyOldChanges)
          throws Exception, SocketException
@@ -129,7 +129,7 @@
    else
       state = new ServerState();
    ChangelogBroker broker = new ChangelogBroker(
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
@@ -137,7 +137,7 @@
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    TestCaseUtils.sleep(100); // give some time to the broker to connect
                              // to the changelog server.
                              // to the replicationServer.
    if (emptyOldChanges)
    {
      /*
@@ -162,15 +162,15 @@
  }
  /**
   * Open a new session to the Changelog Server
   * Open a new session to the ReplicationServer
   * starting with a given ServerState.
   */
  protected ChangelogBroker openChangelogSession(
  protected ReplicationBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, ServerState state)
          throws Exception, SocketException
  {
    ChangelogBroker broker = new ChangelogBroker(
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, 0, 0, 0, 0, window_size, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
@@ -182,10 +182,11 @@
  }
  /**
   * Open a changelog session with flow control to the local Changelog server.
   * Open a replicationServer session with flow control to the local
   * ReplicationServer.
   *
   */
  protected ChangelogBroker openChangelogSession(
  protected ReplicationBroker openChangelogSession(
      final DN baseDn, short serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
@@ -197,7 +198,7 @@
    else
       state = new ServerState();
    ChangelogBroker broker = new ChangelogBroker(
    ReplicationBroker broker = new ReplicationBroker(
        state, baseDn, serverId, maxRcvQueue, 0,
        maxSendQueue, 0, window_size, 0);
    ArrayList<String> servers = new ArrayList<String>(1);
@@ -336,7 +337,7 @@
      "Unable to add the Multimaster replication plugin");
      
    // Add the changelog server
    // Add the replication server
    DirectoryServer.getConfigHandler().addEntry(changeLogEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(changeLogEntry.getDN()),
       "Unable to add the changeLog server");
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -44,7 +44,7 @@
import org.opends.server.core.ModifyOperation;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
@@ -84,7 +84,7 @@
    TestCaseUtils.startServer();
    schemaCheck = DirectoryServer.checkSchema();
    // find  a free port for the changelog server
    // find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    changelogPort = socket.getLocalPort();
    socket.close();
@@ -124,7 +124,7 @@
  }
  /**
   * Checks that changes done to the schema are pushed to the changelog
   * Checks that changes done to the schema are pushed to the replicationServer
   * clients.
   */
  @Test()
@@ -136,7 +136,7 @@
    final DN baseDn = DN.decode("cn=schema");
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true);
    try
@@ -210,7 +210,7 @@
  }
  /**
   * Checks that changes to the schema pushed to the changelog
   * Checks that changes to the schema pushed to the replicationServer
   * are received and correctly replayed by replication plugin.
   */
  @Test(dependsOnMethods = { "pushSchemaChange" })
@@ -222,7 +222,7 @@
    final DN baseDn = DN.decode("cn=schema");
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, changelogPort, 5000, true);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)2, 0);
@@ -241,7 +241,7 @@
  /**
   * Checks that changes done to the schema files are pushed to the
   * Changelog servers and that the ServerState is updated in the schema
   * ReplicationServers and that the ServerState is updated in the schema
   * file.
   * FIXME: This test is disabled because it has side effects.
   * It causes schema tests in org.opends.server.core.AddOperationTestCase
@@ -256,7 +256,7 @@
    final DN baseDn = DN.decode("cn=schema");
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 3, 100, changelogPort, 5000, true);
    // create a schema change Notification
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/StressTest.java
@@ -45,7 +45,7 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyOperation;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.types.Attribute;
@@ -65,7 +65,7 @@
import org.testng.annotations.Test;
/**
 * Stress test for the synchronization code using the ChangelogBroker API.
 * Stress test for the synchronization code using the ReplicationBroker API.
 */
public class StressTest extends ReplicationTestCase
{
@@ -97,7 +97,7 @@
  // WORKAROUND FOR BUG #639 - END -
  /**
   * Stress test from LDAP server to client using the ChangelogBroker API.
   * Stress test from LDAP server to client using the ReplicationBroker API.
   */
  @Test(enabled=true, groups="slow")
  public void fromServertoBroker() throws Exception
@@ -109,7 +109,7 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    final int TOTAL_MESSAGES = 1000;
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 18, 100, 8989, 5000, true);
    Monitor monitor = new Monitor("stress test monitor");
    DirectoryServer.registerMonitorProvider(monitor);
@@ -117,7 +117,7 @@
    try {
      /*
       * Test that operations done on this server are sent to the
       * changelog server and forwarded to our changelog broker session.
       * replicationServer and forwarded to our replicationServer broker session.
       */
      // Create an Entry (add operation) that will be later used in the test.
@@ -320,12 +320,12 @@
  }
  /**
   * Continuously reads messages from a changelog broker until there is nothing
   * Continuously reads messages from a replicationServer broker until there is nothing
   * left. Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    private ReplicationBroker broker;
    private int count = 0;
    private Boolean finished = false;
@@ -333,7 +333,7 @@
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    public BrokerReader(ReplicationBroker broker)
    {
      this.broker = broker;
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -38,7 +38,7 @@
import org.opends.server.TestCaseUtils;
import org.opends.server.plugins.ShortCircuitPlugin;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
@@ -232,16 +232,16 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the changelog server using the broker API.
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the changelog server.
     * when we need to send operation messages to the replicationServer.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
@@ -315,16 +315,16 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the changelog server using the broker API.
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operation messages to the changelog server.
     * when we need to send operation messages to the replicationServer.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
@@ -398,10 +398,10 @@
  /**
   * Tests the naming conflict resolution code.
   * In this test, the local server act both as an LDAP server and
   * a changelog server that are inter-connected.
   * a replicationServer that are inter-connected.
   *
   * The test creates an other session to the changelog server using
   * directly the ChangelogBroker API.
   * The test creates an other session to the replicationServer using
   * directly the ReplicationBroker API.
   * It then uses this session to siomulate conflicts and therefore
   * test the naming conflict resolution code.
   */
@@ -415,15 +415,15 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    /*
     * Open a session to the changelog server using the Changelog broker API.
     * Open a session to the replicationServer using the ReplicationServer broker API.
     * This must use a serverId different from the LDAP server ID
     */
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
    /*
     * Create a Change number generator to generate new changenumbers
     * when we need to send operations messages to the changelog server.
     * when we need to send operations messages to the replicationServer.
     */
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 2, 0);
@@ -812,7 +812,7 @@
  }
  /**
   * Tests done using directly the ChangelogBroker interface.
   * Tests done using directly the ReplicationBroker interface.
   */
  @Test(enabled=false, dataProvider="assured")
  public void updateOperations(boolean assured) throws Exception
@@ -825,14 +825,14 @@
    cleanRealEntries();
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 27, 100, 8989, 1000, true);
    try {
      ChangeNumberGenerator gen = new ChangeNumberGenerator((short) 27, 0);
      /*
       * Test that operations done on this server are sent to the
       * changelog server and forwarded to our changelog broker session.
       * replicationServer and forwarded to our replicationServer broker session.
       */
      // Create an Entry (add operation)
@@ -925,7 +925,7 @@
      "The received DELETE message is not for the excepted DN");
      /*
       * Now check that when we send message to the Changelog server
       * Now check that when we send message to the ReplicationServer
       * and that they are received and correctly replayed by the server.
       *
       * Start by testing the Add message reception
@@ -1109,7 +1109,7 @@
    final DN baseDn = DN.decode("ou=People,dc=example,dc=com");
    Thread.sleep(2000);
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 11, 100, 8989, 1000, true);
    try
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -29,7 +29,7 @@
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.plugin.Historical;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.TestCaseUtils;
@@ -199,10 +199,10 @@
         DirectoryServer.getAttributeType("entryuuid");
    /*
     * Open a session to the changelog server using the broker API.
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
     */
    ChangelogBroker broker =
    ReplicationBroker broker =
      openChangelogSession(baseDn, (short) 2, 100, 8989, 1000, true);
@@ -310,7 +310,7 @@
  }
  private static
  void publishModify(ChangelogBroker broker, ChangeNumber changeNum,
  void publishModify(ReplicationBroker broker, ChangeNumber changeNum,
                     DN dn, String entryuuid, Modification mod)
  {
    List<Modification> mods = new ArrayList<Modification>(1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -51,7 +51,7 @@
import org.opends.server.replication.protocol.AckMessage;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ChangelogStartMessage;
import org.opends.server.replication.protocol.ReplServerStartMessage;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.DoneMessage;
@@ -510,16 +510,16 @@
  /**
   * Test that changelogStartMessage encoding and decoding works
   * by checking that : msg == new ChangelogStartMessage(msg.getBytes()).
   * by checking that : msg == new ReplServerStartMessage(msg.getBytes()).
   */
  @Test(dataProvider="changelogStart")
  public void ChangelogStartMessageTest(short serverId, DN baseDN, int window,
         String url, ServerState state) throws Exception
  {
    state.update(new ChangeNumber((long)1, 1,(short)1));
    ChangelogStartMessage msg = new ChangelogStartMessage(serverId,
    ReplServerStartMessage msg = new ReplServerStartMessage(serverId,
        url, baseDN, window, state);
    ChangelogStartMessage newMsg = new ChangelogStartMessage(msg.getBytes());
    ReplServerStartMessage newMsg = new ReplServerStartMessage(msg.getBytes());
    assertEquals(msg.getServerId(), newMsg.getServerId());
    assertEquals(msg.getBaseDn(), newMsg.getBaseDn());
    assertEquals(msg.getWindowSize(), newMsg.getWindowSize());
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ChangelogFakeConfiguration.java
@@ -37,9 +37,9 @@
/**
 * This Class implements an object that can be used to instantiate
 * The Changelog class for tests purpose.
 * The ReplicationServer class for tests purpose.
 */
public class ChangelogFakeConfiguration implements ChangelogServerCfg
public class ReplServerFakeConfiguration implements ChangelogServerCfg
{
  int port;
  String dirName;
@@ -49,7 +49,7 @@
  int windowSize;
  private SortedSet<String> servers;
  public ChangelogFakeConfiguration(
  public ReplServerFakeConfiguration(
      int port, String dirName, int purgeDelay, int serverId,
      int queueSize, int windowSize, SortedSet<String> servers)
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
File was renamed from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ChangelogTest.java
@@ -43,14 +43,14 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.plugin.ChangelogBroker;
import org.opends.server.replication.plugin.ReplicationBroker;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ReplicationMessage;
import org.opends.server.replication.server.Changelog;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.Attribute;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
@@ -63,18 +63,18 @@
import org.testng.annotations.Test;
/**
 * Tests for the changelog service code.
 * Tests for the replicationServer code.
 */
public class ChangelogTest extends ReplicationTestCase
public class ReplicationServerTest extends ReplicationTestCase
{
  /**
   * The changelog server that will be used in this test.
   * The replicationServer that will be used in this test.
   */
  private Changelog changelog = null;
  private ReplicationServer replicationServer = null;
  /**
   * The port of the changelog server.
   * The port of the replicationServer.
   */
  private int changelogPort;
@@ -88,26 +88,26 @@
  /**
   * Before starting the tests, start the server and configure a
   * changelog server.
   * replicationServer.
   */
  @BeforeClass()
  public void configure() throws Exception
  {
    TestCaseUtils.startServer();
    //  find  a free port for the changelog server
    //  find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    changelogPort = socket.getLocalPort();
    socket.close();
    ChangelogFakeConfiguration conf =
      new ChangelogFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null);
    changelog = new Changelog(conf);
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(changelogPort, null, 0, 1, 0, 0, null);
    replicationServer = new ReplicationServer(conf);
  }
  /**
   * Basic test of the changelog code :
   *  Connect 2 clients to the changelog server and exchange messages
   * Basic test of the replicationServer code :
   *  Connect 2 clients to the replicationServer and exchange messages
   *  between the clients.
   *
   * Note : Other tests in this file depends on this test and may need to
@@ -116,12 +116,12 @@
  @Test()
  public void changelogBasic() throws Exception
  {
    ChangelogBroker server1 = null;
    ChangelogBroker server2 = null;
    ReplicationBroker server1 = null;
    ReplicationBroker server2 = null;
    try {
      /*
       * Open a sender session and a receiver session to the changelog
       * Open a sender session and a receiver session to the replicationServer
       */
      server1 = openChangelogSession(
          DN.decode("dc=example,dc=com"), (short) 1, 100, changelogPort,
@@ -148,9 +148,9 @@
      /*
       * Create a ChangeNumber between firstChangeNumberServer1 and  
       * secondChangeNumberServer1 that will not be used to create a
       * change sent to the changelog server but that will be used
       * change sent to the replicationServer but that will be used
       * in the Server State when opening a connection to the 
       * Changelog Server to make sure that the Changelog server is
       * ReplicationServer to make sure that the ReplicationServer is
       * able to accept such clients.
       */
      unknownChangeNumberServer1 = new ChangeNumber(time+1, 1, (short) 1);
@@ -167,10 +167,10 @@
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
            "ReplicationServer basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
        fail("ReplicationServer basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
@@ -182,10 +182,10 @@
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
            "ReplicationServer basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
        fail("ReplicationServer basic : incorrect message type received.");
      /*
       * Send and receive a Delete Msg from server 1 to server 2
@@ -199,10 +199,10 @@
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
            "ReplicationServer basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
        fail("ReplicationServer basic : incorrect message type received.");
      /*
       * Send and receive a second Delete Msg
@@ -214,10 +214,10 @@
      {
        DeleteMsg del = (DeleteMsg) msg2;
        assertTrue(del.toString().equals(msg.toString()),
            "Changelog basic : incorrect message body received.");
            "ReplicationServer basic : incorrect message body received.");
      }
      else
        fail("Changelog basic : incorrect message type received.");
        fail("ReplicationServer basic : incorrect message type received.");
    }
    finally
    {
@@ -235,7 +235,7 @@
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClient() throws Exception
  {
    ChangelogBroker broker = null;
    ReplicationBroker broker = null;
    try {
      broker =
@@ -244,7 +244,7 @@
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
        fail("ReplicationServer basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -269,10 +269,10 @@
  private void newClientWithChanges(
      ServerState state, ChangeNumber nextChangeNumber) throws Exception
  {
    ChangelogBroker broker = null;
    ReplicationBroker broker = null;
    /*
     * Connect to the changelog server using the state created above.
     * Connect to the replicationServer using the state created above.
     */
    try {
      broker =
@@ -281,7 +281,7 @@
      ReplicationMessage msg2 = broker.receive();
      if (!(msg2 instanceof DeleteMsg))
        fail("Changelog basic transmission failed");
        fail("ReplicationServer basic transmission failed");
      else
      {
        DeleteMsg del = (DeleteMsg) msg2;
@@ -317,7 +317,7 @@
  
  /**
   * Test with a client that has already seen a Change that the
   * Changelog server has not seen.
   * ReplicationServer has not seen.
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void newClientWithUnknownChanges() throws Exception
@@ -383,12 +383,12 @@
  /**
   * Test that newClient() and newClientWithFirstChange() still works
   * after stopping and restarting the changelog server.
   * after stopping and restarting the replicationServer.
   */
  @Test(enabled=true, dependsOnMethods = { "changelogBasic" })
  public void stopChangelog() throws Exception
  {
    changelog.shutdown();
    replicationServer.shutdown();
    configure();
    newClient();
    newClientWithFirstChanges();
@@ -397,10 +397,10 @@
  }
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   * Stress test from client using the ReplicationBroker API
   * to the replicationServer.
   * This test allow to investigate the behaviour of the
   * Changelog server when it needs to distribute the load of
   * ReplicationServer when it needs to distribute the load of
   * updates from a single LDAP server to a number of LDAP servers.
   *
   * This test i sconfigured by a relatively low stress
@@ -409,7 +409,7 @@
  @Test(enabled=true, groups="slow")
  public void oneWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;
    ReplicationBroker server = null;
    int TOTAL_MSG = 1000;     // number of messages to send during the test
    int CLIENT_THREADS = 2;   // number of threads that will try to read
                              // the messages
@@ -417,7 +417,7 @@
      new ChangeNumberGenerator((short)5 , (long) 0);
    BrokerReader client[] = new BrokerReader[CLIENT_THREADS];
    ChangelogBroker clientBroker[] = new ChangelogBroker[CLIENT_THREADS];
    ReplicationBroker clientBroker[] = new ReplicationBroker[CLIENT_THREADS];
    try
    {
@@ -449,7 +449,7 @@
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       * to the replicationServer.
       */
      for (int i = 0; i< TOTAL_MSG; i++)
      {
@@ -477,11 +477,11 @@
  }
  /**
   * Stress test from client using the ChangelogBroker API
   * to the changelog server.
   * Stress test from client using the ReplicationBroker API
   * to the replicationServer.
   *
   * This test allow to investigate the behaviour of the
   * Changelog server when it needs to distribute the load of
   * ReplicationServer when it needs to distribute the load of
   * updates from multiple LDAP server to a number of LDAP servers.
   *
   * This test is sconfigured for a relatively low stress
@@ -490,7 +490,7 @@
  @Test(enabled=true, groups="slow")
  public void multipleWriterMultipleReader() throws Exception
  {
    ChangelogBroker server = null;
    ReplicationBroker server = null;
    final int TOTAL_MSG = 1000;   // number of messages to send during the test
    final int THREADS = 2;       // number of threads that will produce
                               // and read the messages.
@@ -508,7 +508,7 @@
        short serverId = (short) (10+i);
        ChangeNumberGenerator gen =
          new ChangeNumberGenerator(serverId , (long) 0);
        ChangelogBroker broker =
        ReplicationBroker broker =
          openChangelogSession( DN.decode("dc=example,dc=com"), serverId,
            100, changelogPort, 1000, 1000, 0, true);
@@ -542,23 +542,23 @@
  /**
   * Chaining tests of the changelog code with 2 changelog servers involved
   * Chaining tests of the replication Server code with 2 replication servers involved
   * 2 tests are done here (itest=0 or itest=1)
   *
   * Test 1
   * - Create changelog server 1
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 1 to changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Create replication server 1
   * - Create replication server 2 connected with replication server 1
   * - Create and connect client 1 to replication server 1
   * - Create and connect client 2 to replication server 2
   * - Make client1 publish changes
   * - Check that client 2 receives the changes published by client 1
   *
   * Test 2
   * - Create changelog server 1
   * - Create and connect client1 to changelog server 1
   * - Create replication server 1
   * - Create and connect client1 to replication server 1
   * - Make client1 publish changes
   * - Create changelog server 2 connected with changelog server 1
   * - Create and connect client 2 to changelog server 2
   * - Create replication server 2 connected with replication server 1
   * - Create and connect client 2 to replication server 2
   * - Check that client 2 receives the changes published by client 1
   *
   */
@@ -567,11 +567,11 @@
  {
    for (int itest = 0; itest <2; itest++)
    {
      ChangelogBroker broker2 = null;
      ReplicationBroker broker2 = null;
      boolean emptyOldChanges = true;
      // - Create 2 connected changelog servers
      Changelog[] changelogs = new Changelog[2];
      // - Create 2 connected replicationServer
      ReplicationServer[] changelogs = new ReplicationServer[2];
      int[] changelogPorts = new int[2];
      int[] changelogIds = new int[2];
      short[] brokerIds = new short[2];
@@ -593,19 +593,19 @@
      {
        changelogs[i] = null;
        // for itest=0, create the 2 connected changelog servers
        // for itest=1, create the 1rst changelog server, the second
        // for itest=0, create the 2 connected replicationServer
        // for itest=1, create the 1rst replicationServer, the second
        // one will be created later
        SortedSet<String> servers = new TreeSet<String>();
        servers.add(
          "localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
        ChangelogFakeConfiguration conf =
          new ChangelogFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[i], "changelogDb"+i, 0,
                                         changelogIds[i], 0, 100, servers); 
        changelog = new Changelog(conf);
        replicationServer = new ReplicationServer(conf);
      }
      ChangelogBroker broker1 = null;
      ReplicationBroker broker1 = null;
      try
      {
@@ -672,10 +672,10 @@
          
          SortedSet<String> servers = new TreeSet<String>();
          servers.add("localhost:"+changelogPorts[0]);
          ChangelogFakeConfiguration conf =
            new ChangelogFakeConfiguration(changelogPorts[1], null, 0,
          ReplServerFakeConfiguration conf =
            new ReplServerFakeConfiguration(changelogPorts[1], null, 0,
                                           changelogIds[1], 0, 0, null); 
          changelogs[1] = new Changelog(conf);
          changelogs[1] = new ReplicationServer(conf);
          // Connect broker 2 to changelog2
          broker2 = openChangelogSession(DN.decode("dc=example,dc=com"),
@@ -724,7 +724,7 @@
          }
          else
          {
            fail("Changelog transmission failed: no expected message class.");
            fail("ReplicationServer transmission failed: no expected message class.");
            break;
          }
        }
@@ -747,30 +747,30 @@
  }
  /**
   * After the tests stop the changelog server.
   * After the tests stop the replicationServer.
   */
  @AfterClass()
  public void shutdown() throws Exception
  {
    if (changelog != null)
      changelog.shutdown();
    if (replicationServer != null)
      replicationServer.shutdown();
  }
  /**
   * This class allows to creater reader thread.
   * They continuously reads messages from a changelog broker until
   * They continuously reads messages from a replication broker until
   * there is nothing left.
   * They Count the number of received messages.
   */
  private class BrokerReader extends Thread
  {
    private ChangelogBroker broker;
    private ReplicationBroker broker;
    /**
     * Creates a new Stress Test Reader
     * @param broker
     */
    public BrokerReader(ChangelogBroker broker)
    public BrokerReader(ReplicationBroker broker)
    {
      this.broker = broker;
    }
@@ -798,15 +798,15 @@
  /**
   * This class allows to create writer thread that can
   * be used as producers for the Changelog stress tests.
   * be used as producers for the ReplicationServer stress tests.
   */
  private class BrokerWriter extends Thread
  {
    int count;
    private ChangelogBroker broker;
    private ReplicationBroker broker;
    ChangeNumberGenerator gen;
    public BrokerWriter(ChangelogBroker broker, ChangeNumberGenerator gen,
    public BrokerWriter(ReplicationBroker broker, ChangeNumberGenerator gen,
        int count)
    {
      this.broker = broker;
@@ -822,7 +822,7 @@
    {
      /*
       * Simple loop creating changes and sending them
       * to the changelog server.
       * to the replicationServer.
       */
      while (count>0)
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/dbHandlerTest.java
@@ -34,8 +34,8 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.server.Changelog;
import org.opends.server.replication.server.ChangelogDbEnv;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationDbEnv;
import org.opends.server.replication.server.DbHandler;
import org.opends.server.types.DN;
import org.testng.annotations.Test;
@@ -51,16 +51,16 @@
  {
    TestCaseUtils.startServer();
    //  find  a free port for the changelog server
    //  find  a free port for the replicationServer
    ServerSocket socket = TestCaseUtils.bindFreePort();
    int changelogPort = socket.getLocalPort();
    socket.close();
    // configure a Changelog server.
    ChangelogFakeConfiguration conf =
      new ChangelogFakeConfiguration(changelogPort, null, 0,
    // configure a ReplicationServer.
    ReplServerFakeConfiguration conf =
      new ReplServerFakeConfiguration(changelogPort, null, 0,
                                     2, 0, 100, null);
    Changelog changelog = new Changelog(conf);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    // create or clean a directory for the dbHandler
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
@@ -73,10 +73,10 @@
    }
    testRoot.mkdirs();
    ChangelogDbEnv dbEnv = new ChangelogDbEnv(path, changelog);
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(path, replicationServer);
    DbHandler handler =
      new DbHandler((short) 1, DN.decode("o=test"), changelog, dbEnv);
      new DbHandler((short) 1, DN.decode("o=test"), replicationServer, dbEnv);
    ChangeNumberGenerator gen = new ChangeNumberGenerator((short)1, 0);
    ChangeNumber changeNumber1 = gen.NewChangeNumber();
@@ -116,7 +116,7 @@
    handler.shutdown();
    dbEnv.shutdown();
    changelog.shutdown();
    replicationServer.shutdown();
    TestCaseUtils.deleteDirectory(testRoot);
  }