From c63e1f305327734be21f5ce0e21bdd2f7a4d143b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 10:32:47 +0000
Subject: [PATCH] Enforced ReplicationServerDomain responsibilities by increasing encapsulation.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java        |   19 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java        |   69 +----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java            |  234 ++++++++-----------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java  |  255 +++++++++++----------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java           |   99 +++----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java |   33 -
 6 files changed, 308 insertions(+), 401 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 8ec6405..4b25380 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -229,14 +229,6 @@
     return newStatus;
   }
 
-  private void createStatusAnalyzer()
-  {
-    if (!replicationServerDomain.isRunningStatusAnalyzer())
-    {
-      replicationServerDomain.startStatusAnalyzer();
-    }
-  }
-
   /**
    * Retrieves a set of attributes containing monitor data that should be
    * returned to the client if the corresponding monitor entry is requested.
@@ -457,8 +449,7 @@
       localGenerationId = replicationServerDomain.getGenerationId();
       oldGenerationId = localGenerationId;
 
-      // Duplicate server ?
-      if (!replicationServerDomain.checkForDuplicateDS(this))
+      if (replicationServerDomain.isAlreadyConnectedToDS(this))
       {
         abortStart(null);
         return;
@@ -468,7 +459,6 @@
       {
         StartMsg outStartMsg = sendStartToRemote();
 
-        // log
         logStartHandshakeRCVandSND(inServerStartMsg, outStartMsg);
 
         // The session initiator decides whether to use SSL.
@@ -508,11 +498,8 @@
         throw new DirectoryException(ResultCode.OTHER, null, null);
       }
 
-      // Create the status analyzer for the domain if not already started
-      createStatusAnalyzer();
-
-      // Create the monitoring publisher for the domain if not already started
-      createMonitoringPublisher();
+      replicationServerDomain.startStatusAnalyzer();
+      replicationServerDomain.startMonitoringPublisher();
 
       registerIntoDomain();
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index d02fe4a..9a136b5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -399,11 +399,7 @@
         for (ReplicationServerDomain domain : getReplicationServerDomains())
         {
           // Create a normalized set of server URLs.
-          final Set<String> connectedReplServers = new HashSet<String>();
-          for (String url : domain.getChangelogs())
-          {
-            connectedReplServers.add(normalizeServerURL(url));
-          }
+          final Set<String> connectedRSUrls = getConnectedRSUrls(domain);
 
           /*
            * check that all replication server in the config are in the
@@ -440,7 +436,7 @@
 
             // Don't connect to a server if it is already connected.
             final String normalizedServerURL = normalizeServerURL(aServerURL);
-            if (connectedReplServers.contains(normalizedServerURL))
+            if (connectedRSUrls.contains(normalizedServerURL))
             {
               continue;
             }
@@ -472,6 +468,16 @@
     }
   }
 
+  private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
+  {
+    Set<String> results = new LinkedHashSet<String>();
+    for (ReplicationServerHandler handler : domain.getConnectedRSs().values())
+    {
+      results.add(normalizeServerURL(handler.getServerAddressURL()));
+    }
+    return results;
+  }
+
   /**
    * Establish a connection to the server with the address and port.
    *
@@ -1039,59 +1045,23 @@
 
     // Update threshold value for status analyzers (stop them if requested
     // value is 0)
-    if (degradedStatusThreshold != configuration
-        .getDegradedStatusThreshold())
+    if (degradedStatusThreshold != configuration.getDegradedStatusThreshold())
     {
-      int oldThresholdValue = degradedStatusThreshold;
-      degradedStatusThreshold = configuration
-          .getDegradedStatusThreshold();
+      degradedStatusThreshold = configuration.getDegradedStatusThreshold();
       for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
-        if (degradedStatusThreshold == 0)
-        {
-          // Requested to stop analyzers
-          domain.stopStatusAnalyzer();
-        }
-        else if (domain.isRunningStatusAnalyzer())
-        {
-          // Update the threshold value for this running analyzer
-          domain.updateStatusAnalyzer(degradedStatusThreshold);
-        }
-        else if (oldThresholdValue == 0)
-        {
-          // Requested to start analyzers with provided threshold value
-          if (domain.getConnectedDSs().size() > 0)
-            domain.startStatusAnalyzer();
-        }
+        domain.updateDegradedStatusThreshold(degradedStatusThreshold);
       }
     }
 
     // Update period value for monitoring publishers (stop them if requested
     // value is 0)
-    if (monitoringPublisherPeriod != configuration
-        .getMonitoringPeriod())
+    if (monitoringPublisherPeriod != configuration.getMonitoringPeriod())
     {
-      long oldMonitoringPeriod = monitoringPublisherPeriod;
       monitoringPublisherPeriod = configuration.getMonitoringPeriod();
       for (ReplicationServerDomain domain : getReplicationServerDomains())
       {
-        if (monitoringPublisherPeriod == 0L)
-        {
-          // Requested to stop monitoring publishers
-          domain.stopMonitoringPublisher();
-        }
-        else if (domain.isRunningMonitoringPublisher())
-        {
-          // Update the threshold value for this running monitoring publisher
-          domain.updateMonitoringPublisher(monitoringPublisherPeriod);
-        }
-        else if (oldMonitoringPeriod == 0L)
-        {
-          // Requested to start monitoring publishers with provided period value
-          if ((domain.getConnectedDSs().size() > 0)
-              || (domain.getConnectedRSs().size() > 0))
-            domain.startMonitoringPublisher();
-        }
+        domain.updateMonitoringPeriod(monitoringPublisherPeriod);
       }
     }
 
@@ -1126,7 +1096,7 @@
     return new ConfigChangeResult(ResultCode.SUCCESS, false);
   }
 
-  /*
+  /**
    * Try and set a sensible URL for this replication server. Since we are
    * listening on all addresses there are a couple of potential candidates: 1) a
    * matching server url in the replication server's configuration, 2) hostname
@@ -1666,8 +1636,7 @@
     }
 
     if (debugEnabled())
-      TRACER.debugInfo("In " + this +
-        " getEligibleCN() ends with " +
+      TRACER.debugInfo("In " + this + " getEligibleCN() ends with " +
         " the following domainEligibleCN for each domain :" + debugLog +
         " thus CrossDomainEligibleCN=" + eligibleCN +
         "  ts=" + new Date(eligibleCN.getTime()).toString());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 31156f3..099f608 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -940,14 +940,14 @@
   /**
    * Stop operations with a list of replication servers.
    *
-   * @param replServers the replication servers for which
-   * we want to stop operations
+   * @param replServerURLs
+   *          the replication servers URLs for which we want to stop operations
    */
-  public void stopReplicationServers(Collection<String> replServers)
+  public void stopReplicationServers(Collection<String> replServerURLs)
   {
     for (ReplicationServerHandler handler : connectedRSs.values())
     {
-      if (replServers.contains(handler.getServerAddressURL()))
+      if (replServerURLs.contains(handler.getServerAddressURL()))
       {
         stopServer(handler, false);
       }
@@ -976,12 +976,13 @@
   }
 
   /**
-   * Checks that a DS is not connected with same id.
+   * Checks whether it is already connected to a DS with same id.
    *
-   * @param handler the DS we want to check
-   * @return true if this is not a duplicate server
+   * @param handler
+   *          the DS we want to check
+   * @return true if this DS is already connected to the current server
    */
-  public boolean checkForDuplicateDS(DataServerHandler handler)
+  public boolean isAlreadyConnectedToDS(DataServerHandler handler)
   {
     if (connectedDSs.containsKey(handler.getServerId()))
     {
@@ -991,9 +992,9 @@
           connectedDSs.get(handler.getServerId()).toString(),
           handler.toString(), handler.getServerId());
       logError(message);
-      return false;
+      return true;
     }
-    return true;
+    return false;
   }
 
   /**
@@ -1005,6 +1006,7 @@
    */
   public void stopServer(ServerHandler handler, boolean shutdown)
   {
+    // TODO JNR merge with stopServer(MessageHandler)
     if (debugEnabled())
     {
       TRACER.debugInfo("In "
@@ -1055,22 +1057,9 @@
           stopMonitoringPublisher();
         }
 
-        if (handler.isReplicationServer())
+        if (connectedRSs.containsKey(handler.getServerId()))
         {
-          if (connectedRSs.containsKey(handler.getServerId()))
-          {
-            unregisterServerHandler(handler);
-            handler.shutdown();
-
-            // Check if generation id has to be reset
-            mayResetGenerationId();
-            if (!shutdown)
-            {
-              // Warn our DSs that a RS or DS has quit (does not use this
-              // handler as already removed from list)
-              buildAndSendTopoInfoToDSs(null);
-            }
-          }
+          unregisterServerHandler(handler, shutdown, false);
         } else if (connectedDSs.containsKey(handler.getServerId()))
         {
           // If this is the last DS for the domain,
@@ -1086,25 +1075,10 @@
             }
             stopStatusAnalyzer();
           }
-
-          unregisterServerHandler(handler);
-          handler.shutdown();
-
-          // Check if generation id has to be reset
-          mayResetGenerationId();
-          if (!shutdown)
-          {
-            // Update the remote replication servers with our list
-            // of connected LDAP servers
-            buildAndSendTopoInfoToRSs();
-            // Warn our DSs that a RS or DS has quit (does not use this
-            // handler as already removed from list)
-            buildAndSendTopoInfoToDSs(null);
-          }
+          unregisterServerHandler(handler, shutdown, true);
         } else if (otherHandlers.contains(handler))
         {
-          unRegisterHandler(handler);
-          handler.shutdown();
+          unregisterOtherHandler(handler);
         }
       }
       catch(Exception e)
@@ -1122,12 +1096,41 @@
     }
   }
 
+  private void unregisterOtherHandler(MessageHandler handler)
+  {
+    unRegisterHandler(handler);
+    handler.shutdown();
+  }
+
+  private void unregisterServerHandler(ServerHandler handler, boolean shutdown,
+      boolean isDirectoryServer)
+  {
+    unregisterServerHandler(handler);
+    handler.shutdown();
+
+    // Check if generation id has to be reset
+    mayResetGenerationId();
+    if (!shutdown)
+    {
+      if (isDirectoryServer)
+      {
+        // Update the remote replication servers with our list
+        // of connected LDAP servers
+        buildAndSendTopoInfoToRSs();
+      }
+      // Warn our DSs that a RS or DS has quit (does not use this
+      // handler as already removed from list)
+      buildAndSendTopoInfoToDSs(null);
+    }
+  }
+
   /**
    * Stop the handler.
    * @param handler The handler to stop.
    */
   public void stopServer(MessageHandler handler)
   {
+    // TODO JNR merge with stopServer(ServerHandler, boolean)
     if (debugEnabled())
     {
       TRACER.debugInfo("In "
@@ -1163,8 +1166,7 @@
       {
         if (otherHandlers.contains(handler))
         {
-          unRegisterHandler(handler);
-          handler.shutdown();
+          unregisterOtherHandler(handler);
         }
       }
       catch(Exception e)
@@ -1269,39 +1271,40 @@
   }
 
   /**
-   * Checks that a remote RS is not already connected to this hosting RS.
-   * @param handler The handler for the remote RS.
+   * Checks whether a remote RS is already connected to this hosting RS.
+   *
+   * @param handler
+   *          The handler for the remote RS.
    * @return flag specifying whether the remote RS is already connected.
-   * @throws DirectoryException when a problem occurs.
+   * @throws DirectoryException
+   *           when a problem occurs.
    */
-  public boolean checkForDuplicateRS(ReplicationServerHandler handler)
-  throws DirectoryException
+  public boolean isAlreadyConnectedToRS(ReplicationServerHandler handler)
+      throws DirectoryException
   {
     ReplicationServerHandler oldHandler =
-      connectedRSs.get(handler.getServerId());
-    if (oldHandler != null)
+        connectedRSs.get(handler.getServerId());
+    if (oldHandler == null)
     {
-      if (oldHandler.getServerAddressURL().equals(
-        handler.getServerAddressURL()))
-      {
-        // this is the same server, this means that our ServerStart messages
-        // have been sent at about the same time and 2 connections
-        // have been established.
-        // Silently drop this connection.
-        return false;
-      }
-      else
-      {
-        // looks like two replication servers have the same serverId
-        // log an error message and drop this connection.
-        Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
-          localReplicationServer.getMonitorInstanceName(), oldHandler.
-          getServerAddressURL(), handler.getServerAddressURL(),
-          handler.getServerId());
-        throw new DirectoryException(ResultCode.OTHER, message);
-      }
+      return false;
     }
-    return true;
+
+    if (oldHandler.getServerAddressURL().equals(handler.getServerAddressURL()))
+    {
+      // this is the same server, this means that our ServerStart messages
+      // have been sent at about the same time and 2 connections
+      // have been established.
+      // Silently drop this connection.
+      return true;
+    }
+
+    // looks like two replication servers have the same serverId
+    // log an error message and drop this connection.
+    Message message = ERR_DUPLICATE_REPLICATION_SERVER_ID.get(
+        localReplicationServer.getMonitorInstanceName(),
+        oldHandler.getServerAddressURL(), handler.getServerAddressURL(),
+        handler.getServerId());
+    throw new DirectoryException(ResultCode.OTHER, message);
   }
 
   /**
@@ -1327,21 +1330,6 @@
   }
 
   /**
-   * Return a Set of String containing the lists of Replication servers
-   * connected to this server.
-   * @return the set of connected servers
-   */
-  public Set<String> getChangelogs()
-  {
-    Set<String> results = new LinkedHashSet<String>();
-    for (ReplicationServerHandler handler : connectedRSs.values())
-    {
-      results.add(handler.getServerAddressURL());
-    }
-    return results;
-  }
-
-  /**
    * Return a set containing the server that produced update and known by
    * this replicationServer from all over the topology,
    * whatever directly connected of connected to another RS.
@@ -2861,11 +2849,11 @@
   }
 
   /**
-   * Starts the status analyzer for the domain.
+   * Starts the status analyzer for the domain if not already started.
    */
   public void startStatusAnalyzer()
   {
-    if (statusAnalyzer == null)
+    if (!isRunningStatusAnalyzer())
     {
       int degradedStatusThreshold =
         localReplicationServer.getDegradedStatusThreshold();
@@ -2880,9 +2868,9 @@
   /**
    * Stops the status analyzer for the domain.
    */
-  public void stopStatusAnalyzer()
+  private void stopStatusAnalyzer()
   {
-    if (statusAnalyzer != null)
+    if (isRunningStatusAnalyzer())
     {
       statusAnalyzer.shutdown();
       statusAnalyzer.waitForShutdown();
@@ -2894,32 +2882,19 @@
    * Tests if the status analyzer for this domain is running.
    * @return True if the status analyzer is running, false otherwise.
    */
-  public boolean isRunningStatusAnalyzer()
+  private boolean isRunningStatusAnalyzer()
   {
     return statusAnalyzer != null;
   }
 
   /**
-   * Update the status analyzer with the new threshold value.
-   * @param degradedStatusThreshold The new threshold value.
-   */
-  public void updateStatusAnalyzer(int degradedStatusThreshold)
-  {
-    if (statusAnalyzer != null)
-    {
-      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
-    }
-  }
-
-  /**
-   * Starts the monitoring publisher for the domain.
+   * Starts the monitoring publisher for the domain if not already started.
    */
   public void startMonitoringPublisher()
   {
-    if (monitoringPublisher == null)
+    if (!isRunningMonitoringPublisher())
     {
-      long period =
-        localReplicationServer.getMonitoringPublisherPeriod();
+      long period = localReplicationServer.getMonitoringPublisherPeriod();
       if (period > 0) // 0 means no monitoring publisher
       {
         monitoringPublisher = new MonitoringPublisher(this, period);
@@ -2931,9 +2906,9 @@
   /**
    * Stops the monitoring publisher for the domain.
    */
-  public void stopMonitoringPublisher()
+  private void stopMonitoringPublisher()
   {
-    if (monitoringPublisher != null)
+    if (isRunningMonitoringPublisher())
     {
       monitoringPublisher.shutdown();
       monitoringPublisher.waitForShutdown();
@@ -2945,24 +2920,12 @@
    * Tests if the monitoring publisher for this domain is running.
    * @return True if the monitoring publisher is running, false otherwise.
    */
-  public boolean isRunningMonitoringPublisher()
+  private boolean isRunningMonitoringPublisher()
   {
     return monitoringPublisher != null;
   }
 
   /**
-   * Update the monitoring publisher with the new period value.
-   * @param period The new period value.
-   */
-  public void updateMonitoringPublisher(long period)
-  {
-    if (monitoringPublisher != null)
-    {
-      monitoringPublisher.setPeriod(period);
-    }
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -3384,4 +3347,52 @@
   {
     return this.localReplicationServer.getServerId();
   }
+
+  /**
+   * Update the status analyzer with the new threshold value.
+   *
+   * @param degradedStatusThreshold
+   *          The new threshold value.
+   */
+  void updateDegradedStatusThreshold(int degradedStatusThreshold)
+  {
+    if (degradedStatusThreshold == 0)
+    {
+      // Requested to stop analyzers
+      stopStatusAnalyzer();
+    }
+    else if (isRunningStatusAnalyzer())
+    {
+      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
+    }
+    else if (getConnectedDSs().size() > 0)
+    {
+      // Requested to start analyzers with provided threshold value
+      startStatusAnalyzer();
+    }
+  }
+
+  /**
+   * Update the monitoring publisher with the new period value.
+   *
+   * @param period
+   *          The new period value.
+   */
+  void updateMonitoringPeriod(long period)
+  {
+    if (period == 0)
+    {
+      // Requested to stop monitoring publishers
+      stopMonitoringPublisher();
+    }
+    else if (isRunningMonitoringPublisher())
+    {
+      monitoringPublisher.setPeriod(period);
+    }
+    else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
+    {
+      // Requested to start monitoring publishers with provided period value
+      startMonitoringPublisher();
+    }
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 6f96bda..1753dd6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -160,7 +160,6 @@
     {
       lockDomain(false); // no timeout
 
-      // Send start
       ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
 
       // Wait answer
@@ -174,22 +173,19 @@
           // Remote replication server is probably shutting down or simultaneous
           // cross-connect detected.
           abortStart(null);
-          return;
         }
         else
         {
           Message message = ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE.get(msg
               .getClass().getCanonicalName(), "ReplServerStartMsg");
           abortStart(message);
-          return;
         }
+        return;
       }
 
-      // Process hello from remote.
-      processStartFromRemote((ReplServerStartMsg)msg);
+      processStartFromRemote((ReplServerStartMsg) msg);
 
-      // Duplicate server ?
-      if (!replicationServerDomain.checkForDuplicateRS(this))
+      if (replicationServerDomain.isAlreadyConnectedToRS(this))
       {
         // Simultaneous cross connect.
         abortStart(null);
@@ -207,10 +203,9 @@
             generationId, false);
       }
 
-      // Log
       logStartHandshakeSNDandRCV(outReplServerStartMsg,(ReplServerStartMsg)msg);
 
-      // Until here session is encrypted then it depends on the negociation
+      // Until here session is encrypted then it depends on the negotiation
       // The session initiator decides whether to use SSL.
       if (!this.sslEncryption)
         session.stopEncryption();
@@ -239,8 +234,7 @@
 
         logTopoHandshakeSNDandRCV(outTopoMsg, inTopoMsg);
 
-        // Create the monitoring publisher for the domain if not already started
-        createMonitoringPublisher();
+        replicationServerDomain.startMonitoringPublisher();
 
         /*
         FIXME: i think this should be done for all protocol version !!
@@ -292,7 +286,6 @@
     }
     finally
     {
-      // Release domain
       if (replicationServerDomain != null &&
           replicationServerDomain.hasLock())
         replicationServerDomain.release();
@@ -316,8 +309,7 @@
       // lock with timeout
       lockDomain(true);
 
-      // Duplicate server ?
-      if (!replicationServerDomain.checkForDuplicateRS(this))
+      if (replicationServerDomain.isAlreadyConnectedToRS(this))
       {
         abortStart(null);
         return;
@@ -326,7 +318,6 @@
       this.localGenerationId = replicationServerDomain.getGenerationId();
       ReplServerStartMsg outReplServerStartMsg = sendStartToRemote();
 
-      // log
       logStartHandshakeRCVandSND(inReplServerStartMsg, outReplServerStartMsg);
 
       /*
@@ -358,7 +349,6 @@
             .createTopologyMsgForRS();
         sendTopoInfo(outTopoMsg);
 
-        // log
         logTopoHandshakeRCVandSND(inTopoMsg, outTopoMsg);
       }
       else
@@ -390,9 +380,7 @@
         */
       }
 
-
-      // Create the monitoring publisher for the domain if not already started
-      createMonitoringPublisher();
+      replicationServerDomain.startMonitoringPublisher();
 
       registerIntoDomain();
 
@@ -616,9 +604,7 @@
   public void shutdown()
   {
     super.shutdown();
-    /*
-     * Stop the remote LSHandler
-     */
+    // Stop the remote LSHandler
     synchronized (remoteDirectoryServers)
     {
       for (LightweightServerHandler lsh : remoteDirectoryServers.values())
@@ -755,7 +741,7 @@
     attributes.add(Attributes.create("missing-changes",
         String.valueOf(md.getMissingChangesRS(serverId))));
 
-    /* get the Server State */
+    // get the Server State
     AttributeBuilder builder = new AttributeBuilder("server-state");
     ServerState state = md.getRSStates(serverId);
     if (state != null)
@@ -769,6 +755,7 @@
 
     return attributes;
   }
+
   /**
    * {@inheritDoc}
    */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index c377e5f..339ce80 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -78,8 +78,7 @@
       if (debugEnabled())
         TRACER.debugInfo("In " +
           ((handler != null) ? handler.toString() : "Replication Server") +
-          " closing session with err=" +
-          providedMsg.toString());
+          " closing session with err=" + providedMsg);
       logError(providedMsg);
     }
 
@@ -125,7 +124,7 @@
    */
   protected AtomicInteger assuredSrSentUpdatesTimeout = new AtomicInteger();
   /**
-  // Number of updates received from the server in assured safe data mode.
+   * Number of updates received from the server in assured safe data mode.
    */
   protected int assuredSdReceivedUpdates = 0;
   /**
@@ -169,7 +168,7 @@
   /**
    * The initial size of the sending window.
    */
-  int sendWindowSize;
+  protected int sendWindowSize;
   /**
    * remote generation id.
    */
@@ -185,7 +184,7 @@
   /**
    * Group id of this remote server.
    */
-  protected byte groupId = (byte) -1;
+  protected byte groupId = -1;
   /**
    * The SSL encryption after the negotiation with the peer.
    */
@@ -254,8 +253,7 @@
       closeSession(localSession, reason, this);
     }
 
-    if ((replicationServerDomain != null) &&
-        replicationServerDomain.hasLock())
+    if (replicationServerDomain != null && replicationServerDomain.hasLock())
       replicationServerDomain.release();
 
     // If generation id of domain was changed, set it back to old value
@@ -263,10 +261,9 @@
     // peer server and the last topo message sent may have failed being
     // sent: in that case retrieve old value of generation id for
     // replication server domain
-    if (oldGenerationId != -100)
+    if (oldGenerationId != -100 && replicationServerDomain != null)
     {
-      if (replicationServerDomain!=null)
-        replicationServerDomain.changeGenerationId(oldGenerationId, false);
+      replicationServerDomain.changeGenerationId(oldGenerationId, false);
     }
   }
 
@@ -304,7 +301,6 @@
   @Override
   public boolean engageShutdown()
   {
-    // Use thread safe boolean
     return shuttingDown.getAndSet(true);
   }
 
@@ -340,13 +336,11 @@
       // sendWindow MUST be created before starting the writer
       sendWindow = new Semaphore(sendWindowSize);
 
-      writer = new ServerWriter(session, this,
-          replicationServerDomain);
+      writer = new ServerWriter(session, this, replicationServerDomain);
       reader = new ServerReader(session, this);
 
-      session.setName("Replication server RS("
-          + this.getReplicationServerId()
-          + ") session thread to " + this.toString() + " at "
+      session.setName("Replication server RS(" + getReplicationServerId()
+          + ") session thread to " + this + " at "
           + session.getReadableRemoteAddress());
       session.start();
       try
@@ -366,9 +360,8 @@
       // Create a thread to send heartbeat messages.
       if (heartbeatInterval > 0)
       {
-        String threadName = "Replication server RS("
-            + this.getReplicationServerId()
-            + ") heartbeat publisher to " + this.toString() + " at "
+        String threadName = "Replication server RS(" + getReplicationServerId()
+            + ") heartbeat publisher to " + this + " at "
             + session.getReadableRemoteAddress();
         heartbeatThread = new HeartbeatThread(threadName, session,
             heartbeatInterval / 3);
@@ -788,7 +781,7 @@
    */
   public boolean isReplicationServer()
   {
-    return (!this.isDataServer());
+    return !this.isDataServer();
   }
 
 
@@ -827,62 +820,58 @@
     // it will be created and locked later in the method
     if (!timedout)
     {
-      // !timedout
       if (!replicationServerDomain.hasLock())
         replicationServerDomain.lock();
+      return;
     }
-    else
+
+    /**
+     * Take the lock on the domain.
+     * WARNING: Here we try to acquire the lock with a timeout. This
+     * is for preventing a deadlock that may happen if there are cross
+     * connection attempts (for same domain) from this replication
+     * server and from a peer one:
+     * Here is the scenario:
+     * - RS1 connect thread takes the domain lock and starts
+     * connection to RS2
+     * - at the same time RS2 connect thread takes his domain lock and
+     * start connection to RS2
+     * - RS2 listen thread starts processing received
+     * ReplServerStartMsg from RS1 and wants to acquire the lock on
+     * the domain (here) but cannot as RS2 connect thread already has
+     * it
+     * - RS1 listen thread starts processing received
+     * ReplServerStartMsg from RS2 and wants to acquire the lock on
+     * the domain (here) but cannot as RS1 connect thread already has
+     * it
+     * => Deadlock: 4 threads are locked.
+     * So to prevent that in such situation, the listen threads here
+     * will both timeout trying to acquire the lock. The random time
+     * for the timeout should allow on connection attempt to be
+     * aborted whereas the other one should have time to finish in the
+     * same time.
+     * Warning: the minimum time (3s) should be big enough to allow
+     * normal situation connections to terminate. The added random
+     * time should represent a big enough range so that the chance to
+     * have one listen thread timing out a lot before the peer one is
+     * great. When the first listen thread times out, the remote
+     * connect thread should release the lock and allow the peer
+     * listen thread to take the lock it was waiting for and process
+     * the connection attempt.
+     */
+    Random random = new Random();
+    int randomTime = random.nextInt(6); // Random from 0 to 5
+    // Wait at least 3 seconds + (0 to 5 seconds)
+    long timeout = 3000 + (randomTime * 1000);
+    boolean lockAcquired = replicationServerDomain.tryLock(timeout);
+    if (!lockAcquired)
     {
-      // timedout
-      /**
-       * Take the lock on the domain.
-       * WARNING: Here we try to acquire the lock with a timeout. This
-       * is for preventing a deadlock that may happen if there are cross
-       * connection attempts (for same domain) from this replication
-       * server and from a peer one:
-       * Here is the scenario:
-       * - RS1 connect thread takes the domain lock and starts
-       * connection to RS2
-       * - at the same time RS2 connect thread takes his domain lock and
-       * start connection to RS2
-       * - RS2 listen thread starts processing received
-       * ReplServerStartMsg from RS1 and wants to acquire the lock on
-       * the domain (here) but cannot as RS2 connect thread already has
-       * it
-       * - RS1 listen thread starts processing received
-       * ReplServerStartMsg from RS2 and wants to acquire the lock on
-       * the domain (here) but cannot as RS1 connect thread already has
-       * it
-       * => Deadlock: 4 threads are locked.
-       * So to prevent that in such situation, the listen threads here
-       * will both timeout trying to acquire the lock. The random time
-       * for the timeout should allow on connection attempt to be
-       * aborted whereas the other one should have time to finish in the
-       * same time.
-       * Warning: the minimum time (3s) should be big enough to allow
-       * normal situation connections to terminate. The added random
-       * time should represent a big enough range so that the chance to
-       * have one listen thread timing out a lot before the peer one is
-       * great. When the first listen thread times out, the remote
-       * connect thread should release the lock and allow the peer
-       * listen thread to take the lock it was waiting for and process
-       * the connection attempt.
-       */
-      Random random = new Random();
-      int randomTime = random.nextInt(6); // Random from 0 to 5
-      // Wait at least 3 seconds + (0 to 5 seconds)
-      long timeout = 3000 + (randomTime * 1000);
-      boolean noTimeout = replicationServerDomain.tryLock(timeout);
-      if (!noTimeout)
-      {
-        // Timeout
-        Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
-            getBaseDN(),
-            serverId,
-            session.getReadableRemoteAddress(),
-            getReplicationServerId());
-        throw new DirectoryException(ResultCode.OTHER, message);
-      }
+      Message message = WARN_TIMEOUT_WHEN_CROSS_CONNECTION.get(
+          getBaseDN(),
+          serverId,
+          session.getReadableRemoteAddress(),
+          getReplicationServerId());
+      throw new DirectoryException(ResultCode.OTHER, message);
     }
   }
 
@@ -1011,9 +1000,7 @@
       session.close();
     }
 
-    /*
-     * Stop the heartbeat thread.
-     */
+    // Stop the heartbeat thread.
     if (heartbeatThread != null)
     {
       heartbeatThread.shutdown();
@@ -1028,12 +1015,11 @@
      */
     try
     {
-      if ((writer != null) && (!(Thread.currentThread().equals(writer))))
+      if (writer != null && !Thread.currentThread().equals(writer))
       {
-
         writer.join(SHUTDOWN_JOIN_TIMEOUT);
       }
-      if ((reader != null) && (!(Thread.currentThread().equals(reader))))
+      if (reader != null && !Thread.currentThread().equals(reader))
       {
         reader.join(SHUTDOWN_JOIN_TIMEOUT);
       }
@@ -1068,7 +1054,7 @@
       {
         // loop until not interrupted
       }
-    } while (((interrupted) || (!acquired)) && (!shutdownWriter));
+    } while ((interrupted || !acquired) && !shutdownWriter);
     if (msg != null)
     {
       incrementOutCount();
@@ -1078,10 +1064,9 @@
         if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
         {
           incrementAssuredSrSentUpdates();
-        } else
+        } else if (!isDataServer())
         {
-          if (!isDataServer())
-            incrementAssuredSdSentUpdates();
+          incrementAssuredSdSentUpdates();
         }
       }
     }
@@ -1094,22 +1079,10 @@
    */
   public RSInfo toRSInfo()
   {
-
-    return new RSInfo(serverId, serverURL, generationId, groupId,
-      weight);
+    return new RSInfo(serverId, serverURL, generationId, groupId, weight);
   }
 
   /**
-   * Starts the monitoring publisher for the domain if not already started.
-   */
-  protected void createMonitoringPublisher()
-  {
-    if (!replicationServerDomain.isRunningMonitoringPublisher())
-    {
-      replicationServerDomain.startMonitoringPublisher();
-    }
-  }
-  /**
    * Update the send window size based on the credit specified in the
    * given window message.
    *
@@ -1132,11 +1105,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-        this.replicationServer.getMonitorInstanceName() + ", " +
-        this.getClass().getSimpleName() + " " + this + ":" +
-        "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg.toString()+
-        "\nAND REPLIED:\n" + outStartMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH START HANDSHAKE RECEIVED:\n" + inStartMsg
+          + "\nAND REPLIED:\n" + outStartMsg);
     }
   }
 
@@ -1151,12 +1123,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-        this.replicationServer.getMonitorInstanceName() + ", " +
-        this.getClass().getSimpleName() + " " + this + ":" +
-        "\nSH START HANDSHAKE SENT("+ this +
-        "):\n" + outStartMsg.toString()+
-        "\nAND RECEIVED:\n" + inStartMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH START HANDSHAKE SENT:\n" + outStartMsg + "\nAND RECEIVED:\n"
+          + inStartMsg);
     }
   }
 
@@ -1171,11 +1141,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + ":" +
-          "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg.toString() +
-          "\nAND REPLIED:\n" + outTopoMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH TOPO HANDSHAKE RECEIVED:\n" + inTopoMsg + "\nAND REPLIED:\n"
+          + outTopoMsg);
     }
   }
 
@@ -1190,11 +1159,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + ":" +
-          "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg.toString() +
-          "\nAND RECEIVED:\n" + inTopoMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + ":"
+          + "\nSH TOPO HANDSHAKE SENT:\n" + outTopoMsg + "\nAND RECEIVED:\n"
+          + inTopoMsg);
     }
   }
 
@@ -1209,11 +1177,10 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg.toString() +
-          "\nAND REPLIED:\n" + outTopoMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + " :"
+          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartSessionMsg
+          + "\nAND REPLIED:\n" + outTopoMsg);
     }
   }
 
@@ -1224,10 +1191,9 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + " :"
+          + "\nSH SESSION HANDSHAKE RECEIVED A STOP MESSAGE");
     }
   }
 
@@ -1240,11 +1206,9 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("In " +
-          this.replicationServer.getMonitorInstanceName() + ", " +
-          this.getClass().getSimpleName() + " " + this + " :" +
-          "\nSH SESSION HANDSHAKE RECEIVED:\n" +
-          inStartECLSessionMsg.toString());
+      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
+          + ", " + getClass().getSimpleName() + " " + this + " :"
+          + "\nSH SESSION HANDSHAKE RECEIVED:\n" + inStartECLSessionMsg);
     }
   }
 
@@ -1264,10 +1228,9 @@
    */
   public long getReferenceGenId()
   {
-    long refgenid = -1;
-    if (replicationServerDomain!=null)
-      refgenid = replicationServerDomain.getGenerationId();
-    return refgenid;
+    if (replicationServerDomain != null)
+      return replicationServerDomain.getGenerationId();
+    return -1;
   }
 
   /**
@@ -1285,8 +1248,7 @@
    * @param update the update message received.
    * @throws IOException when it occurs.
    */
-  public void put(UpdateMsg update)
-  throws IOException
+  public void put(UpdateMsg update) throws IOException
   {
     if (replicationServerDomain!=null)
       replicationServerDomain.put(update, this);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
index 3b8daa7..222b8c7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/StatusAnalyzer.java
@@ -29,11 +29,12 @@
 
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachineEvent;
 import org.opends.server.types.DebugLogLevel;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.replication.common.StatusMachineEvent.*;
 
 /**
  * This thread is in charge of periodically determining if the connected
@@ -85,7 +86,6 @@
   }
 
   /**
-   * Run method for the StatusAnalyzer.
    * Analyzes if servers are late or not, and change their status accordingly.
    */
   @Override
@@ -93,13 +93,11 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("Directory server status analyzer starting for dn " +
-        replicationServerDomain.getBaseDn());
+      TRACER.debugInfo(
+          getMessage("Directory server status analyzer starting."));
     }
 
-    final int localRsId = replicationServerDomain.getLocalRSServerId();
-    boolean interrupted = false;
-    while (!shutdown && !interrupted)
+    while (!shutdown)
     {
       synchronized (shutdownLock)
       {
@@ -126,22 +124,21 @@
       // for it and change status accordingly if threshold value is
       // crossed/uncrossed
       for (DataServerHandler serverHandler :
-        replicationServerDomain.getConnectedDSs(). values())
+        replicationServerDomain.getConnectedDSs().values())
       {
         // Get number of pending changes for this server
         int nChanges = serverHandler.getRcvMsgQueueSize();
         if (debugEnabled())
         {
-          TRACER.debugInfo("Status analyzer for dn "
-              + replicationServerDomain.getBaseDn() + " DS "
+          TRACER.debugInfo(getMessage("Status analyzer: DS "
               + serverHandler.getServerId() + " has " + nChanges
-              + " message(s) in writer queue. This is in RS " + localRsId);
+              + " message(s) in writer queue."));
         }
 
         // Check status to know if it is relevant to change the status. Do not
         // take RSD lock to test. If we attempt to change the status whereas
-        // we are in a status that do not allows that, this will be noticed by
-        // the changeStatusFromStatusAnalyzer method. This allows to take the
+        // the current status does allow it, this will be noticed by
+        // the changeStatusFromStatusAnalyzer() method. This allows to take the
         // lock roughly only when needed versus every sleep time timeout.
         if (degradedStatusThreshold > 0)
           // Threshold value = 0 means no status analyzer (no degrading system)
@@ -151,39 +148,18 @@
         {
           if (nChanges >= degradedStatusThreshold)
           {
-            if (serverHandler.getStatus() == ServerStatus.NORMAL_STATUS)
+            if (serverHandler.getStatus() == NORMAL_STATUS
+                && isInterrupted(serverHandler, TO_DEGRADED_STATUS_EVENT))
             {
-              interrupted =
-                replicationServerDomain.changeStatusFromStatusAnalyzer(
-                serverHandler,
-                StatusMachineEvent.TO_DEGRADED_STATUS_EVENT);
-              if (interrupted)
-              {
-                // Finish job and let thread die
-                TRACER.debugInfo("Status analyzer for dn "
-                    + replicationServerDomain.getBaseDn()
-                    + " has been interrupted and will die. This is in RS "
-                    + localRsId);
-                break;
-              }
+              break;
             }
-          } else
+          }
+          else
           {
-            if (serverHandler.getStatus() == ServerStatus.DEGRADED_STATUS)
+            if (serverHandler.getStatus() == DEGRADED_STATUS
+                && isInterrupted(serverHandler, TO_NORMAL_STATUS_EVENT))
             {
-              interrupted =
-                replicationServerDomain.changeStatusFromStatusAnalyzer(
-                serverHandler,
-                StatusMachineEvent.TO_NORMAL_STATUS_EVENT);
-              if (interrupted)
-              {
-                // Finish job and let thread die
-                TRACER.debugInfo("Status analyzer for dn "
-                    + replicationServerDomain.getBaseDn()
-                    + " has been interrupted and will die. This is in RS "
-                    + localRsId);
-                break;
-              }
+              break;
             }
           }
         }
@@ -191,9 +167,28 @@
     }
 
     done = true;
-    TRACER.debugInfo("Status analyzer for dn "
-        + replicationServerDomain.getBaseDn() + " is terminated."
-        + " This is in RS " + localRsId);
+    TRACER.debugInfo(getMessage("Status analyzer is terminated."));
+  }
+
+  private String getMessage(String message)
+  {
+    return "In RS " + replicationServerDomain.getLocalRSServerId()
+        + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
+        + message;
+  }
+
+  private boolean isInterrupted(DataServerHandler serverHandler,
+      StatusMachineEvent event)
+  {
+    if (replicationServerDomain.changeStatusFromStatusAnalyzer(serverHandler,
+        event))
+    {
+      // Finish job and let thread die
+      TRACER.debugInfo(
+          getMessage("Status analyzer has been interrupted and will die."));
+      return true;
+    }
+    return false;
   }
 
   /**
@@ -208,9 +203,7 @@
 
       if (debugEnabled())
       {
-        TRACER.debugInfo("Shutting down status analyzer for dn "
-            + replicationServerDomain.getBaseDn()
-            + " in RS " + replicationServerDomain.getLocalRSServerId());
+        TRACER.debugInfo(getMessage("Shutting down status analyzer."));
       }
     }
   }
@@ -231,9 +224,7 @@
         n++;
         if (n >= FACTOR)
         {
-          TRACER.debugInfo("Interrupting status analyzer for dn " +
-              replicationServerDomain.getBaseDn() + " in RS " +
-              replicationServerDomain.getLocalRSServerId());
+          TRACER.debugInfo(getMessage("Interrupting status analyzer."));
           interrupt();
         }
       }
@@ -251,9 +242,9 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("Directory server status analyzer for dn " +
-        replicationServerDomain.getBaseDn() + " changing threshold value to " +
-        degradedStatusThreshold);
+      TRACER.debugInfo(getMessage(
+          "Directory server status analyzer changing threshold value to "
+              + degradedStatusThreshold));
     }
 
     this.degradedStatusThreshold = degradedStatusThreshold;

--
Gitblit v1.10.0