mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
10.43.2009 ccc4127f23f63214f4dc2f94d26a021a3ec2eec6
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -26,56 +26,59 @@
 */
package org.opends.server.replication.server;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.Iterator;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeBuilder;
import org.opends.server.types.Attributes;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
/**
 * This class define an in-memory cache that will be used to store
@@ -111,8 +114,8 @@
   * to this replication server.
   *
   */
  private final Map<Short, ServerHandler> directoryServers =
    new ConcurrentHashMap<Short, ServerHandler>();
  private final Map<Short, DataServerHandler> directoryServers =
    new ConcurrentHashMap<Short, DataServerHandler>();
  /*
   * This map contains one ServerHandler for each replication servers
@@ -123,8 +126,11 @@
   * We add new TreeSet in the HashMap when a new replication server register
   * to this replication server.
   */
  private final Map<Short, ServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ServerHandler>();
  private final Map<Short, ReplicationServerHandler> replicationServers =
    new ConcurrentHashMap<Short, ReplicationServerHandler>();
  private final ConcurrentLinkedQueue<MessageHandler> otherHandlers =
    new ConcurrentLinkedQueue<MessageHandler>();
  /*
   * This map contains the List of updates received from each
@@ -134,16 +140,14 @@
    new ConcurrentHashMap<Short, DbHandler>();
  private ReplicationServer replicationServer;
  /* GenerationId management */
  // GenerationId management
  private long generationId = -1;
  private boolean generationIdSavedStatus = false;
  /**
   * The tracer object for the debug logger.
   */
  // The tracer object for the debug logger.
  private static final DebugTracer TRACER = getTracer();
  /* Monitor data management */
  // Monitor data management
  /**
   * The monitor data consolidated over the topology.
   */
@@ -346,9 +350,9 @@
    /*
     * Push the message to the replication servers
     */
    if (sourceHandler.isLDAPserver())
    if (sourceHandler.isDataServer())
    {
      for (ServerHandler handler : replicationServers.values())
      for (ReplicationServerHandler handler : replicationServers.values())
      {
        /**
         * Ignore updates to RS with bad gen id
@@ -397,7 +401,7 @@
    /*
     * Push the message to the LDAP servers
     */
    for (ServerHandler handler : directoryServers.values())
    for (DataServerHandler handler : directoryServers.values())
    {
      // Don't forward the change to the server that just sent it
      if (handler == sourceHandler)
@@ -467,6 +471,14 @@
        handler.add(update, sourceHandler);
      }
    }
    // Push the message to the other subscribing handlers
    Iterator<MessageHandler> otherIter = otherHandlers.iterator();
    while (otherIter.hasNext())
    {
      MessageHandler handler = otherIter.next();
      handler.add(update, sourceHandler);
    }
  }
  /**
@@ -522,10 +534,10 @@
    if (sourceGroupId == groupId)
      // Assured feature does not cross different group ids
    {
      if (sourceHandler.isLDAPserver())
      if (sourceHandler.isDataServer())
      {
        // Look for RS eligible for assured
        for (ServerHandler handler : replicationServers.values())
        for (ReplicationServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            // No ack expected from a RS with different group id
@@ -541,7 +553,7 @@
      }
      // Look for DS eligible for assured
      for (ServerHandler handler : directoryServers.values())
      for (DataServerHandler handler : directoryServers.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
@@ -636,7 +648,7 @@
        (generationId == sourceHandler.getGenerationId()))
        // Ignore assured updates from wrong generationId servers
      {
        if (sourceHandler.isLDAPserver())
        if (sourceHandler.isDataServer())
        {
          if (safeDataLevel == (byte) 1)
          {
@@ -689,10 +701,10 @@
    List<Short> expectedServers = new ArrayList<Short>();
    if (interestedInAcks)
    {
      if (sourceHandler.isLDAPserver())
      if (sourceHandler.isDataServer())
      {
        // Look for RS eligible for assured
        for (ServerHandler handler : replicationServers.values())
        for (ReplicationServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            // No ack expected from a RS with different group id
@@ -879,7 +891,7 @@
            origServer.incrementAssuredSrReceivedUpdatesTimeout();
          } else
          {
            if (origServer.isLDAPserver())
            if (origServer.isDataServer())
            {
              origServer.incrementAssuredSdReceivedUpdatesTimeout();
            }
@@ -957,7 +969,7 @@
   */
  public void stopReplicationServers(Collection<String> replServers)
  {
    for (ServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
        stopServer(handler);
@@ -970,13 +982,13 @@
  public void stopAllServers()
  {
    // Close session with other replication servers
    for (ServerHandler serverHandler : replicationServers.values())
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    {
      stopServer(serverHandler);
    }
    // Close session with other LDAP servers
    for (ServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : directoryServers.values())
    {
      stopServer(serverHandler);
    }
@@ -988,14 +1000,13 @@
   * @param handler the DS we want to check
   * @return true if this is not a duplicate server
   */
  public boolean checkForDuplicateDS(ServerHandler handler)
  public boolean checkForDuplicateDS(DataServerHandler handler)
  {
    ServerHandler oldHandler = directoryServers.get(handler.getServerId());
    DataServerHandler oldHandler = directoryServers.get(handler.getServerId());
    if (directoryServers.containsKey(handler.getServerId()))
    {
      // looks like two LDAP servers have the same serverId
      // log an error message and drop this connection.
      Message message = ERR_DUPLICATE_SERVER_ID.get(
        replicationServer.getMonitorInstanceName(), oldHandler.toString(),
        handler.toString(), handler.getServerId());
@@ -1012,6 +1023,12 @@
   */
  public void stopServer(ServerHandler handler)
  {
      if (debugEnabled())
        TRACER.debugInfo(
            "In RS " + this.replicationServer.getMonitorInstanceName() +
            " domain=" + this +
            " stopServer(SH)" + handler.getMonitorInstanceName() +
          " " + stackTraceToSingleLineString(new Exception()));
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
@@ -1020,14 +1037,103 @@
     * or not (is already being processed or not).
     */
    if (!handler.engageShutdown())
    // Only do this once (prevent other thread to enter here again)
      // Only do this once (prevent other thread to enter here again)
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In RS " + this.replicationServer.getMonitorInstanceName() +
          " for " + baseDn + " " +
          " stopServer " + handler.getMonitorInstanceName());
      try
      {
        try
        {
          // Acquire lock on domain (see more details in comment of start()
          // method of ServerHandler)
          lock();
        } catch (InterruptedException ex)
        {
          // Try doing job anyway...
        }
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsValue(handler))
          {
            unregisterServerHandler(handler);
            handler.shutdown();
            // Check if generation id has to be resetted
            mayResetGenerationId();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
          }
        } else
        {
          if (directoryServers.containsValue(handler))
          {
            // If this is the last DS for the domain,
            // shutdown the status analyzer
            if (directoryServers.size() == 1)
            {
              if (debugEnabled())
                TRACER.debugInfo("In " +
                    replicationServer.getMonitorInstanceName() +
                    " remote server " + handler.getMonitorInstanceName() +
                " is the last DS to be stopped: stopping status analyzer");
              stopStatusAnalyzer();
            }
            unregisterServerHandler(handler);
            handler.shutdown();
            // Check if generation id has to be resetted
            mayResetGenerationId();
            // Update the remote replication servers with our list
            // of connected LDAP servers
            buildAndSendTopoInfoToRSs();
            // Warn our DSs that a RS or DS has quit (does not use this
            // handler as already removed from list)
            buildAndSendTopoInfoToDSs(null);
          }
          else if (otherHandlers.contains(handler))
          {
            unRegisterHandler(handler);
            handler.shutdown();
          }
        }
      }
      catch(Exception e)
      {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            stackTraceToSingleLineString(e)));
      }
      finally
      {
        release();
      }
    }
  }
  /**
   * Stop the handler.
   * @param handler The handler to stop.
   */
  public void stopServer(MessageHandler handler)
  {
    if (debugEnabled())
      TRACER.debugInfo(
          "In RS " + this.replicationServer.getMonitorInstanceName() +
          " domain=" + this +
          " stopServer(MH)" + handler.getMonitorInstanceName() +
          " " + stackTraceToSingleLineString(new Exception()));
    /*
     * We must prevent deadlock on replication server domain lock, when for
     * instance this code is called from dying ServerReader but also dying
     * ServerWriter at the same time, or from a thread that wants to shut down
     * the handler. So use a thread safe flag to know if the job must be done
     * or not (is already being processed or not).
     */
    if (!handler.engageShutdown())
      // Only do this once (prevent other thread to enter here again)
    {
      try
      {
        // Acquire lock on domain (see more details in comment of start()
@@ -1037,57 +1143,40 @@
      {
        // Try doing job anyway...
      }
      if (handler.isReplicationServer())
      if (otherHandlers.contains(handler))
      {
        if (replicationServers.containsValue(handler))
        {
          replicationServers.remove(handler.getServerId());
          handler.shutdown();
          // Check if generation id has to be resetted
          mayResetGenerationId();
          // Warn our DSs that a RS or DS has quit (does not use this
          // handler as already removed from list)
          sendTopoInfoToDSs(null);
        }
      } else
      {
        if (directoryServers.containsValue(handler))
        {
          // If this is the last DS for the domain, shutdown the status analyzer
          if (directoryServers.size() == 1)
          {
            if (debugEnabled())
              TRACER.debugInfo("In " +
                replicationServer.getMonitorInstanceName() +
                " remote server " + handler.getMonitorInstanceName() +
                " is the last DS to be stopped: stopping status analyzer");
            stopStatusAnalyzer();
          }
          directoryServers.remove(handler.getServerId());
          handler.shutdown();
          // Check if generation id has to be resetted
          mayResetGenerationId();
          // Update the remote replication servers with our list
          // of connected LDAP servers
          sendTopoInfoToRSs();
          // Warn our DSs that a RS or DS has quit (does not use this
          // handler as already removed from list)
          sendTopoInfoToDSs(null);
        }
        unRegisterHandler(handler);
        handler.shutdown();
      }
    }
    release();
  }
      release();
  /**
   * Unregister this handler from the list of handlers registered to this
   * domain.
   * @param handler the provided handler to unregister.
   */
  protected void unregisterServerHandler(ServerHandler handler)
  {
    if (handler.isReplicationServer())
    {
      replicationServers.remove(handler.getServerId());
    }
    else
    {
      directoryServers.remove(handler.getServerId());
    }
  }
  /**
   * Resets the generationId for this domain if there is no LDAP
   * server currently connected and if the generationId has never
   * been saved.
   * This method resets the generationId for this domain if there is no LDAP
   * server currently connected in the whole topology on this domain and
   * if the generationId has never been saved.
   *
   * - test emtpyness of directoryServers list
   * - traverse replicationServers list and test for each if DS are connected
   * So it strongly relies on the directoryServers list
   */
  protected void mayResetGenerationId()
  {
@@ -1104,7 +1193,7 @@
    boolean lDAPServersConnectedInTheTopology = false;
    if (directoryServers.isEmpty())
    {
      for (ServerHandler rsh : replicationServers.values())
      for (ReplicationServerHandler rsh : replicationServers.values())
      {
        if (generationId != rsh.getGenerationId())
        {
@@ -1149,14 +1238,16 @@
  }
  /**
   * Checks that a RS is not already connected.
   *
   * @param handler the RS we want to check
   * @return true if this is not a duplicate server
   * Checks that a remote RS is not already connected to this hosting RS.
   * @param handler The handler for the remote RS.
   * @return flag specifying whether the remote RS is already connected.
   * @throws DirectoryException when a problem occurs.
   */
  public boolean checkForDuplicateRS(ServerHandler handler)
  public boolean checkForDuplicateRS(ReplicationServerHandler handler)
  throws DirectoryException
  {
    ServerHandler oldHandler = replicationServers.get(handler.getServerId());
    ReplicationServerHandler oldHandler =
      replicationServers.get(handler.getServerId());
    if ((oldHandler != null))
    {
      if (oldHandler.getServerAddressURL().equals(
@@ -1166,7 +1257,9 @@
        // have been sent at about the same time and 2 connections
        // have been established.
        // Silently drop this connection.
        } else
        return false;
      }
      else
      {
        // looks like two replication servers have the same serverId
        // log an error message and drop this connection.
@@ -1174,9 +1267,8 @@
          replicationServer.getMonitorInstanceName(), oldHandler.
          getServerAddressURL(), handler.getServerAddressURL(),
          handler.getServerId());
        logError(message);
        throw new DirectoryException(ResultCode.OTHER, message);
      }
      return false;
    }
    return true;
  }
@@ -1223,7 +1315,7 @@
  {
    LinkedHashSet<String> mySet = new LinkedHashSet<String>();
    for (ServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      mySet.add(handler.getServerAddressURL());
    }
@@ -1232,7 +1324,9 @@
  }
  /**
   * Return a Set containing the servers known by this replicationServer.
   * Return a set containing the server that produced update and known by
   * this replicationServer from all over the topology,
   * whatever directly connected of connected to another RS.
   * @return a set containing the servers known by this replicationServer.
   */
  public Set<Short> getServers()
@@ -1250,7 +1344,7 @@
  {
    List<String> mySet = new ArrayList<String>(0);
    for (ServerHandler handler : directoryServers.values())
    for (DataServerHandler handler : directoryServers.values())
    {
      mySet.add(String.valueOf(handler.getServerId()));
    }
@@ -1348,7 +1442,7 @@
      {
        // Send to all replication servers with a least one remote
        // server connected
        for (ServerHandler rsh : replicationServers.values())
        for (ReplicationServerHandler rsh : replicationServers.values())
        {
          if (rsh.hasRemoteLDAPServers())
          {
@@ -1358,7 +1452,7 @@
      }
      // Sends to all connected LDAP servers
      for (ServerHandler destinationHandler : directoryServers.values())
      for (DataServerHandler destinationHandler : directoryServers.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
@@ -1368,7 +1462,7 @@
    } else
    {
      // Destination is one server
      ServerHandler destinationHandler =
      DataServerHandler destinationHandler =
        directoryServers.get(msg.getDestination());
      if (destinationHandler != null)
      {
@@ -1378,9 +1472,9 @@
        // the targeted server is NOT connected
        // Let's search for THE changelog server that MAY
        // have the targeted server connected.
        if (senderHandler.isLDAPserver())
        if (senderHandler.isDataServer())
        {
          for (ServerHandler h : replicationServers.values())
          for (ReplicationServerHandler h : replicationServers.values())
          {
            // Send to all replication servers with a least one remote
            // server connected
@@ -1421,7 +1515,7 @@
        // build the full list of all servers in the topology
        // and send back a MonitorMsg with the full list of all the servers
        // in the topology.
        if (senderHandler.isLDAPserver())
        if (senderHandler.isDataServer())
        {
          MonitorMsg returnMsg =
            new MonitorMsg(msg.getDestination(), msg.getsenderID());
@@ -1481,7 +1575,7 @@
        // from the states stored in the serverHandler.
        // - the server state
        // - the older missing change
        for (ServerHandler lsh : this.directoryServers.values())
        for (DataServerHandler lsh : this.directoryServers.values())
        {
          monitorMsg.setServerState(
            lsh.getServerId(),
@@ -1491,7 +1585,7 @@
        }
        // Same for the connected RS
        for (ServerHandler rsh : this.replicationServers.values())
        for (ReplicationServerHandler rsh : this.replicationServers.values())
        {
          monitorMsg.setServerState(
            rsh.getServerId(),
@@ -1657,12 +1751,12 @@
   */
  public void checkAllSaturation() throws IOException
  {
    for (ServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      handler.checkWindow();
    }
    for (ServerHandler handler : directoryServers.values())
    for (DataServerHandler handler : directoryServers.values())
    {
      handler.checkWindow();
    }
@@ -1675,15 +1769,15 @@
   * @return true if the server can restart sending changes.
   *         false if the server can't restart sending changes.
   */
  public boolean restartAfterSaturation(ServerHandler sourceHandler)
  public boolean restartAfterSaturation(MessageHandler sourceHandler)
  {
    for (ServerHandler handler : replicationServers.values())
    for (MessageHandler handler : replicationServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
        return false;
    }
    for (ServerHandler handler : directoryServers.values())
    for (MessageHandler handler : directoryServers.values())
    {
      if (!handler.restartAfterSaturation(sourceHandler))
        return false;
@@ -1698,9 +1792,9 @@
   * @param notThisOne If not null, the topology message will not be sent to
   * this passed server.
   */
  public void sendTopoInfoToDSs(ServerHandler notThisOne)
  public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
  {
    for (ServerHandler handler : directoryServers.values())
    for (DataServerHandler handler : directoryServers.values())
    {
      if ((notThisOne == null) || // All DSs requested
        ((notThisOne != null) && (handler != notThisOne)))
@@ -1725,10 +1819,10 @@
   * Send a TopologyMsg to all the connected replication servers
   * in order to let them know our connected LDAP servers.
   */
  public void sendTopoInfoToRSs()
  public void buildAndSendTopoInfoToRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : replicationServers.values())
    {
      try
      {
@@ -1755,7 +1849,7 @@
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    // Go through every DSs
    for (ServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : directoryServers.values())
    {
      dsInfos.add(serverHandler.toDSInfo());
    }
@@ -1785,7 +1879,7 @@
    List<RSInfo> rsInfos = new ArrayList<RSInfo>();
    // Go through every DSs (except recipient of msg)
    for (ServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : directoryServers.values())
    {
      if (serverHandler.getServerId() == destDsId)
        continue;
@@ -1799,7 +1893,7 @@
    // Go through every peer RSs (and get their connected DSs), also add info
    // for RSs
    for (ServerHandler serverHandler : replicationServers.values())
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    {
      // Put RS info
      rsInfos.add(serverHandler.toRSInfo());
@@ -1840,12 +1934,6 @@
  synchronized public long setGenerationId(long generationId,
    boolean savedStatus)
  {
    if (debugEnabled())
      TRACER.debugInfo(
        "In " + this.replicationServer.getMonitorInstanceName() +
        " baseDN=" + baseDn +
        " RCache.set GenerationId=" + generationId);
    long oldGenerationId = this.generationId;
    if (this.generationId != generationId)
@@ -1916,7 +2004,7 @@
        // After we'll have sent the message , the remote RS will adopt
        // the new genId
        rsHandler.setGenerationId(newGenId);
        if (senderHandler.isLDAPserver())
        if (senderHandler.isDataServer())
        {
          rsHandler.forwardGenerationIdToRS(genIdMsg);
        }
@@ -1929,7 +2017,7 @@
    // Change status of the connected DSs according to the requested new
    // reference generation id
    for (ServerHandler dsHandler : directoryServers.values())
    for (DataServerHandler dsHandler : directoryServers.values())
    {
      try
      {
@@ -1948,8 +2036,8 @@
    // (consecutive to reset gen id message), we prefer advertising once for
    // all after changes (less packet sent), here at the end of the reset msg
    // treatment.
    sendTopoInfoToDSs(null);
    sendTopoInfoToRSs();
    buildAndSendTopoInfoToDSs(null);
    buildAndSendTopoInfoToRSs();
    Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
      Long.toString(newGenId));
@@ -1964,7 +2052,7 @@
   *        that changed his status.
   * @param csMsg The message containing the new status
   */
  public void processNewStatus(ServerHandler senderHandler,
  public void processNewStatus(DataServerHandler senderHandler,
    ChangeStatusMsg csMsg)
  {
    if (debugEnabled())
@@ -1995,8 +2083,8 @@
    }
    // Update every peers (RS/DS) with topology changes
    sendTopoInfoToDSs(senderHandler);
    sendTopoInfoToRSs();
    buildAndSendTopoInfoToDSs(senderHandler);
    buildAndSendTopoInfoToRSs();
    Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
      Short.toString(senderHandler.getServerId()),
@@ -2014,7 +2102,7 @@
   * @param event The event to be used for new status computation
   * @return True if we have been interrupted (must stop), false otherwise
   */
  public boolean changeStatusFromStatusAnalyzer(ServerHandler serverHandler,
  public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
    StatusMachineEvent event)
  {
    try
@@ -2066,8 +2154,8 @@
    }
    // Update every peers (RS/DS) with topology changes
    sendTopoInfoToDSs(serverHandler);
    sendTopoInfoToRSs();
    buildAndSendTopoInfoToDSs(serverHandler);
    buildAndSendTopoInfoToRSs();
    release();
    return false;
@@ -2169,10 +2257,12 @@
   * @param allowResetGenId True for allowing to reset the generation id (
   * when called after initial handshake)
   * @throws IOException If an error occurred.
   * @throws DirectoryException If an error occurred.
   */
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg, ServerHandler handler,
  public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
      ReplicationServerHandler handler,
    boolean allowResetGenId)
    throws IOException
    throws IOException, DirectoryException
  {
    if (debugEnabled())
    {
@@ -2186,7 +2276,8 @@
    {
      // Acquire lock on domain (see more details in comment of start() method
      // of ServerHandler)
      lock();
      if (!hasLock())
        lock();
    } catch (InterruptedException ex)
    {
      // Try doing job anyway...
@@ -2228,7 +2319,7 @@
     * Sends the currently known topology information to every connected
     * DS we have.
     */
    sendTopoInfoToDSs(null);
    buildAndSendTopoInfoToDSs(null);
    release();
  }
@@ -2441,7 +2532,7 @@
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ServerHandler rsjHdr = replicationServers.get(rsid);
            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
            if (rsjHdr != null)
            {
              for (short remotelsid : rsjHdr.getConnectedDirectoryServerIds())
@@ -2496,7 +2587,7 @@
   * Get the map of connected DSs.
   * @return The map of connected DSs
   */
  public Map<Short, ServerHandler> getConnectedDSs()
  public Map<Short, DataServerHandler> getConnectedDSs()
  {
    return directoryServers;
  }
@@ -2505,7 +2596,7 @@
   * Get the map of connected RSs.
   * @return The map of connected RSs
   */
  public Map<Short, ServerHandler> getConnectedRSs()
  public Map<Short, ReplicationServerHandler> getConnectedRSs()
  {
    return replicationServers;
  }
@@ -2708,5 +2799,140 @@
    return attributes;
  }
}
  /**
   * Register in the domain an handler that subscribes to changes.
   * @param handler the provided subscribing handler.
   */
  public void registerHandler(MessageHandler handler)
  {
    this.otherHandlers.add(handler);
  }
  /**
   * Unregister from the domain an handler.
   * @param handler the provided unsubscribing handler.
   * @return Whether this handler has been unregistered with success.
   */
  public boolean unRegisterHandler(MessageHandler handler)
  {
    return this.otherHandlers.remove(handler);
  }
  /**
   * Return the state that contain for each server the time of eligibility.
   * @return the state.
   */
  public ServerState getHeartbeatState()
  {
    // TODO:ECL Eligility must be supported
    return this.getDbServerState();
  }
  /**
   * Computes the change number eligible to the ECL.
   * @return null if the domain does not play in eligibility.
   */
  public ChangeNumber computeEligibleCN()
  {
    ChangeNumber elligibleCN = null;
    ServerState heartbeatState = getHeartbeatState();
    if (heartbeatState==null)
      return null;
    // compute elligible CN
    ServerState hbState = heartbeatState.duplicate();
    Iterator<Short> it = hbState.iterator();
    while (it.hasNext())
    {
      short sid = it.next();
      ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the server is considered down and not considered for eligibility
      if (TimeThread.getTime()-storedCN.getTime()>2000)
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "For RSD." + this.baseDn + " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
      {
        elligibleCN = storedCN;
      }
    }
    if (debugEnabled())
      TRACER.debugInfo(
        "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN);
    return elligibleCN;
  }
  /**
   * Computes the eligible server state by minimizing the dbServerState and the
   * elligibleCN.
   * @return The computed eligible server state.
   */
  public ServerState getCLElligibleState()
  {
    // ChangeNumber elligibleCN = computeEligibleCN();
    ServerState res = new ServerState();
    ServerState dbState = this.getDbServerState();
    res = dbState;
    /* TODO:ECL Eligibility is not yet implemented
    Iterator<Short> it = dbState.iterator();
    while (it.hasNext())
    {
      Short sid = it.next();
      DbHandler h = sourceDbHandlers.get(sid);
      ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
      try
      {
        if ((elligibleCN!=null)&&(elligibleCN.older(dbCN)))
        {
          // some CN exist in the db newer than elligible CN
          ReplicationIterator ri = h.generateIterator(elligibleCN);
          ChangeNumber newCN = ri.getCurrentCN();
          res.update(newCN);
          ri.releaseCursor();
        }
        else
        {
          // no CN exist in the db newer than elligible CN
          res.update(dbCN);
        }
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
    */
    if (debugEnabled())
      TRACER.debugInfo("In " + this.getName()
        + " getCLElligibleState returns:" + res);
    return res;
  }
  /**
   * Returns the start state of the domain, made of the first (oldest)
   * change stored for each serverId.
   * @return t start state of the domain.
   */
  public ServerState getStartState()
  {
    ServerState domainStartState = new ServerState();
    Iterator<Short> it = this.getDbServerState().iterator();
    for (DbHandler dbHandler : sourceDbHandlers.values())
    {
      domainStartState.update(dbHandler.getFirstChange());
    }
    return domainStartState;
  }
}