From f9fc57cba81bcf9a8259a8ea699eb999c0415397 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 21 Aug 2013 13:26:47 +0000
Subject: [PATCH] Extracted ReplicationDomainMonitor class out of ReplicationServerDomain to increase its cohesion.

---
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |  472 +++++++---------------------------------------------------
 1 files changed, 61 insertions(+), 411 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3a10c23..2bedd43 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -32,7 +32,6 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
@@ -51,7 +50,6 @@
 import org.opends.server.replication.server.changelog.api.ReplicationIterator;
 import org.opends.server.replication.server.changelog.je.DbHandler;
 import org.opends.server.types.*;
-import org.opends.server.util.TimeThread;
 
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
@@ -92,6 +90,11 @@
    */
   private AtomicReference<MonitoringPublisher> monitoringPublisher =
       new AtomicReference<MonitoringPublisher>();
+  /**
+   * Maintains monitor data for the current domain.
+   */
+  private ReplicationDomainMonitor domainMonitor =
+      new ReplicationDomainMonitor(this);
 
   /**
    * The following map contains one balanced tree for each replica ID to which
@@ -128,63 +131,6 @@
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
-  // Monitor data management
-  /**
-   * The monitor data consolidated over the topology.
-   */
-  private volatile MonitorData monitorData = new MonitorData();
-
-  /**
-   * This lock guards against multiple concurrent monitor data recalculation.
-   */
-  private final Object pendingMonitorLock = new Object();
-
-  /** Guarded by pendingMonitorLock. */
-  private long monitorDataLastBuildDate = 0;
-
-  /**
-   * The set of replication servers which are already known to be slow to send
-   * monitor data.
-   * <p>
-   * Guarded by pendingMonitorLock.
-   */
-  private final Set<Integer> monitorDataLateServers = new HashSet<Integer>();
-
-  /** This lock serializes updates to the pending monitor data. */
-  private final Object pendingMonitorDataLock = new Object();
-
-  /**
-   * Monitor data which is currently being calculated. Guarded by
-   * pendingMonitorDataLock.
-   */
-  private MonitorData pendingMonitorData;
-
-  /**
-   * A set containing the IDs of servers from which we are currently expecting
-   * monitor responses. When a response is received from a server we remove the
-   * ID from this table, and count down the latch if the ID was in the table.
-   * <p>
-   * Guarded by pendingMonitorDataLock.
-   */
-  private final Set<Integer> pendingMonitorDataServerIDs =
-    new HashSet<Integer>();
-
-  /**
-   * This latch is non-null and is used in order to count incoming responses as
-   * they arrive. Since incoming response may arrive at any time, even when
-   * there is no pending monitor request, access to the latch must be guarded.
-   * <p>
-   * Guarded by pendingMonitorDataLock.
-   */
-  private CountDownLatch pendingMonitorDataLatch = null;
-
-  /**
-   * TODO: Remote monitor data cache lifetime is 500ms/should be configurable.
-   */
-  private final long monitorDataLifeTime = 500;
-
-
-
   /**
    * The needed info for each received assured update message we are waiting
    * acks for.
@@ -251,11 +197,7 @@
 
     sourceHandler.updateServerState(update);
     sourceHandler.incrementInCount();
-
-    if (generationId < 0)
-    {
-      generationId = sourceHandler.getGenerationId();
-    }
+    setGenerationIdIfUnset(sourceHandler.getGenerationId());
 
     /**
      * If this is an assured message (a message requesting ack), we must
@@ -424,19 +366,17 @@
         {
           if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
           {
-            TRACER.debugInfo("In " + this + " for dn " + baseDn + ", update "
-                + update.getChangeNumber()
+            TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
                 + " will not be sent to directory server "
                 + dsHandler.getServerId() + " with generation id "
                 + dsHandler.getGenerationId() + " different from local "
-                + "generation id " + generationId);
+                + "generation id " + generationId));
           }
           if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
           {
-            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
-                + " for dn " + baseDn + ", update " + update.getChangeNumber()
+            TRACER.debugInfo(getMessage("update " + update.getChangeNumber()
                 + " will not be sent to directory server "
-                + dsHandler.getServerId() + " as it is in full update");
+                + dsHandler.getServerId() + " as it is in full update"));
           }
         }
 
@@ -869,11 +809,9 @@
           ServerHandler origServer = expectedAcksInfo.getRequesterServer();
           if (debugEnabled())
           {
-            TRACER.debugInfo("In RS " + localReplicationServer.getServerId()
-                    + " for "+ baseDn
-                    + ", sending timeout for assured update with change "
-                    + " number " + cn + " to server id "
-                    + origServer.getServerId());
+            TRACER.debugInfo(getMessage(
+                "sending timeout for assured update with change number " + cn
+                + " to server id " + origServer.getServerId()));
           }
           try
           {
@@ -1112,8 +1050,7 @@
     unregisterServerHandler(sHandler);
     sHandler.shutdown();
 
-    // Check if generation id has to be reset
-    mayResetGenerationId();
+    resetGenerationIdIfPossible();
     if (!shutdown)
     {
       if (isDirectoryServer)
@@ -1211,16 +1148,13 @@
    * - traverse replicationServers list and test for each if DS are connected
    * So it strongly relies on the directoryServers list
    */
-  private void mayResetGenerationId()
+  private void resetGenerationIdIfPossible()
   {
-    String prefix =
-        "In RS " + this.localReplicationServer.getMonitorInstanceName()
-            + " for " + baseDn + " ";
-
     if (debugEnabled())
     {
-      TRACER.debugInfo(prefix + "mayResetGenerationId generationIdSavedStatus="
-          + generationIdSavedStatus);
+      TRACER.debugInfo(getMessage(
+          "mayResetGenerationId generationIdSavedStatus="
+          + generationIdSavedStatus));
     }
 
     // If there is no more any LDAP server connected to this domain in the
@@ -1235,9 +1169,9 @@
         {
           if (debugEnabled())
           {
-            TRACER.debugInfo(prefix + "mayResetGenerationId skip RS "
+            TRACER.debugInfo(getMessage("mayResetGenerationId skip RS "
                 + rsHandler.getMonitorInstanceName()
-                + " that has different genId");
+                + " that has different genId"));
           }
         }
         else if (rsHandler.hasRemoteLDAPServers())
@@ -1246,10 +1180,10 @@
 
           if (debugEnabled())
           {
-            TRACER.debugInfo(prefix + "mayResetGenerationId RS "
+            TRACER.debugInfo(getMessage("mayResetGenerationId RS "
                 + rsHandler.getMonitorInstanceName()
                 + " has ldap servers connected to it"
-                + " - will not reset generationId");
+                + " - will not reset generationId"));
           }
           break;
         }
@@ -1261,13 +1195,13 @@
 
       if (debugEnabled())
       {
-        TRACER.debugInfo(prefix + "has ldap servers connected to it"
-            + " - will not reset generationId");
+        TRACER.debugInfo(getMessage("has ldap servers connected to it"
+            + " - will not reset generationId"));
       }
     }
 
     if (!ldapServersConnectedInTheTopology
-        && !this.generationIdSavedStatus
+        && !generationIdSavedStatus
         && generationId != -1)
     {
       changeGenerationId(-1, false);
@@ -1544,7 +1478,8 @@
       } else if (msg instanceof MonitorMsg)
       {
         MonitorMsg monitorMsg = (MonitorMsg) msg;
-        receivesMonitorDataResponse(monitorMsg, msgEmitter.getServerId());
+        domainMonitor.receiveMonitorDataResponse(monitorMsg,
+            msgEmitter.getServerId());
       } else
       {
         replyWithUnroutableMsgType(msgEmitter, msg);
@@ -1573,10 +1508,10 @@
     if (msgEmitter.isDataServer())
     {
       // Monitoring information requested by a DS
-      MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
-          msg.getDestination(), msg.getSenderID(), monitorData);
       try
       {
+        MonitorMsg monitorMsg = createGlobalTopologyMonitorMsg(
+            msg.getDestination(), msg.getSenderID());
         msgEmitter.send(monitorMsg);
       }
       catch (IOException e)
@@ -1711,33 +1646,25 @@
    *          The sender of this message.
    * @param destination
    *          The destination of this message.
-   * @param monitorData
-   *          The domain monitor data which should be used for the message.
    * @return The newly created and filled MonitorMsg. Null if a problem occurred
    *         during message creation.
    */
-  public MonitorMsg createGlobalTopologyMonitorMsg(
-      int sender, int destination, MonitorData monitorData)
+  public MonitorMsg createGlobalTopologyMonitorMsg(int sender, int destination)
   {
     final MonitorMsg returnMsg = new MonitorMsg(sender, destination);
-
     returnMsg.setReplServerDbState(getDbServerState());
 
-    // Add the informations about the Replicas currently in the topology.
-    Iterator<Integer> it = monitorData.ldapIterator();
-    while (it.hasNext())
+    // Add the server state for each DS and RS currently in the topology.
+    final MonitorData monitorData = getDomainMonitorData();
+    for (int replicaId : toIterable(monitorData.ldapIterator()))
     {
-      int replicaId = it.next();
       returnMsg.setServerState(replicaId,
           monitorData.getLDAPServerState(replicaId),
           monitorData.getApproxFirstMissingDate(replicaId), true);
     }
 
-    // Add the information about the RSs currently in the topology.
-    it = monitorData.rsIterator();
-    while (it.hasNext())
+    for (int replicaId : toIterable(monitorData.rsIterator()))
     {
-      int replicaId = it.next();
       returnMsg.setServerState(replicaId,
           monitorData.getRSStates(replicaId),
           monitorData.getRSApproxFirstMissingDate(replicaId), false);
@@ -1774,27 +1701,22 @@
 
     try
     {
-      MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+      final MonitorMsg monitorMsg = new MonitorMsg(sender, destination);
+      monitorMsg.setReplServerDbState(getDbServerState());
 
-      // Populate for each connected LDAP Server
-      // from the states stored in the serverHandler.
-      // - the server state
-      // - the older missing change
+      // Add the server state for each connected DS and RS.
       for (DataServerHandler dsHandler : this.connectedDSs.values())
       {
         monitorMsg.setServerState(dsHandler.getServerId(), dsHandler
             .getServerState(), dsHandler.getApproxFirstMissingDate(), true);
       }
 
-      // Same for the connected RS
       for (ReplicationServerHandler rsHandler : this.connectedRSs.values())
       {
         monitorMsg.setServerState(rsHandler.getServerId(), rsHandler
             .getServerState(), rsHandler.getApproxFirstMissingDate(), false);
       }
 
-      // Populate the RS state in the msg from the DbState
-      monitorMsg.setReplServerDbState(getDbServerState());
       return monitorMsg;
     }
     finally
@@ -2060,7 +1982,6 @@
 
       if (this.generationId != generationId)
       {
-        // we are changing of genId
         clearDbs();
 
         this.generationId = generationId;
@@ -2187,10 +2108,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo(
-          "In RS " + getLocalRSServerId() +
-          " Receiving ChangeStatusMsg from " + senderHandler.getServerId() +
-          " for baseDn " + baseDn + ":\n" + csMsg);
+      TRACER.debugInfo(getMessage("receiving ChangeStatusMsg from "
+          + senderHandler.getServerId() + ":\n" + csMsg));
     }
 
     try
@@ -2421,9 +2340,8 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In RS " + getLocalRSServerId()
-          + " Receiving TopologyMsg from " + rsHandler.getServerId()
-          + " for baseDn " + baseDn + ":\n" + topoMsg);
+      TRACER.debugInfo(getMessage("receiving TopologyMsg from "
+          + rsHandler.getServerId() + ":\n" + topoMsg));
     }
 
     try
@@ -2448,12 +2366,8 @@
       // Handle generation id
       if (allowResetGenId)
       {
-        // Check if generation id has to be reset
-        mayResetGenerationId();
-        if (generationId < 0)
-        {
-          generationId = rsHandler.getGenerationId();
-        }
+        resetGenerationIdIfPossible();
+        setGenerationIdIfUnset(rsHandler.getGenerationId());
       }
 
       if (isDifferentGenerationId(rsHandler.getGenerationId()))
@@ -2487,10 +2401,13 @@
     }
   }
 
-  /* =======================
-   * Monitor Data generation
-   * =======================
-   */
+  private void setGenerationIdIfUnset(long generationId)
+  {
+    if (this.generationId < 0)
+    {
+      this.generationId = generationId;
+    }
+  }
 
   /**
    * Returns the latest monitor data available for this replication server
@@ -2501,274 +2418,7 @@
    */
   MonitorData getDomainMonitorData()
   {
-    return monitorData;
-  }
-
-
-
-  /**
-   * Recomputes the monitor data for this replication server domain.
-   *
-   * @return The recomputed monitor data for this replication server domain.
-   * @throws InterruptedException
-   *           If this thread is interrupted while waiting for a response.
-   */
-  MonitorData computeDomainMonitorData() throws InterruptedException
-  {
-    // Only allow monitor recalculation at a time.
-    synchronized (pendingMonitorLock)
-    {
-      if ((monitorDataLastBuildDate + monitorDataLifeTime) < TimeThread
-          .getTime())
-      {
-        try
-        {
-          // Prevent out of band monitor responses from updating our pending
-          // table until we are ready.
-          synchronized (pendingMonitorDataLock)
-          {
-            // Clear the pending monitor data.
-            pendingMonitorDataServerIDs.clear();
-            pendingMonitorData = new MonitorData();
-
-            // Initialize the monitor data.
-            initializePendingMonitorData();
-
-            // Send the monitor requests to the connected replication servers.
-            for (ReplicationServerHandler rs : connectedRSs.values())
-            {
-              // Add server ID to pending table.
-              int serverId = rs.getServerId();
-
-              MonitorRequestMsg msg = new MonitorRequestMsg(
-                  this.localReplicationServer.getServerId(), serverId);
-              try
-              {
-                rs.send(msg);
-
-                // Only register this server ID if we were able to send the
-                // message.
-                pendingMonitorDataServerIDs.add(serverId);
-              }
-              catch (IOException e)
-              {
-                // Log a message and do a best effort from here.
-                Message message = ERR_SENDING_REMOTE_MONITOR_DATA_REQUEST
-                    .get(baseDn, serverId, e.getMessage());
-                logError(message);
-              }
-            }
-
-            // Create the pending response latch based on the number of expected
-            // monitor responses.
-            pendingMonitorDataLatch = new CountDownLatch(
-                pendingMonitorDataServerIDs.size());
-          }
-
-          // Wait for the responses to come back.
-          pendingMonitorDataLatch.await(5, TimeUnit.SECONDS);
-
-          // Log messages for replication servers that have gone or come back.
-          synchronized (pendingMonitorDataLock)
-          {
-            // Log servers that have come back.
-            for (int serverId : monitorDataLateServers)
-            {
-              // Ensure that we only log once per server: don't fill the
-              // error log with repeated messages.
-              if (!pendingMonitorDataServerIDs.contains(serverId))
-              {
-                logError(NOTE_MONITOR_DATA_RECEIVED.get(baseDn,
-                    serverId));
-              }
-            }
-
-            // Log servers that have gone away.
-            for (int serverId : pendingMonitorDataServerIDs)
-            {
-              // Ensure that we only log once per server: don't fill the
-              // error log with repeated messages.
-              if (!monitorDataLateServers.contains(serverId))
-              {
-                logError(WARN_MISSING_REMOTE_MONITOR_DATA.get(baseDn,
-                    serverId));
-              }
-            }
-
-            // Remember which servers were late this time.
-            monitorDataLateServers.clear();
-            monitorDataLateServers.addAll(pendingMonitorDataServerIDs);
-          }
-
-          // Store the new computed data as the reference
-          synchronized (pendingMonitorDataLock)
-          {
-            // Now we have the expected answers or an error occurred
-            pendingMonitorData.completeComputing();
-            monitorData = pendingMonitorData;
-            monitorDataLastBuildDate = TimeThread.getTime();
-          }
-        }
-        finally
-        {
-          synchronized (pendingMonitorDataLock)
-          {
-            // Clear pending state.
-            pendingMonitorData = null;
-            pendingMonitorDataLatch = null;
-            pendingMonitorDataServerIDs.clear();
-          }
-        }
-      }
-    }
-
-    return monitorData;
-  }
-
-
-
-  /**
-   * Start collecting global monitoring information for this
-   * ReplicationServerDomain.
-   */
-
-  private void initializePendingMonitorData()
-  {
-    // Let's process our directly connected DS
-    // - in the ServerHandler for a given DS1, the stored state contains :
-    // - the max CN produced by DS1
-    // - the last CN consumed by DS1 from DS2..n
-    // - in the RSdomain/dbHandler, the built-in state contains :
-    // - the max CN produced by each server
-    // So for a given DS connected we can take the state and the max from
-    // the DS/state.
-
-    for (ServerHandler ds : connectedDSs.values())
-    {
-      int serverID = ds.getServerId();
-
-      // the state comes from the state stored in the SH
-      ServerState dsState = ds.getServerState().duplicate();
-
-      // the max CN sent by that LS also comes from the SH
-      ChangeNumber maxcn = dsState.getChangeNumber(serverID);
-      if (maxcn == null)
-      {
-        // This directly connected LS has never produced any change
-        maxcn = new ChangeNumber(0, 0, serverID);
-      }
-      pendingMonitorData.setMaxCN(serverID, maxcn);
-      pendingMonitorData.setLDAPServerState(serverID, dsState);
-      pendingMonitorData.setFirstMissingDate(serverID,
-          ds.getApproxFirstMissingDate());
-    }
-
-    // Then initialize the max CN for the LS that produced something
-    // - from our own local db state
-    // - whatever they are directly or indirectly connected
-    ServerState dbServerState = getDbServerState();
-    pendingMonitorData.setRSState(localReplicationServer.getServerId(),
-        dbServerState);
-    for (int serverId : dbServerState) {
-      ChangeNumber storedCN = dbServerState.getChangeNumber(serverId);
-      pendingMonitorData.setMaxCN(serverId, storedCN);
-    }
-  }
-
-
-
-  /**
-   * Processes a Monitor message receives from a remote Replication Server and
-   * stores the data received.
-   *
-   * @param msg
-   *          The message to be processed.
-   * @param serverId
-   *          server handler that is receiving the message.
-   */
-  private void receivesMonitorDataResponse(MonitorMsg msg, int serverId)
-  {
-    synchronized (pendingMonitorDataLock)
-    {
-      if (pendingMonitorData == null)
-      {
-        // This is a response for an earlier request whose computing is
-        // already complete.
-        logError(INFO_IGNORING_REMOTE_MONITOR_DATA.get(baseDn,
-            msg.getSenderID()));
-        return;
-      }
-
-      try
-      {
-        // Here is the RS state : list <serverID, lastChangeNumber>
-        // For each LDAP Server, we keep the max CN across the RSes
-        ServerState replServerState = msg.getReplServerDbState();
-        pendingMonitorData.setMaxCNs(replServerState);
-
-        // store the remote RS states.
-        pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
-
-        // Store the remote LDAP servers states
-        Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
-        while (dsServerIdIterator.hasNext())
-        {
-          int dsServerId = dsServerIdIterator.next();
-          ServerState dsServerState = msg.getLDAPServerState(dsServerId);
-          pendingMonitorData.setMaxCNs(dsServerState);
-          pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
-          pendingMonitorData.setFirstMissingDate(dsServerId,
-              msg.getLDAPApproxFirstMissingDate(dsServerId));
-        }
-
-        // Process the latency reported by the remote RSi on its connections
-        // to the other RSes
-        Iterator<Integer> rsServerIdIterator = msg.rsIterator();
-        while (rsServerIdIterator.hasNext())
-        {
-          int rsServerId = rsServerIdIterator.next();
-          long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
-          if (rsServerId == localReplicationServer.getServerId())
-          {
-            // this is the latency of the remote RSi regarding the current RS
-            // let's update the fmd of my connected LS
-            for (DataServerHandler connectedDS : connectedDSs.values())
-            {
-              int connectedServerId = connectedDS.getServerId();
-              pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
-            }
-          }
-          else
-          {
-            // this is the latency of the remote RSi regarding another RSj
-            // let's update the latency of the LSes connected to RSj
-            ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
-            if (rsjHdr != null)
-            {
-              for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
-              {
-                pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
-              }
-            }
-          }
-        }
-      }
-      catch (RuntimeException e)
-      {
-        // FIXME: do we really expect these???
-        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
-            e.getMessage() + stackTraceToSingleLineString(e)));
-      }
-      finally
-      {
-        // Decreases the number of expected responses and potentially
-        // wakes up the waiting requester thread.
-        if (pendingMonitorDataServerIDs.remove(serverId))
-        {
-          pendingMonitorDataLatch.countDown();
-        }
-      }
-    }
+    return domainMonitor.getMonitorData();
   }
 
   /**
@@ -2791,7 +2441,7 @@
    */
   public Map<Integer, DataServerHandler> getConnectedDSs()
   {
-    return connectedDSs;
+    return Collections.unmodifiableMap(connectedDSs);
   }
 
   /**
@@ -2800,7 +2450,7 @@
    */
   public Map<Integer, ReplicationServerHandler> getConnectedRSs()
   {
-    return connectedRSs;
+    return Collections.unmodifiableMap(connectedRSs);
   }
 
 
@@ -2960,19 +2610,13 @@
         String.valueOf(localReplicationServer.getServerId())));
     attributes.add(Attributes.create("replication-server-port",
         String.valueOf(localReplicationServer.getReplicationPort())));
-
-    // Add all the base DNs that are known by this replication server.
     attributes.add(Attributes.create("domain-name", baseDn));
-
-    // Publish to monitor the generation ID by replicationServerDomain
     attributes.add(Attributes.create("generation-id",
         baseDn + " " + generationId));
 
-    MonitorData md = getDomainMonitorData();
-
     // Missing changes
-    long missingChanges = md.getMissingChangesRS(localReplicationServer
-        .getServerId());
+    long missingChanges = getDomainMonitorData().getMissingChangesRS(
+        localReplicationServer.getServerId());
     attributes.add(Attributes.create("missing-changes",
         String.valueOf(missingChanges)));
 
@@ -3439,4 +3083,10 @@
     // connection): store handler.
     connectedRSs.put(rsHandler.getServerId(), rsHandler);
   }
+
+  private String getMessage(String message)
+  {
+    return "In RS " + localReplicationServer.getServerId() + " for " + baseDn
+        + ": " + message;
+  }
 }

--
Gitblit v1.10.0