mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
30.21.2011 4d0faf5b8ad46e978a72d35a8f736f83fb61fd2d
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -41,15 +41,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
@@ -70,7 +62,6 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
import org.opends.server.core.networkgroups.NetworkGroup;
import org.opends.server.loggers.LogLevel;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.ProtocolSession;
@@ -100,7 +91,6 @@
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import com.sleepycat.je.DatabaseException;
import java.util.Collections;
/**
 * ReplicationServer Listener.
@@ -113,7 +103,7 @@
 * It is responsible for creating the replication server replicationServerDomain
 * and managing it
 */
public class ReplicationServer
public final class ReplicationServer
  implements ConfigurationChangeListener<ReplicationServerCfg>,
             BackupTaskListener, RestoreTaskListener, ImportTaskListener,
             ExportTaskListener
@@ -131,8 +121,8 @@
  /* This table is used to store the list of dn for which we are currently
   * handling servers.
   */
  private ConcurrentHashMap<String, ReplicationServerDomain> baseDNs =
          new ConcurrentHashMap<String, ReplicationServerDomain>();
  private final Map<String, ReplicationServerDomain> baseDNs =
          new HashMap<String, ReplicationServerDomain>();
  private String localURL = "null";
  private volatile boolean shutdown = false;
@@ -155,11 +145,6 @@
  // ID of the backend
  private static final String backendId = "replicationChanges";
  // At startup, the listen thread wait on this flag for the connect
  // thread to look for other servers in the topology.
  private boolean connectedInTopology = false;
  private final Object connectedInTopologyLock = new Object();
  /*
   * Assured mode properties
   */
@@ -180,10 +165,20 @@
  // The handler of the draft change numbers database, the database used to
  // store the relation between a draft change number ('seqnum') and the
  // associated cookie.
  //
  // Guarded by draftCNLock
  //
  private DraftCNDbHandler draftCNDbHandler;
  // The last value generated of the draft change number.
  //
  // Guarded by draftCNLock
  //
  private int lastGeneratedDraftCN = 0;
  // Used for protecting draft CN related state.
  private final Object draftCNLock = new Object();
  /**
   * The tracer object for the debug logger.
   */
@@ -191,13 +186,15 @@
  private static String externalChangeLogWorkflowID =
    "External Changelog Workflow ID";
  ECLWorkflowElement eclwe;
  WorkflowImpl externalChangeLogWorkflowImpl = null;
  private ECLWorkflowElement eclwe;
  private WorkflowImpl externalChangeLogWorkflowImpl = null;
  private static HashSet<Integer> localPorts = new HashSet<Integer>();
  // used to synchronize the domain creation with the connect thread.
  final private Object domainMonitor = new Object();
  // Monitors for synchronizing domain creation with the connect thread.
  private final Object domainTicketLock = new Object();
  private final Object connectThreadLock = new Object();
  private long domainTicket = 0L;
  // ServiceIDs excluded for ECL
  private  ArrayList<String> excludedServiceIDs = new ArrayList<String>();
@@ -314,22 +311,6 @@
  void runListen()
  {
    // wait for the connect thread to find other replication
    // servers in the topology before starting to accept connections
    // from the ldap servers.
    synchronized (connectedInTopologyLock)
    {
      if (connectedInTopology == false)
      {
        try
        {
          connectedInTopologyLock.wait(1000);
        } catch (InterruptedException e)
        {
        }
      }
    }
    while ((shutdown == false) && (stopListen  == false))
    {
      // Wait on the replicationServer port.
@@ -419,88 +400,91 @@
   */
  void runConnect()
  {
    while (shutdown == false)
    synchronized (connectThreadLock)
    {
      /*
       * periodically check that we are connected to all other
       * replication servers and if not establish the connection
       */
      for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
      while (!shutdown)
      {
        Set<String> connectedReplServers =
                replicationServerDomain.getChangelogs();
        /*
         * check that all replication server in the config are in the connected
         * Set. If not create the connection
         * periodically check that we are connected to all other replication
         * servers and if not establish the connection
         */
        for (String serverURL : replicationServers)
        for (ReplicationServerDomain domain : getReplicationServerDomains())
        {
          int separator = serverURL.lastIndexOf(':');
          String port = serverURL.substring(separator + 1);
          String hostname = serverURL.substring(0, separator);
          Set<String> connectedReplServers = domain.getChangelogs();
          try
          /*
           * check that all replication server in the config are in the
           * connected Set. If not create the connection
           */
          for (String serverURL : replicationServers)
          {
            InetAddress inetAddress = InetAddress.getByName(hostname);
            String serverAddress = inetAddress.getHostAddress() + ":" + port;
            String alternServerAddress = null;
            if (hostname.equalsIgnoreCase("localhost"))
            int separator = serverURL.lastIndexOf(':');
            String port = serverURL.substring(separator + 1);
            String hostname = serverURL.substring(0, separator);
            try
            {
              // if "localhost" was used as the hostname in the configuration
              // also check is the connection is already opened with the
              // local address.
              alternServerAddress =
                InetAddress.getLocalHost().getHostAddress() + ":" + port;
            }
            if (inetAddress.equals(InetAddress.getLocalHost()))
            {
              // if the host address is the local one, also check
              // if the connection is already opened with the "localhost"
              // address
              alternServerAddress = "127.0.0.1" + ":" + port;
            }
              InetAddress inetAddress = InetAddress
                  .getByName(hostname);
              String serverAddress = inetAddress.getHostAddress()
                  + ":" + port;
              String alternServerAddress = null;
            if ((serverAddress.compareTo("127.0.0.1:" + replicationPort) != 0)
                && (serverAddress.compareTo(this.localURL) != 0)
                && (!connectedReplServers.contains(serverAddress)
                && ((alternServerAddress == null)
                    || !connectedReplServers.contains(alternServerAddress))))
              if (hostname.equalsIgnoreCase("localhost"))
              {
                // if "localhost" was used as the hostname in the configuration
                // also check is the connection is already opened with the
                // local address.
                alternServerAddress = InetAddress.getLocalHost()
                    .getHostAddress() + ":" + port;
              }
              if (inetAddress.equals(InetAddress.getLocalHost()))
              {
                // if the host address is the local one, also check
                // if the connection is already opened with the "localhost"
                // address
                alternServerAddress = "127.0.0.1" + ":" + port;
              }
              if ((serverAddress.compareTo("127.0.0.1:"
                  + replicationPort) != 0)
                  && (serverAddress.compareTo(this.localURL) != 0)
                  && (!connectedReplServers.contains(serverAddress)
                      && ((alternServerAddress == null) || !connectedReplServers
                      .contains(alternServerAddress))))
              {
                connect(serverURL, domain.getBaseDn());
              }
            }
            catch (IOException e)
            {
              this.connect(serverURL, replicationServerDomain.getBaseDn());
              Message message = ERR_COULD_NOT_SOLVE_HOSTNAME
                  .get(hostname);
              logError(message);
            }
          }
          catch (IOException e)
          {
            Message message = ERR_COULD_NOT_SOLVE_HOSTNAME.get(hostname);
            logError(message);
          }
        }
      }
      synchronized (connectedInTopologyLock)
      {
        // wake up the listen thread if necessary.
        if (connectedInTopology == false)
        // Notify any threads waiting with domain tickets after each iteration.
        synchronized (domainTicketLock)
        {
          connectedInTopologyLock.notify();
          connectedInTopology = true;
          domainTicket++;
          domainTicketLock.notifyAll();
        }
      }
      try
      {
        synchronized(domainMonitor)
        // Retry each second.
        final int randomizer = (int) (Math.random() * 100);
        try
        {
          domainMonitor.notifyAll();
          // Releases lock, allows threads to get domain ticket.
          connectThreadLock.wait(1000 + randomizer);
        }
        synchronized (this)
        catch (InterruptedException e)
        {
          /* check if we are connected every second */
          int randomizer = (int)(Math.random()*100);
          wait(1000 + randomizer);
          // Signalled to shutdown.
          return;
        }
      } catch (InterruptedException e)
      {
        // ignore error, will try to connect again or shutdown
      }
    }
  }
@@ -721,40 +705,50 @@
  private void shutdownECL()
  {
    WorkflowImpl eclwf =
      (WorkflowImpl)WorkflowImpl.getWorkflow(externalChangeLogWorkflowID);
    WorkflowImpl eclwf = (WorkflowImpl) WorkflowImpl
        .getWorkflow(externalChangeLogWorkflowID);
    // do it only if not already done by another RS (unit test case)
    // if (DirectoryServer.getWorkflowElement(externalChangeLogWorkflowID)
    if (eclwf!=null)
    if (eclwf != null)
    {
      // FIXME:ECL should the ECL Workflow be registered in
      // internalNetworkGroup?
      NetworkGroup internalNetworkGroup = NetworkGroup
          .getInternalNetworkGroup();
      internalNetworkGroup
          .deregisterWorkflow(externalChangeLogWorkflowID);
      // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
      NetworkGroup adminNetworkGroup = NetworkGroup
          .getAdminNetworkGroup();
      adminNetworkGroup
          .deregisterWorkflow(externalChangeLogWorkflowID);
    // FIXME:ECL should the ECL Workflow be registered in internalNetworkGroup?
    NetworkGroup internalNetworkGroup = NetworkGroup.getInternalNetworkGroup();
    internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
      NetworkGroup defaultNetworkGroup = NetworkGroup
          .getDefaultNetworkGroup();
      defaultNetworkGroup
          .deregisterWorkflow(externalChangeLogWorkflowID);
    // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
    NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
    adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
    NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
    defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
    eclwf.deregister();
    eclwf.finalizeWorkflow();
      eclwf.deregister();
      eclwf.finalizeWorkflow();
    }
    eclwe = (ECLWorkflowElement)
    DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
    if (eclwe!=null)
    eclwe = (ECLWorkflowElement) DirectoryServer
        .getWorkflowElement("EXTERNAL CHANGE LOG");
    if (eclwe != null)
    {
      DirectoryServer.deregisterWorkflowElement(eclwe);
      eclwe.finalizeWorkflowElement();
    }
    if (draftCNDbHandler != null)
      draftCNDbHandler.shutdown();
    synchronized (draftCNLock)
    {
      if (draftCNDbHandler != null)
      {
        draftCNDbHandler.shutdown();
      }
    }
  }
  /**
@@ -790,38 +784,60 @@
  public ReplicationServerDomain getReplicationServerDomain(String baseDn,
          boolean create, boolean waitConnections)
  {
    ReplicationServerDomain replicationServerDomain;
    ReplicationServerDomain domain;
    synchronized (baseDNs)
    {
      replicationServerDomain = baseDNs.get(baseDn);
      if ((replicationServerDomain == null) && (create))
      domain = baseDNs.get(baseDn);
      if (domain != null ||!create) {
        return domain;
      }
      domain = new ReplicationServerDomain(baseDn, this);
      baseDNs.put(baseDn, domain);
    }
    if (waitConnections)
    {
      // Acquire a domain ticket and wait for a complete cycle of the connect
      // thread.
      final long myDomainTicket;
      synchronized (connectThreadLock)
      {
        replicationServerDomain = new ReplicationServerDomain(baseDn, this);
        baseDNs.put(baseDn, replicationServerDomain);
        synchronized (domainMonitor)
        // Connect thread must be waiting.
        synchronized (domainTicketLock)
        {
          if (waitConnections)
          // Determine the ticket which will be used in the next connect thread
          // iteration.
          myDomainTicket = domainTicket + 1;
        }
        // Wake up connect thread.
        connectThreadLock.notify();
      }
      // Wait until the connect thread has processed next connect phase.
      synchronized (domainTicketLock)
      {
        // Condition.
        while (myDomainTicket > domainTicket && !shutdown)
        {
          try
          {
            synchronized (this)
            {
              // kick up the connect thread so that this new domain
              // gets connected to all the Replication Servers.
              this.notify();
            }
            try
            {
              // wait for the connect thread to signal that it finished its job
              domainMonitor.wait(500);
            } catch (InterruptedException e)
            {
            }
            // Wait with timeout so that we detect shutdown.
            domainTicketLock.wait(500);
          }
          catch (InterruptedException e)
          {
            // Can't do anything with this.
            Thread.currentThread().interrupt();
          }
        }
      }
    }
    return replicationServerDomain;
    return domain;
  }
  /**
@@ -861,9 +877,9 @@
    }
    // shutdown all the ChangelogCaches
    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
    for (ReplicationServerDomain domain : getReplicationServerDomains())
    {
      replicationServerDomain.shutdown();
      domain.shutdown();
    }
    shutdownECL();
@@ -894,37 +910,60 @@
    return new DbHandler(id, baseDn, this, dbEnv, queueSize);
  }
  /**
   * Clears the generationId for the replicationServerDomain related to the
   * provided baseDn.
   * @param  baseDn The baseDn for which to delete the generationId.
   * @throws DatabaseException When it occurs.
   *
   * @param baseDn
   *          The baseDn for which to delete the generationId.
   */
  public void clearGenerationId(String baseDn)
  throws DatabaseException
  {
    try
    {
      dbEnv.clearGenerationId(baseDn);
    }
    catch (Exception e)
    {
      // Ignore.
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.WARNING, e);
      }
    }
      if (this.draftCNDbHandler != null)
    synchronized (draftCNLock)
    {
      if (draftCNDbHandler != null)
      {
        try
        {
          try
          draftCNDbHandler.clear(baseDn);
        }
        catch (Exception e)
        {
          // Ignore.
          if (debugEnabled())
          {
            draftCNDbHandler.clear(baseDn);
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
          }
          catch(Exception e){}
        }
        try
        {
          lastGeneratedDraftCN = draftCNDbHandler.getLastKey();
        }
        catch(Exception e) {}
        catch (Exception e)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
          }
        }
      }
    }
    catch(Exception e)
    {
      TRACER.debugCaught(LogLevel.ALL, e);
    }
  }
@@ -993,7 +1032,7 @@
    {
      purgeDelay = newPurgeDelay;
      // propagate
      for (ReplicationServerDomain domain : baseDNs.values())
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        domain.setPurgeDelay(purgeDelay*1000);
      }
@@ -1043,64 +1082,71 @@
    // Update threshold value for status analyzers (stop them if requested
    // value is 0)
    if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
    if (degradedStatusThreshold != configuration
        .getDegradedStatusThreshold())
    {
      int oldThresholdValue = degradedStatusThreshold;
      degradedStatusThreshold = configuration.getDegradedStatusThreshold();
      for(ReplicationServerDomain rsd : baseDNs.values())
      degradedStatusThreshold = configuration
          .getDegradedStatusThreshold();
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        if (degradedStatusThreshold == 0)
        {
          // Requested to stop analyzers
          rsd.stopStatusAnalyzer();
        } else if (rsd.isRunningStatusAnalyzer())
          domain.stopStatusAnalyzer();
        }
        else if (domain.isRunningStatusAnalyzer())
        {
          // Update the threshold value for this running analyzer
          rsd.updateStatusAnalyzer(degradedStatusThreshold);
        } else if (oldThresholdValue == 0)
          domain.updateStatusAnalyzer(degradedStatusThreshold);
        }
        else if (oldThresholdValue == 0)
        {
          // Requested to start analyzers with provided threshold value
          if (rsd.getConnectedDSs().size() > 0)
            rsd.startStatusAnalyzer();
          if (domain.getConnectedDSs().size() > 0)
            domain.startStatusAnalyzer();
        }
      }
    }
    // Update period value for monitoring publishers (stop them if requested
    // value is 0)
    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
    if (monitoringPublisherPeriod != configuration
        .getMonitoringPeriod())
    {
      long oldMonitoringPeriod = monitoringPublisherPeriod;
      monitoringPublisherPeriod = configuration.getMonitoringPeriod();
      for(ReplicationServerDomain rsd : baseDNs.values())
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        if (monitoringPublisherPeriod == 0L)
        {
          // Requested to stop monitoring publishers
          rsd.stopMonitoringPublisher();
        } else if (rsd.isRunningMonitoringPublisher())
          domain.stopMonitoringPublisher();
        }
        else if (domain.isRunningMonitoringPublisher())
        {
          // Update the threshold value for this running monitoring publisher
          rsd.updateMonitoringPublisher(monitoringPublisherPeriod);
        } else if (oldMonitoringPeriod == 0L)
          domain.updateMonitoringPublisher(monitoringPublisherPeriod);
        }
        else if (oldMonitoringPeriod == 0L)
        {
          // Requested to start monitoring publishers with provided period value
          if ( (rsd.getConnectedDSs().size() > 0) ||
            (rsd.getConnectedRSs().size() > 0) )
            rsd.startMonitoringPublisher();
          if ((domain.getConnectedDSs().size() > 0)
              || (domain.getConnectedRSs().size() > 0))
            domain.startMonitoringPublisher();
        }
      }
    }
    // Changed the group id ?
    byte newGroupId = (byte)configuration.getGroupId();
    byte newGroupId = (byte) configuration.getGroupId();
    if (newGroupId != groupId)
    {
      groupId = newGroupId;
      // Have a new group id: Disconnect every servers.
      for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
      for (ReplicationServerDomain domain : getReplicationServerDomains())
      {
        replicationServerDomain.stopAllServers(true);
        domain.stopAllServers(true);
      }
    }
@@ -1129,10 +1175,10 @@
   */
  private void broadcastConfigChange()
  {
    for (ReplicationServerDomain replicationServerDomain : baseDNs.values())
    for (ReplicationServerDomain domain : getReplicationServerDomains())
    {
      replicationServerDomain.buildAndSendTopoInfoToDSs(null);
      replicationServerDomain.buildAndSendTopoInfoToRSs();
      domain.buildAndSendTopoInfoToDSs(null);
      domain.buildAndSendTopoInfoToRSs();
    }
  }
@@ -1364,10 +1410,15 @@
   */
  public Iterator<ReplicationServerDomain> getDomainIterator()
  {
    if (!baseDNs.isEmpty())
      return baseDNs.values().iterator();
    Collection<ReplicationServerDomain> domains = getReplicationServerDomains();
    if (!domains.isEmpty())
    {
      return domains.iterator();
    }
    else
    {
      return null;
    }
  }
  /**
@@ -1384,16 +1435,40 @@
        rsd.clearDbs();
      }
    }
    if (this.draftCNDbHandler != null)
    synchronized (draftCNLock)
    {
      try
      if (draftCNDbHandler != null)
      {
        try { draftCNDbHandler.clear(); } catch(Exception e){}
        draftCNDbHandler.shutdown();
        try
        {
          draftCNDbHandler.clear();
        }
        catch (Exception e)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
          }
        }
        try
        {
          draftCNDbHandler.shutdown();
        }
        catch (Exception e)
        {
          // Ignore.
          if (debugEnabled())
          {
            TRACER.debugCaught(DebugLogLevel.WARNING, e);
          }
        }
        lastGeneratedDraftCN = 0;
        draftCNDbHandler = null;
      }
      catch(Exception e) {}
    }
  }
@@ -1468,9 +1543,9 @@
    if (serversToDisconnect.isEmpty())
      return;
    for (ReplicationServerDomain replicationServerDomain: baseDNs.values())
    for (ReplicationServerDomain domain: getReplicationServerDomains())
    {
      replicationServerDomain.stopReplicationServers(serversToDisconnect);
      domain.stopReplicationServers(serversToDisconnect);
    }
  }
@@ -1494,221 +1569,6 @@
    return replicationPort;
  }
  // TODO: Remote monitor data cache lifetime is 500ms/should be configurable
  private long monitorDataLifeTime = 500;
  /* The date of the last time they have been elaborated */
  private long monitorDataLastBuildDate = 0;
  /**
   * This uniquely identifies a server (handler) in the cross-domain topology.
   * Represents an identifier of a handler (in the whole RS) we have to wait a
   * monitoring message from before answering to a monitor request.
   */
  public static class GlobalServerId {
    private int serverId = -1;
    private String baseDn = null;
    /**
     * Constructor for a global server id.
     * @param baseDn The dn of the RSD owning the handler.
     * @param serverId The handler id in the matching RSD.
     */
    public GlobalServerId(String baseDn, int serverId) {
      this.baseDn = baseDn;
      this.serverId = serverId;
    }
    /**
     * Get the server handler id.
     * @return the serverId
     */
    public int getServerId()
    {
      return serverId;
    }
    /**
     * Get the base dn.
     * @return the baseDn
     */
    public String getBaseDn()
    {
      return baseDn;
    }
    /**
     * Get the hascode.
     * @return The hashcode.
     */
    @Override
    public int hashCode()
    {
      int hash = 7;
      hash = 43 * hash + this.serverId;
      hash = 43 * hash + (this.baseDn != null ? this.baseDn.hashCode() : 0);
      return hash;
    }
    /**
     * Tests if the passed global server handler id represents the same server
     * handler as this one.
     * @param obj The object to test.
     * @return True if both identifiers are the same.
     */
    public boolean equals(Object obj) {
      if ( (obj == null) || (!(obj instanceof GlobalServerId)))
        return false;
      GlobalServerId globalServerId = (GlobalServerId)obj;
      return ( globalServerId.baseDn.equals(baseDn) &&
        (globalServerId.serverId == serverId) );
    }
  }
  /**
   * This gives the list of server handlers we are willing to wait monitoring
   * message from. Each time a monitoring message is received by a server
   * handler, the matching server handler id is retired from the list. When the
   * list is empty, we received all expected monitoring messages.
   */
  private List<GlobalServerId> expectedMonitoringMsg = null;
  /**
   * Trigger the computation of the Global Monitoring Data.
   * This should be called by all the MonitorProviders that need
   * the global monitoring data to be updated before they can
   * publish their information to cn=monitor.
   *
   * This method will trigger the update of all the global monitoring
   * information of all the base-DNs of this replication Server.
   *
   * @throws DirectoryException If the computation cannot be achieved.
   */
  public synchronized void computeMonitorData() throws DirectoryException
  {
    if (monitorDataLastBuildDate + monitorDataLifeTime > TimeThread.getTime())
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() + " getRemoteMonitorData in cache");
      // The current data are still valid. No need to renew them.
      return;
    }
    // Initialize the list of server handlers we expect monitoring messages from
    expectedMonitoringMsg =
      Collections.synchronizedList(new ArrayList<GlobalServerId>());
    // Copy the list of domains as a new domain may arrive or disappear between
    // the initializeMonitorData and completeMonitorData calls
    List<ReplicationServerDomain> rsdList =
                new ArrayList<ReplicationServerDomain>(baseDNs.values());
    for (ReplicationServerDomain domain : rsdList)
    {
      domain.initializeMonitorData(expectedMonitoringMsg);
    }
    // Wait for responses
    waitMonitorDataResponses();
    for (ReplicationServerDomain domain : rsdList)
    {
      domain.completeMonitorData();
    }
  }
  /**
   * Wait for the expected received MonitorMsg.
   * @throws DirectoryException When an error occurs.
   */
  private void waitMonitorDataResponses()
    throws DirectoryException
  {
    try
    {
      if (debugEnabled())
        TRACER.debugInfo(
          "In " + getMonitorInstanceName() +
          " waiting for " + expectedMonitoringMsg.size() +
          " expected monitor messages");
      // Wait up to 5 seconds for every expected monitoring message to come
      // back.
      boolean allReceived = false;
      long startTime = TimeThread.getTime();
      long curTime = startTime;
      int maxTime = 5000;
      while ( (curTime - startTime) < maxTime )
      {
        // Have every expected monitoring messages arrived ?
        if (expectedMonitoringMsg.size() == 0)
        {
          // Ok break the loop
          allReceived = true;
          break;
        }
        Thread.sleep(100);
        curTime = TimeThread.getTime();
      }
      monitorDataLastBuildDate = TimeThread.getTime();
      if (!allReceived)
      {
        logError(ERR_MISSING_REMOTE_MONITOR_DATA.get());
        // let's go on in best effort even with limited data received.
      } else
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "In " + getMonitorInstanceName() +
            " Successfully received all expected monitor messages");
      }
    } catch (Exception e)
    {
      logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e.getMessage()));
    }
  }
  /**
   * This should be called by each ReplicationServerDomain that receives
   * a response to a monitor request message. This may also be called when a
   * monitoring message is coming from a RS whose monitoring publisher thread
   * sent it. As monitoring messages (sent because of monitoring request or
   * because of monitoring publisher) have the same content, this is also ok
   * to mark ok the server when the monitoring message coms from a monitoring
   * publisher thread.
   * @param globalServerId The server handler that is receiving the
   * monitoring message.
   */
  public void responseReceived(GlobalServerId globalServerId)
  {
    expectedMonitoringMsg.remove(globalServerId);
  }
  /**
   * This should be called when the Monitoring has failed and the
   * Worker thread that is waiting for the result should be awaken.
   */
  public void responseReceivedAll()
  {
    expectedMonitoringMsg.clear();
  }
  /**
   * Returns the number of domains managed by this replication server.
   * @return the number of domains managed.
   */
  public int getCacheSize()
  {
    return baseDNs.size();
  }
  /**
   * Create a new session to get the ECL.
   * @param msg The message that specifies the ECL request.
@@ -1861,32 +1721,37 @@
    return eligibleCN;
  }
  /**
   * Get or create a handler on a Db on DraftCN for external changelog.
   *
   * @return the handler.
   * @throws DirectoryException when needed.
   * @throws DirectoryException
   *           when needed.
   */
  public synchronized DraftCNDbHandler getDraftCNDbHandler()
  throws DirectoryException
  public DraftCNDbHandler getDraftCNDbHandler()
      throws DirectoryException
  {
    try
    synchronized (draftCNLock)
    {
      if (draftCNDbHandler == null)
      try
      {
        draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
        if (draftCNDbHandler == null)
          return null;
        this.lastGeneratedDraftCN = getLastDraftChangeNumber();
        {
          draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
          lastGeneratedDraftCN = getLastDraftChangeNumber();
        }
        return draftCNDbHandler;
      }
      return draftCNDbHandler;
    }
    catch (Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      MessageBuilder mb = new MessageBuilder();
      mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
      throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
          mb.toMessage(), e);
      catch (Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        MessageBuilder mb = new MessageBuilder();
        mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
            mb.toMessage(), e);
      }
    }
  }
@@ -1896,10 +1761,17 @@
   */
  public int getFirstDraftChangeNumber()
  {
    int first=0;
    if (draftCNDbHandler != null)
      first = draftCNDbHandler.getFirstKey();
    return first;
    synchronized (draftCNLock)
    {
      if (draftCNDbHandler != null)
      {
        return draftCNDbHandler.getFirstKey();
      }
      else
      {
        return 0;
      }
    }
  }
  /**
@@ -1908,19 +1780,29 @@
   */
  public int getLastDraftChangeNumber()
  {
    int last=0;
    if (draftCNDbHandler != null)
      last = draftCNDbHandler.getLastKey();
    return last;
    synchronized (draftCNLock)
    {
      if (draftCNDbHandler != null)
      {
        return draftCNDbHandler.getLastKey();
      }
      else
      {
        return 0;
      }
    }
  }
  /**
   * Generate a new Draft ChangeNumber.
   * @return The generated Draft ChangeNUmber
   */
  synchronized public int getNewDraftCN()
  public int getNewDraftCN()
  {
    return ++lastGeneratedDraftCN;
    synchronized (draftCNLock)
    {
      return ++lastGeneratedDraftCN;
    }
  }
  /**
@@ -2092,4 +1974,14 @@
    return weight;
  }
  private Collection<ReplicationServerDomain> getReplicationServerDomains()
  {
    synchronized (baseDNs)
    {
      return new ArrayList<ReplicationServerDomain>(baseDNs.values());
    }
  }
}