From 45eb21b1354b6925fc058f834f505a9699d1bbbe Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 10 Jun 2009 08:43:50 +0000
Subject: [PATCH] External Changelog - first step - related issues 495,  519

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  542 ++++++++++++++++++++++++++++++++++++++---------------
 1 files changed, 384 insertions(+), 158 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index ea99e4b..76aa95b 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -26,56 +26,59 @@
  */
 package org.opends.server.replication.server;
 
-import org.opends.messages.Message;
-import org.opends.messages.MessageBuilder;
-
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
-import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.MonitorProvider;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.Iterator;
+import java.util.concurrent.locks.ReentrantLock;
 
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.messages.Severity;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.common.StatusMachineEvent;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.ChangeStatusMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.replication.protocol.MonitorMsg;
 import org.opends.server.replication.protocol.MonitorRequestMsg;
+import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ResetGenerationIdMsg;
+import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeBuilder;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
+import org.opends.server.util.TimeThread;
 import com.sleepycat.je.DatabaseException;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.locks.ReentrantLock;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.ProtocolVersion;
 
 /**
  * This class define an in-memory cache that will be used to store
@@ -111,8 +114,8 @@
    * to this replication server.
    *
    */
-  private final Map<Short, ServerHandler> directoryServers =
-    new ConcurrentHashMap<Short, ServerHandler>();
+  private final Map<Short, DataServerHandler> directoryServers =
+    new ConcurrentHashMap<Short, DataServerHandler>();
 
   /*
    * This map contains one ServerHandler for each replication servers
@@ -123,8 +126,11 @@
    * We add new TreeSet in the HashMap when a new replication server register
    * to this replication server.
    */
-  private final Map<Short, ServerHandler> replicationServers =
-    new ConcurrentHashMap<Short, ServerHandler>();
+  private final Map<Short, ReplicationServerHandler> replicationServers =
+    new ConcurrentHashMap<Short, ReplicationServerHandler>();
+
+  private final ConcurrentLinkedQueue<MessageHandler> otherHandlers =
+    new ConcurrentLinkedQueue<MessageHandler>();
 
   /*
    * This map contains the List of updates received from each
@@ -134,16 +140,14 @@
     new ConcurrentHashMap<Short, DbHandler>();
   private ReplicationServer replicationServer;
 
-  /* GenerationId management */
+  // GenerationId management
   private long generationId = -1;
   private boolean generationIdSavedStatus = false;
-  /**
-   * The tracer object for the debug logger.
-   */
+
+  // The tracer object for the debug logger.
   private static final DebugTracer TRACER = getTracer();
 
-  /* Monitor data management */
-
+  // Monitor data management
   /**
    * The monitor data consolidated over the topology.
    */
@@ -346,9 +350,9 @@
     /*
      * Push the message to the replication servers
      */
-    if (sourceHandler.isLDAPserver())
+    if (sourceHandler.isDataServer())
     {
-      for (ServerHandler handler : replicationServers.values())
+      for (ReplicationServerHandler handler : replicationServers.values())
       {
         /**
          * Ignore updates to RS with bad gen id
@@ -397,7 +401,7 @@
     /*
      * Push the message to the LDAP servers
      */
-    for (ServerHandler handler : directoryServers.values())
+    for (DataServerHandler handler : directoryServers.values())
     {
       // Don't forward the change to the server that just sent it
       if (handler == sourceHandler)
@@ -467,6 +471,14 @@
         handler.add(update, sourceHandler);
       }
     }
+
+    // Push the message to the other subscribing handlers
+    Iterator<MessageHandler> otherIter = otherHandlers.iterator();
+    while (otherIter.hasNext())
+    {
+      MessageHandler handler = otherIter.next();
+      handler.add(update, sourceHandler);
+    }
   }
 
   /**
@@ -522,10 +534,10 @@
     if (sourceGroupId == groupId)
       // Assured feature does not cross different group ids
     {
-      if (sourceHandler.isLDAPserver())
+      if (sourceHandler.isDataServer())
       {
         // Look for RS eligible for assured
-        for (ServerHandler handler : replicationServers.values())
+        for (ReplicationServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
             // No ack expected from a RS with different group id
@@ -541,7 +553,7 @@
       }
 
       // Look for DS eligible for assured
-      for (ServerHandler handler : directoryServers.values())
+      for (DataServerHandler handler : directoryServers.values())
       {
         // Don't forward the change to the server that just sent it
         if (handler == sourceHandler)
@@ -636,7 +648,7 @@
         (generationId == sourceHandler.getGenerationId()))
         // Ignore assured updates from wrong generationId servers
       {
-        if (sourceHandler.isLDAPserver())
+        if (sourceHandler.isDataServer())
         {
           if (safeDataLevel == (byte) 1)
           {
@@ -689,10 +701,10 @@
     List<Short> expectedServers = new ArrayList<Short>();
     if (interestedInAcks)
     {
-      if (sourceHandler.isLDAPserver())
+      if (sourceHandler.isDataServer())
       {
         // Look for RS eligible for assured
-        for (ServerHandler handler : replicationServers.values())
+        for (ReplicationServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
             // No ack expected from a RS with different group id
@@ -879,7 +891,7 @@
             origServer.incrementAssuredSrReceivedUpdatesTimeout();
           } else
           {
-            if (origServer.isLDAPserver())
+            if (origServer.isDataServer())
             {
               origServer.incrementAssuredSdReceivedUpdatesTimeout();
             }
@@ -957,7 +969,7 @@
    */
   public void stopReplicationServers(Collection<String> replServers)
   {
-    for (ServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : replicationServers.values())
     {
       if (replServers.contains(handler.getServerAddressURL()))
         stopServer(handler);
@@ -970,13 +982,13 @@
   public void stopAllServers()
   {
     // Close session with other replication servers
-    for (ServerHandler serverHandler : replicationServers.values())
+    for (ReplicationServerHandler serverHandler : replicationServers.values())
     {
       stopServer(serverHandler);
     }
 
     // Close session with other LDAP servers
-    for (ServerHandler serverHandler : directoryServers.values())
+    for (DataServerHandler serverHandler : directoryServers.values())
     {
       stopServer(serverHandler);
     }
@@ -988,14 +1000,13 @@
    * @param handler the DS we want to check
    * @return true if this is not a duplicate server
    */
-  public boolean checkForDuplicateDS(ServerHandler handler)
+  public boolean checkForDuplicateDS(DataServerHandler handler)
   {
-    ServerHandler oldHandler = directoryServers.get(handler.getServerId());
+    DataServerHandler oldHandler = directoryServers.get(handler.getServerId());
 
     if (directoryServers.containsKey(handler.getServerId()))
     {
       // looks like two LDAP servers have the same serverId
-      // log an error message and drop this connection.
       Message message = ERR_DUPLICATE_SERVER_ID.get(
         replicationServer.getMonitorInstanceName(), oldHandler.toString(),
         handler.toString(), handler.getServerId());
@@ -1012,6 +1023,12 @@
    */
   public void stopServer(ServerHandler handler)
   {
+      if (debugEnabled())
+        TRACER.debugInfo(
+            "In RS " + this.replicationServer.getMonitorInstanceName() +
+            " domain=" + this +
+            " stopServer(SH)" + handler.getMonitorInstanceName() +
+          " " + stackTraceToSingleLineString(new Exception()));
     /*
      * We must prevent deadlock on replication server domain lock, when for
      * instance this code is called from dying ServerReader but also dying
@@ -1020,14 +1037,103 @@
      * or not (is already being processed or not).
      */
     if (!handler.engageShutdown())
-    // Only do this once (prevent other thread to enter here again)
+      // Only do this once (prevent other thread to enter here again)
     {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In RS " + this.replicationServer.getMonitorInstanceName() +
-          " for " + baseDn + " " +
-          " stopServer " + handler.getMonitorInstanceName());
+      try
+      {
+        try
+        {
+          // Acquire lock on domain (see more details in comment of start()
+          // method of ServerHandler)
+          lock();
+        } catch (InterruptedException ex)
+        {
+          // Try doing job anyway...
+        }
 
+        if (handler.isReplicationServer())
+        {
+          if (replicationServers.containsValue(handler))
+          {
+            unregisterServerHandler(handler);
+            handler.shutdown();
+
+            // Check if generation id has to be resetted
+            mayResetGenerationId();
+            // Warn our DSs that a RS or DS has quit (does not use this
+            // handler as already removed from list)
+            buildAndSendTopoInfoToDSs(null);
+          }
+        } else
+        {
+          if (directoryServers.containsValue(handler))
+          {
+            // If this is the last DS for the domain,
+            // shutdown the status analyzer
+            if (directoryServers.size() == 1)
+            {
+              if (debugEnabled())
+                TRACER.debugInfo("In " +
+                    replicationServer.getMonitorInstanceName() +
+                    " remote server " + handler.getMonitorInstanceName() +
+                " is the last DS to be stopped: stopping status analyzer");
+              stopStatusAnalyzer();
+            }
+
+            unregisterServerHandler(handler);
+            handler.shutdown();
+
+            // Check if generation id has to be resetted
+            mayResetGenerationId();
+            // Update the remote replication servers with our list
+            // of connected LDAP servers
+            buildAndSendTopoInfoToRSs();
+            // Warn our DSs that a RS or DS has quit (does not use this
+            // handler as already removed from list)
+            buildAndSendTopoInfoToDSs(null);
+          }
+          else if (otherHandlers.contains(handler))
+          {
+            unRegisterHandler(handler);
+            handler.shutdown();
+          }
+        }
+
+      }
+      catch(Exception e)
+      {
+        logError(Message.raw(Category.SYNC, Severity.NOTICE,
+            stackTraceToSingleLineString(e)));
+      }
+      finally
+      {
+        release();
+      }
+    }
+  }
+
+  /**
+   * Stop the handler.
+   * @param handler The handler to stop.
+   */
+  public void stopServer(MessageHandler handler)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+          "In RS " + this.replicationServer.getMonitorInstanceName() +
+          " domain=" + this +
+          " stopServer(MH)" + handler.getMonitorInstanceName() +
+          " " + stackTraceToSingleLineString(new Exception()));
+    /*
+     * We must prevent deadlock on replication server domain lock, when for
+     * instance this code is called from dying ServerReader but also dying
+     * ServerWriter at the same time, or from a thread that wants to shut down
+     * the handler. So use a thread safe flag to know if the job must be done
+     * or not (is already being processed or not).
+     */
+    if (!handler.engageShutdown())
+      // Only do this once (prevent other thread to enter here again)
+    {
       try
       {
         // Acquire lock on domain (see more details in comment of start()
@@ -1037,57 +1143,40 @@
       {
         // Try doing job anyway...
       }
-
-      if (handler.isReplicationServer())
+      if (otherHandlers.contains(handler))
       {
-        if (replicationServers.containsValue(handler))
-        {
-          replicationServers.remove(handler.getServerId());
-          handler.shutdown();
-
-          // Check if generation id has to be resetted
-          mayResetGenerationId();
-          // Warn our DSs that a RS or DS has quit (does not use this
-          // handler as already removed from list)
-          sendTopoInfoToDSs(null);
-        }
-      } else
-      {
-        if (directoryServers.containsValue(handler))
-        {
-          // If this is the last DS for the domain, shutdown the status analyzer
-          if (directoryServers.size() == 1)
-          {
-            if (debugEnabled())
-              TRACER.debugInfo("In " +
-                replicationServer.getMonitorInstanceName() +
-                " remote server " + handler.getMonitorInstanceName() +
-                " is the last DS to be stopped: stopping status analyzer");
-            stopStatusAnalyzer();
-          }
-
-          directoryServers.remove(handler.getServerId());
-          handler.shutdown();
-
-          // Check if generation id has to be resetted
-          mayResetGenerationId();
-          // Update the remote replication servers with our list
-          // of connected LDAP servers
-          sendTopoInfoToRSs();
-          // Warn our DSs that a RS or DS has quit (does not use this
-          // handler as already removed from list)
-          sendTopoInfoToDSs(null);
-        }
+        unRegisterHandler(handler);
+        handler.shutdown();
       }
+    }
+    release();
+  }
 
-      release();
+  /**
+   * Unregister this handler from the list of handlers registered to this
+   * domain.
+   * @param handler the provided handler to unregister.
+   */
+  protected void unregisterServerHandler(ServerHandler handler)
+  {
+    if (handler.isReplicationServer())
+    {
+      replicationServers.remove(handler.getServerId());
+    }
+    else
+    {
+      directoryServers.remove(handler.getServerId());
     }
   }
 
   /**
-   * Resets the generationId for this domain if there is no LDAP
-   * server currently connected and if the generationId has never
-   * been saved.
+   * This method resets the generationId for this domain if there is no LDAP
+   * server currently connected in the whole topology on this domain and
+   * if the generationId has never been saved.
+   *
+   * - test emtpyness of directoryServers list
+   * - traverse replicationServers list and test for each if DS are connected
+   * So it strongly relies on the directoryServers list
    */
   protected void mayResetGenerationId()
   {
@@ -1104,7 +1193,7 @@
     boolean lDAPServersConnectedInTheTopology = false;
     if (directoryServers.isEmpty())
     {
-      for (ServerHandler rsh : replicationServers.values())
+      for (ReplicationServerHandler rsh : replicationServers.values())
       {
         if (generationId != rsh.getGenerationId())
         {
@@ -1149,14 +1238,16 @@
   }
 
   /**
-   * Checks that a RS is not already connected.
-   *
-   * @param handler the RS we want to check
-   * @return true if this is not a duplicate server
+   * Checks that a remote RS is not already connected to this hosting RS.
+   * @param handler The handler for the remote RS.
+   * @return flag specifying whether the remote RS is already connected.
+   * @throws DirectoryException when a problem occurs.
    */
-  public boolean checkForDuplicateRS(ServerHandler handler)
+  public boolean checkForDuplicateRS(ReplicationServerHandler handler)
+  throws DirectoryException
   {
-    ServerHandler oldHandler = replicationServers.get(handler.getServerId());
+    ReplicationServerHandler oldHandler =
+      replicationServers.get(handler.getServerId());
     if ((oldHandler != null))
     {
       if (oldHandler.getServerAddressURL().equals(
@@ -1166,7 +1257,9 @@
         // have been sent at about the same time and 2 connections
         // have been established.
         // Silently drop this connection.
-        } else
+        return false;
+      }
+      else
       {
         // looks like two replication servers have the same serverId
         // log an error message and drop this connection.
@@ -1174,9 +1267,8 @@
           replicationServer.getMonitorInstanceName(), oldHandler.
           getServerAddressURL(), handler.getServerAddressURL(),
           handler.getServerId());
-        logError(message);
+        throw new DirectoryException(ResultCode.OTHER, message);
       }
-      return false;
     }
     return true;
   }
@@ -1223,7 +1315,7 @@
   {
     LinkedHashSet<String> mySet = new LinkedHashSet<String>();
 
-    for (ServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : replicationServers.values())
     {
       mySet.add(handler.getServerAddressURL());
     }
@@ -1232,7 +1324,9 @@
   }
 
   /**
-   * Return a Set containing the servers known by this replicationServer.
+   * Return a set containing the server that produced update and known by
+   * this replicationServer from all over the topology,
+   * whatever directly connected of connected to another RS.
    * @return a set containing the servers known by this replicationServer.
    */
   public Set<Short> getServers()
@@ -1250,7 +1344,7 @@
   {
     List<String> mySet = new ArrayList<String>(0);
 
-    for (ServerHandler handler : directoryServers.values())
+    for (DataServerHandler handler : directoryServers.values())
     {
       mySet.add(String.valueOf(handler.getServerId()));
     }
@@ -1348,7 +1442,7 @@
       {
         // Send to all replication servers with a least one remote
         // server connected
-        for (ServerHandler rsh : replicationServers.values())
+        for (ReplicationServerHandler rsh : replicationServers.values())
         {
           if (rsh.hasRemoteLDAPServers())
           {
@@ -1358,7 +1452,7 @@
       }
 
       // Sends to all connected LDAP servers
-      for (ServerHandler destinationHandler : directoryServers.values())
+      for (DataServerHandler destinationHandler : directoryServers.values())
       {
         // Don't loop on the sender
         if (destinationHandler == senderHandler)
@@ -1368,7 +1462,7 @@
     } else
     {
       // Destination is one server
-      ServerHandler destinationHandler =
+      DataServerHandler destinationHandler =
         directoryServers.get(msg.getDestination());
       if (destinationHandler != null)
       {
@@ -1378,9 +1472,9 @@
         // the targeted server is NOT connected
         // Let's search for THE changelog server that MAY
         // have the targeted server connected.
-        if (senderHandler.isLDAPserver())
+        if (senderHandler.isDataServer())
         {
-          for (ServerHandler h : replicationServers.values())
+          for (ReplicationServerHandler h : replicationServers.values())
           {
             // Send to all replication servers with a least one remote
             // server connected
@@ -1421,7 +1515,7 @@
         // build the full list of all servers in the topology
         // and send back a MonitorMsg with the full list of all the servers
         // in the topology.
-        if (senderHandler.isLDAPserver())
+        if (senderHandler.isDataServer())
         {
           MonitorMsg returnMsg =
             new MonitorMsg(msg.getDestination(), msg.getsenderID());
@@ -1481,7 +1575,7 @@
         // from the states stored in the serverHandler.
         // - the server state
         // - the older missing change
-        for (ServerHandler lsh : this.directoryServers.values())
+        for (DataServerHandler lsh : this.directoryServers.values())
         {
           monitorMsg.setServerState(
             lsh.getServerId(),
@@ -1491,7 +1585,7 @@
         }
 
         // Same for the connected RS
-        for (ServerHandler rsh : this.replicationServers.values())
+        for (ReplicationServerHandler rsh : this.replicationServers.values())
         {
           monitorMsg.setServerState(
             rsh.getServerId(),
@@ -1657,12 +1751,12 @@
    */
   public void checkAllSaturation() throws IOException
   {
-    for (ServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : replicationServers.values())
     {
       handler.checkWindow();
     }
 
-    for (ServerHandler handler : directoryServers.values())
+    for (DataServerHandler handler : directoryServers.values())
     {
       handler.checkWindow();
     }
@@ -1675,15 +1769,15 @@
    * @return true if the server can restart sending changes.
    *         false if the server can't restart sending changes.
    */
-  public boolean restartAfterSaturation(ServerHandler sourceHandler)
+  public boolean restartAfterSaturation(MessageHandler sourceHandler)
   {
-    for (ServerHandler handler : replicationServers.values())
+    for (MessageHandler handler : replicationServers.values())
     {
       if (!handler.restartAfterSaturation(sourceHandler))
         return false;
     }
 
-    for (ServerHandler handler : directoryServers.values())
+    for (MessageHandler handler : directoryServers.values())
     {
       if (!handler.restartAfterSaturation(sourceHandler))
         return false;
@@ -1698,9 +1792,9 @@
    * @param notThisOne If not null, the topology message will not be sent to
    * this passed server.
    */
-  public void sendTopoInfoToDSs(ServerHandler notThisOne)
+  public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
   {
-    for (ServerHandler handler : directoryServers.values())
+    for (DataServerHandler handler : directoryServers.values())
     {
       if ((notThisOne == null) || // All DSs requested
         ((notThisOne != null) && (handler != notThisOne)))
@@ -1725,10 +1819,10 @@
    * Send a TopologyMsg to all the connected replication servers
    * in order to let them know our connected LDAP servers.
    */
-  public void sendTopoInfoToRSs()
+  public void buildAndSendTopoInfoToRSs()
   {
     TopologyMsg topoMsg = createTopologyMsgForRS();
-    for (ServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : replicationServers.values())
     {
       try
       {
@@ -1755,7 +1849,7 @@
     List<DSInfo> dsInfos = new ArrayList<DSInfo>();
 
     // Go through every DSs
-    for (ServerHandler serverHandler : directoryServers.values())
+    for (DataServerHandler serverHandler : directoryServers.values())
     {
       dsInfos.add(serverHandler.toDSInfo());
     }
@@ -1785,7 +1879,7 @@
     List<RSInfo> rsInfos = new ArrayList<RSInfo>();
 
     // Go through every DSs (except recipient of msg)
-    for (ServerHandler serverHandler : directoryServers.values())
+    for (DataServerHandler serverHandler : directoryServers.values())
     {
       if (serverHandler.getServerId() == destDsId)
         continue;
@@ -1799,7 +1893,7 @@
 
     // Go through every peer RSs (and get their connected DSs), also add info
     // for RSs
-    for (ServerHandler serverHandler : replicationServers.values())
+    for (ReplicationServerHandler serverHandler : replicationServers.values())
     {
       // Put RS info
       rsInfos.add(serverHandler.toRSInfo());
@@ -1840,12 +1934,6 @@
   synchronized public long setGenerationId(long generationId,
     boolean savedStatus)
   {
-    if (debugEnabled())
-      TRACER.debugInfo(
-        "In " + this.replicationServer.getMonitorInstanceName() +
-        " baseDN=" + baseDn +
-        " RCache.set GenerationId=" + generationId);
-
     long oldGenerationId = this.generationId;
 
     if (this.generationId != generationId)
@@ -1916,7 +2004,7 @@
         // After we'll have sent the message , the remote RS will adopt
         // the new genId
         rsHandler.setGenerationId(newGenId);
-        if (senderHandler.isLDAPserver())
+        if (senderHandler.isDataServer())
         {
           rsHandler.forwardGenerationIdToRS(genIdMsg);
         }
@@ -1929,7 +2017,7 @@
 
     // Change status of the connected DSs according to the requested new
     // reference generation id
-    for (ServerHandler dsHandler : directoryServers.values())
+    for (DataServerHandler dsHandler : directoryServers.values())
     {
       try
       {
@@ -1948,8 +2036,8 @@
     // (consecutive to reset gen id message), we prefer advertising once for
     // all after changes (less packet sent), here at the end of the reset msg
     // treatment.
-    sendTopoInfoToDSs(null);
-    sendTopoInfoToRSs();
+    buildAndSendTopoInfoToDSs(null);
+    buildAndSendTopoInfoToRSs();
 
     Message message = NOTE_RESET_GENERATION_ID.get(baseDn.toString(),
       Long.toString(newGenId));
@@ -1964,7 +2052,7 @@
    *        that changed his status.
    * @param csMsg The message containing the new status
    */
-  public void processNewStatus(ServerHandler senderHandler,
+  public void processNewStatus(DataServerHandler senderHandler,
     ChangeStatusMsg csMsg)
   {
     if (debugEnabled())
@@ -1995,8 +2083,8 @@
     }
 
     // Update every peers (RS/DS) with topology changes
-    sendTopoInfoToDSs(senderHandler);
-    sendTopoInfoToRSs();
+    buildAndSendTopoInfoToDSs(senderHandler);
+    buildAndSendTopoInfoToRSs();
 
     Message message = NOTE_DIRECTORY_SERVER_CHANGED_STATUS.get(
       Short.toString(senderHandler.getServerId()),
@@ -2014,7 +2102,7 @@
    * @param event The event to be used for new status computation
    * @return True if we have been interrupted (must stop), false otherwise
    */
-  public boolean changeStatusFromStatusAnalyzer(ServerHandler serverHandler,
+  public boolean changeStatusFromStatusAnalyzer(DataServerHandler serverHandler,
     StatusMachineEvent event)
   {
     try
@@ -2066,8 +2154,8 @@
     }
 
     // Update every peers (RS/DS) with topology changes
-    sendTopoInfoToDSs(serverHandler);
-    sendTopoInfoToRSs();
+    buildAndSendTopoInfoToDSs(serverHandler);
+    buildAndSendTopoInfoToRSs();
 
     release();
     return false;
@@ -2169,10 +2257,12 @@
    * @param allowResetGenId True for allowing to reset the generation id (
    * when called after initial handshake)
    * @throws IOException If an error occurred.
+   * @throws DirectoryException If an error occurred.
    */
-  public void receiveTopoInfoFromRS(TopologyMsg topoMsg, ServerHandler handler,
+  public void receiveTopoInfoFromRS(TopologyMsg topoMsg,
+      ReplicationServerHandler handler,
     boolean allowResetGenId)
-    throws IOException
+    throws IOException, DirectoryException
   {
     if (debugEnabled())
     {
@@ -2186,7 +2276,8 @@
     {
       // Acquire lock on domain (see more details in comment of start() method
       // of ServerHandler)
-      lock();
+      if (!hasLock())
+        lock();
     } catch (InterruptedException ex)
     {
       // Try doing job anyway...
@@ -2228,7 +2319,7 @@
      * Sends the currently known topology information to every connected
      * DS we have.
      */
-    sendTopoInfoToDSs(null);
+    buildAndSendTopoInfoToDSs(null);
 
     release();
   }
@@ -2441,7 +2532,7 @@
           {
             // this is the latency of the remote RSi regarding another RSj
             // let's update the latency of the LSes connected to RSj
-            ServerHandler rsjHdr = replicationServers.get(rsid);
+            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
             if (rsjHdr != null)
             {
               for (short remotelsid : rsjHdr.getConnectedDirectoryServerIds())
@@ -2496,7 +2587,7 @@
    * Get the map of connected DSs.
    * @return The map of connected DSs
    */
-  public Map<Short, ServerHandler> getConnectedDSs()
+  public Map<Short, DataServerHandler> getConnectedDSs()
   {
     return directoryServers;
   }
@@ -2505,7 +2596,7 @@
    * Get the map of connected RSs.
    * @return The map of connected RSs
    */
-  public Map<Short, ServerHandler> getConnectedRSs()
+  public Map<Short, ReplicationServerHandler> getConnectedRSs()
   {
     return replicationServers;
   }
@@ -2708,5 +2799,140 @@
 
     return attributes;
   }
-}
 
+  /**
+   * Register in the domain an handler that subscribes to changes.
+   * @param handler the provided subscribing handler.
+   */
+  public void registerHandler(MessageHandler handler)
+  {
+    this.otherHandlers.add(handler);
+  }
+
+  /**
+   * Unregister from the domain an handler.
+   * @param handler the provided unsubscribing handler.
+   * @return Whether this handler has been unregistered with success.
+   */
+  public boolean unRegisterHandler(MessageHandler handler)
+  {
+    return this.otherHandlers.remove(handler);
+  }
+
+  /**
+   * Return the state that contain for each server the time of eligibility.
+   * @return the state.
+   */
+  public ServerState getHeartbeatState()
+  {
+    // TODO:ECL Eligility must be supported
+    return this.getDbServerState();
+  }
+  /**
+   * Computes the change number eligible to the ECL.
+   * @return null if the domain does not play in eligibility.
+   */
+  public ChangeNumber computeEligibleCN()
+  {
+    ChangeNumber elligibleCN = null;
+    ServerState heartbeatState = getHeartbeatState();
+
+    if (heartbeatState==null)
+      return null;
+
+    // compute elligible CN
+    ServerState hbState = heartbeatState.duplicate();
+
+    Iterator<Short> it = hbState.iterator();
+    while (it.hasNext())
+    {
+      short sid = it.next();
+      ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
+
+      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
+      // then the server is considered down and not considered for eligibility
+      if (TimeThread.getTime()-storedCN.getTime()>2000)
+      {
+        if (debugEnabled())
+          TRACER.debugInfo(
+            "For RSD." + this.baseDn + " Server " + sid
+            + " is not considered for eligibility ... potentially down");
+        continue;
+      }
+
+      if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
+      {
+        elligibleCN = storedCN;
+      }
+    }
+
+    if (debugEnabled())
+      TRACER.debugInfo(
+        "For RSD." + this.baseDn + " ElligibleCN()=" + elligibleCN);
+    return elligibleCN;
+  }
+
+  /**
+   * Computes the eligible server state by minimizing the dbServerState and the
+   * elligibleCN.
+   * @return The computed eligible server state.
+   */
+  public ServerState getCLElligibleState()
+  {
+    // ChangeNumber elligibleCN = computeEligibleCN();
+    ServerState res = new ServerState();
+    ServerState dbState = this.getDbServerState();
+    res = dbState;
+
+    /* TODO:ECL Eligibility is not yet implemented
+    Iterator<Short> it = dbState.iterator();
+    while (it.hasNext())
+    {
+      Short sid = it.next();
+      DbHandler h = sourceDbHandlers.get(sid);
+      ChangeNumber dbCN = dbState.getMaxChangeNumber(sid);
+      try
+      {
+        if ((elligibleCN!=null)&&(elligibleCN.older(dbCN)))
+        {
+          // some CN exist in the db newer than elligible CN
+          ReplicationIterator ri = h.generateIterator(elligibleCN);
+          ChangeNumber newCN = ri.getCurrentCN();
+          res.update(newCN);
+          ri.releaseCursor();
+        }
+        else
+        {
+          // no CN exist in the db newer than elligible CN
+          res.update(dbCN);
+        }
+      }
+      catch(Exception e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
+    }
+    */
+
+    if (debugEnabled())
+      TRACER.debugInfo("In " + this.getName()
+        + " getCLElligibleState returns:" + res);
+    return res;
+  }
+
+  /**
+   * Returns the start state of the domain, made of the first (oldest)
+   * change stored for each serverId.
+   * @return t start state of the domain.
+   */
+  public ServerState getStartState()
+  {
+    ServerState domainStartState = new ServerState();
+    Iterator<Short> it = this.getDbServerState().iterator();
+    for (DbHandler dbHandler : sourceDbHandlers.values())
+    {
+      domainStartState.update(dbHandler.getFirstChange());
+    }
+    return domainStartState;
+  }
+}

--
Gitblit v1.10.0