From d04fb0f282e0fd9a4bc80d3f9d5ee15506a3b83b Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Mon, 08 Dec 2008 08:03:33 +0000
Subject: [PATCH] Merge the replication-service branch with the OpenDS trunk

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  672 ++++++++++++++++++++++++++++++++++++++++++++-----------
 1 files changed, 539 insertions(+), 133 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a7c3dae..cf68944 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -50,7 +50,6 @@
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -58,17 +57,21 @@
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.MonitorRequestMsg;
 import org.opends.server.replication.protocol.ResetGenerationIdMsg;
-import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
 import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.locks.ReentrantLock;
+import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.DSInfo;
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ChangeStatusMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -88,9 +91,8 @@
  */
 public class ReplicationServerDomain
 {
-
   private final Object flowControlLock = new Object();
-  private final DN baseDn;
+  private final String baseDn;
   // The Status analyzer that periodically verifis if the connected DSs are
   // late or not
   private StatusAnalyzer statusAnalyzer = null;
@@ -154,16 +156,38 @@
   private MonitorData wrkMonitorData;
 
   /**
+   * The needed info for each received assured update message we are waiting
+   * acks for.
+   * Key: a change number matching a received update message which requested
+   * assured mode usage (either safe read or safe data mode)
+   * Value: The object holding every info needed about the already received acks
+   * as well as the acks to be received.
+   * For more details, see ExpectedAcksInfo and its sub classes javadoc.
+   */
+  private final ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo> waitingAcks =
+    new ConcurrentHashMap<ChangeNumber, ExpectedAcksInfo>();
+
+  // The timer used to run the timeout code (timer tasks) for the assured update
+  // messages we are waiting acks for.
+  private Timer assuredTimeoutTimer = null;
+  // Counter used to purge the timer tasks referemces in assuredTimeoutTimer,
+  // every n number of treated assured messages
+  private int assuredTimeoutTimerPurgeCounter = 0;
+
+  /**
    * Creates a new ReplicationServerDomain associated to the DN baseDn.
    *
    * @param baseDn The baseDn associated to the ReplicationServerDomain.
    * @param replicationServer the ReplicationServer that created this
    *                          replicationServer cache.
    */
-  public ReplicationServerDomain(DN baseDn, ReplicationServer replicationServer)
+  public ReplicationServerDomain(
+      String baseDn, ReplicationServer replicationServer)
   {
     this.baseDn = baseDn;
     this.replicationServer = replicationServer;
+    this.assuredTimeoutTimer = new Timer("Replication Assured Timer for " +
+      baseDn + " in RS " + replicationServer.getServerId(), true);
   }
 
   /**
@@ -179,32 +203,11 @@
   public void put(UpdateMsg update, ServerHandler sourceHandler)
     throws IOException
   {
-    /*
-     * TODO : In case that the source server is a LDAP server this method
-     * should check that change did get pushed to at least one
-     * other replication server before pushing it to the LDAP servers
-     */
-
-    short id = update.getChangeNumber().getServerId();
+    ChangeNumber cn = update.getChangeNumber();
+    short id = cn.getServerId();
     sourceHandler.updateServerState(update);
     sourceHandler.incrementInCount();
 
-    if (update.isAssured())
-    {
-      int count = this.NumServers();
-      if (count > 1)
-      {
-        if (sourceHandler.isReplicationServer())
-          ServerHandler.addWaitingAck(update, sourceHandler.getServerId(),
-            this, count - 1);
-        else
-          sourceHandler.addWaitingAck(update, count - 1);
-      } else
-      {
-        sourceHandler.sendAck(update.getChangeNumber());
-      }
-    }
-
     if (generationId < 0)
     {
       generationId = sourceHandler.getGenerationId();
@@ -244,11 +247,103 @@
     // Publish the messages to the source handler
     dbHandler.add(update);
 
+    /**
+     * If this is an assured message (a message requesting ack), we must
+     * construct the ExpectedAcksInfo object with the right number of expected
+     * acks before posting message to the writers. Otherwise some writers may
+     * have time to post, receive the ack and increment received ack counter
+     * (kept in ExpectedAcksInfo object) and we could think the acknowledgment
+     * is fully processed although it may be not (some other acks from other
+     * servers are not yet arrived). So for that purpose we do a pre-loop
+     * to determine to who we will post an assured message.
+     * Whether the assured mode is safe read or safe data, we anyway do not
+     * support the assured replication feature across topologies with different
+     * group ids. The assured feature insures assured replication based on the
+     * same locality (group id). For instance in double data center deployment
+     * (2 group id usage) with assured replication enabled, an assured message
+     * sent from data center 1 (group id = 1) will be sent to servers of both
+     * data centers, but one will request and wait acks only from servers of the
+     * data center 1.
+     */
+    boolean assuredMessage = update.isAssured();
+    PreparedAssuredInfo preparedAssuredInfo = null;
+    if (assuredMessage)
+    {
+      // Assured feature is supported starting from replication protocol V2
+      if (sourceHandler.getProtocolVersion() >=
+        ProtocolVersion.REPLICATION_PROTOCOL_V2)
+      {
+        // According to assured sub-mode, prepare structures to keep track of
+        // the acks we are interested in.
+        AssuredMode assuredMode = update.getAssuredMode();
+        if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+        {
+          preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
+        } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
+        {
+          preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
+        } else
+        {
+          // Unknown assured mode: should never happen
+          Message errorMsg = ERR_RS_UNKNOWN_ASSURED_MODE.get(
+            Short.toString(replicationServer.getServerId()),
+            assuredMode.toString(), baseDn, update.toString());
+          logError(errorMsg);
+          assuredMessage = false;
+        }
+      } else
+      {
+        assuredMessage = false;
+      }
+    }
+
+    List<Short> expectedServers = null;
+    if (assuredMessage)
+    {
+      expectedServers = preparedAssuredInfo.expectedServers;
+      if (expectedServers != null)
+      {
+        // Store the expected acks info into the global map.
+        // The code for processing reception of acks for this update will update
+        // info kept in this object and if enough acks received, it will send
+        // back the final ack to the requester and remove the object from this
+        // map
+        // OR
+        // The following timer will time out and send an timeout ack to the
+        // requester if the acks are not received in time. The timer will also
+        // remove the object from this map.
+        waitingAcks.put(cn, preparedAssuredInfo.expectedAcksInfo);
+
+        // Arm timer for this assured update message (wait for acks until it
+        // times out)
+        AssuredTimeoutTask assuredTimeoutTask = new AssuredTimeoutTask(cn);
+        assuredTimeoutTimer.schedule(assuredTimeoutTask,
+          replicationServer.getAssuredTimeout());
+        // Purge timer every 100 treated messages
+        assuredTimeoutTimerPurgeCounter++;
+        if ((assuredTimeoutTimerPurgeCounter % 100) == 0)
+          assuredTimeoutTimer.purge();
+      }
+    }
+
+    /**
+     * The update message equivalent to the originally received update message,
+     * but with assured flag disabled. This message is the one that should be
+     * sent to non elligible servers for assured mode.
+     * We need a clone like of the original message with assured flag off, to be
+     * posted to servers we don't want to wait the ack from (not normal status
+     * servers or servers with different group id). This must be done because
+     * the posted message is a reference so each writer queue gets the same
+     * reference, thus, changing the assured flag of an object is done for every
+     * references posted on every writer queues. That is why we need a message
+     * version with assured flag on and another one with assured flag off.
+     */
+    NotAssuredUpdateMsg notAssuredUpdate = null;
 
     /*
      * Push the message to the replication servers
      */
-    if (!sourceHandler.isReplicationServer())
+    if (sourceHandler.isLDAPserver())
     {
       for (ServerHandler handler : replicationServers.values())
       {
@@ -261,7 +356,7 @@
           if (debugEnabled())
             TRACER.debugInfo("In RS " +
               replicationServer.getServerId() +
-              " for dn " + baseDn.toNormalizedString() + ", update " +
+              " for dn " + baseDn + ", update " +
               update.getChangeNumber().toString() +
               " will not be sent to replication server " +
               Short.toString(handler.getServerId()) + " with generation id " +
@@ -272,7 +367,27 @@
           continue;
         }
 
-        handler.add(update, sourceHandler);
+        if (assuredMessage)
+        {
+          // Assured mode: post an assured or not assured matching update
+          // message according to what has been computed for the destination
+          // server
+          if ((expectedServers != null) && expectedServers.contains(handler.
+            getServerId()))
+          {
+            handler.add(update, sourceHandler);
+          } else
+          {
+            if (notAssuredUpdate == null)
+            {
+              notAssuredUpdate = new NotAssuredUpdateMsg(update);
+            }
+            handler.add(notAssuredUpdate, sourceHandler);
+          }
+        } else
+        {
+          handler.add(update, sourceHandler);
+        }
       }
     }
 
@@ -281,7 +396,7 @@
      */
     for (ServerHandler handler : directoryServers.values())
     {
-      // don't forward the change to the server that just sent it
+      // Don't forward the change to the server that just sent it
       if (handler == sourceHandler)
       {
         continue;
@@ -307,7 +422,7 @@
           if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
             TRACER.debugInfo("In RS " +
               replicationServer.getServerId() +
-              " for dn " + baseDn.toNormalizedString() + ", update " +
+              " for dn " + baseDn + ", update " +
               update.getChangeNumber().toString() +
               " will not be sent to directory server " +
               Short.toString(handler.getServerId()) + " with generation id " +
@@ -317,7 +432,7 @@
           if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
             TRACER.debugInfo("In RS " +
               replicationServer.getServerId() +
-              " for dn " + baseDn.toNormalizedString() + ", update " +
+              " for dn " + baseDn + ", update " +
               update.getChangeNumber().toString() +
               " will not be sent to directory server " +
               Short.toString(handler.getServerId()) +
@@ -327,9 +442,389 @@
         continue;
       }
 
-      handler.add(update, sourceHandler);
+      if (assuredMessage)
+      {
+        // Assured mode: post an assured or not assured matching update
+        // message according to what has been computed for the destination
+        // server
+        if ((expectedServers != null) && expectedServers.contains(handler.
+          getServerId()))
+        {
+          handler.add(update, sourceHandler);
+        } else
+        {
+          if (notAssuredUpdate == null)
+          {
+            notAssuredUpdate = new NotAssuredUpdateMsg(update);
+          }
+          handler.add(notAssuredUpdate, sourceHandler);
+        }
+      } else
+      {
+        handler.add(update, sourceHandler);
+      }
+    }
+  }
+
+  /**
+   * Helper class to be the return type of a method that processes a just
+   * received assured update message:
+   * - processSafeReadUpdateMsg
+   * - processSafeDataUpdateMsg
+   * This is a facility to pack many interesting returned object.
+   */
+  private class PreparedAssuredInfo
+  {
+      /**
+       * The list of servers identified as servers we are interested in
+       * receiving acks from. If this list is not null, then expectedAcksInfo
+       * should be not null.
+       * Servers that are not in this list are servers not elligible for an ack
+       * request.
+       *
+       */
+      public List<Short> expectedServers = null;
+
+      /**
+       * The constructed ExpectedAcksInfo object to be used when acks will be
+       * received. Null if expectedServers is null.
+       */
+      public ExpectedAcksInfo expectedAcksInfo = null;
+  }
+
+  /**
+   * Process a just received assured update message in Safe Read mode. If the
+   * ack can be sent immediately, it is done here. This will also determine to
+   * which suitable servers an ack should be requested from, and which ones are
+   * not elligible for an ack request.
+   * This method is an helper method for the put method. Have a look at the put
+   * method for a better understanding.
+   * @param update The just received assured update to process.
+   * @param sourceHandler The ServerHandler for the server from which the
+   *        update was received
+   * @return A suitable PreparedAssuredInfo object that contains every needed
+   * info to proceed with post to server writers.
+   * @throws IOException When an IO exception happens during the update
+   *         processing.
+   */
+  private PreparedAssuredInfo processSafeReadUpdateMsg(
+    UpdateMsg update, ServerHandler sourceHandler) throws IOException
+  {
+    ChangeNumber cn = update.getChangeNumber();
+    byte groupId = replicationServer.getGroupId();
+    byte sourceGroupId = sourceHandler.getGroupId();
+    List<Short> expectedServers = new ArrayList<Short>();
+    List<Short> wrongStatusServers = new ArrayList<Short>();
+
+    if (sourceGroupId != groupId)
+      // Assured feature does not cross different group ids
+    {
+      if (sourceHandler.isLDAPserver())
+      {
+        // Look for RS elligible for assured
+        for (ServerHandler handler : replicationServers.values())
+        {
+          if (handler.getGroupId() == groupId)
+            expectedServers.add(handler.getServerId());
+        }
+      }
+
+      // Look for DS elligible for assured
+      for (ServerHandler handler : directoryServers.values())
+      {
+        // Don't forward the change to the server that just sent it
+        if (handler == sourceHandler)
+        {
+          continue;
+        }
+        if (handler.getGroupId() == groupId)
+        {
+          if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
+          {
+            expectedServers.add(handler.getServerId());
+          } else
+          {
+            wrongStatusServers.add(handler.getServerId());
+          }
+        }
+      }
     }
 
+    // Return computed structures
+    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
+    if (expectedServers.size() > 0)
+    {
+      // Some other acks to wait for
+      preparedAssuredInfo.expectedAcksInfo = new SafeReadExpectedAcksInfo(cn,
+        sourceHandler, expectedServers, wrongStatusServers);
+      preparedAssuredInfo.expectedServers = expectedServers;
+    }
+
+    if (preparedAssuredInfo.expectedServers == null)
+    {
+      // No elligible servers found, send the ack immediatly
+      AckMsg ack = new AckMsg(cn);
+      sourceHandler.sendAck(ack);
+    }
+
+    return preparedAssuredInfo;
+  }
+
+  /**
+   * Process a just received assured update message in Safe Data mode. If the
+   * ack can be sent immediately, it is done here. This will also determine to
+   * which suitable servers an ack should be requested from, and which ones are
+   * not elligible for an ack request.
+   * This method is an helper method for the put method. Have a look at the put
+   * method for a better understanding.
+   * @param update The just received assured update to process.
+   * @param sourceHandler The ServerHandler for the server from which the
+   *        update was received
+   * @return A suitable PreparedAssuredInfo object that contains every needed
+   * info to proceed with post to server writers.
+   * @throws IOException When an IO exception happens during the update
+   *         processing.
+   */
+  private PreparedAssuredInfo processSafeDataUpdateMsg(
+    UpdateMsg update, ServerHandler sourceHandler) throws IOException
+  {
+    ChangeNumber cn = update.getChangeNumber();
+    boolean interestedInAcks = true;
+    byte safeDataLevel = update.getSafeDataLevel();
+    byte groupId = replicationServer.getGroupId();
+    byte sourceGroupId = sourceHandler.getGroupId();
+    if (safeDataLevel < (byte) 1)
+    {
+      // Should never happen
+      Message errorMsg = ERR_UNKNOWN_ASSURED_SAFE_DATA_LEVEL.get(
+        Short.toString(replicationServer.getServerId()),
+        Byte.toString(safeDataLevel), baseDn, update.toString());
+      logError(errorMsg);
+      interestedInAcks = false;
+    } else if (sourceGroupId != groupId)
+    {
+      // Assured feature does not cross different group ids
+      interestedInAcks = false;
+    } else
+    {
+      if (sourceHandler.isLDAPserver())
+      {
+        if (safeDataLevel == (byte) 1)
+        {
+          // Immediatly return the ack for an assured message in safe data mode
+          // with safe data level 1, coming from a DS. No need to wait for more
+          // acks
+          AckMsg ack = new AckMsg(cn);
+          sourceHandler.sendAck(ack);
+          interestedInAcks = false; // No further acks to obtain
+        } else
+        {
+          // level > 1 : We need further acks
+          // The message will be posted in assured mode to elligible servers.
+          // The embedded safe data level is not changed, and his value will be
+          // used by a remote RS to determine if he must send an ack (level > 1)
+          // or not (level = 1)
+        }
+      } else
+      { // A RS sent us the safe data message, for sure no futher acks to wait
+        interestedInAcks = false;
+        if (safeDataLevel == (byte) 1)
+        {
+          // The original level was 1 so the RS that sent us this message should
+          // have already sent his ack to the sender DS. Level 1 has already
+          // been reached so no further acks to wait
+          // This should not happen in theory as the sender RS server should
+          // have sent us a matching not assured message so we should not come
+          // to here.
+        } else
+        {
+          // level > 1, so Ack this message to originator RS
+          AckMsg ack = new AckMsg(cn);
+          sourceHandler.sendAck(ack);
+        }
+      }
+    }
+
+    List<Short> expectedServers = new ArrayList<Short>();
+    if (interestedInAcks)
+    {
+      if (sourceHandler.isLDAPserver())
+      {
+        // Look for RS elligible for assured
+        for (ServerHandler handler : replicationServers.values())
+        {
+          if (handler.getGroupId() == groupId)
+            expectedServers.add(handler.getServerId());
+        }
+      }
+
+      // Look for DS elligible for assured
+      for (ServerHandler handler : directoryServers.values())
+      {
+        // Don't forward the change to the server that just sent it
+        if (handler == sourceHandler)
+        {
+          continue;
+        }
+        if (handler.getGroupId() == groupId)
+          expectedServers.add(handler.getServerId());
+      }
+    }
+
+    // Return computed structures
+    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
+    if (interestedInAcks && (expectedServers.size() > 0))
+    {
+      // Some other acks to wait for
+      preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+        sourceHandler, update.getSafeDataLevel());
+      preparedAssuredInfo.expectedServers = expectedServers;
+    }
+
+    if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
+    {
+      // level > 1 and source is a DS but no elligible servers found, send the
+      // ack immediatly
+      AckMsg ack = new AckMsg(cn);
+      sourceHandler.sendAck(ack);
+    }
+
+    return preparedAssuredInfo;
+  }
+
+  /**
+   * Process an ack received from a given server.
+   *
+   * @param ack The ack message received.
+   * @param ackingServer The server handler of the server that sent the ack.
+   */
+  public void processAck(AckMsg ack, ServerHandler ackingServer)
+  {
+    // Retrieve the expected acks info for the update matching the original
+    // sent update.
+    ChangeNumber cn = ack.getChangeNumber();
+    ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+
+    if (expectedAcksInfo != null)
+    {
+      // Prevent concurrent access from processAck() or AssuredTimeoutTask.run()
+      synchronized (expectedAcksInfo)
+      {
+        if (expectedAcksInfo.isCompleted())
+        {
+          // Timeout code is sending a timeout ack, do nothing and let him
+          // remove object from the map
+          return;
+        }
+        // If this is the last ack we were waiting from, immediatly create and
+        // send the final ack to the original server
+        if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
+        {
+          // Remove the object from the map as no more needed
+          waitingAcks.remove(cn);
+          AckMsg finalAck = expectedAcksInfo.createAck(false);
+          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
+          try
+          {
+            origServer.sendAck(finalAck);
+          } catch (IOException e)
+          {
+            /*
+             * An error happened trying the send back an ack to the server.
+             * Log an error and close the connection to this server.
+             */
+            MessageBuilder mb = new MessageBuilder();
+            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
+              Short.toString(replicationServer.getServerId()),
+              Short.toString(origServer.getServerId()), cn.toString(), baseDn));
+            mb.append(stackTraceToSingleLineString(e));
+            logError(mb.toMessage());
+            stopServer(origServer);
+          }
+          // Mark the ack info object as completed to prevent potential timeout
+          // code parallel run
+          expectedAcksInfo.completed();
+        }
+      }
+    } else
+    {
+      // The timeout occured for the update matching this change number and the
+      // ack with timeout error has probably already been sent.
+    }
+  }
+
+  /**
+   * The code run when the timeout occurs while waiting for acks of the
+   * elligible servers. This basically sends a timeout ack (with any additional
+   * error info) to the original server that sent an assured update message.
+   */
+  private class AssuredTimeoutTask extends TimerTask
+  {
+    private ChangeNumber cn = null;
+
+    /**
+     * Constructor for the timer task.
+     * @param cn The changenumber of the assured update we are waiting acks for
+     */
+    public AssuredTimeoutTask(ChangeNumber cn)
+    {
+      this.cn = cn;
+    }
+
+    /**
+     * Run when the assured timeout for an assured update message we are waiting
+     * acks for occurs.
+     */
+    public void run()
+    {
+      ExpectedAcksInfo expectedAcksInfo = waitingAcks.get(cn);
+
+      if (expectedAcksInfo != null)
+      {
+        synchronized (expectedAcksInfo)
+        {
+          if (expectedAcksInfo.isCompleted())
+          {
+            // processAck() code is sending the ack, do nothing and let him
+            // remove object from the map
+            return;
+          }
+          // Remove the object from the map as no more needed
+          waitingAcks.remove(cn);
+          // Create the timeout ack and send him to the server the assured
+          // update message came from
+          AckMsg finalAck = expectedAcksInfo.createAck(true);
+          ServerHandler origServer = expectedAcksInfo.getRequesterServer();
+          if (debugEnabled())
+            TRACER.debugInfo(
+              "In RS " + Short.toString(replicationServer.getServerId()) +
+              " for " + baseDn +
+              ", sending timeout for assured update with change " + " number " +
+              cn.toString() + " to server id " +
+              Short.toString(origServer.getServerId()));
+          try
+          {
+            origServer.sendAck(finalAck);
+          } catch (IOException e)
+          {
+            /*
+             * An error happened trying the send back an ack to the server.
+             * Log an error and close the connection to this server.
+             */
+            MessageBuilder mb = new MessageBuilder();
+            mb.append(ERR_RS_ERROR_SENDING_ACK.get(
+              Short.toString(replicationServer.getServerId()),
+              Short.toString(origServer.getServerId()), cn.toString(), baseDn));
+            mb.append(stackTraceToSingleLineString(e));
+            logError(mb.toMessage());
+            stopServer(origServer);
+          }
+          // Mark the ack info object as completed to prevent potential
+          // processAck() code parallel run
+          expectedAcksInfo.completed();
+        }
+      }
+    }
   }
 
   /**
@@ -687,7 +1182,7 @@
    * Get the baseDn.
    * @return Returns the baseDn.
    */
-  public DN getBaseDn()
+  public String getBaseDn()
   {
     return baseDn;
   }
@@ -711,44 +1206,6 @@
   }
 
   /**
-   * Get the number of currently connected servers.
-   *
-   * @return the number of currently connected servers.
-   */
-  private int NumServers()
-  {
-    return replicationServers.size() + directoryServers.size();
-  }
-
-  /**
-   * Add an ack to the list of ack received for a given change.
-   *
-   * @param message The ack message received.
-   * @param fromServerId The identifier of the server that sent the ack.
-   */
-  public void ack(AckMsg message, short fromServerId)
-  {
-    /*
-     * there are 2 possible cases here :
-     *  - the message that was acked comes from a server to which
-     *    we are directly connected.
-     *    In this case, we can find the handler from the directoryServers map
-     *  - the message that was acked comes from a server to which we are not
-     *    connected.
-     *    In this case we need to find the replication server that forwarded
-     *    the change and send back the ack to this server.
-     */
-    ServerHandler handler = directoryServers.get(
-      message.getChangeNumber().getServerId());
-    if (handler != null)
-      handler.ack(message, fromServerId);
-    else
-    {
-      ServerHandler.ackChangelog(message, fromServerId);
-    }
-  }
-
-  /**
    * Retrieves the destination handlers for a routable message.
    *
    * @param msg The message to route.
@@ -975,56 +1432,6 @@
   }
 
   /**
-   * Send back an ack to the server that sent the change.
-   *
-   * @param changeNumber The ChangeNumber of the change that must be acked.
-   * @param isLDAPserver This boolean indicates if the server that sent the
-   *                     change was an LDAP server or a ReplicationServer.
-   */
-  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver)
-  {
-    short serverId = changeNumber.getServerId();
-    sendAck(changeNumber, isLDAPserver, serverId);
-  }
-
-  /**
-   *
-   * Send back an ack to a server that sent the change.
-   *
-   * @param changeNumber The ChangeNumber of the change that must be acked.
-   * @param isLDAPserver This boolean indicates if the server that sent the
-   *                     change was an LDAP server or a ReplicationServer.
-   * @param serverId     The identifier of the server from which we
-   *                     received the change..
-   */
-  public void sendAck(ChangeNumber changeNumber, boolean isLDAPserver,
-    short serverId)
-  {
-    ServerHandler handler;
-    if (isLDAPserver)
-      handler = directoryServers.get(serverId);
-    else
-      handler = replicationServers.get(serverId);
-
-    // TODO : check for null handler and log error
-    try
-    {
-      handler.sendAck(changeNumber);
-    } catch (IOException e)
-    {
-      /*
-       * An error happened trying the send back an ack to this server.
-       * Log an error and close the connection to this server.
-       */
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_ERROR_SENDING_ACK.get(this.toString()));
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      stopServer(handler);
-    }
-  }
-
-  /**
    * Shutdown this ReplicationServerDomain.
    */
   public void shutdown()
@@ -1228,13 +1635,8 @@
     {
       // Put RS info
       rsInfos.add(serverHandler.toRSInfo());
-      // Put his DSs info
-      Map<Short, LightweightServerHandler> lsList =
-        serverHandler.getConnectedDSs();
-      for (LightweightServerHandler ls : lsList.values())
-      {
-        dsInfos.add(ls.toDSInfo());
-      }
+
+      serverHandler.addDSInfos(dsInfos);
     }
 
     return new TopologyMsg(dsInfos, rsInfos);
@@ -1641,7 +2043,7 @@
     if (generationId > 0 && (generationId != handler.getGenerationId()))
     {
       Message message = NOTE_BAD_GENERATION_ID_FROM_RS.get(
-        baseDn.toNormalizedString(),
+        baseDn,
         Short.toString(handler.getServerId()),
         Long.toString(handler.getGenerationId()),
         Long.toString(generationId));
@@ -1879,10 +2281,13 @@
       synchronized (wrkMonitorData)
       {
         // Here is the RS state : list <serverID, lastChangeNumber>
-        // For each LDAP Server, we keep the max CN accross the RSes
+        // For each LDAP Server, we keep the max CN across the RSes
         ServerState replServerState = msg.getReplServerDbState();
         wrkMonitorData.setMaxCNs(replServerState);
 
+        // store the remote RS states.
+        wrkMonitorData.setRSState(msg.getsenderID(), replServerState);
+
         // Store the remote LDAP servers states
         Iterator<Short> lsidIterator = msg.ldapIterator();
         while (lsidIterator.hasNext())
@@ -2092,3 +2497,4 @@
     }
   }
 }
+

--
Gitblit v1.10.0