mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
06.11.2009 3a9e211d36ee94ff99941943b3b51e0f768624f5
In order to support a more clever algorithm for the DS to choose his RS, 
we introduce:



- a weigth, which is an integer affected to each RS that combined with
each others will define a percentage value which matches the number of
DSs (compared with total number od DSs in the topology) that can be
connected to the RS at a time in the topology. In these modif, this
configuration of the weight is added as well as dynamic changes. Also
transported in Topo messages. No modification of the connection
algorithm yet



- Also to support the future connection algorithm, these modifs
introduces a Monitoring Publisher thread which is a thread that sens
every 3 seconds a Monitoring message (unchanged format) to every DSs
that are connected to him. These information will be used by the DSs to
potentially reconnect to another RSs with a newer server state (info
included in monitoring messages)



The new connection algorithm will take into account:

- group id

- generation id

- server states

- locality (same VM)

- weight (load)

28 files modified
1268 ■■■■ changed files
opends/resource/schema/02-config.ldif 8 ●●●● patch | view | raw | blame | history
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml 23 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/DSInfo.java 18 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/RSInfo.java 30 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 18 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 8 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 230 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 316 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 92 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 57 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 77 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 203 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java 55 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java 2 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java 8 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 6 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java 18 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 6 ●●●● patch | view | raw | blame | history
opends/resource/schema/02-config.ldif
@@ -2458,6 +2458,11 @@
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.27
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
attributeTypes: ( 1.3.6.1.4.1.26027.1.1.605
  NAME 'ds-cfg-monitoring-period'
  SYNTAX 1.3.6.1.4.1.1466.115.121.1.15
  SINGLE-VALUE
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.1
  NAME 'ds-cfg-access-control-handler'
  SUP top
@@ -3137,7 +3142,8 @@
        ds-cfg-group-id $
        ds-cfg-assured-timeout $
        ds-cfg-degraded-status-threshold $
        ds-cfg-weight)
        ds-cfg-weight $
        ds-cfg-monitoring-period)
  X-ORIGIN 'OpenDS Directory Server' )
objectClasses: ( 1.3.6.1.4.1.26027.1.2.65
  NAME 'ds-backup-directory'
opends/src/admin/defn/org/opends/server/admin/std/ReplicationServerConfiguration.xml
@@ -296,4 +296,27 @@
      </ldap:attribute>
    </adm:profile>
  </adm:property>
  <adm:property name="monitoring-period" mandatory="false">
    <adm:synopsis>
      The period between sending of monitoring messages.
    </adm:synopsis>
    <adm:description>
      Defines the amount of milliseconds the replication server will wait before
      sending new monitoring messages to its peers (replication servers and
      directory servers).
    </adm:description>
    <adm:default-behavior>
      <adm:defined>
        <adm:value>3000ms</adm:value>
      </adm:defined>
    </adm:default-behavior>
    <adm:syntax>
      <adm:duration base-unit="ms" lower-limit="1000" />
    </adm:syntax>
    <adm:profile name="ldap">
      <ldap:attribute>
        <ldap:name>ds-cfg-monitoring-period</ldap:name>
      </ldap:attribute>
    </adm:profile>
  </adm:property>
</adm:managed-object>
opends/src/messages/messages/replication.properties
opends/src/server/org/opends/server/replication/common/DSInfo.java
@@ -247,23 +247,23 @@
    StringBuffer sb = new StringBuffer();
    sb.append("DS id: ");
    sb.append(dsId);
    sb.append(" RS id: ");
    sb.append(" ; RS id: ");
    sb.append(rsId);
    sb.append(" Generation id: ");
    sb.append(" ; Generation id: ");
    sb.append(generationId);
    sb.append(" Status: ");
    sb.append(" ; Status: ");
    sb.append(status);
    sb.append(" Assured replication: ");
    sb.append(" ; Assured replication: ");
    sb.append(assuredFlag);
    sb.append(" Assured mode: ");
    sb.append(" ; Assured mode: ");
    sb.append(assuredMode);
    sb.append(" Safe data level: ");
    sb.append(" ; Safe data level: ");
    sb.append(safeDataLevel);
    sb.append(" Group id: ");
    sb.append(" ; Group id: ");
    sb.append(groupId);
    sb.append(" Referral URLs: ");
    sb.append(" ; Referral URLs: ");
    sb.append(refUrls);
    sb.append(" ECL Include: ");
    sb.append(" ; ECL Include: ");
    sb.append(eclIncludes);
    return sb.toString();
  }
opends/src/server/org/opends/server/replication/common/RSInfo.java
@@ -40,6 +40,11 @@
  private long generationId = -1;
  // Group id of the RS
  private byte groupId = (byte) -1;
  // The weight of the RS
  // It is important to keep the default value to 1 so that it is used as
  // default value for a RS using protocol V3: this default value vill be used
  // in algorithms that use weight
  private int weight = 1;
  /**
   * Creates a new instance of RSInfo with every given info.
@@ -47,12 +52,14 @@
   * @param id The RS id
   * @param generationId The generation id the RS is using
   * @param groupId RS group id
   * @param weight RS weight
   */
  public RSInfo(int id, long generationId, byte groupId)
  public RSInfo(int id, long generationId, byte groupId, int weight)
  {
    this.id = id;
    this.generationId = generationId;
    this.groupId = groupId;
    this.weight = weight;
  }
  /**
@@ -83,6 +90,16 @@
  }
  /**
   * Get the RS weight.
   * @return The RS weight
   */
  public int getWeight()
  {
    return weight;
  }
  /**
   * Test if the passed object is equal to this one.
   * @param obj The object to test
   * @return True if both objects are equal
@@ -99,7 +116,8 @@
      RSInfo rsInfo = (RSInfo) obj;
      return ((id == rsInfo.getId()) &&
        (generationId == rsInfo.getGenerationId()) &&
        (groupId == rsInfo.getGroupId()));
        (groupId == rsInfo.getGroupId()) &&
        (weight == rsInfo.getWeight()));
    } else
    {
      return false;
@@ -117,6 +135,7 @@
    hash = 37 * hash + this.id;
    hash = 37 * hash + (int) (this.generationId ^ (this.generationId >>> 32));
    hash = 37 * hash + this.groupId;
    hash = 37 * hash + this.weight;
    return hash;
  }
@@ -130,11 +149,12 @@
    StringBuffer sb = new StringBuffer();
    sb.append("Id: ");
    sb.append(id);
    sb.append(" Generation id: ");
    sb.append(" ; Generation id: ");
    sb.append(generationId);
    sb.append(" Group id: ");
    sb.append(" ; Group id: ");
    sb.append(groupId);
    sb.append(" ; Weight: ");
    sb.append(weight);
    return sb.toString();
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -93,6 +93,24 @@
  }
  /**
   * Sets the sender ID.
   * @param senderID The sender ID.
   */
  public void setSenderID(int senderID)
  {
    this.senderID = senderID;
  }
  /**
   * Sets the destination.
   * @param destination The destination.
   */
  public void setDestination(int destination)
  {
    this.destination = destination;
  }
  /**
   * Sets the state of the replication server.
   * @param state The state.
   */
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -31,7 +31,7 @@
/**
 * This message is part of the replication protocol.
 * RS1 sends a MonitorRequestMsg to RS2 to requests its monitoring
 * RS1 sends a MonitorRequestMsg to RS2 to request its monitoring
 * informations.
 * When RS2 receives a MonitorRequestMsg from RS1, RS2 responds with a
 * MonitorMessage.
opends/src/server/org/opends/server/replication/protocol/RoutableMsg.java
@@ -29,7 +29,7 @@
/**
 * This is an abstract class of messages of the replication protocol
 * for message that needs to contain information about the server that
 * send them and the destination servers to whitch they should be sent.
 * send them and the destination servers to which they should be sent.
 */
public abstract class RoutableMsg extends ReplicationMsg
{
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -167,6 +167,7 @@
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          Set<String> attrs = dsInfo.getEclIncludes();
          oStream.write(attrs.size());
          for (String attr : attrs)
@@ -192,8 +193,15 @@
        oStream.write(String.valueOf(rsInfo.getGenerationId()).
          getBytes("UTF-8"));
        oStream.write(0);
        // Put DS group id
        // Put RS group id
        oStream.write(rsInfo.getGroupId());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put RS weight
          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
          oStream.write(0);
        }
      }
      return oStream.toByteArray();
@@ -332,23 +340,30 @@
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos +=
          length + 1;
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        pos +=
          length + 1;
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
        int weight = 1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
        }
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, generationId, groupId);
        RSInfo rsInfo = new RSInfo(id, generationId, groupId, weight);
        rsList.add(rsInfo);
        nRsInfo--;
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -309,8 +309,7 @@
    try
    {
      MonitorData md;
      md = replicationServerDomain.computeMonitorData();
      MonitorData md = replicationServerDomain.computeMonitorData();
      // Oldest missing update
      Long approxFirstMissingDate = md.getApproxFirstMissingDate(serverId);
@@ -538,7 +537,7 @@
          return;
        }
        // Send our own TopologyMsg to remote RS
        // Send our own TopologyMsg to remote DS
        TopologyMsg outTopoMsg = sendTopoToRemoteDS();
        logStartSessionHandshake(inStartSessionMsg, outTopoMsg);
@@ -572,6 +571,9 @@
      // Create the status analyzer for the domain if not already started
      createStatusAnalyzer();
      // Create the monitoring publisher for the domain if not already started
      createMonitoringPublisher();
      registerIntoDomain();
      super.finalizeStart();
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -49,8 +49,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -101,6 +99,7 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
import java.util.Collections;
/**
 * ReplicationServer Listener.
@@ -173,6 +172,10 @@
  // the DS in DEGRADED_STATUS. If value is 0, status analyzer is disabled
  private int degradedStatusThreshold = 5000;
  // Number of milliseconds to wait before sending new monitoring messages.
  // If value is 0, monitoring publisher is disabled
  private long monitoringPublisherPeriod = 3000;
  // The handler of the draft change numbers database, the database used to
  // store the relation between a draft change number ('seqnum') and the
  // associated cookie.
@@ -211,6 +214,13 @@
  private int weight = 1;
  /**
   * Holds the list of all replication servers instantiated in this VM.
   * This allows to perform clean up of the RS databases in unit tests.
   */
  private static List<ReplicationServer> allInstances =
    new ArrayList<ReplicationServer>();
  /**
   * Creates a new Replication server using the provided configuration entry.
   *
   * @param configuration The configuration of this replication server.
@@ -254,6 +264,7 @@
    groupId = (byte)configuration.getGroupId();
    assuredTimeout = configuration.getAssuredTimeout();
    degradedStatusThreshold = configuration.getDegradedStatusThreshold();
    monitoringPublisherPeriod = configuration.getMonitoringPeriod();
    replSessionSecurity = new ReplSessionSecurity();
    initialize(replicationPort);
@@ -274,8 +285,20 @@
    DirectoryServer.registerImportTaskListener(this);
    localPorts.add(replicationPort);
    // Keep track of this new instance
    allInstances.add(this);
  }
  /**
   * Get the list of every replication servers instantiated in the current VM.
   * @return The list of every replication servers instantiated in the current
   * VM.
   */
  public static List<ReplicationServer> getAllInstances()
  {
    return allInstances;
  }
  /**
   * The run method for the Listen thread.
@@ -850,6 +873,8 @@
      dbEnv.shutdown();
    }
    // Remove this instance from the global instance list
    allInstances.remove(this);
}
@@ -1028,6 +1053,32 @@
      }
    }
    // Update period value for monitoring publishers (stop them if requested
    // value is 0)
    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
    {
      long oldMonitoringPeriod = monitoringPublisherPeriod;
      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
      for(ReplicationServerDomain rsd : baseDNs.values())
      {
        if (monitoringPublisherPeriod == 0L)
        {
          // Requested to stop monitoring publishers
          rsd.stopMonitoringPublisher();
        } else if (rsd.isRunningMonitoringPublisher())
        {
          // Update the threshold value for this running monitoring publisher
          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
        } else if (oldMonitoringPeriod == 0L)
        {
          // Requested to start monitoring publishers with provided period value
          if ( (rsd.getConnectedDSs().size() > 0) ||
            (rsd.getConnectedRSs().size() > 0) )
            rsd.startMonitoringPublisher();
        }
      }
    }
    // Changed the group id ?
    byte newGroupId = (byte)configuration.getGroupId();
    if (newGroupId != groupId)
@@ -1044,7 +1095,10 @@
    if (weight != configuration.getWeight())
    {
      weight = configuration.getWeight();
      // TODO: send new TopologyMsg
      // Broadcast the new weight the the whole topology. This will make some
      // DSs reconnect (if needed) to other RSs according to the new weight of
      // this RS.
      broadcastConfigChange();
    }
    if ((configuration.getReplicationDBDirectory() != null) &&
@@ -1057,6 +1111,19 @@
  }
  /**
   * Broadcast a configuration change that just happened to the whole topology
   * by sending a TopologyMsg to every entity in the topology.
   */
  private void broadcastConfigChange()
  {
    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
    {
      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
      replicationServerDomain.buildAndSendTopoInfoToRSs();
    }
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationChangeAcceptable(
@@ -1345,6 +1412,15 @@
  }
  /**
   * Get the monitoring publisher period value.
   * @return the monitoring publisher period value.
   */
  public long getMonitoringPublisherPeriod()
  {
    return monitoringPublisherPeriod;
  }
  /**
   * Compute the list of replication servers that are not any
   * more connected to this Replication Server and stop the
   * corresponding handlers.
@@ -1411,12 +1487,80 @@
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /* Search op on monitor data is processed by a worker thread.
   * Requests are sent to the other RS,and responses are received by the
   * listener threads.
   * The worker thread is awoke on this semaphore, or on timeout.
  /**
   * This uniquely identifies a server (handler) in the cross-domain topology.
   * Represents an identifier of a handler (in the whole RS) we have to wait a
   * monitoring message from before answering to a monitor request.
   */
  Semaphore remoteMonitorResponsesSemaphore = new Semaphore(0);
  public static class GlobalServerId {
    private int serverId = -1;
    private String baseDn = null;
    /**
     * Constructor for a global server id.
     * @param baseDn The dn of the RSD owning the handler.
     * @param serverId The handler id in the matching RSD.
     */
    public GlobalServerId(String baseDn, int serverId) {
      this.baseDn = baseDn;
      this.serverId = serverId;
    }
    /**
     * Get the server handler id.
     * @return the serverId
     */
    public int getServerId()
    {
      return serverId;
    }
    /**
     * Get the base dn.
     * @return the baseDn
     */
    public String getBaseDn()
    {
      return baseDn;
    }
    /**
     * Get the hascode.
     * @return The hashcode.
     */
    @Override
    public int hashCode()
    {
      int hash = 7;
      hash = 43 * hash + this.serverId;
      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
      return hash;
    }
    /**
     * Tests if the passed global server handler id represents the same server
     * handler as this one.
     * @param obj The object to test.
     * @return True if both identifiers are the same.
     */
    public boolean equals(Object obj) {
      if ( (obj == null) || (obj instanceof GlobalServerId))
        return false;
      GlobalServerId globalServerId = (GlobalServerId)obj;
      return ( globalServerId.baseDn.equals(baseDn) &&
        (globalServerId.serverId == serverId) );
    }
  }
  /**
   * This gives the list of server handlers we are willing to wait monitoring
   * message from. Each time a monitoring message is received by a server
   * handler, the matching server handler id is retired from the list. When the
   * list is empty, we received all expected monitoring messages.
   */
  private List<GlobalServerId> expectedMonitoringMsg = null;
  /**
   * Trigger the computation of the Global Monitoring Data.
@@ -1429,7 +1573,7 @@
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public void computeMonitorData() throws DirectoryException
  public synchronized void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
@@ -1440,15 +1584,17 @@
      return;
    }
    remoteMonitorResponsesSemaphore.drainPermits();
    int count = 0;
    // Initialize the list of server handlers we expect monitoring messages from
    expectedMonitoringMsg =
      Collections.synchronizedList(new ArrayList<GlobalServerId>());
    for (ReplicationServerDomain domain : baseDNs.values())
    {
      count += domain.initializeMonitorData();
      domain.initializeMonitorData(expectedMonitoringMsg);
    }
    // Wait for responses
    waitMonitorDataResponses(count);
    waitMonitorDataResponses();
    for (ReplicationServerDomain domain : baseDNs.values())
    {
@@ -1457,38 +1603,51 @@
  }
  /**
   * Wait for the expected count of received MonitorMsg.
   * @param expectedResponses The number of expected answers.
   * Wait for the expected received MonitorMsg.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses(int expectedResponses)
  private void waitMonitorDataResponses()
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " baseDn=" +
          " waiting for " + expectedResponses + " expected monitor messages");
          "In " + getMonitorInstanceName() +
          " waiting for " + expectedMonitoringMsg.size() +
          " expected monitor messages");
      boolean allPermitsAcquired =
        remoteMonitorResponsesSemaphore.tryAcquire(
        expectedResponses,
        (long) 5000, TimeUnit.MILLISECONDS);
      if (!allPermitsAcquired)
      // Wait up to 5 seconds for every expected monitoring message to come
      // back.
      boolean allReceived = false;
      long startTime = TimeThread.getTime();
      long curTime = startTime;
      int maxTime = 5000;
      while ( (curTime - startTime) < maxTime )
      {
        // Have every expected monitoring messages arrived ?
        if (expectedMonitoringMsg.size() == 0)
        {
          // Ok break the loop
          allReceived = true;
          break;
        }
        Thread.sleep(100);
        curTime = TimeThread.getTime();
      }
        monitorDataLastBuildDate = TimeThread.getTime();
      if (!allReceived)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
      // let's go on in best effort even with limited data received.
      } else
      {
        monitorDataLastBuildDate = TimeThread.getTime();
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() + " baseDn=" +
            " Successfully received all " + expectedResponses +
            " expected monitor messages");
            "In " + getMonitorInstanceName() +
            " Successfully received all expected monitor messages");
      }
    } catch (Exception e)
    {
@@ -1499,11 +1658,18 @@
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message.
   * a response to a monitor request message. This may also be called when a
   * monitoring message is coming from a RS whose monitoring publisher thread
   * sent it. As monitoring messages (sent because of monitoring request or
   * because of monitoring publisher) have the same content, this is also ok
   * to mark ok the server when the monitoring message coms from a monitoring
   * publisher thread.
   * @param globalServerId The server handler that is receiving the
   * monitoring message.
   */
  public void responseReceived()
  public void responseReceived(GlobalServerId globalServerId)
  {
    remoteMonitorResponsesSemaphore.release();
    expectedMonitoringMsg.remove(globalServerId);
  }
@@ -1513,7 +1679,7 @@
   */
  public void responseReceivedAll()
  {
    remoteMonitorResponsesSemaphore.notifyAll();
    expectedMonitoringMsg.clear();
  }
  /**
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -85,6 +85,8 @@
import org.opends.server.types.ResultCode;
import com.sleepycat.je.DatabaseException;
import org.opends.server.replication.server.
  ReplicationServer.GlobalServerId;
/**
 * This class define an in-memory cache that will be used to store
@@ -109,6 +111,10 @@
  // late or not
  private StatusAnalyzer statusAnalyzer = null;
  // The monitoring publisher that periodically sends monitoring messages to the
  // topology
  private MonitoringPublisher monitoringPublisher = null;
  /*
   * The following map contains one balanced tree for each replica ID
   * to which we are currently publishing
@@ -1066,6 +1072,17 @@
          // Try doing job anyway...
        }
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        {
          if (debugEnabled())
            TRACER.debugInfo("In " +
              replicationServer.getMonitorInstanceName() +
              " remote server " + handler.getMonitorInstanceName() + " is " +
              "the last RS/DS to be stopped: stopping monitoring publisher");
          stopMonitoringPublisher();
        }
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsValue(handler))
@@ -1082,9 +1099,7 @@
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else
        {
          if (directoryServers.containsValue(handler))
        } else if (directoryServers.containsValue(handler))
          {
            // If this is the last DS for the domain,
            // shutdown the status analyzer
@@ -1103,24 +1118,21 @@
            // Check if generation id has to be reset
            mayResetGenerationId();
            // Update the remote replication servers with our list
            // of connected LDAP servers
            if (!shutdown)
            {
            // Update the remote replication servers with our list
            // of connected LDAP servers
              buildAndSendTopoInfoToRSs();
              // Warn our DSs that a RS or DS has quit (does not use this
              // handler as already removed from list)
              buildAndSendTopoInfoToDSs(null);
            }
          }
          else if (otherHandlers.contains(handler))
        } else if (otherHandlers.contains(handler))
          {
            unRegisterHandler(handler);
            handler.shutdown();
          }
        }
      }
      catch(Exception e)
      {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -1581,83 +1593,31 @@
        // in the topology.
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
          // Monitoring information requested by a DS
          MonitorMsg monitorMsg =
            createGlobalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
           if (monitorMsg != null)
          {
          try
          {
            returnMsg.setReplServerDbState(getDbServerState());
            // Update the information we have about all servers
            // in the topology.
            MonitorData md = computeMonitorData();
            // Add the informations about the Replicas currently in
            // the topology.
            Iterator<Integer> it = md.ldapIterator();
            while (it.hasNext())
            {
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getLDAPServerState(replicaId),
                  md.getApproxFirstMissingDate(replicaId), true);
            }
            // Add the informations about the Replication Servers
            // currently in the topology.
            it = md.rsIterator();
            while (it.hasNext())
            {
              int replicaId = it.next();
              returnMsg.setServerState(
                  replicaId, md.getRSStates(replicaId),
                  md.getRSApproxFirstMissingDate(replicaId), false);
            }
          }
          catch (DirectoryException e)
          {
            // If we can't compute the Monitor Information, send
            // back an empty message.
          }
          try
          {
            senderHandler.send(returnMsg);
              senderHandler.send(monitorMsg);
          } catch (IOException e)
          {
            // the connection was closed.
          }
          }
          return;
        }
        } else
        {
          // Monitoring information requested by a RS
        MonitorMsg monitorMsg =
          new MonitorMsg(msg.getDestination(), msg.getsenderID());
            createLocalTopologyMonitorMsg(msg.getDestination(),
            msg.getsenderID());
        // Populate for each connected LDAP Server
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (DataServerHandler lsh : this.directoryServers.values())
          if (monitorMsg != null)
        {
          monitorMsg.setServerState(
            lsh.getServerId(),
            lsh.getServerState(),
            lsh.getApproxFirstMissingDate(),
            true);
        }
        // Same for the connected RS
        for (ReplicationServerHandler rsh : this.replicationServers.values())
        {
          monitorMsg.setServerState(
            rsh.getServerId(),
            rsh.getServerState(),
            rsh.getApproxFirstMissingDate(),
            false);
        }
        // Populate the RS state in the msg from the DbState
        monitorMsg.setReplServerDbState(this.getDbServerState());
        try
        {
          senderHandler.send(monitorMsg);
@@ -1668,12 +1628,16 @@
          logError(ERR_CHANGELOG_ERROR_SENDING_MSG.get(
              Integer.toString((msg.getDestination()))));
        }
          }
        }
      } else if (msg instanceof MonitorMsg)
      {
        MonitorMsg monitorMsg =
          (MonitorMsg) msg;
        receivesMonitorDataResponse(monitorMsg);
        GlobalServerId globalServerId =
          new GlobalServerId(baseDn, senderHandler.getServerId());
        receivesMonitorDataResponse(monitorMsg, globalServerId);
      } else
      {
        logError(NOTE_ERR_ROUTING_TO_SERVER.get(
@@ -1775,6 +1739,116 @@
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * whole topology.
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   */
  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg returnMsg =
      new MonitorMsg(sender, destination);
    try
    {
      returnMsg.setReplServerDbState(getDbServerState());
      // Update the information we have about all servers
      // in the topology.
      MonitorData md = computeMonitorData();
      // Add the informations about the Replicas currently in
      // the topology.
      Iterator<Integer> it = md.ldapIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getLDAPServerState(replicaId),
            md.getApproxFirstMissingDate(replicaId), true);
      }
      // Add the informations about the Replication Servers
      // currently in the topology.
      it = md.rsIterator();
      while (it.hasNext())
      {
        int replicaId = it.next();
        returnMsg.setServerState(
            replicaId, md.getRSStates(replicaId),
            md.getRSApproxFirstMissingDate(replicaId), false);
      }
    }
    catch (DirectoryException e)
    {
      // If we can't compute the Monitor Information, send
      // back an empty message.
    }
    return returnMsg;
  }
  /**
   * Creates a new monitor message including monitoring information for the
   * topology directly connected to this RS. This includes information for:
   * - local RS
   * - all direct DSs
   * - all direct RSs
   * @param sender The sender of this message.
   * @param destination The destination of this message.
   * @return The newly created and filled MonitorMsg. Null if a problem occurred
   * during message creation.
   */
  public MonitorMsg createLocalTopologyMonitorMsg(int sender, int destination)
  {
    MonitorMsg monitorMsg = null;
    try {
      // Lock domain as we need to go through connected servers list
      lock();
      monitorMsg = new MonitorMsg(sender, destination);
      // Populate for each connected LDAP Server
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      {
        monitorMsg.setServerState(
          lsh.getServerId(),
          lsh.getServerState(),
          lsh.getApproxFirstMissingDate(),
          true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      {
        monitorMsg.setServerState(
          rsh.getServerId(),
          rsh.getServerState(),
          rsh.getApproxFirstMissingDate(),
          false);
      }
      // Populate the RS state in the msg from the DbState
      monitorMsg.setReplServerDbState(this.getDbServerState());
    } catch(InterruptedException e)
    {
      // At lock, too bad...
    } finally
    {
      if (hasLock())
        release();
    }
    return monitorMsg;
  }
  /**
   * Shutdown this ReplicationServerDomain.
   */
  public void shutdown()
@@ -1831,8 +1905,7 @@
  /**
   * Send a TopologyMsg to all the connected directory servers in order to
   * let.
   * them know the topology (every known DSs and RSs)
   * let them know the topology (every known DSs and RSs).
   * @param notThisOne If not null, the topology message will not be sent to
   * this passed server.
   */
@@ -1931,10 +2004,11 @@
      dsInfos.add(serverHandler.toDSInfo());
    }
    // Create info for us (local RS)
    // Create info for the local RS
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId());
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    return new TopologyMsg(dsInfos, rsInfos);
@@ -1965,7 +2039,8 @@
    // Add our own info (local RS)
    RSInfo localRSInfo = new RSInfo(replicationServer.getServerId(),
      generationId, replicationServer.getGroupId());
      generationId, replicationServer.getGroupId(),
      replicationServer.getWeight());
    rsInfos.add(localRSInfo);
    // Go through every peer RSs (and get their connected DSs), also add info
@@ -2471,13 +2546,15 @@
   * Start collecting global monitoring information for this
   * ReplicationServerDomain.
   *
   * @return The number of response that should come back.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   *
   * @throws DirectoryException In case the monitoring information could
   *                            not be collected.
   */
  int initializeMonitorData() throws DirectoryException
  void initializeMonitorData(List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    synchronized (monitorDataLock)
    {
@@ -2539,7 +2616,7 @@
    }
    // Send the request for remote monitor data to the
    return sendMonitorDataRequest();
    sendMonitorDataRequest(expectedMonitoringMsg);
  }
  /**
@@ -2566,22 +2643,25 @@
  /**
   * Sends a MonitorRequest message to all connected RS.
   * @return the number of requests sent.
   * @param expectedMonitoringMsg The list of server handler we have to wait a
   * monitoring message from. Will be filled as necessary by this method.
   * @throws DirectoryException when a problem occurs.
   */
  protected int sendMonitorDataRequest()
  protected void sendMonitorDataRequest(
    List<GlobalServerId> expectedMonitoringMsg)
    throws DirectoryException
  {
    int sent = 0;
    try
    {
      for (ServerHandler rs : replicationServers.values())
      {
        int serverId = rs.getServerId();
        MonitorRequestMsg msg =
          new MonitorRequestMsg(this.replicationServer.getServerId(),
          rs.getServerId());
          serverId);
        rs.send(msg);
        sent++;
        // Store the fact that we expect a MonitoringMsg back from this server
        expectedMonitoringMsg.add(new GlobalServerId(baseDn, serverId));
      }
    } catch (Exception e)
    {
@@ -2590,7 +2670,6 @@
      throw new DirectoryException(ResultCode.OTHER,
        message, e);
    }
    return sent;
  }
  /**
@@ -2598,8 +2677,10 @@
   * and stores the data received.
   *
   * @param msg The message to be processed.
   * @param globalServerHandlerId server handler that is receiving the message.
   */
  public void receivesMonitorDataResponse(MonitorMsg msg)
  private void receivesMonitorDataResponse(MonitorMsg msg,
    GlobalServerId globalServerId)
  {
    try
    {
@@ -2677,7 +2758,7 @@
      // Decreases the number of expected responses and potentially
      // wakes up the waiting requestor thread.
      replicationServer.responseReceived();
      replicationServer.responseReceived(globalServerId);
    } catch (Exception e)
    {
@@ -2832,6 +2913,57 @@
  }
  /**
   * Starts the monitoring publisher for the domain.
   */
  public void startMonitoringPublisher()
  {
    if (monitoringPublisher == null)
    {
      long period =
        replicationServer.getMonitoringPublisherPeriod();
      if (period > 0) // 0 means no monitoring publisher
      {
        monitoringPublisher = new MonitoringPublisher(this, period);
        monitoringPublisher.start();
      }
    }
  }
  /**
   * Stops the monitoring publisher for the domain.
   */
  public void stopMonitoringPublisher()
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.shutdown();
      monitoringPublisher.waitForShutdown();
      monitoringPublisher = null;
    }
  }
  /**
   * Tests if the monitoring publisher for this domain is running.
   * @return True if the monitoring publisher is running, false otherwise.
   */
  public boolean isRunningMonitoringPublisher()
  {
    return (monitoringPublisher != null);
  }
  /**
   * Update the monitoring publisher with the new period value.
   * @param period The new period value.
   */
  public void updateMonitoringPublisher(long period)
  {
    if (monitoringPublisher != null)
    {
      monitoringPublisher.setPeriod(period);
    }
  }
  /**
   * {@inheritDoc}
   */
  @Override
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -240,6 +240,9 @@
        logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
        // Create the monitoring publisher for the domain if not already started
        createMonitoringPublisher();
        // FIXME: i think this should be done for all protocol version !!
        // not only those > V1
        registerIntoDomain();
@@ -408,6 +411,10 @@
        // other servers.
      }
      // Create the monitoring publisher for the domain if not already started
      createMonitoringPublisher();
      registerIntoDomain();
      // Process TopologyMsg sent by remote RS: store matching new info
@@ -497,7 +504,18 @@
    // Remote RS sent his topo msg
    TopologyMsg inTopoMsg = (TopologyMsg) msg;
    // CONNECTION WITH A RS
    // Store remore RS weight if it has one
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      // List should only contain RS info for sender
      RSInfo rsInfo = inTopoMsg.getRsList().get(0);
      weight = rsInfo.getWeight();
    }
    else
    {
      // Remote RS uses protocol version prior to 4 : use default value for
      // weight: 1
    }
    // if the remote RS and the local RS have the same genID
    // then it's ok and nothing else to do
@@ -646,6 +664,7 @@
    RSInfo rsInfo = rsInfos.get(0);
    generationId = rsInfo.getGenerationId();
    groupId = rsInfo.getGroupId();
    weight = rsInfo.getWeight();
    /**
     * Store info for DSs connected to the peer RS
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -244,6 +244,10 @@
   */
  private AtomicBoolean shuttingDown = new AtomicBoolean(false);
  /**
   * Weight of this remote server.
   */
  protected int weight = 1;
  /**
   * Creates a new server handler instance with the provided socket.
@@ -1215,12 +1219,23 @@
   */
  public RSInfo toRSInfo()
  {
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
    RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, weight);
    return rsInfo;
  }
  /**
   * Starts the monitoring publisher for the domain if not already started.
   */
  protected void createMonitoringPublisher()
  {
    if (!replicationServerDomain.isRunningMonitoringPublisher())
    {
      replicationServerDomain.startMonitoringPublisher();
    }
  }
  /**
   * Performs any processing periodic processing that may be desired to update
   * the information associated with this monitor.  Note that best-effort
   * attempts will be made to ensure that calls to this method come
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -55,11 +55,14 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.HeartbeatMonitor;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
@@ -116,6 +119,28 @@
  private ReplicationDomain domain = null;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled or the last
   * time it published monitoring information.
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  /**
   * A Map containing the ServerStates of all the replication servers in the
   * topology as seen by the ReplicationServer the last time it was polled or
   * the last time it published monitoring information.
   */
  private HashMap<Integer, ServerState> rsStates =
    new HashMap<Integer, ServerState>();
  /**
   * The expected duration in milliseconds between heartbeats received
   * from the replication server.  Zero means heartbeats are off.
   */
@@ -1918,6 +1943,37 @@
          // Try to find a suitable RS
          this.reStart(failingSession);
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier or
          // the regular message of the monitoring publisher of the RS.
          // Extract and store replicas ServerStates
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
          }
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.set(true);
            monitorResponse.notify();
          }
          // Extract and store replication servers ServerStates
          rsStates = new HashMap<Integer, ServerState>();
          it = monitorMsg.rsIterator();
          while (it.hasNext())
          {
            int srvId = it.next();
            rsStates.put(srvId, monitorMsg.getRSServerState(srvId));
          }
        }
        else
        {
          return msg;
@@ -1949,6 +2005,40 @@
  }
  /**
   * Gets the States of all the Replicas currently in the
   * Topology.
   * When this method is called, a Monitoring message will be sent
   * to the Replication Server to which this domain is currently connected
   * so that it computes a table containing information about
   * all Directory Servers in the topology.
   * This Computation involves communications will all the servers
   * currently connected and
   *
   * @return The States of all Replicas in the topology (except us)
   */
  public Map<Integer, ServerState> getReplicaStates()
  {
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    publish(new MonitorRequestMsg(serverId, getRsServerId()));
    // wait for Response up to 10 seconds.
    try
    {
      synchronized (monitorResponse)
      {
        if (monitorResponse.get() == false)
        {
          monitorResponse.wait(10000);
        }
      }
    } catch (InterruptedException e)
    {}
    return replicaStates;
  }
  /**
   * This method allows to do the necessary computing for the window
   * management after treatment by the worker threads.
   *
@@ -2440,7 +2530,7 @@
    {
      ctHeartbeatPublisherThread =
        new CTHeartbeatPublisherThread(
            "Replication CN Heartbeat Thread started for " +
            "Replication CN Heartbeat sender for " +
            baseDn + " with " + getReplicationServer(),
            session, changeTimeHeartbeatSendInterval, serverId);
      ctHeartbeatPublisherThread.start();
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -65,7 +65,6 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
@@ -79,7 +78,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.MutableBoolean;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachine;
@@ -92,8 +90,6 @@
import org.opends.server.replication.protocol.HeartbeatMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -306,20 +302,6 @@
   */
  private final ChangeNumberGenerator generator;
  /**
   * This object is used as a conditional event to be notified about
   * the reception of monitor information from the Replication Server.
   */
  private final MutableBoolean monitorResponse = new MutableBoolean(false);
  /**
   * A Map containing of the ServerStates of all the replicas in the topology
   * as seen by the ReplicationServer the last time it was polled.
   */
  private HashMap<Integer, ServerState> replicaStates =
    new HashMap<Integer, ServerState>();
  Set<String> cfgEclIncludes = new HashSet<String>();
  Set<String>    eClIncludes = new HashSet<String>();
@@ -586,24 +568,7 @@
   */
  public Map<Integer, ServerState> getReplicaStates()
  {
    monitorResponse.set(false);
    // publish Monitor Request Message to the Replication Server
    broker.publish(new MonitorRequestMsg(serverID, broker.getRsServerId()));
    // wait for Response up to 10 seconds.
    try
    {
      synchronized (monitorResponse)
      {
        if (monitorResponse.get() == false)
        {
          monitorResponse.wait(10000);
        }
      }
    } catch (InterruptedException e)
    {}
    return replicaStates;
    return broker.getReplicaStates();
  }
  /**
@@ -834,26 +799,6 @@
          update = (UpdateMsg) msg;
          generator.adjust(update.getChangeNumber());
        }
        else if (msg instanceof MonitorMsg)
        {
          // This is the response to a MonitorRequest that was sent earlier
          // build the replicaStates Map.
          replicaStates = new HashMap<Integer, ServerState>();
          MonitorMsg monitorMsg = (MonitorMsg) msg;
          Iterator<Integer> it = monitorMsg.ldapIterator();
          while (it.hasNext())
          {
            int serverId = it.next();
            replicaStates.put(
                serverId, monitorMsg.getLDAPServerState(serverId));
          }
          // Notify the sender that the response was received.
          synchronized (monitorResponse)
          {
            monitorResponse.set(true);
            monitorResponse.notify();
          }
        }
      }
      catch (SocketTimeoutException e)
      {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -27,7 +27,6 @@
package org.opends.server.replication;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.OperationContext.SYNCHROCONTEXT;
@@ -37,7 +36,6 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import static org.opends.server.loggers.ErrorLogger.logError;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
@@ -56,13 +54,9 @@
import java.util.SortedSet;
import java.util.TreeSet;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.Backend;
import org.opends.server.api.ConnectionHandler;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.MemoryBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.controls.ExternalChangelogRequestControl;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -784,6 +784,8 @@
      catch(SocketTimeoutException e)
      {
        // This is the expected result
        // Note that timeout should be lower than RS montoring publisher period
        // so that timeout occurs
      }
      //===========================================================
@@ -889,49 +891,21 @@
      // Broker 2 and 3 should receive 1 change status message to order them
      // to enter the bad gen id status
      try
      {
        ReplicationMsg msg = broker2.receive();
        if (!(msg instanceof ChangeStatusMsg))
        {
          fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
        ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
      ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2,
        ChangeStatusMsg.class.getName());
        if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
        {
          fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
            + csMsg);
        }
      }
      catch(SocketTimeoutException se)
      {
        fail("DS2 is expected to receive 1 ChangeStatusMsg to enter the " +
          "bad gen id status.");
      }
      try
      {
        ReplicationMsg msg = broker3.receive();
        if (!(msg instanceof ChangeStatusMsg))
        {
          fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
        ChangeStatusMsg csMsg = (ChangeStatusMsg)msg;
      csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3,
        ChangeStatusMsg.class.getName());
        if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
        {
          fail("Broker 3 connection is expected to receive 1 ChangeStatusMsg" +
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
            " to enter the bad gen id status"
              + msg);
        }
      }
      catch(SocketTimeoutException se)
      {
        fail("DS3 is expected to receive 1 ChangeStatusMsg to enter the " +
          "bad gen id status.");
            + csMsg);
      }
      debugInfo("DS1 root entry must contain the new gen ID");
@@ -988,7 +962,8 @@
      debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it.");
      broker2.publish(createAddMsg());
      AddMsg emsg = (AddMsg)createAddMsg();
      broker2.publish(emsg);
      // Updates count in RS1 must stay unchanged = to 1
      Thread.sleep(500);
@@ -1060,8 +1035,30 @@
          isDegradedDueToGenerationId(server3ID),
      "Expecting that DS3 is not in bad gen id from RS1");
      debugInfo("Verify that DS2 receives the add message stored in RS1 DB");
      try
      {
        ReplicationMsg msg = broker2.receive();
        assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
      }
      catch(SocketTimeoutException e)
      {
        fail("The msg stored in RS1 DB is expected to be received by DS2)");
      }
      debugInfo("Verify that DS3 receives the add message stored in RS1 DB");
      try
      {
        ReplicationMsg msg = broker3.receive();
        assertTrue(msg instanceof AddMsg, "Excpected to receive an AddMsg but received: " + msg);
      }
      catch(SocketTimeoutException e)
      {
        fail("The msg stored in RS1 DB is expected to be received by DS3)");
      }
      debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it.");
      AddMsg emsg = (AddMsg)createAddMsg();
      emsg = (AddMsg)createAddMsg();
      broker2.publish(emsg);
      Thread.sleep(500);
@@ -1190,7 +1187,7 @@
      assertEquals(replServer2.getGenerationId(baseDn.toNormalizedString()), genId);
      assertEquals(replServer3.getGenerationId(baseDn.toNormalizedString()), genId);
      debugInfo("Connecting broker2 to replServer1 with a bad genId");
      debugInfo("Connecting broker3 to replServer1 with a bad genId");
      try
      {
        long badgenId = 1;
@@ -1215,7 +1212,7 @@
      debugInfo("Connecting DS to replServer1.");
      connectServer1ToChangelog(changelog1ID);
      Thread.sleep(1000);
      Thread.sleep(3000);
      debugInfo("Adding reset task to DS.");
@@ -1373,7 +1370,7 @@
  /**
   * Loop opening sessions to the Replication Server
   * to check that it handle correctly deconnection and reconnection.
   * to check that it handle correctly disconnection and reconnection.
   */
  @Test(enabled=false, groups="slow")
  public void testLoop() throws Exception
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -47,7 +47,6 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -65,9 +64,10 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.schema.DirectoryStringSyntax;
@@ -260,34 +260,6 @@
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      {
        logError(new MessageBuilder(
            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
            .append(" when emptying old changes").toMessage());
      }
    }
    return broker;
  }
@@ -313,32 +285,6 @@
      broker.setSoTimeout(timeout);
    checkConnection(30, broker, port); // give some time to the broker to connect
                                       // to the replicationServer.
    if (emptyOldChanges)
    {
      // loop receiving update until there is nothing left
      // to make sure that message from previous tests have been consumed.
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      {
        logError(new MessageBuilder(
            "ReplicationTestCase/openReplicationSession ").append(e.getMessage())
            .append(" when emptying old changes").toMessage());
      }
    }
    return broker;
  }
  */
@@ -435,17 +381,6 @@
      boolean emptyOldChanges)
      throws Exception, SocketException
  {
    return openReplicationSession(baseDn, serverId, window_size,
        port, timeout, maxSendQueue, maxRcvQueue, emptyOldChanges,
        getGenerationId(baseDn));
  }
  protected ReplicationBroker openReplicationSession(
      final DN baseDn, int serverId, int window_size,
        int port, int timeout, int maxSendQueue, int maxRcvQueue,
        boolean emptyOldChanges, long generationId)
            throws Exception, SocketException
  {
    ServerState state = new ServerState();
    if (emptyOldChanges)
@@ -453,37 +388,13 @@
    ReplicationBroker broker = new ReplicationBroker(null,
        state, baseDn.toNormalizedString(), serverId, window_size,
        generationId, 0, getReplSessionSecurity(), (byte)1, 500);
        getGenerationId(baseDn), 0, getReplSessionSecurity(), (byte)1, 500);
    ArrayList<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    if (emptyOldChanges)
    {
      /*
       * loop receiving update until there is nothing left
       * to make sure that message from previous tests have been consumed.
       */
      try
      {
        while (true)
        {
          ReplicationMsg rMsg = broker.receive();
          if (rMsg instanceof ErrorMsg)
          {
            ErrorMsg eMsg = (ErrorMsg)rMsg;
            logError(new MessageBuilder(
                "ReplicationTestCase/openReplicationSession ").append(
                " received ErrorMessage when emptying old changes ").append(
                eMsg.getDetails()).toMessage());
          }
        }
      }
      catch (Exception e)
      { }
    }
    return broker;
  }
@@ -575,11 +486,14 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
      " ##### Calling ReplicationTestCase.classCleanUp ##### "));
    // Clean RS databases
    cleanUpReplicationServersDB();
    cleanConfigEntries();
    configEntryList = null;
    configEntryList = new LinkedList<DN>();
    cleanRealEntries();
    entryList = null;
    entryList = new LinkedList<DN>();
    // Clear the test backend (TestCaseUtils.TEST_ROOT_DN_STRING)
    // (in case our test created some emtries in it)
@@ -631,6 +545,10 @@
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-server)",
      "Found unexpected replication server config left");
    // Be sure that no replication server instance is left
    List<ReplicationServer> allRSInstances = ReplicationServer.getAllInstances();
    assertTrue(allRSInstances.size() == 0, "Some replication servers left: " + allRSInstances);
    // Check for config entries for replication domain
    assertNoConfigEntriesWithFilter("(objectclass=ds-cfg-replication-domain)",
      "Found unexpected replication domain config left");
@@ -648,6 +566,17 @@
  }
  /**
   * Cleanup databases of the currently instantiated replication servers in the
   * VM
   */
  protected void cleanUpReplicationServersDB() {
    for (ReplicationServer rs : ReplicationServer.getAllInstances()) {
      rs.clearDb();
    }
  }
  /**
   * Performs a search on the config backend with the specified filter.
   * Fails if a config entry is found.
   * @param filter The filter to apply for the search
@@ -1266,4 +1195,90 @@
      // done
    }
  }
  /**
   * Wait for the arrival of a specific message type on the provided session
   * before going in timeout and failing.
   * @param session Session from which we should receive the message.
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      try
      {
        replMsg = session.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  }
  /**
   * Wait for the arrival of a specific message type on the provided broker
   * before going in timeout and failing.
   * @param broker Broker from which we should receive the message.
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      try
      {
        replMsg = broker.receive();
      } catch (Exception ex)
      {
        fail("Exception waiting for " + msgType + " message : " +
          ex.getClass().getName()  + " : " + ex.getMessage());
      }
      // Get message type
      String rcvMsgType = replMsg.getClass().getName();
      if (rcvMsgType.equals(msgType))
      {
        // Ok, got it, let's return the expected message
        return replMsg;
      }
      TRACER.debugInfo("waitForSpecificMsg received : " + replMsg);
      nMsg++;
      curTime = System.currentTimeMillis();
    }
    // Timeout
    fail("Failed to receive an expected " + msgType +
      " message after 5 seconds : also received " + nMsg +
      " other messages during wait time.");
    return null;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/SchemaReplicationTest.java
@@ -128,6 +128,8 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : pushSchemaChange "));
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("cn=schema");
    ReplicationBroker broker =
@@ -216,6 +218,8 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : replaySchemaChange "));
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("cn=schema");
    ReplicationBroker broker =
@@ -253,6 +257,8 @@
    logError(Message.raw(Category.SYNC, Severity.NOTICE,
        "Starting replication test : pushSchemaFilesChange "));
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("cn=schema");
    ReplicationBroker broker =
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/UpdateOperationTest.java
@@ -67,6 +67,7 @@
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -300,6 +301,9 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting synchronization test : toggleReceiveStatus"));
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    /*
@@ -379,6 +383,9 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting replication test : lostHeartbeatFailover"));
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    /*
@@ -483,6 +490,9 @@
         DirectoryServer.getAttributeType("entryuuid");
    String monitorAttr = "resolved-modify-conflicts";
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    /*
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
@@ -610,6 +620,9 @@
    String resolvedMonitorAttr = "resolved-naming-conflicts";
    String unresolvedMonitorAttr = "unresolved-naming-conflicts";
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    /*
     * Open a session to the replicationServer using the ReplicationServer broker API.
     * This must use a serverId different from the LDAP server ID
@@ -1302,6 +1315,18 @@
    return new Object[][] { { false }, {true} };
  }
  private void cleanupTest() {
    try
    {
      classCleanUp();
      setUp();
    } catch (Exception e)
    {
      fail("Test cleanup failed: " + e.getClass().getName() + " : " +
        e.getMessage() + " : " + StaticUtils.stackTraceToSingleLineString(e));
    }
  }
  /**
   * Tests done using directly the ReplicationBroker interface.
   */
@@ -1312,6 +1337,9 @@
        Category.SYNC, Severity.INFORMATION,
        "Starting replication test : updateOperations " + assured));
    // Cleanup from previous run
    cleanupTest();
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    ReplicationBroker broker =
@@ -1341,15 +1369,15 @@
        // Check if the client has received the msg
        ReplicationMsg msg = broker.receive();
        assertTrue(msg instanceof AddMsg,
        "The received replication message is not an ADD msg");
        "The received replication message is not an ADD msg : " + msg);
        AddMsg addMsg =  (AddMsg) msg;
        Operation receivedOp = addMsg.createOperation(connection);
        assertTrue(OperationType.ADD.compareTo(receivedOp.getOperationType()) == 0,
        "The received replication message is not an ADD msg");
        "The received replication message is not an ADD msg : " + addMsg);
        assertEquals(DN.decode(addMsg.getDn()),personEntry.getDN(),
        "The received ADD replication message is not for the excepted DN");
        "The received ADD replication message is not for the excepted DN : " + addMsg);
      }
      // Modify the entry
@@ -1364,12 +1392,12 @@
      // See if the client has received the msg
      ReplicationMsg msg = broker.receive();
      assertTrue(msg instanceof ModifyMsg,
      "The received replication message is not a MODIFY msg");
      "The received replication message is not a MODIFY msg : " + msg);
      ModifyMsg modMsg = (ModifyMsg) msg;
      modMsg.createOperation(connection);
      assertTrue(DN.decode(modMsg.getDn()).compareTo(personEntry.getDN()) == 0,
      "The received MODIFY replication message is not for the excepted DN");
      "The received MODIFY replication message is not for the excepted DN : " + modMsg);
      // Modify the entry DN
      DN newDN = DN.decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING) ;
@@ -1387,12 +1415,12 @@
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof ModifyDNMsg,
      "The received replication message is not a MODIFY DN msg");
      "The received replication message is not a MODIFY DN msg : " + msg);
      ModifyDNMsg moddnMsg = (ModifyDNMsg) msg;
      moddnMsg.createOperation(connection);
      assertTrue(DN.decode(moddnMsg.getDn()).compareTo(personEntry.getDN()) == 0,
      "The received MODIFY_DN message is not for the excepted DN");
      "The received MODIFY_DN message is not for the excepted DN : " + moddnMsg);
      // Delete the entry
      DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
@@ -1406,12 +1434,12 @@
      // See if the client has received the msg
      msg = broker.receive();
      assertTrue(msg instanceof DeleteMsg,
      "The received replication message is not a MODIFY DN msg");
      "The received replication message is not a MODIFY DN msg : " + msg);
      DeleteMsg delMsg = (DeleteMsg) msg;
      delMsg.createOperation(connection);
      assertTrue(DN.decode(delMsg.getDn()).compareTo(DN
          .decode("uid= new person,ou=People," + TEST_ROOT_DN_STRING)) == 0,
      "The received DELETE message is not for the excepted DN");
      "The received DELETE message is not for the excepted DN : " + delMsg);
      /*
       * Now check that when we send message to the ReplicationServer
@@ -1512,6 +1540,9 @@
    logError(Message.raw(Category.SYNC, Severity.INFORMATION,
        "Starting replication test : deleteNoSuchObject"));
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    DN dn = DN.decode("cn=No Such Object,ou=People," + TEST_ROOT_DN_STRING);
    DeleteOperationBasis op =
         new DeleteOperationBasis(connection,
@@ -1535,6 +1566,9 @@
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    Thread.sleep(2000);
    ReplicationBroker broker =
      openReplicationSession(baseDn,  11, 100, replServerPort, 1000, true);
@@ -1675,6 +1709,9 @@
    final DN baseDn = DN.decode("ou=People," + TEST_ROOT_DN_STRING);
    // Clean replication server database from previous run
    cleanUpReplicationServersDB();
    /*
     * Open a session to the replicationServer using the broker API.
     * This must use a different serverId to that of the directory server.
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -501,7 +501,7 @@
        // Send topo view
        List<RSInfo> rsList = new ArrayList<RSInfo>();
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
        rsList.add(rsInfo);
        TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
          rsList);
@@ -719,7 +719,7 @@
    }
    /**
     * Read the coming seaf read mode updates and send back acks with errors
     * Read the coming safe read mode updates and send back acks with errors
     */
    private void executeSafeReadManyErrorsScenario()
    {
@@ -1058,7 +1058,7 @@
  }
  /**
   * Tests parameters sent in session handshake an updates, when not using
   * Tests parameters sent in session handshake and updates, when not using
   * assured replication
   */
  @Test
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/TopologyViewTest.java
@@ -836,7 +836,7 @@
        fail("Unknown replication server id.");
    }
    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId);
    return new RSInfo(rsId, TEST_DN_WITH_ROOT_ENTRY_GENID, (byte)groupId, 1);
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ProtocolCompatibilityTest.java
@@ -1092,11 +1092,11 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -1026,13 +1026,13 @@
    dsList4.add(dsInfo2);
    dsList4.add(dsInfo1);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103);
    RSInfo rsInfo1 = new RSInfo(4527, (long)45316, (byte)103, 1);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0);
    RSInfo rsInfo2 = new RSInfo(4527, (long)0, (byte)0, 1);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98);
    RSInfo rsInfo3 = new RSInfo(0, (long)-21113, (byte)98, 1);
    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98);
    RSInfo rsInfo4 = new RSInfo(45678, (long)-21113, (byte)98, 1);
    List<RSInfo> rsList1 = new ArrayList<RSInfo>();
    rsList1.add(rsInfo1);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -595,6 +594,9 @@
      ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
        replServers, groupId, assuredTimeout, 5000);
      // No monitoring publisher to not interfer with some SocketTimeoutException
      // expected at some points in these tests
      conf.setMonitoringPeriod(0L);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      return replicationServer;
@@ -967,7 +969,7 @@
        }
        // Send our topo mesg
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId, 1);
        List<RSInfo> rsInfos = new ArrayList<RSInfo>();
        rsInfos.add(rsInfo);
        TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplServerFakeConfiguration.java
@@ -63,8 +63,11 @@
  // The weight of the server
  private int weight = 1;
  // The monitoring publisher period
  private long monitoringPeriod = 3000;
  /**
   * Constructor without goup id, assured info and weight
   * Constructor without group id, assured info and weight
   */
  public ReplServerFakeConfiguration(
      int port, String dirName, int purgeDelay, int serverId,
@@ -254,4 +257,17 @@
    return weight;
  }
  public long getMonitoringPeriod()
  {
    return monitoringPeriod;
  }
  /**
   * @param monitoringPeriod the monitoringPeriod to set
   */
  public void setMonitoringPeriod(long monitoringPeriod)
  {
    this.monitoringPeriod = monitoringPeriod;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -74,7 +74,6 @@
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
@@ -1003,7 +1002,7 @@
            ProtocolVersion.getCurrentVersion(), 0, sslEncryption, (byte)-1);
      session.publish(msg);
      // Read the Replication Server state from the ReplServerStartMsg that
      // Read the Replication Server state from the ReplServerStartDSMsg that
      // comes back.
      ReplServerStartDSMsg replStartDSMsg =
        (ReplServerStartDSMsg) session.receive();
@@ -1079,7 +1078,8 @@
      // check that this did not change the window by sending a probe again.
      session.publish(new WindowProbeMsg());
      windowMsg = (WindowMsg) session.receive();
      // We may receive some MonitoringMsg so use filter method
      windowMsg = (WindowMsg)waitForSpecificMsg(session, WindowMsg.class.getName());
      assertEquals(serverwindow, windowMsg.getNumAck());
      debugInfo("Ending windowProbeTest");
    }