From 4d90aff1b4e079be6e32e3f880e328883dd534ee Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 30 Mar 2011 19:21:16 +0000
Subject: [PATCH] Fix issue OpenDJ-96: Replication server monitor data computation takes too long / blocks rest of server when another RS is cannot be reached

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  758 +++++++++++++++++++++++++---------------------------------
 1 files changed, 325 insertions(+), 433 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 5e73527..4358278 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -41,15 +41,7 @@
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.*;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -70,7 +62,6 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.WorkflowImpl;
 import org.opends.server.core.networkgroups.NetworkGroup;
-import org.opends.server.loggers.LogLevel;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.protocol.ProtocolSession;
@@ -100,7 +91,6 @@
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 import com.sleepycat.je.DatabaseException;
-import java.util.Collections;
 
 /**
  * ReplicationServer Listener.
@@ -113,7 +103,7 @@
  * It is responsible for creating the replication server replicationServerDomain
  * and managing it
  */
-public class ReplicationServer
+public final class ReplicationServer
   implements ConfigurationChangeListener<ReplicationServerCfg>,
              BackupTaskListener, RestoreTaskListener, ImportTaskListener,
              ExportTaskListener
@@ -131,8 +121,8 @@
   /* This table is used to store the list of dn for which we are currently
    * handling servers.
    */
-  private ConcurrentHashMap<String, ReplicationServerDomain> baseDNs =
-          new ConcurrentHashMap<String, ReplicationServerDomain>();
+  private final Map<String, ReplicationServerDomain> baseDNs =
+          new HashMap<String, ReplicationServerDomain>();
 
   private String localURL = "null";
   private volatile boolean shutdown = false;
@@ -155,11 +145,6 @@
   // ID of the backend
   private static final String backendId = "replicationChanges";
 
-  // At startup, the listen thread wait on this flag for the connect
-  // thread to look for other servers in the topology.
-  private boolean connectedInTopology = false;
-  private final Object connectedInTopologyLock = new Object();
-
   /*
    * Assured mode properties
    */
@@ -180,10 +165,20 @@
   // The handler of the draft change numbers database, the database used to
   // store the relation between a draft change number ('seqnum') and the
   // associated cookie.
+  //
+  // Guarded by draftCNLock
+  //
   private DraftCNDbHandler draftCNDbHandler;
+
   // The last value generated of the draft change number.
+  //
+  // Guarded by draftCNLock
+  //
   private int lastGeneratedDraftCN = 0;
 
+  // Used for protecting draft CN related state.
+  private final Object draftCNLock = new Object();
+
   /**
    * The tracer object for the debug logger.
    */
@@ -191,13 +186,15 @@
 
   private static String externalChangeLogWorkflowID =
     "External Changelog Workflow ID";
-  ECLWorkflowElement eclwe;
-  WorkflowImpl externalChangeLogWorkflowImpl = null;
+  private ECLWorkflowElement eclwe;
+  private WorkflowImpl externalChangeLogWorkflowImpl = null;
 
   private static HashSet<Integer> localPorts = new HashSet<Integer>();
 
-  // used to synchronize the domain creation with the connect thread.
-  final private Object domainMonitor = new Object();
+  // Monitors for synchronizing domain creation with the connect thread.
+  private final Object domainTicketLock = new Object();
+  private final Object connectThreadLock = new Object();
+  private long domainTicket = 0L;
 
   // ServiceIDs excluded for ECL
   private  ArrayList<String> excludedServiceIDs = new ArrayList<String>();
@@ -314,22 +311,6 @@
 
   void runListen()
   {
-    // wait for the connect thread to find other replication
-    // servers in the topology before starting to accept connections
-    // from the ldap servers.
-    synchronized (connectedInTopologyLock)
-    {
-      if (connectedInTopology == false)
-      {
-        try
-        {
-          connectedInTopologyLock.wait(1000);
-        } catch (InterruptedException e)
-        {
-        }
-      }
-    }
-
     while ((shutdown == false) && (stopListen  == false))
     {
       // Wait on the replicationServer port.
@@ -419,88 +400,91 @@
    */
   void runConnect()
   {
-    while (shutdown == false)
+    synchronized (connectThreadLock)
     {
-      /*
-       * periodically check that we are connected to all other
-       * replication servers and if not establish the connection
-       */
-      for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
+      while (!shutdown)
       {
-        Set<String> connectedReplServers =
-                replicationServerDomain.getChangelogs();
         /*
-         * check that all replication server in the config are in the connected
-         * Set. If not create the connection
+         * periodically check that we are connected to all other replication
+         * servers and if not establish the connection
          */
-        for (String serverURL : replicationServers)
+        for (ReplicationServerDomain domain : getReplicationServerDomains())
         {
-          int separator = serverURL.lastIndexOf(':');
-          String port = serverURL.substring(separator + 1);
-          String hostname = serverURL.substring(0, separator);
+          Set<String> connectedReplServers = domain.getChangelogs();
 
-          try
+          /*
+           * check that all replication server in the config are in the
+           * connected Set. If not create the connection
+           */
+          for (String serverURL : replicationServers)
           {
-            InetAddress inetAddress = InetAddress.getByName(hostname);
-            String serverAddress = inetAddress.getHostAddress() + ":" + port;
-            String alternServerAddress = null;
-            if (hostname.equalsIgnoreCase("localhost"))
+            int separator = serverURL.lastIndexOf(':');
+            String port = serverURL.substring(separator + 1);
+            String hostname = serverURL.substring(0, separator);
 
+            try
             {
-              // if "localhost" was used as the hostname in the configuration
-              // also check is the connection is already opened with the
-              // local address.
-              alternServerAddress =
-                InetAddress.getLocalHost().getHostAddress() + ":" + port;
-            }
-            if (inetAddress.equals(InetAddress.getLocalHost()))
-            {
-              // if the host address is the local one, also check
-              // if the connection is already opened with the "localhost"
-              // address
-              alternServerAddress = "127.0.0.1" + ":" + port;
-            }
+              InetAddress inetAddress = InetAddress
+                  .getByName(hostname);
+              String serverAddress = inetAddress.getHostAddress()
+                  + ":" + port;
+              String alternServerAddress = null;
 
-            if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0)
-                && (serverAddress.compareTo(this.localURL) != 0)
-                && (!connectedReplServers.contains(serverAddress)
-                && ((alternServerAddress == null)
-                    || !connectedReplServers.contains(alternServerAddress))))
+              if (hostname.equalsIgnoreCase("localhost"))
+              {
+                // if "localhost" was used as the hostname in the configuration
+                // also check is the connection is already opened with the
+                // local address.
+                alternServerAddress = InetAddress.getLocalHost()
+                    .getHostAddress() + ":" + port;
+              }
+
+              if (inetAddress.equals(InetAddress.getLocalHost()))
+              {
+                // if the host address is the local one, also check
+                // if the connection is already opened with the "localhost"
+                // address
+                alternServerAddress = "127.0.0.1" + ":" + port;
+              }
+
+              if ((serverAddress.compareTo("127.0.0.1:"
+                  + replicationPort) != 0)
+                  && (serverAddress.compareTo(this.localURL) != 0)
+                  && (!connectedReplServers.contains(serverAddress)
+                      && ((alternServerAddress == null) || !connectedReplServers
+                      .contains(alternServerAddress))))
+              {
+                connect(serverURL, domain.getBaseDn());
+              }
+            }
+            catch (IOException e)
             {
-              this.connect(serverURL, replicationServerDomain.getBaseDn());
+              Message message = ERR_COULD_NOT_SOLVE_HOSTNAME
+                  .get(hostname);
+              logError(message);
             }
           }
-          catch (IOException e)
-          {
-            Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
-            logError(message);
-          }
         }
-      }
-      synchronized (connectedInTopologyLock)
-      {
-        // wake up the listen thread if necessary.
-        if (connectedInTopology == false)
+
+        // Notify any threads waiting with domain tickets after each iteration.
+        synchronized (domainTicketLock)
         {
-          connectedInTopologyLock.notify();
-          connectedInTopology = true;
+          domainTicket++;
+          domainTicketLock.notifyAll();
         }
-      }
-      try
-      {
-        synchronized(domainMonitor)
+
+        // Retry each second.
+        final int randomizer = (int) (Math.random() * 100);
+        try
         {
-          domainMonitor.notifyAll();
+          // Releases lock, allows threads to get domain ticket.
+          connectThreadLock.wait(1000 + randomizer);
         }
-        synchronized (this)
+        catch (InterruptedException e)
         {
-          /* check if we are connected every second */
-          int randomizer = (int)(Math.random()*100);
-          wait(1000 + randomizer);
+          // Signalled to shutdown.
+          return;
         }
-      } catch (InterruptedException e)
-      {
-        // ignore error, will try to connect again or shutdown
       }
     }
   }
@@ -721,40 +705,50 @@
 
   private void shutdownECL()
   {
-    WorkflowImpl eclwf =
-      (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID);
+    WorkflowImpl eclwf = (WorkflowImpl) WorkflowImpl
+        .getWorkflow(externalChangeLogWorkflowID);
 
     // do it only if not already done by another RS (unit test case)
     // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID)
-    if (eclwf!=null)
+    if (eclwf != null)
     {
+      // FIXME:ECL should the ECL Workflow be registered in
+      // internalNetworkGroup?
+      NetworkGroup internalNetworkGroup = NetworkGroup
+          .getInternalNetworkGroup();
+      internalNetworkGroup
+          .deregisterWorkflow(externalChangeLogWorkflowID);
 
+      // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
+      NetworkGroup adminNetworkGroup = NetworkGroup
+          .getAdminNetworkGroup();
+      adminNetworkGroup
+          .deregisterWorkflow(externalChangeLogWorkflowID);
 
-    // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
-    NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
-    internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
+      NetworkGroup defaultNetworkGroup = NetworkGroup
+          .getDefaultNetworkGroup();
+      defaultNetworkGroup
+          .deregisterWorkflow(externalChangeLogWorkflowID);
 
-    // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
-    NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
-    adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
-
-    NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
-    defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
-
-    eclwf.deregister();
-    eclwf.finalizeWorkflow();
+      eclwf.deregister();
+      eclwf.finalizeWorkflow();
     }
 
-    eclwe = (ECLWorkflowElement)
-    DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
-    if (eclwe!=null)
+    eclwe = (ECLWorkflowElement) DirectoryServer
+        .getWorkflowElement("EXTERNAL CHANGE LOG");
+    if (eclwe != null)
     {
       DirectoryServer.deregisterWorkflowElement(eclwe);
       eclwe.finalizeWorkflowElement();
     }
 
-    if (draftCNDbHandler != null)
-      draftCNDbHandler.shutdown();
+    synchronized (draftCNLock)
+    {
+      if (draftCNDbHandler != null)
+      {
+        draftCNDbHandler.shutdown();
+      }
+    }
   }
 
   /**
@@ -790,38 +784,60 @@
   public ReplicationServerDomain getReplicationServerDomain(String baseDn,
           boolean create, boolean waitConnections)
   {
-    ReplicationServerDomain replicationServerDomain;
+    ReplicationServerDomain domain;
 
     synchronized (baseDNs)
     {
-      replicationServerDomain = baseDNs.get(baseDn);
-      if ((replicationServerDomain == null) && (create))
+      domain = baseDNs.get(baseDn);
+
+      if (domain != null ||!create) {
+        return domain;
+      }
+
+      domain = new ReplicationServerDomain(baseDn, this);
+      baseDNs.put(baseDn, domain);
+    }
+
+    if (waitConnections)
+    {
+      // Acquire a domain ticket and wait for a complete cycle of the connect
+      // thread.
+      final long myDomainTicket;
+      synchronized (connectThreadLock)
       {
-        replicationServerDomain = new ReplicationServerDomain(baseDn, this);
-        baseDNs.put(baseDn, replicationServerDomain);
-        synchronized (domainMonitor)
+        // Connect thread must be waiting.
+        synchronized (domainTicketLock)
         {
-          if (waitConnections)
+          // Determine the ticket which will be used in the next connect thread
+          // iteration.
+          myDomainTicket = domainTicket + 1;
+        }
+
+        // Wake up connect thread.
+        connectThreadLock.notify();
+      }
+
+      // Wait until the connect thread has processed next connect phase.
+      synchronized (domainTicketLock)
+      {
+        // Condition.
+        while (myDomainTicket > domainTicket && !shutdown)
+        {
+          try
           {
-            synchronized (this)
-            {
-              // kick up the connect thread so that this new domain
-              // gets connected to all the Replication Servers.
-              this.notify();
-            }
-            try
-            {
-              // wait for the connect thread to signal that it finished its job
-              domainMonitor.wait(500);
-            } catch (InterruptedException e)
-            {
-            }
+            // Wait with timeout so that we detect shutdown.
+            domainTicketLock.wait(500);
+          }
+          catch (InterruptedException e)
+          {
+            // Can't do anything with this.
+            Thread.currentThread().interrupt();
           }
         }
       }
     }
 
-    return replicationServerDomain;
+    return domain;
   }
 
   /**
@@ -861,9 +877,9 @@
     }
 
     // shutdown all the ChangelogCaches
-    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+    for (ReplicationServerDomain domain : getReplicationServerDomains())
     {
-      replicationServerDomain.shutdown();
+      domain.shutdown();
     }
 
     shutdownECL();
@@ -894,37 +910,60 @@
     return new DbHandler(id, baseDn, this, dbEnv, queueSize);
   }
 
+
+
   /**
    * Clears the generationId for the replicationServerDomain related to the
    * provided baseDn.
-   * @param  baseDn The baseDn for which to delete the generationId.
-   * @throws DatabaseException When it occurs.
+   *
+   * @param baseDn
+   *          The baseDn for which to delete the generationId.
    */
   public void clearGenerationId(String baseDn)
-  throws DatabaseException
   {
     try
     {
       dbEnv.clearGenerationId(baseDn);
+    }
+    catch (Exception e)
+    {
+      // Ignore.
+      if (debugEnabled())
+      {
+        TRACER.debugCaught(DebugLogLevel.WARNING, e);
+      }
+    }
 
-      if (this.draftCNDbHandler != null)
+    synchronized (draftCNLock)
+    {
+      if (draftCNDbHandler != null)
       {
         try
         {
-          try
+          draftCNDbHandler.clear(baseDn);
+        }
+        catch (Exception e)
+        {
+          // Ignore.
+          if (debugEnabled())
           {
-            draftCNDbHandler.clear(baseDn);
+            TRACER.debugCaught(DebugLogLevel.WARNING, e);
           }
-          catch(Exception e){}
+        }
+
+        try
+        {
           lastGeneratedDraftCN = draftCNDbHandler.getLastKey();
         }
-        catch(Exception e) {}
+        catch (Exception e)
+        {
+          // Ignore.
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+          }
+        }
       }
-
-    }
-    catch(Exception e)
-    {
-      TRACER.debugCaught(LogLevel.ALL, e);
     }
   }
 
@@ -993,7 +1032,7 @@
     {
       purgeDelay = newPurgeDelay;
       // propagate
-      for (ReplicationServerDomain domain : baseDNs.values())
+      for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
         domain.setPurgeDelay(purgeDelay*1000);
       }
@@ -1043,64 +1082,71 @@
 
     // Update threshold value for status analyzers (stop them if requested
     // value is 0)
-    if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
+    if (degradedStatusThreshold != configuration
+        .getDegradedStatusThreshold())
     {
       int oldThresholdValue = degradedStatusThreshold;
-      degradedStatusThreshold = configuration.getDegradedStatusThreshold();
-      for(ReplicationServerDomain rsd : baseDNs.values())
+      degradedStatusThreshold = configuration
+          .getDegradedStatusThreshold();
+      for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
         if (degradedStatusThreshold == 0)
         {
           // Requested to stop analyzers
-          rsd.stopStatusAnalyzer();
-        } else if (rsd.isRunningStatusAnalyzer())
+          domain.stopStatusAnalyzer();
+        }
+        else if (domain.isRunningStatusAnalyzer())
         {
           // Update the threshold value for this running analyzer
-          rsd.updateStatusAnalyzer(degradedStatusThreshold);
-        } else if (oldThresholdValue == 0)
+          domain.updateStatusAnalyzer(degradedStatusThreshold);
+        }
+        else if (oldThresholdValue == 0)
         {
           // Requested to start analyzers with provided threshold value
-          if (rsd.getConnectedDSs().size() > 0)
-            rsd.startStatusAnalyzer();
+          if (domain.getConnectedDSs().size() > 0)
+            domain.startStatusAnalyzer();
         }
       }
     }
 
     // Update period value for monitoring publishers (stop them if requested
     // value is 0)
-    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
+    if (monitoringPublisherPeriod != configuration
+        .getMonitoringPeriod())
     {
       long oldMonitoringPeriod = monitoringPublisherPeriod;
       monitoringPublisherPeriod = configuration.getMonitoringPeriod();
-      for(ReplicationServerDomain rsd : baseDNs.values())
+      for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
         if (monitoringPublisherPeriod == 0L)
         {
           // Requested to stop monitoring publishers
-          rsd.stopMonitoringPublisher();
-        } else if (rsd.isRunningMonitoringPublisher())
+          domain.stopMonitoringPublisher();
+        }
+        else if (domain.isRunningMonitoringPublisher())
         {
           // Update the threshold value for this running monitoring publisher
-          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
-        } else if (oldMonitoringPeriod == 0L)
+          domain.updateMonitoringPublisher(monitoringPublisherPeriod);
+        }
+        else if (oldMonitoringPeriod == 0L)
         {
           // Requested to start monitoring publishers with provided period value
-          if ( (rsd.getConnectedDSs().size() > 0) ||
-            (rsd.getConnectedRSs().size() > 0) )
-            rsd.startMonitoringPublisher();
+          if ((domain.getConnectedDSs().size() > 0)
+              || (domain.getConnectedRSs().size() > 0))
+            domain.startMonitoringPublisher();
         }
       }
     }
 
     // Changed the group id ?
-    byte newGroupId = (byte)configuration.getGroupId();
+    byte newGroupId = (byte) configuration.getGroupId();
     if (newGroupId != groupId)
     {
       groupId = newGroupId;
       // Have a new group id: Disconnect every servers.
-      for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+      for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
-        replicationServerDomain.stopAllServers(true);
+        domain.stopAllServers(true);
       }
     }
 
@@ -1129,10 +1175,10 @@
    */
   private void broadcastConfigChange()
   {
-    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
+    for (ReplicationServerDomain domain : getReplicationServerDomains())
     {
-      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
-      replicationServerDomain.buildAndSendTopoInfoToRSs();
+      domain.buildAndSendTopoInfoToDSs(null);
+      domain.buildAndSendTopoInfoToRSs();
     }
   }
 
@@ -1364,10 +1410,15 @@
    */
   public Iterator<ReplicationServerDomain> getDomainIterator()
   {
-    if (!baseDNs.isEmpty())
-      return baseDNs.values().iterator();
+    Collection<ReplicationServerDomain> domains = getReplicationServerDomains();
+    if (!domains.isEmpty())
+    {
+      return domains.iterator();
+    }
     else
+    {
       return null;
+    }
   }
 
   /**
@@ -1384,16 +1435,40 @@
         rsd.clearDbs();
       }
     }
-    if (this.draftCNDbHandler != null)
+
+    synchronized (draftCNLock)
     {
-      try
+      if (draftCNDbHandler != null)
       {
-        try { draftCNDbHandler.clear(); } catch(Exception e){}
-        draftCNDbHandler.shutdown();
+        try
+        {
+          draftCNDbHandler.clear();
+        }
+        catch (Exception e)
+        {
+          // Ignore.
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+          }
+        }
+
+        try
+        {
+          draftCNDbHandler.shutdown();
+        }
+        catch (Exception e)
+        {
+          // Ignore.
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+          }
+        }
+
         lastGeneratedDraftCN = 0;
         draftCNDbHandler = null;
       }
-      catch(Exception e) {}
     }
   }
 
@@ -1468,9 +1543,9 @@
     if (serversToDisconnect.isEmpty())
       return;
 
-    for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
+    for (ReplicationServerDomain domain: getReplicationServerDomains())
     {
-      replicationServerDomain.stopReplicationServers(serversToDisconnect);
+      domain.stopReplicationServers(serversToDisconnect);
     }
   }
 
@@ -1494,221 +1569,6 @@
     return replicationPort;
   }
 
-  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
-  private long monitorDataLifeTime = 500;
-
-  /* The date of the last time they have been elaborated */
-  private long monitorDataLastBuildDate = 0;
-
-  /**
-   * This uniquely identifies a server (handler) in the cross-domain topology.
-   * Represents an identifier of a handler (in the whole RS) we have to wait a
-   * monitoring message from before answering to a monitor request.
-   */
-  public static class GlobalServerId {
-
-    private int serverId = -1;
-    private String baseDn = null;
-
-    /**
-     * Constructor for a global server id.
-     * @param baseDn The dn of the RSD owning the handler.
-     * @param serverId The handler id in the matching RSD.
-     */
-    public GlobalServerId(String baseDn, int serverId) {
-      this.baseDn = baseDn;
-      this.serverId = serverId;
-    }
-
-    /**
-     * Get the server handler id.
-     * @return the serverId
-     */
-    public int getServerId()
-    {
-      return serverId;
-    }
-
-    /**
-     * Get the base dn.
-     * @return the baseDn
-     */
-    public String getBaseDn()
-    {
-      return baseDn;
-    }
-
-    /**
-     * Get the hascode.
-     * @return The hashcode.
-     */
-    @Override
-    public int hashCode()
-    {
-      int hash = 7;
-      hash = 43 * hash + this.serverId;
-      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
-      return hash;
-    }
-
-    /**
-     * Tests if the passed global server handler id represents the same server
-     * handler as this one.
-     * @param obj The object to test.
-     * @return True if both identifiers are the same.
-     */
-    public boolean equals(Object obj) {
-      if ( (obj == null) || (!(obj instanceof GlobalServerId)))
-        return false;
-
-      GlobalServerId globalServerId = (GlobalServerId)obj;
-      return ( globalServerId.baseDn.equals(baseDn) &&
-        (globalServerId.serverId == serverId) );
-    }
-  }
-
-  /**
-   * This gives the list of server handlers we are willing to wait monitoring
-   * message from. Each time a monitoring message is received by a server
-   * handler, the matching server handler id is retired from the list. When the
-   * list is empty, we received all expected monitoring messages.
-   */
-  private List<GlobalServerId> expectedMonitoringMsg = null;
-
-  /**
-   * Trigger the computation of the Global Monitoring Data.
-   * This should be called by all the MonitorProviders that need
-   * the global monitoring data to be updated before they can
-   * publish their information to cn=monitor.
-   *
-   * This method will trigger the update of all the global monitoring
-   * information of all the base-DNs of this replication Server.
-   *
-   * @throws DirectoryException If the computation cannot be achieved.
-   */
-  public synchronized void computeMonitorData() throws DirectoryException
-  {
-    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
-      // The current data are still valid. No need to renew them.
-      return;
-    }
-
-    // Initialize the list of server handlers we expect monitoring messages from
-    expectedMonitoringMsg =
-      Collections.synchronizedList(new ArrayList<GlobalServerId>());
-
-    // Copy the list of domains as a new domain may arrive or disappear between
-    // the initializeMonitorData and completeMonitorData calls
-    List<ReplicationServerDomain> rsdList =
-                new ArrayList<ReplicationServerDomain>(baseDNs.values());
-
-    for (ReplicationServerDomain domain : rsdList)
-    {
-      domain.initializeMonitorData(expectedMonitoringMsg);
-    }
-
-    // Wait for responses
-    waitMonitorDataResponses();
-
-    for (ReplicationServerDomain domain : rsdList)
-    {
-      domain.completeMonitorData();
-    }
-  }
-
-  /**
-   * Wait for the expected received MonitorMsg.
-   * @throws DirectoryException When an error occurs.
-   */
-  private void waitMonitorDataResponses()
-    throws DirectoryException
-  {
-    try
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + getMonitorInstanceName() +
-          " waiting for " + expectedMonitoringMsg.size() +
-          " expected monitor messages");
-
-      // Wait up to 5 seconds for every expected monitoring message to come
-      // back.
-      boolean allReceived = false;
-      long startTime = TimeThread.getTime();
-      long curTime = startTime;
-      int maxTime = 5000;
-      while ( (curTime - startTime) < maxTime )
-      {
-        // Have every expected monitoring messages arrived ?
-        if (expectedMonitoringMsg.size() == 0)
-        {
-          // Ok break the loop
-          allReceived = true;
-          break;
-        }
-        Thread.sleep(100);
-        curTime = TimeThread.getTime();
-      }
-
-      monitorDataLastBuildDate = TimeThread.getTime();
-
-      if (!allReceived)
-      {
-        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
-        // let's go on in best effort even with limited data received.
-      } else
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "In " + getMonitorInstanceName() +
-            " Successfully received all expected monitor messages");
-      }
-    } catch (Exception e)
-    {
-      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
-    }
-  }
-
-
-  /**
-   * This should be called by each ReplicationServerDomain that receives
-   * a response to a monitor request message. This may also be called when a
-   * monitoring message is coming from a RS whose monitoring publisher thread
-   * sent it. As monitoring messages (sent because of monitoring request or
-   * because of monitoring publisher) have the same content, this is also ok
-   * to mark ok the server when the monitoring message coms from a monitoring
-   * publisher thread.
-   * @param globalServerId The server handler that is receiving the
-   * monitoring message.
-   */
-  public void responseReceived(GlobalServerId globalServerId)
-  {
-    expectedMonitoringMsg.remove(globalServerId);
-  }
-
-
-  /**
-   * This should be called when the Monitoring has failed and the
-   * Worker thread that is waiting for the result should be awaken.
-   */
-  public void responseReceivedAll()
-  {
-    expectedMonitoringMsg.clear();
-  }
-
-  /**
-   * Returns the number of domains managed by this replication server.
-   * @return the number of domains managed.
-   */
-  public int getCacheSize()
-  {
-    return baseDNs.size();
-  }
-
   /**
    * Create a new session to get the ECL.
    * @param msg The message that specifies the ECL request.
@@ -1861,32 +1721,37 @@
     return eligibleCN;
   }
 
+
+
   /**
    * Get or create a handler on a Db on DraftCN for external changelog.
+   *
    * @return the handler.
-   * @throws DirectoryException when needed.
+   * @throws DirectoryException
+   *           when needed.
    */
-  public synchronized DraftCNDbHandler getDraftCNDbHandler()
-  throws DirectoryException
+  public DraftCNDbHandler getDraftCNDbHandler()
+      throws DirectoryException
   {
-    try
+    synchronized (draftCNLock)
     {
-      if (draftCNDbHandler == null)
+      try
       {
-        draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
         if (draftCNDbHandler == null)
-          return null;
-        this.lastGeneratedDraftCN = getLastDraftChangeNumber();
+        {
+          draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
+          lastGeneratedDraftCN = getLastDraftChangeNumber();
+        }
+        return draftCNDbHandler;
       }
-      return draftCNDbHandler;
-    }
-    catch (Exception e)
-    {
-      TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
-      throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
-          mb.toMessage(), e);
+      catch (Exception e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
+        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
+            mb.toMessage(), e);
+      }
     }
   }
 
@@ -1896,10 +1761,17 @@
    */
   public int getFirstDraftChangeNumber()
   {
-    int first=0;
-    if (draftCNDbHandler != null)
-      first = draftCNDbHandler.getFirstKey();
-    return first;
+    synchronized (draftCNLock)
+    {
+      if (draftCNDbHandler != null)
+      {
+        return draftCNDbHandler.getFirstKey();
+      }
+      else
+      {
+        return 0;
+      }
+    }
   }
 
   /**
@@ -1908,19 +1780,29 @@
    */
   public int getLastDraftChangeNumber()
   {
-    int last=0;
-    if (draftCNDbHandler != null)
-      last = draftCNDbHandler.getLastKey();
-    return last;
+    synchronized (draftCNLock)
+    {
+      if (draftCNDbHandler != null)
+      {
+        return draftCNDbHandler.getLastKey();
+      }
+      else
+      {
+        return 0;
+      }
+    }
   }
 
   /**
    * Generate a new Draft ChangeNumber.
    * @return The generated Draft ChangeNUmber
    */
-  synchronized public int getNewDraftCN()
+  public int getNewDraftCN()
   {
-    return ++lastGeneratedDraftCN;
+    synchronized (draftCNLock)
+    {
+      return ++lastGeneratedDraftCN;
+    }
   }
 
   /**
@@ -2092,4 +1974,14 @@
     return weight;
   }
 
+
+
+  private Collection<ReplicationServerDomain> getReplicationServerDomains()
+  {
+    synchronized (baseDNs)
+    {
+      return new ArrayList<ReplicationServerDomain>(baseDNs.values());
+    }
+  }
+
 }

--
Gitblit v1.10.0