mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
08.03.2008 7adb93986ace907531875e25be1f94d735fbb068
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,7 +50,6 @@
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -58,17 +57,21 @@
import org.opends.server.replication.protocol.MonitorMsg;
import org.opends.server.replication.protocol.MonitorRequestMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.ReentrantLock;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeStatusMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
/**
 * This class define an in-memory cache that will be used to store
@@ -88,9 +91,8 @@
 */
public class ReplicationServerDomain
{
  private final Object flowControlLock = new Object();
  private final DN baseDn;
  private final String baseDn;
  // The Status analyzer that periodically verifis if the connected DSs are
  // late or not
  private StatusAnalyzer statusAnalyzer = null;
@@ -154,16 +156,38 @@
  private MonitorData wrkMonitorData;
  /**
   * The needed info for each received assured update message we are waiting
   * acks for.
   * Key: a change number matching a received update message which requested
   * assured mode usage (either safe read or safe data mode)
   * Value: The object holding every info needed about the already received acks
   * as well as the acks to be received.
   * For more details, see ExpectedAcksInfo and its sub classes javadoc.
   */
  private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
    new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
  // The timer used to run the timeout code (timer tasks) for the assured update
  // messages we are waiting acks for.
  private Timer assuredTimeoutTimer = null;
  // Counter used to purge the timer tasks referemces in assuredTimeoutTimer,
  // every n number of treated assured messages
  private int assuredTimeoutTimerPurgeCounter = 0;
  /**
   * Creates a new ReplicationServerDomain associated to the DN baseDn.
   *
   * @param baseDn The baseDn associated to the ReplicationServerDomain.
   * @param replicationServer the ReplicationServer that created this
   *                          replicationServer cache.
   */
  public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
  public ReplicationServerDomain(
      String baseDn, ReplicationServer replicationServer)
  {
    this.baseDn = baseDn;
    this.replicationServer = replicationServer;
    this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
      baseDn + " in RS " + replicationServer.getServerId(), true);
  }
  /**
@@ -179,32 +203,11 @@
  public void put(UpdateMsg update, ServerHandler sourceHandler)
    throws IOException
  {
    /*
     * TODO : In case that the source server is a LDAP server this method
     * should check that change did get pushed to at least one
     * other replication server before pushing it to the LDAP servers
     */
    short id = update.getChangeNumber().getServerId();
    ChangeNumber cn = update.getChangeNumber();
    short id = cn.getServerId();
    sourceHandler.updateServerState(update);
    sourceHandler.incrementInCount();
    if (update.isAssured())
    {
      int count = this.NumServers();
      if (count > 1)
      {
        if (sourceHandler.isReplicationServer())
          ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
            this, count - 1);
        else
          sourceHandler.addWaitingAck(update, count - 1);
      } else
      {
        sourceHandler.sendAck(update.getChangeNumber());
      }
    }
    if (generationId < 0)
    {
      generationId = sourceHandler.getGenerationId();
@@ -244,11 +247,103 @@
    // Publish the messages to the source handler
    dbHandler.add(update);
    /**
     * If this is an assured message (a message requesting ack), we must
     * construct the ExpectedAcksInfo object with the right number of expected
     * acks before posting message to the writers. Otherwise some writers may
     * have time to post, receive the ack and increment received ack counter
     * (kept in ExpectedAcksInfo object) and we could think the acknowledgment
     * is fully processed although it may be not (some other acks from other
     * servers are not yet arrived). So for that purpose we do a pre-loop
     * to determine to who we will post an assured message.
     * Whether the assured mode is safe read or safe data, we anyway do not
     * support the assured replication feature across topologies with different
     * group ids. The assured feature insures assured replication based on the
     * same locality (group id). For instance in double data center deployment
     * (2 group id usage) with assured replication enabled, an assured message
     * sent from data center 1 (group id = 1) will be sent to servers of both
     * data centers, but one will request and wait acks only from servers of the
     * data center 1.
     */
    boolean assuredMessage = update.isAssured();
    PreparedAssuredInfo preparedAssuredInfo = null;
    if (assuredMessage)
    {
      // Assured feature is supported starting from replication protocol V2
      if (sourceHandler.getProtocolVersion() >=
        ProtocolVersion.REPLICATION_PROTOCOL_V2)
      {
        // According to assured sub-mode, prepare structures to keep track of
        // the acks we are interested in.
        AssuredMode assuredMode = update.getAssuredMode();
        if (assuredMode != AssuredMode.SAFE_DATA_MODE)
        {
          preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
        } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
        {
          preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
        } else
        {
          // Unknown assured mode: should never happen
          Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
            Short.toString(replicationServer.getServerId()),
            assuredMode.toString(), baseDn, update.toString());
          logError(errorMsg);
          assuredMessage = false;
        }
      } else
      {
        assuredMessage = false;
      }
    }
    List<Short> expectedServers = null;
    if (assuredMessage)
    {
      expectedServers = preparedAssuredInfo.expectedServers;
      if (expectedServers != null)
      {
        // Store the expected acks info into the global map.
        // The code for processing reception of acks for this update will update
        // info kept in this object and if enough acks received, it will send
        // back the final ack to the requester and remove the object from this
        // map
        // OR
        // The following timer will time out and send an timeout ack to the
        // requester if the acks are not received in time. The timer will also
        // remove the object from this map.
        waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo);
        // Arm timer for this assured update message (wait for acks until it
        // times out)
        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
        assuredTimeoutTimer.schedule(assuredTimeoutTask,
          replicationServer.getAssuredTimeout());
        // Purge timer every 100 treated messages
        assuredTimeoutTimerPurgeCounter++;
        if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
          assuredTimeoutTimer.purge();
      }
    }
    /**
     * The update message equivalent to the originally received update message,
     * but with assured flag disabled. This message is the one that should be
     * sent to non elligible servers for assured mode.
     * We need a clone like of the original message with assured flag off, to be
     * posted to servers we don't want to wait the ack from (not normal status
     * servers or servers with different group id). This must be done because
     * the posted message is a reference so each writer queue gets the same
     * reference, thus, changing the assured flag of an object is done for every
     * references posted on every writer queues. That is why we need a message
     * version with assured flag on and another one with assured flag off.
     */
    NotAssuredUpdateMsg notAssuredUpdate = null;
    /*
     * Push the message to the replication servers
     */
    if (!sourceHandler.isReplicationServer())
    if (sourceHandler.isLDAPserver())
    {
      for (ServerHandler handler : replicationServers.values())
      {
@@ -261,7 +356,7 @@
          if (debugEnabled())
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
              " for dn " + baseDn.toNormalizedString() + ", update " +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to replication server " +
              Short.toString(handler.getServerId()) + " with generation id " +
@@ -272,7 +367,27 @@
          continue;
        }
        handler.add(update, sourceHandler);
        if (assuredMessage)
        {
          // Assured mode: post an assured or not assured matching update
          // message according to what has been computed for the destination
          // server
          if ((expectedServers != null) && expectedServers.contains(handler.
            getServerId()))
          {
            handler.add(update, sourceHandler);
          } else
          {
            if (notAssuredUpdate == null)
            {
              notAssuredUpdate = new NotAssuredUpdateMsg(update);
            }
            handler.add(notAssuredUpdate, sourceHandler);
          }
        } else
        {
          handler.add(update, sourceHandler);
        }
      }
    }
@@ -281,7 +396,7 @@
     */
    for (ServerHandler handler : directoryServers.values())
    {
      // don't forward the change to the server that just sent it
      // Don't forward the change to the server that just sent it
      if (handler == sourceHandler)
      {
        continue;
@@ -307,7 +422,7 @@
          if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
              " for dn " + baseDn.toNormalizedString() + ", update " +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
              Short.toString(handler.getServerId()) + " with generation id " +
@@ -317,7 +432,7 @@
          if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
            TRACER.debugInfo("In RS " +
              replicationServer.getServerId() +
              " for dn " + baseDn.toNormalizedString() + ", update " +
              " for dn " + baseDn + ", update " +
              update.getChangeNumber().toString() +
              " will not be sent to directory server " +
              Short.toString(handler.getServerId()) +
@@ -327,9 +442,389 @@
        continue;
      }
      handler.add(update, sourceHandler);
      if (assuredMessage)
      {
        // Assured mode: post an assured or not assured matching update
        // message according to what has been computed for the destination
        // server
        if ((expectedServers != null) && expectedServers.contains(handler.
          getServerId()))
        {
          handler.add(update, sourceHandler);
        } else
        {
          if (notAssuredUpdate == null)
          {
            notAssuredUpdate = new NotAssuredUpdateMsg(update);
          }
          handler.add(notAssuredUpdate, sourceHandler);
        }
      } else
      {
        handler.add(update, sourceHandler);
      }
    }
  }
  /**
   * Helper class to be the return type of a method that processes a just
   * received assured update message:
   * - processSafeReadUpdateMsg
   * - processSafeDataUpdateMsg
   * This is a facility to pack many interesting returned object.
   */
  private class PreparedAssuredInfo
  {
      /**
       * The list of servers identified as servers we are interested in
       * receiving acks from. If this list is not null, then expectedAcksInfo
       * should be not null.
       * Servers that are not in this list are servers not elligible for an ack
       * request.
       *
       */
      public List<Short> expectedServers = null;
      /**
       * The constructed ExpectedAcksInfo object to be used when acks will be
       * received. Null if expectedServers is null.
       */
      public ExpectedAcksInfo expectedAcksInfo = null;
  }
  /**
   * Process a just received assured update message in Safe Read mode. If the
   * ack can be sent immediately, it is done here. This will also determine to
   * which suitable servers an ack should be requested from, and which ones are
   * not elligible for an ack request.
   * This method is an helper method for the put method. Have a look at the put
   * method for a better understanding.
   * @param update The just received assured update to process.
   * @param sourceHandler The ServerHandler for the server from which the
   *        update was received
   * @return A suitable PreparedAssuredInfo object that contains every needed
   * info to proceed with post to server writers.
   * @throws IOException When an IO exception happens during the update
   *         processing.
   */
  private PreparedAssuredInfo processSafeReadUpdateMsg(
    UpdateMsg update, ServerHandler sourceHandler) throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    byte groupId = replicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
    List<Short> expectedServers = new ArrayList<Short>();
    List<Short> wrongStatusServers = new ArrayList<Short>();
    if (sourceGroupId != groupId)
      // Assured feature does not cross different group ids
    {
      if (sourceHandler.isLDAPserver())
      {
        // Look for RS elligible for assured
        for (ServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            expectedServers.add(handler.getServerId());
        }
      }
      // Look for DS elligible for assured
      for (ServerHandler handler : directoryServers.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
        {
          continue;
        }
        if (handler.getGroupId() == groupId)
        {
          if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
          {
            expectedServers.add(handler.getServerId());
          } else
          {
            wrongStatusServers.add(handler.getServerId());
          }
        }
      }
    }
    // Return computed structures
    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
    if (expectedServers.size() > 0)
    {
      // Some other acks to wait for
      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn,
        sourceHandler, expectedServers, wrongStatusServers);
      preparedAssuredInfo.expectedServers = expectedServers;
    }
    if (preparedAssuredInfo.expectedServers == null)
    {
      // No elligible servers found, send the ack immediatly
      AckMsg ack = new AckMsg(cn);
      sourceHandler.sendAck(ack);
    }
    return preparedAssuredInfo;
  }
  /**
   * Process a just received assured update message in Safe Data mode. If the
   * ack can be sent immediately, it is done here. This will also determine to
   * which suitable servers an ack should be requested from, and which ones are
   * not elligible for an ack request.
   * This method is an helper method for the put method. Have a look at the put
   * method for a better understanding.
   * @param update The just received assured update to process.
   * @param sourceHandler The ServerHandler for the server from which the
   *        update was received
   * @return A suitable PreparedAssuredInfo object that contains every needed
   * info to proceed with post to server writers.
   * @throws IOException When an IO exception happens during the update
   *         processing.
   */
  private PreparedAssuredInfo processSafeDataUpdateMsg(
    UpdateMsg update, ServerHandler sourceHandler) throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    boolean interestedInAcks = true;
    byte safeDataLevel = update.getSafeDataLevel();
    byte groupId = replicationServer.getGroupId();
    byte sourceGroupId = sourceHandler.getGroupId();
    if (safeDataLevel < (byte) 1)
    {
      // Should never happen
      Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
        Short.toString(replicationServer.getServerId()),
        Byte.toString(safeDataLevel), baseDn, update.toString());
      logError(errorMsg);
      interestedInAcks = false;
    } else if (sourceGroupId != groupId)
    {
      // Assured feature does not cross different group ids
      interestedInAcks = false;
    } else
    {
      if (sourceHandler.isLDAPserver())
      {
        if (safeDataLevel == (byte) 1)
        {
          // Immediatly return the ack for an assured message in safe data mode
          // with safe data level 1, coming from a DS. No need to wait for more
          // acks
          AckMsg ack = new AckMsg(cn);
          sourceHandler.sendAck(ack);
          interestedInAcks = false; // No further acks to obtain
        } else
        {
          // level > 1 : We need further acks
          // The message will be posted in assured mode to elligible servers.
          // The embedded safe data level is not changed, and his value will be
          // used by a remote RS to determine if he must send an ack (level > 1)
          // or not (level = 1)
        }
      } else
      { // A RS sent us the safe data message, for sure no futher acks to wait
        interestedInAcks = false;
        if (safeDataLevel == (byte) 1)
        {
          // The original level was 1 so the RS that sent us this message should
          // have already sent his ack to the sender DS. Level 1 has already
          // been reached so no further acks to wait
          // This should not happen in theory as the sender RS server should
          // have sent us a matching not assured message so we should not come
          // to here.
        } else
        {
          // level > 1, so Ack this message to originator RS
          AckMsg ack = new AckMsg(cn);
          sourceHandler.sendAck(ack);
        }
      }
    }
    List<Short> expectedServers = new ArrayList<Short>();
    if (interestedInAcks)
    {
      if (sourceHandler.isLDAPserver())
      {
        // Look for RS elligible for assured
        for (ServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
            expectedServers.add(handler.getServerId());
        }
      }
      // Look for DS elligible for assured
      for (ServerHandler handler : directoryServers.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
        {
          continue;
        }
        if (handler.getGroupId() == groupId)
          expectedServers.add(handler.getServerId());
      }
    }
    // Return computed structures
    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
    if (interestedInAcks && (expectedServers.size() > 0))
    {
      // Some other acks to wait for
      preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
        sourceHandler, update.getSafeDataLevel());
      preparedAssuredInfo.expectedServers = expectedServers;
    }
    if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
    {
      // level > 1 and source is a DS but no elligible servers found, send the
      // ack immediatly
      AckMsg ack = new AckMsg(cn);
      sourceHandler.sendAck(ack);
    }
    return preparedAssuredInfo;
  }
  /**
   * Process an ack received from a given server.
   *
   * @param ack The ack message received.
   * @param ackingServer The server handler of the server that sent the ack.
   */
  public void processAck(AckMsg ack, ServerHandler ackingServer)
  {
    // Retrieve the expected acks info for the update matching the original
    // sent update.
    ChangeNumber cn = ack.getChangeNumber();
    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
    if (expectedAcksInfo != null)
    {
      // Prevent concurrent access from processAck() or AssuredTimeoutTask.run()
      synchronized (expectedAcksInfo)
      {
        if (expectedAcksInfo.isCompleted())
        {
          // Timeout code is sending a timeout ack, do nothing and let him
          // remove object from the map
          return;
        }
        // If this is the last ack we were waiting from, immediatly create and
        // send the final ack to the original server
        if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
        {
          // Remove the object from the map as no more needed
          waitingAcks.remove(cn);
          AckMsg finalAck = expectedAcksInfo.createAck(false);
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          try
          {
            origServer.sendAck(finalAck);
          } catch (IOException e)
          {
            /*
             * An error happened trying the send back an ack to the server.
             * Log an error and close the connection to this server.
             */
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Short.toString(replicationServer.getServerId()),
              Short.toString(origServer.getServerId()), cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer);
          }
          // Mark the ack info object as completed to prevent potential timeout
          // code parallel run
          expectedAcksInfo.completed();
        }
      }
    } else
    {
      // The timeout occured for the update matching this change number and the
      // ack with timeout error has probably already been sent.
    }
  }
  /**
   * The code run when the timeout occurs while waiting for acks of the
   * elligible servers. This basically sends a timeout ack (with any additional
   * error info) to the original server that sent an assured update message.
   */
  private class AssuredTimeoutTask extends TimerTask
  {
    private ChangeNumber cn = null;
    /**
     * Constructor for the timer task.
     * @param cn The changenumber of the assured update we are waiting acks for
     */
    public AssuredTimeoutTask(ChangeNumber cn)
    {
      this.cn = cn;
    }
    /**
     * Run when the assured timeout for an assured update message we are waiting
     * acks for occurs.
     */
    public void run()
    {
      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
      if (expectedAcksInfo != null)
      {
        synchronized (expectedAcksInfo)
        {
          if (expectedAcksInfo.isCompleted())
          {
            // processAck() code is sending the ack, do nothing and let him
            // remove object from the map
            return;
          }
          // Remove the object from the map as no more needed
          waitingAcks.remove(cn);
          // Create the timeout ack and send him to the server the assured
          // update message came from
          AckMsg finalAck = expectedAcksInfo.createAck(true);
          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
          if (debugEnabled())
            TRACER.debugInfo(
              "In RS " + Short.toString(replicationServer.getServerId()) +
              " for " + baseDn +
              ", sending timeout for assured update with change " + " number " +
              cn.toString() + " to server id " +
              Short.toString(origServer.getServerId()));
          try
          {
            origServer.sendAck(finalAck);
          } catch (IOException e)
          {
            /*
             * An error happened trying the send back an ack to the server.
             * Log an error and close the connection to this server.
             */
            MessageBuilder mb = new MessageBuilder();
            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
              Short.toString(replicationServer.getServerId()),
              Short.toString(origServer.getServerId()), cn.toString(), baseDn));
            mb.append(stackTraceToSingleLineString(e));
            logError(mb.toMessage());
            stopServer(origServer);
          }
          // Mark the ack info object as completed to prevent potential
          // processAck() code parallel run
          expectedAcksInfo.completed();
        }
      }
    }
  }
  /**
@@ -687,7 +1182,7 @@
   * Get the baseDn.
   * @return Returns the baseDn.
   */
  public DN getBaseDn()
  public String getBaseDn()
  {
    return baseDn;
  }
@@ -711,44 +1206,6 @@
  }
  /**
   * Get the number of currently connected servers.
   *
   * @return the number of currently connected servers.
   */
  private int NumServers()
  {
    return replicationServers.size() + directoryServers.size();
  }
  /**
   * Add an ack to the list of ack received for a given change.
   *
   * @param message The ack message received.
   * @param fromServerId The identifier of the server that sent the ack.
   */
  public void ack(AckMsg message, short fromServerId)
  {
    /*
     * there are 2 possible cases here :
     *  - the message that was acked comes from a server to which
     *    we are directly connected.
     *    In this case, we can find the handler from the directoryServers map
     *  - the message that was acked comes from a server to which we are not
     *    connected.
     *    In this case we need to find the replication server that forwarded
     *    the change and send back the ack to this server.
     */
    ServerHandler handler = directoryServers.get(
      message.getChangeNumber().getServerId());
    if (handler != null)
      handler.ack(message, fromServerId);
    else
    {
      ServerHandler.ackChangelog(message, fromServerId);
    }
  }
  /**
   * Retrieves the destination handlers for a routable message.
   *
   * @param msg The message to route.
@@ -975,56 +1432,6 @@
  }
  /**
   * Send back an ack to the server that sent the change.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a ReplicationServer.
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
  {
    short serverId = changeNumber.getServerId();
    sendAck(changeNumber, isLDAPserver, serverId);
  }
  /**
   *
   * Send back an ack to a server that sent the change.
   *
   * @param changeNumber The ChangeNumber of the change that must be acked.
   * @param isLDAPserver This boolean indicates if the server that sent the
   *                     change was an LDAP server or a ReplicationServer.
   * @param serverId     The identifier of the server from which we
   *                     received the change..
   */
  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
    short serverId)
  {
    ServerHandler handler;
    if (isLDAPserver)
      handler = directoryServers.get(serverId);
    else
      handler = replicationServers.get(serverId);
    // TODO : check for null handler and log error
    try
    {
      handler.sendAck(changeNumber);
    } catch (IOException e)
    {
      /*
       * An error happened trying the send back an ack to this server.
       * Log an error and close the connection to this server.
       */
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
      mb.append(stackTraceToSingleLineString(e));
      logError(mb.toMessage());
      stopServer(handler);
    }
  }
  /**
   * Shutdown this ReplicationServerDomain.
   */
  public void shutdown()
@@ -1228,13 +1635,8 @@
    {
      // Put RS info
      rsInfos.add(serverHandler.toRSInfo());
      // Put his DSs info
      Map<Short, LightweightServerHandler> lsList =
        serverHandler.getConnectedDSs();
      for (LightweightServerHandler ls : lsList.values())
      {
        dsInfos.add(ls.toDSInfo());
      }
      serverHandler.addDSInfos(dsInfos);
    }
    return new TopologyMsg(dsInfos, rsInfos);
@@ -1641,7 +2043,7 @@
    if (generationId > 0 && (generationId != handler.getGenerationId()))
    {
      Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
        baseDn.toNormalizedString(),
        baseDn,
        Short.toString(handler.getServerId()),
        Long.toString(handler.getGenerationId()),
        Long.toString(generationId));
@@ -1879,10 +2281,13 @@
      synchronized (wrkMonitorData)
      {
        // Here is the RS state : list <serverID, lastChangeNumber>
        // For each LDAP Server, we keep the max CN accross the RSes
        // For each LDAP Server, we keep the max CN across the RSes
        ServerState replServerState = msg.getReplServerDbState();
        wrkMonitorData.setMaxCNs(replServerState);
        // store the remote RS states.
        wrkMonitorData.setRSState(msg.getsenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Short> lsidIterator = msg.ldapIterator();
        while (lsidIterator.hasNext())
@@ -2092,3 +2497,4 @@
    }
  }
}