mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
06.11.2009 3a9e211d36ee94ff99941943b3b51e0f768624f5
opends/resource/schema/02-config.ldif
@@ -2458,6 +2458,11 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.605
  NAME 'ds-cfg-monitoring-period'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler'
  SUP top
@@ -3137,7 +3142,8 @@
        ds-cfg-group-id $
        ds-cfg-assured-timeout $
        ds-cfg-degraded-status-threshold $
        ds-cfg-weight)
        ds-cfg-weight $
        ds-cfg-monitoring-period)
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
  NAME 'ds-backup-directory'
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -296,4 +296,27 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="monitoring-period" mandatory="false">
    <adm:synopsis>
      The period between sending of monitoring messages.
    </adm:synopsis>
    <adm:description>
      Defines the amount of milliseconds the replication server will wait before
      sending new monitoring messages to its peers (replication servers and
      directory servers).
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>3000ms</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:duration base-unit="ms" lower-limit="1000" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-monitoring-period</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/messages/messages/replication.properties
@@ -170,7 +170,7 @@
 UTF-8. This is required to be able to encode the changes in the database. \
 This replication server will now shutdown
SEVERE_ERR_REPLICATION_COULD_NOT_CONNECT_61=The Replication is configured for \
 suffix  %s but was not able to connect to any Replication Server
 suffix %s but was not able to connect to any Replication Server
NOTICE_NOW_FOUND_SAME_GENERATION_CHANGELOG_62=Replication is up and running \
 for domain %s with replication server id %s %s - local server id is %s - data \
 generation is %s
opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -247,23 +247,23 @@
    StringBuffer sb = new StringBuffer();
    sb.append("DS id: ");
    sb.append(dsId);
    sb.append(" RS id: ");
    sb.append(" ; RS id: ");
    sb.append(rsId);
    sb.append(" Generation id: ");
    sb.append(" ; Generation id: ");
    sb.append(generationId);
    sb.append(" Status: ");
    sb.append(" ; Status: ");
    sb.append(status);
    sb.append(" Assured replication: ");
    sb.append(" ; Assured replication: ");
    sb.append(assuredFlag);
    sb.append(" Assured mode: ");
    sb.append(" ; Assured mode: ");
    sb.append(assuredMode);
    sb.append(" Safe data level: ");
    sb.append(" ; Safe data level: ");
    sb.append(safeDataLevel);
    sb.append(" Group id: ");
    sb.append(" ; Group id: ");
    sb.append(groupId);
    sb.append(" Referral URLs: ");
    sb.append(" ; Referral URLs: ");
    sb.append(refUrls);
    sb.append(" ECL Include: ");
    sb.append(" ; ECL Include: ");
    sb.append(eclIncludes);
    return sb.toString();
  }
opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -40,6 +40,11 @@
  private long generationId = -1;
  // Group id of the RS
  private byte groupId = (byte) -1;
  // The weight of the RS
  // It is important to keep the default value to 1 so that it is used as
  // default value for a RS using protocol V3: this default value vill be used
  // in algorithms that use weight
  private int weight = 1;
  /**
   * Creates a new instance of RSInfo with every given info.
@@ -47,12 +52,14 @@
   * @param id The RS id
   * @param generationId The generation id the RS is using
   * @param groupId RS group id
   * @param weight RS weight
   */
  public RSInfo(int id, long generationId, byte groupId)
  public RSInfo(int id, long generationId, byte groupId, int weight)
  {
    this.id = id;
    this.generationId = generationId;
    this.groupId = groupId;
    this.weight = weight;
  }
  /**
@@ -83,6 +90,16 @@
  }
  /**
   * Get the RS weight.
   * @return The RS weight
   */
  public int getWeight()
  {
    return weight;
  }
  /**
   * Test if the passed object is equal to this one.
   * @param obj The object to test
   * @return True if both objects are equal
@@ -99,7 +116,8 @@
      RSInfo rsInfo = (RSInfo) obj;
      return ((id == rsInfo.getId()) &&
        (generationId == rsInfo.getGenerationId()) &&
        (groupId == rsInfo.getGroupId()));
        (groupId == rsInfo.getGroupId()) &&
        (weight == rsInfo.getWeight()));
    } else
    {
      return false;
@@ -117,6 +135,7 @@
    hash = 37 * hash + this.id;
    hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
    hash = 37 * hash + this.groupId;
    hash = 37 * hash + this.weight;
    return hash;
  }
@@ -130,11 +149,12 @@
    StringBuffer sb = new StringBuffer();
    sb.append("Id: ");
    sb.append(id);
    sb.append(" Generation id: ");
    sb.append(" ; Generation id: ");
    sb.append(generationId);
    sb.append(" Group id: ");
    sb.append(" ; Group id: ");
    sb.append(groupId);
    sb.append(" ; Weight: ");
    sb.append(weight);
    return sb.toString();
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -93,6 +93,24 @@
  }
  /**
   * Sets the sender ID.
   * @param senderID The sender ID.
   */
  public void setSenderID(int senderID)
  {
    this.senderID = senderID;
  }
  /**
   * Sets the destination.
   * @param destination The destination.
   */
  public void setDestination(int destination)
  {
    this.destination = destination;
  }
  /**
   * Sets the state of the replication server.
   * @param state The state.
   */
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -31,7 +31,7 @@
/**
 * This message is part of the replication protocol.
 * RS1 sends a MonitorRequestMsg to RS2 to requests its monitoring
 * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
 * informations.
 * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
 * MonitorMessage.
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -29,7 +29,7 @@
/**
 * This is an abstract class of messages of the replication protocol
 * for message that needs to contain information about the server that
 * send them and the destination servers to whitch they should be sent.
 * send them and the destination servers to which they should be sent.
 */
public abstract class RoutableMsg extends ReplicationMsg
{
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -167,6 +167,7 @@
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          Set<String> attrs = dsInfo.getEclIncludes();
          oStream.write(attrs.size());
          for (String attr : attrs)
@@ -192,8 +193,15 @@
        oStream.write(String.valueOf(rsInfo.getGenerationId()).
          getBytes("UTF-8"));
        oStream.write(0);
        // Put DS group id
        // Put RS group id
        oStream.write(rsInfo.getGroupId());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put RS weight
          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
          oStream.write(0);
        }
      }
      return oStream.toByteArray();
@@ -332,23 +340,30 @@
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos +=
          length + 1;
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos +=
          length + 1;
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
        int weight = 1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
        }
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, generationId, groupId);
        RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight);
        rsList.add(rsInfo);
        nRsInfo--;
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -309,8 +309,7 @@
    try
    {
      MonitorData md;
      md = replicationServerDomain.computeMonitorData();
      MonitorData md = replicationServerDomain.computeMonitorData();
      // Oldest missing update
      Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
@@ -538,7 +537,7 @@
          return;
        }
        // Send our own TopologyMsg to remote RS
        // Send our own TopologyMsg to remote DS
        TopologyMsg outTopoMsg = sendTopoToRemoteDS();
        logStartSessionHandshake(inStartSessionMsg, outTopoMsg);
@@ -572,6 +571,9 @@
      // Create the status analyzer for the domain if not already started
      createStatusAnalyzer();
      // Create the monitoring publisher for the domain if not already started
      createMonitoringPublisher();
      registerIntoDomain();
      super.finalizeStart();
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -101,6 +99,7 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
import java.util.Collections;
/**
 * ReplicationServer Listener.
@@ -173,6 +172,10 @@
  // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
  private int degradedStatusThreshold = 5000;
  // Number of milliseconds to wait before sending new monitoring messages.
  // If value is 0, monitoring publisher is disabled
  private long monitoringPublisherPeriod = 3000;
  // The handler of the draft change numbers database, the database used to
  // store the relation between a draft change number ('seqnum') and the
  // associated cookie.
@@ -211,6 +214,13 @@
  private int weight = 1;
  /**
   * Holds the list of all replication servers instantiated in this VM.
   * This allows to perform clean up of the RS databases in unit tests.
   */
  private static List<ReplicationServer> allInstances =
    new ArrayList<ReplicationServer>();
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
    groupId = (byte)configuration.getGroupId();
    assuredTimeout = configuration.getAssuredTimeout();
    degradedStatusThreshold = configuration.getDegradedStatusThreshold();
    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
    replSessionSecurity = new ReplSessionSecurity();
    initialize(replicationPort);
@@ -274,8 +285,20 @@
    DirectoryServer.registerImportTaskListener(this);
    localPorts.add(replicationPort);
    // Keep track of this new instance
    allInstances.add(this);
  }
  /**
   * Get the list of every replication servers instantiated in the current VM.
   * @return The list of every replication servers instantiated in the current
   * VM.
   */
  public static List<ReplicationServer> getAllInstances()
  {
    return allInstances;
  }
  /**
   * The run method for the Listen thread.
@@ -850,7 +873,9 @@
      dbEnv.shutdown();
    }
}
    // Remove this instance from the global instance list
    allInstances.remove(this);
  }
  /**
@@ -1028,6 +1053,32 @@
      }
    }
    // Update period value for monitoring publishers (stop them if requested
    // value is 0)
    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
    {
      long oldMonitoringPeriod = monitoringPublisherPeriod;
      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
      for(ReplicationServerDomain rsd : baseDNs.values())
      {
        if (monitoringPublisherPeriod == 0L)
        {
          // Requested to stop monitoring publishers
          rsd.stopMonitoringPublisher();
        } else if (rsd.isRunningMonitoringPublisher())
        {
          // Update the threshold value for this running monitoring publisher
          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
        } else if (oldMonitoringPeriod == 0L)
        {
          // Requested to start monitoring publishers with provided period value
          if ( (rsd.getConnectedDSs().size() > 0) ||
            (rsd.getConnectedRSs().size() > 0) )
            rsd.startMonitoringPublisher();
        }
      }
    }
    // Changed the group id ?
    byte newGroupId = (byte)configuration.getGroupId();
    if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
    if (weight != configuration.getWeight())
    {
      weight = configuration.getWeight();
      // TODO: send new TopologyMsg
      // Broadcast the new weight the the whole topology. This will make some
      // DSs reconnect (if needed) to other RSs according to the new weight of
      // this RS.
      broadcastConfigChange();
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
  }
  /**
   * Broadcast a configuration change that just happened to the whole topology
   * by sending a TopologyMsg to every entity in the topology.
   */
  private void broadcastConfigChange()
  {
    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
    {
      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
      replicationServerDomain.buildAndSendTopoInfoToRSs();
    }
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
  }
  /**
   * Get the monitoring publisher period value.
   * @return the monitoring publisher period value.
   */
  public long getMonitoringPublisherPeriod()
  {
    return monitoringPublisherPeriod;
  }
  /**
   * Compute the list of replication servers that are not any
   * more connected to this Replication Server and stop the
   * corresponding handlers.
@@ -1411,12 +1487,80 @@
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
  /**
   * This uniquely identifies a server (handler) in the cross-domain topology.
   * Represents an identifier of a handler (in the whole RS) we have to wait a
   * monitoring message from before answering to a monitor request.
   */
  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
  public static class GlobalServerId {
    private int serverId = -1;
    private String baseDn = null;
    /**
     * Constructor for a global server id.
     * @param baseDn The dn of the RSD owning the handler.
     * @param serverId The handler id in the matching RSD.
     */
    public GlobalServerId(String baseDn, int serverId) {
      this.baseDn = baseDn;
      this.serverId = serverId;
    }
    /**
     * Get the server handler id.
     * @return the serverId
     */
    public int getServerId()
    {
      return serverId;
    }
    /**
     * Get the base dn.
     * @return the baseDn
     */
    public String getBaseDn()
    {
      return baseDn;
    }
    /**
     * Get the hascode.
     * @return The hashcode.
     */
    @Override
    public int hashCode()
    {
      int hash = 7;
      hash = 43 * hash + this.serverId;
      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
      return hash;
    }
    /**
     * Tests if the passed global server handler id represents the same server
     * handler as this one.
     * @param obj The object to test.
     * @return True if both identifiers are the same.
     */
    public boolean equals(Object obj) {
      if ( (obj == null) || (obj instanceof GlobalServerId))
        return false;
      GlobalServerId globalServerId = (GlobalServerId)obj;
      return ( globalServerId.baseDn.equals(baseDn) &&
        (globalServerId.serverId == serverId) );
    }
  }
  /**
   * This gives the list of server handlers we are willing to wait monitoring
   * message from. Each time a monitoring message is received by a server
   * handler, the matching server handler id is retired from the list. When the
   * list is empty, we received all expected monitoring messages.
   */
  private List<GlobalServerId> expectedMonitoringMsg = null;
  /**
   * Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public void computeMonitorData() throws DirectoryException
  public synchronized void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
@@ -1440,15 +1584,17 @@
      return;
    }
    remoteMonitorResponsesSemaphore.drainPermits();
    int count = 0;
    // Initialize the list of server handlers we expect monitoring messages from
    expectedMonitoringMsg =
      Collections.synchronizedList(new ArrayList<GlobalServerId>());
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      count += domain.initializeMonitorData();
      domain.initializeMonitorData(expectedMonitoringMsg);
    }
    // Wait for responses
    waitMonitorDataResponses(count);
    waitMonitorDataResponses();
    for (ReplicationServerDomain domain : baseDNs.values())
    {
@@ -1457,38 +1603,51 @@
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * Wait for the expected received MonitorMsg.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses(int expectedResponses)
  private void waitMonitorDataResponses()
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " baseDn=" +
          " waiting for " + expectedResponses + " expected monitor messages");
          "In " + getMonitorInstanceName() +
          " waiting for " + expectedMonitoringMsg.size() +
          " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      // Wait up to 5 seconds for every expected monitoring message to come
      // back.
      boolean allReceived = false;
      long startTime = TimeThread.getTime();
      long curTime = startTime;
      int maxTime = 5000;
      while ( (curTime - startTime) < maxTime )
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        // Have every expected monitoring messages arrived ?
        if (expectedMonitoringMsg.size() == 0)
        {
          // Ok break the loop
          allReceived = true;
          break;
        }
        Thread.sleep(100);
        curTime = TimeThread.getTime();
      }
      monitorDataLastBuildDate = TimeThread.getTime();
      if (!allReceived)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
        // let's go on in best effort even with limited data received.
      } else
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() + " baseDn=" +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
            "In " + getMonitorInstanceName() +
            " Successfully received all expected monitor messages");
      }
    } catch (Exception e)
    {
@@ -1499,11 +1658,18 @@
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message.
   * a response to a monitor request message. This may also be called when a
   * monitoring message is coming from a RS whose monitoring publisher thread
   * sent it. As monitoring messages (sent because of monitoring request or
   * because of monitoring publisher) have the same content, this is also ok
   * to mark ok the server when the monitoring message coms from a monitoring
   * publisher thread.
   * @param globalServerId The server handler that is receiving the
   * monitoring message.
   */
  public void responseReceived()
  public void responseReceived(GlobalServerId globalServerId)
  {
    remoteMonitorResponsesSemaphore.release();
    expectedMonitoringMsg.remove(globalServerId);
  }
@@ -1513,7 +1679,7 @@
   */
  public void responseReceivedAll()
  {
    remoteMonitorResponsesSemaphore.notifyAll();
    expectedMonitoringMsg.clear();
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
import org.opends.server.replication.server.
  ReplicationServer.GlobalServerId;
/**
 * This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
  // late or not
  private StatusAnalyzer statusAnalyzer = null;
  // The monitoring publisher that periodically sends monitoring messages to the
  // topology
  private MonitoringPublisher monitoringPublisher = null;
  /*
   * The following map contains one balanced tree for each replica ID
   * to which we are currently publishing
@@ -1066,6 +1072,17 @@
          // Try doing job anyway...
        }
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        {
          if (debugEnabled())
            TRACER.debugInfo("In " +
              replicationServer.getMonitorInstanceName() +
              " remote server " + handler.getMonitorInstanceName() + " is " +
              "the last RS/DS to be stopped: stopping monitoring publisher");
          stopMonitoringPublisher();
        }
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsValue(handler))
@@ -1082,44 +1099,39 @@
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else
        } else if (directoryServers.containsValue(handler))
        {
          if (directoryServers.containsValue(handler))
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (directoryServers.size() == 1)
          {
            // If this is the last DS for the domain,
            // shutdown the status analyzer
            if (directoryServers.size() == 1)
            {
              if (debugEnabled())
                TRACER.debugInfo("In " +
                    replicationServer.getMonitorInstanceName() +
                    " remote server " + handler.getMonitorInstanceName() +
            if (debugEnabled())
              TRACER.debugInfo("In " +
                replicationServer.getMonitorInstanceName() +
                " remote server " + handler.getMonitorInstanceName() +
                " is the last DS to be stopped: stopping status analyzer");
              stopStatusAnalyzer();
            }
            stopStatusAnalyzer();
          }
            unregisterServerHandler(handler);
            handler.shutdown();
          unregisterServerHandler(handler);
          handler.shutdown();
            // Check if generation id has to be reset
            mayResetGenerationId();
          // Check if generation id has to be reset
          mayResetGenerationId();
          if (!shutdown)
          {
            // Update the remote replication servers with our list
            // of connected LDAP servers
            if (!shutdown)
            {
              buildAndSendTopoInfoToRSs();
              // Warn our DSs that a RS or DS has quit (does not use this
              // handler as already removed from list)
              buildAndSendTopoInfoToDSs(null);
            }
            buildAndSendTopoInfoToRSs();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
          }
          else if (otherHandlers.contains(handler))
          {
            unRegisterHandler(handler);
            handler.shutdown();
          }
        } else if (otherHandlers.contains(handler))
        {
          unRegisterHandler(handler);
          handler.shutdown();
        }
      }
      catch(Exception e)
      {
@@ -1581,99 +1593,51 @@
        // in the topology.
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          // Monitoring information requested by a DS
          MonitorMsg monitorMsg =
            createGlobalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
          try
           if (monitorMsg != null)
          {
            returnMsg.setReplServerDbState(getDbServerState());
            // Update the information we have about all servers
            // in the topology.
            MonitorData md = computeMonitorData();
            // Add the informations about the Replicas currently in
            // the topology.
            Iterator<Integer> it = md.ldapIterator();
            while (it.hasNext())
            try
            {
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getLDAPServerState(replicaId),
                  md.getApproxFirstMissingDate(replicaId), true);
            }
            // Add the informations about the Replication Servers
            // currently in the topology.
            it = md.rsIterator();
            while (it.hasNext())
              senderHandler.send(monitorMsg);
            } catch (IOException e)
            {
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getRSStates(replicaId),
                  md.getRSApproxFirstMissingDate(replicaId), false);
              // the connection was closed.
            }
          }
          catch (DirectoryException e)
          {
            // If we can't compute the Monitor Information, send
            // back an empty message.
          }
          try
          {
            senderHandler.send(returnMsg);
          } catch (IOException e)
          {
            // the connection was closed.
          }
          return;
        }
        MonitorMsg monitorMsg =
          new MonitorMsg(msg.getDestination(), msg.getsenderID());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (DataServerHandler lsh : this.directoryServers.values())
        } else
        {
          monitorMsg.setServerState(
            lsh.getServerId(),
            lsh.getServerState(),
            lsh.getApproxFirstMissingDate(),
            true);
        }
          // Monitoring information requested by a RS
          MonitorMsg monitorMsg =
            createLocalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
        // Same for the connected RS
        for (ReplicationServerHandler rsh : this.replicationServers.values())
        {
          monitorMsg.setServerState(
            rsh.getServerId(),
            rsh.getServerState(),
            rsh.getApproxFirstMissingDate(),
            false);
        }
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerDbState(this.getDbServerState());
        try
        {
          senderHandler.send(monitorMsg);
        } catch (Exception e)
        {
          // We log the error. The requestor will detect a timeout or
          // any other failure on the connection.
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
              Integer.toString((msg.getDestination()))));
          if (monitorMsg != null)
          {
            try
            {
              senderHandler.send(monitorMsg);
            } catch (Exception e)
            {
              // We log the error. The requestor will detect a timeout or
              // any other failure on the connection.
              logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
                  Integer.toString((msg.getDestination()))));
            }
          }
        }
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg =
          (MonitorMsg) msg;
        receivesMonitorDataResponse(monitorMsg);
        GlobalServerId globalServerId =
          new GlobalServerId(baseDn, senderHandler.getServerId());
        receivesMonitorDataResponse(monitorMsg, globalServerId);
      } else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * whole topology.
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   */
  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg returnMsg =
      new MonitorMsg(sender, destination);
    try
    {
      returnMsg.setReplServerDbState(getDbServerState());
      // Update the information we have about all servers
      // in the topology.
      MonitorData md = computeMonitorData();
      // Add the informations about the Replicas currently in
      // the topology.
      Iterator<Integer> it = md.ldapIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getLDAPServerState(replicaId),
            md.getApproxFirstMissingDate(replicaId), true);
      }
      // Add the informations about the Replication Servers
      // currently in the topology.
      it = md.rsIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getRSStates(replicaId),
            md.getRSApproxFirstMissingDate(replicaId), false);
      }
    }
    catch (DirectoryException e)
    {
      // If we can't compute the Monitor Information, send
      // back an empty message.
    }
    return returnMsg;
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * topology directly connected to this RS. This includes information for:
   * - local RS
   * - all direct DSs
   * - all direct RSs
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg monitorMsg = null;
    try {
      // Lock domain as we need to go through connected servers list
      lock();
      monitorMsg = new MonitorMsg(sender, destination);
      // Populate for each connected LDAP Server
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      {
        monitorMsg.setServerState(
          lsh.getServerId(),
          lsh.getServerState(),
          lsh.getApproxFirstMissingDate(),
          true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      {
        monitorMsg.setServerState(
          rsh.getServerId(),
          rsh.getServerState(),
          rsh.getApproxFirstMissingDate(),
          false);
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(this.getDbServerState());
    } catch(InterruptedException e)
    {
      // At lock, too bad...
    } finally
    {
      if (hasLock())
        release();
    }
    return monitorMsg;
  }
  /**
   * Shutdown this ReplicationServerDomain.
   */
  public void shutdown()
@@ -1831,8 +1905,7 @@
  /**
   * Send a TopologyMsg to all the connected directory servers in order to
   * let.
   * them know the topology (every known DSs and RSs)
   * let them know the topology (every known DSs and RSs).
   * @param notThisOne If not null, the topology message will not be sent to
   * this passed server.
   */
@@ -1931,10 +2004,11 @@
      dsInfos.add(serverHandler.toDSInfo());
    }
    // Create info for us (local RS)
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId());
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
    // Add our own info (local RS)
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId());
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    // Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @return The number of response that should come back.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  int initializeMonitorData() throws DirectoryException
  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
@@ -2539,7 +2616,7 @@
    }
    // Send the request for remote monitor data to the
    return sendMonitorDataRequest();
    sendMonitorDataRequest(expectedMonitoringMsg);
  }
  /**
@@ -2566,22 +2643,25 @@
  /**
   * Sends a MonitorRequest message to all connected RS.
   * @return the number of requests sent.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   * @throws DirectoryException when a problem occurs.
   */
  protected int sendMonitorDataRequest()
  protected void sendMonitorDataRequest(
    List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    int sent = 0;
    try
    {
      for (ServerHandler rs : replicationServers.values())
      {
        int serverId = rs.getServerId();
        MonitorRequestMsg msg =
          new MonitorRequestMsg(this.replicationServer.getServerId(),
          rs.getServerId());
          serverId);
        rs.send(msg);
        sent++;
        // Store the fact that we expect a MonitoringMsg back from this server
        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
      }
    } catch (Exception e)
    {
@@ -2590,7 +2670,6 @@
      throw new DirectoryException(ResultCode.OTHER,
        message, e);
    }
    return sent;
  }
  /**
@@ -2598,8 +2677,10 @@
   * and stores the data received.
   *
   * @param msg The message to be processed.
   * @param globalServerHandlerId server handler that is receiving the message.
   */
  public void receivesMonitorDataResponse(MonitorMsg msg)
  private void receivesMonitorDataResponse(MonitorMsg msg,
    GlobalServerId globalServerId)
  {
    try
    {
@@ -2677,7 +2758,7 @@
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      replicationServer.responseReceived();
      replicationServer.responseReceived(globalServerId);
    } catch (Exception e)
    {
@@ -2832,6 +2913,57 @@
  }
  /**
   * Starts the monitoring publisher for the domain.
   */
  public void startMonitoringPublisher()
  {
    if (monitoringPublisher == null)
    {
      long period =
        replicationServer.getMonitoringPublisherPeriod();
      if (period > 0) // 0 means no monitoring publisher
      {
        monitoringPublisher = new MonitoringPublisher(this, period);
        monitoringPublisher.start();
      }
    }
  }
  /**
   * Stops the monitoring publisher for the domain.
   */
  public void stopMonitoringPublisher()
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.shutdown();
      monitoringPublisher.waitForShutdown();
      monitoringPublisher = null;
    }
  }
  /**
   * Tests if the monitoring publisher for this domain is running.
   * @return True if the monitoring publisher is running, false otherwise.
   */
  public boolean isRunningMonitoringPublisher()
  {
    return (monitoringPublisher != null);
  }
  /**
   * Update the monitoring publisher with the new period value.
   * @param period The new period value.
   */
  public void updateMonitoringPublisher(long period)
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.setPeriod(period);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -240,6 +240,9 @@
        logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
        // Create the monitoring publisher for the domain if not already started
        createMonitoringPublisher();
        // FIXME: i think this should be done for all protocol version !!
        // not only those > V1
        registerIntoDomain();
@@ -408,6 +411,10 @@
        // other servers.
      }
      // Create the monitoring publisher for the domain if not already started
      createMonitoringPublisher();
      registerIntoDomain();
      // Process TopologyMsg sent by remote RS: store matching new info
@@ -497,7 +504,18 @@
    // Remote RS sent his topo msg
    TopologyMsg inTopoMsg = (TopologyMsg) msg;
    // CONNECTION WITH A RS
    // Store remore RS weight if it has one
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // List should only contain RS info for sender
      RSInfo rsInfo = inTopoMsg.getRsList().get(0);
      weight = rsInfo.getWeight();
    }
    else
    {
      // Remote RS uses protocol version prior to 4 : use default value for
      // weight: 1
    }
    // if the remote RS and the local RS have the same genID
    // then it's ok and nothing else to do
@@ -646,6 +664,7 @@
    RSInfo rsInfo = rsInfos.get(0);
    generationId = rsInfo.getGenerationId();
    groupId = rsInfo.getGroupId();
    weight = rsInfo.getWeight();
    /**
     * Store info for DSs connected to the peer RS
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -244,6 +244,10 @@
   */
  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
  /**
   * Weight of this remote server.
   */
  protected int weight = 1;
  /**
   * Creates a new server handler instance with the provided socket.
@@ -1215,12 +1219,23 @@
   */
  public RSInfo toRSInfo()
  {
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight);
    return rsInfo;
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  protected void createMonitoringPublisher()
  {
    if (!replicationServerDomain.isRunningMonitoringPublisher())
    {
      replicationServerDomain.startMonitoringPublisher();
    }
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
  private ReplicationDomain domain = null;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled or the last
   * time it published monitoring information.
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  /**
   * A Map containing the ServerStates of all the replication servers in the
   * topology as seen by the ReplicationServer the last time it was polled or
   * the last time it published monitoring information.
   */
  private HashMap<Integer, ServerState> rsStates =
    new HashMap<Integer, ServerState>();
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
   */
@@ -1918,6 +1943,37 @@
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
          // Extract and store replicas ServerStates
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
          }
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.set(true);
            monitorResponse.notify();
          }
          // Extract and store replication servers ServerStates
          rsStates = new HashMap<Integer, ServerState>();
          it = monitorMsg.rsIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
          }
        }
        else
        {
          return msg;
@@ -1949,6 +2005,40 @@
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication Server to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
   * currently connected and
   *
   * @return The States of all Replicas in the topology (except us)
   */
  public Map<Integer, ServerState> getReplicaStates()
  {
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    publish(new MonitorRequestMsg(serverId, getRsServerId()));
    // wait for Response up to 10 seconds.
    try
    {
      synchronized (monitorResponse)
      {
        if (monitorResponse.get() == false)
        {
          monitorResponse.wait(10000);
        }
      }
    } catch (InterruptedException e)
    {}
    return replicaStates;
  }
  /**
   * This method allows to do the necessary computing for the window
   * management after treatment by the worker threads.
   *
@@ -2440,7 +2530,7 @@
    {
      ctHeartbeatPublisherThread =
        new CTHeartbeatPublisherThread(
            "Replication CN Heartbeat Thread started for " +
            "Replication CN Heartbeat sender for " +
            baseDn + " with " + getReplicationServer(),
            session, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -65,7 +65,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -79,7 +78,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -92,8 +90,6 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -306,20 +302,6 @@
   */
  private final ChangeNumberGenerator generator;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing of the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled.
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  Set<String> cfgEclIncludes = new HashSet<String>();
  Set<String>    eClIncludes = new HashSet<String>();
@@ -586,24 +568,7 @@
   */
  public Map<Integer, ServerState> getReplicaStates()
  {
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
    // wait for Response up to 10 seconds.
    try
    {
      synchronized (monitorResponse)
      {
        if (monitorResponse.get() == false)
        {
          monitorResponse.wait(10000);
        }
      }
    } catch (InterruptedException e)
    {}
    return replicaStates;
    return broker.getReplicaStates();
  }
  /**
@@ -834,26 +799,6 @@
          update = (UpdateMsg) msg;
          generator.adjust(update.getChangeNumber());
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier
          // build the replicaStates Map.
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            int serverId = it.next();
            replicaStates.put(
                serverId, monitorMsg.getLDAPServerState(serverId));
          }
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.set(true);
            monitorResponse.notify();
          }
        }
      }
      catch (SocketTimeoutException e)
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -27,7 +27,6 @@
package org.opends.server.replication;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
@@ -37,7 +36,6 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
@@ -56,13 +54,9 @@
import java.util.SortedSet;
import java.util.TreeSet;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.Backend;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.MemoryBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.controls.ExternalChangelogRequestControl;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -784,6 +784,8 @@
      catch(SocketTimeoutException e)
      {
        // This is the expected result
        // Note that timeout should be lower than RS montoring publisher period
        // so that timeout occurs
      }
      //===========================================================
@@ -889,49 +891,21 @@
      // Broker 2 and 3 should receive 1 change status message to order them
      // to enter the bad gen id status
      try
      ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2,
        ChangeStatusMsg.class.getName());
      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
      {
        ReplicationMsg msg = broker2.receive();
        if (!(msg instanceof ChangeStatusMsg))
        {
          fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
        ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
        if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
        {
          fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
          " to enter the bad gen id status"
            + csMsg);
      }
      catch(SocketTimeoutException se)
      csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3,
        ChangeStatusMsg.class.getName());
      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
      {
        fail("DS2 is expected to receive 1 ChangeStatusMsg to enter the " +
          "bad gen id status.");
      }
      try
      {
        ReplicationMsg msg = broker3.receive();
        if (!(msg instanceof ChangeStatusMsg))
        {
          fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
        ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
        if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
        {
          fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
      }
      catch(SocketTimeoutException se)
      {
        fail("DS3 is expected to receive 1 ChangeStatusMsg to enter the " +
          "bad gen id status.");
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
          " to enter the bad gen id status"
            + csMsg);
      }
      debugInfo("DS1 root entry must contain the new gen ID");
@@ -988,7 +962,8 @@
      debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it.");
      broker2.publish(createAddMsg());
      AddMsg emsg = (AddMsg)createAddMsg();
      broker2.publish(emsg);
      // Updates count in RS1 must stay unchanged = to 1
      Thread.sleep(500);
@@ -1060,8 +1035,30 @@
          isDegradedDueToGenerationId(server3ID),
      "Expecting that DS3 is not in bad gen id from RS1");
      debugInfo("Verify that DS2 receives the add message stored in RS1 DB");
      try
      {
        ReplicationMsg msg = broker2.receive();
        assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
      }
      catch(SocketTimeoutException e)
      {
        fail("The msg stored in RS1 DB is expected to be received by DS2)");
      }
      debugInfo("Verify that DS3 receives the add message stored in RS1 DB");
      try
      {
        ReplicationMsg msg = broker3.receive();
        assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
      }
      catch(SocketTimeoutException e)
      {
        fail("The msg stored in RS1 DB is expected to be received by DS3)");
      }
      debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it.");
      AddMsg emsg = (AddMsg)createAddMsg();
      emsg = (AddMsg)createAddMsg();
      broker2.publish(emsg);
      Thread.sleep(500);
@@ -1105,7 +1102,7 @@
   * The following test focus on:
   * - genId checking across multiple starting RS (replication servers)
   * - genId setting propagation from one RS to the others
   * - genId reset   propagation from one RS to the others
   * - genId reset propagation from one RS to the others
   */
  @Test(enabled=false)
  public void testMultiRS() throws Exception
@@ -1190,7 +1187,7 @@
      assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId);
      assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId);
      debugInfo("Connecting broker2 to replServer1 with a bad genId");
      debugInfo("Connecting broker3 to replServer1 with a bad genId");
      try
      {
        long badgenId = 1;
@@ -1215,7 +1212,7 @@
      debugInfo("Connecting DS to replServer1.");
      connectServer1ToChangelog(changelog1ID);
      Thread.sleep(1000);
      Thread.sleep(3000);
      debugInfo("Adding reset task to DS.");
@@ -1373,7 +1370,7 @@
  /**
   * Loop opening sessions to the Replication Server
   * to check that it handle correctly deconnection and reconnection.
   * to check that it handle correctly disconnection and reconnection.
   */
  @Test(enabled=false, groups="slow")
  public void testLoop() throws Exception
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -65,9 +64,10 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.schema.DirectoryStringSyntax;
@@ -260,34 +260,6 @@
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      {
        logError(new MessageBuilder(
            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
            .append(" when emptying old changes").toMessage());
      }
    }
    return broker;
  }
@@ -313,32 +285,6 @@
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    if (emptyOldChanges)
    {
      // loop receiving update until there is nothing left
      // to make sure that message from previous tests have been consumed.
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      {
        logError(new MessageBuilder(
            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
            .append(" when emptying old changes").toMessage());
      }
    }
    return broker;
  }
  */
@@ -435,17 +381,6 @@
      boolean emptyOldChanges)
      throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
        getGenerationId(baseDn));
  }
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, int serverId, int window_size,
        int port, int timeout, int maxSendQueue, int maxRcvQueue,
        boolean emptyOldChanges, long generationId)
            throws Exception, SocketException
  {
    ServerState state = new ServerState();
    if (emptyOldChanges)
@@ -453,37 +388,13 @@
    ReplicationBroker broker = new ReplicationBroker(null,
        state, baseDn.toNormalizedString(), serverId, window_size,
        generationId, 0, getReplSessionSecurity(), (byte)1, 500);
        getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      { }
    }
    return broker;
  }
@@ -575,11 +486,14 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
      " ##### Calling ReplicationTestCase.classCleanUp ##### "));
    // Clean RS databases
    cleanUpReplicationServersDB();
    cleanConfigEntries();
    configEntryList = null;
    configEntryList = new LinkedList<DN>();
    cleanRealEntries();
    entryList = null;
    entryList = new LinkedList<DN>();
    // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
    // (in case our test created some emtries in it)
@@ -631,6 +545,10 @@
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)",
      "Found unexpected replication server config left");
    // Be sure that no replication server instance is left
    List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
    assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
    // Check for config entries for replication domain
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
      "Found unexpected replication domain config left");
@@ -648,6 +566,17 @@
  }
  /**
   * Cleanup databases of the currently instantiated replication servers in the
   * VM
   */
  protected void cleanUpReplicationServersDB() {
    for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
      rs.clearDb();
    }
  }
  /**
   * Performs a search on the config backend with the specified filter.
   * Fails if a config entry is found.
   * @param filter The filter to apply for the search
@@ -1266,4 +1195,90 @@
      // done
    }
  }
  /**
   * Wait for the arrival of a specific message type on the provided session
   * before going in timeout and failing.
   * @param session Session from which we should receive the message.
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      try
      {
        replMsg = session.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  }
  /**
   * Wait for the arrival of a specific message type on the provided broker
   * before going in timeout and failing.
   * @param broker Broker from which we should receive the message.
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      try
      {
        replMsg = broker.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -128,6 +128,8 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : pushSchemaChange "));
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("cn=schema");
    ReplicationBroker broker =
@@ -216,6 +218,8 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : replaySchemaChange "));
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("cn=schema");
    ReplicationBroker broker =
@@ -253,6 +257,8 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : pushSchemaFilesChange "));
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("cn=schema");
    ReplicationBroker broker =
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -67,6 +67,7 @@
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -300,6 +301,9 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting synchronization test : toggleReceiveStatus"));
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    /*
@@ -379,6 +383,9 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting replication test : lostHeartbeatFailover"));
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    /*
@@ -483,6 +490,9 @@
         DirectoryServer.getAttributeType("entryuuid");
    String monitorAttr = "resolved-modify-conflicts";
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    /*
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
@@ -610,6 +620,9 @@
    String resolvedMonitorAttr = "resolved-naming-conflicts";
    String unresolvedMonitorAttr = "unresolved-naming-conflicts";
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    /*
     * Open a session to the replicationServer using the ReplicationServer broker API.
     * This must use a serverId different from the LDAP server ID
@@ -1302,6 +1315,18 @@
    return new Object[][] { { false }, {true} };
  }
  private void cleanupTest() {
    try
    {
      classCleanUp();
      setUp();
    } catch (Exception e)
    {
      fail("Test cleanup failed: " + e.getClass().getName() + " : " +
        e.getMessage() + " : " + StaticUtils.stackTraceToSingleLineString(e));
    }
  }
  /**
   * Tests done using directly the ReplicationBroker interface.
   */
@@ -1312,6 +1337,9 @@
        Category.SYNC, Severity.INFORMATION,
        "Starting replication test : updateOperations " + assured));
    // Cleanup from previous run
    cleanupTest();
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    ReplicationBroker broker =
@@ -1341,15 +1369,15 @@
        // Check if the client has received the msg
        ReplicationMsg msg = broker.receive();
        assertTrue(msg instanceof AddMsg,
        "The received replication message is not an ADD msg");
        "The received replication message is not an ADD msg : " + msg);
        AddMsg addMsg =  (AddMsg) msg;
        Operation receivedOp = addMsg.createOperation(connection);
        assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
        "The received replication message is not an ADD msg");
        "The received replication message is not an ADD msg : " + addMsg);
        assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD replication message is not for the excepted DN");
        "The received ADD replication message is not for the excepted DN : " + addMsg);
      }
      // Modify the entry
@@ -1364,12 +1392,12 @@
      // See if the client has received the msg
      ReplicationMsg msg = broker.receive();
      assertTrue(msg instanceof ModifyMsg,
      "The received replication message is not a MODIFY msg");
      "The received replication message is not a MODIFY msg : " + msg);
      ModifyMsg modMsg = (ModifyMsg) msg;
      modMsg.createOperation(connection);
      assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
      "The received MODIFY replication message is not for the excepted DN");
      "The received MODIFY replication message is not for the excepted DN : " + modMsg);
      // Modify the entry DN
      DN newDN = DN.decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING) ;
@@ -1387,12 +1415,12 @@
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof ModifyDNMsg,
      "The received replication message is not a MODIFY DN msg");
      "The received replication message is not a MODIFY DN msg : " + msg);
      ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
      moddnMsg.createOperation(connection);
      assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
      "The received MODIFY_DN message is not for the excepted DN");
      "The received MODIFY_DN message is not for the excepted DN : " + moddnMsg);
      // Delete the entry
      DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
@@ -1406,12 +1434,12 @@
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof DeleteMsg,
      "The received replication message is not a MODIFY DN msg");
      "The received replication message is not a MODIFY DN msg : " + msg);
      DeleteMsg delMsg = (DeleteMsg) msg;
      delMsg.createOperation(connection);
      assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
          .decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING)) == 0,
      "The received DELETE message is not for the excepted DN");
      "The received DELETE message is not for the excepted DN : " + delMsg);
      /*
       * Now check that when we send message to the ReplicationServer
@@ -1512,6 +1540,9 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting replication test : deleteNoSuchObject"));
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    DN dn = DN.decode("cn=No Such Object,ou=People," + TEST_ROOT_DN_STRING);
    DeleteOperationBasis op =
         new DeleteOperationBasis(connection,
@@ -1535,6 +1566,9 @@
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    Thread.sleep(2000);
    ReplicationBroker broker =
      openReplicationSession(baseDn,  11, 100, replServerPort, 1000, true);
@@ -1675,6 +1709,9 @@
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    /*
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -501,7 +501,7 @@
        // Send topo view
        List<RSInfo> rsList = new ArrayList<RSInfo>();
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
        rsList.add(rsInfo);
        TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
          rsList);
@@ -719,7 +719,7 @@
    }
    /**
     * Read the coming seaf read mode updates and send back acks with errors
     * Read the coming safe read mode updates and send back acks with errors
     */
    private void executeSafeReadManyErrorsScenario()
    {
@@ -1058,7 +1058,7 @@
  }
  /**
   * Tests parameters sent in session handshake an updates, when not using
   * Tests parameters sent in session handshake and updates, when not using
   * assured replication
   */
  @Test
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -836,7 +836,7 @@
        fail("Unknown replication server id.");
    }
    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId);
    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -1092,11 +1092,11 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -1026,13 +1026,13 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98);
    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -595,6 +594,9 @@
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
        replServers, groupId, assuredTimeout, 5000);
      // No monitoring publisher to not interfer with some SocketTimeoutException
      // expected at some points in these tests
      conf.setMonitoringPeriod(0L);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      return replicationServer;
@@ -908,7 +910,7 @@
        ReplicationMsg replMsg = session.receive();
        if (replMsg instanceof ErrorMsg)
        {
          // Support for connection done with bad gen id : we receive an error
        // Support for connection done with bad gen id : we receive an error
          // message that we must throw away before reading our ack.
          replMsg = session.receive();
        }
@@ -967,7 +969,7 @@
        }
        // Send our topo mesg
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
        List<RSInfo> rsInfos = new ArrayList<RSInfo>();
        rsInfos.add(rsInfo);
        TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -63,8 +63,11 @@
  // The weight of the server
  private int weight = 1;
  // The monitoring publisher period
  private long monitoringPeriod = 3000;
  /**
   * Constructor without goup id, assured info and weight
   * Constructor without group id, assured info and weight
   */
  public ReplServerFakeConfiguration(
      int port, String dirName, int purgeDelay, int serverId,
@@ -254,4 +257,17 @@
    return weight;
  }
  public long getMonitoringPeriod()
  {
    return monitoringPeriod;
  }
  /**
   * @param monitoringPeriod the monitoringPeriod to set
   */
  public void setMonitoringPeriod(long monitoringPeriod)
  {
    this.monitoringPeriod = monitoringPeriod;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -74,7 +74,6 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
@@ -1003,7 +1002,7 @@
            ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
      session.publish(msg);
      // Read the Replication Server state from the ReplServerStartMsg that
      // Read the Replication Server state from the ReplServerStartDSMsg that
      // comes back.
      ReplServerStartDSMsg replStartDSMsg =
        (ReplServerStartDSMsg) session.receive();
@@ -1079,7 +1078,8 @@
      // check that this did not change the window by sending a probe again.
      session.publish(new WindowProbeMsg());
      windowMsg = (WindowMsg) session.receive();
      // We may receive some MonitoringMsg so use filter method
      windowMsg = (WindowMsg)waitForSpecificMsg(session, WindowMsg.class.getName());
      assertEquals(serverwindow, windowMsg.getNumAck());
      debugInfo("Ending windowProbeTest");
    }