From 4bb3d3af3d032a98f2ca318c81be5c4f81034b8f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 11 Oct 2013 12:27:16 +0000
Subject: [PATCH] ReplicationBroker.java: In computeInitialServerStatus(), used early exits. In computeBestServerForWeight(), extracted methods computeBestServerWhenNotConnected() and computeBestServerWhenConnected(). Changed replicationServerUrls from Collection<String> to Set<String>. Removed useless field initialization to null. Renamed _publish() to publish() + reduced local variables scope. In receive(), renamed local variable replicationServerID to previousRsServerID and used this one more rather than the field. In changeConfig(), used Set.equals(). Changed getReplicationMonitor() to getReplicationMonitorInstanceName().

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  564 +++++++++++++++++++++++++++----------------------------
 1 files changed, 278 insertions(+), 286 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index d486e8d..8bd15cb 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -31,7 +31,10 @@
 import java.math.BigDecimal;
 import java.math.MathContext;
 import java.math.RoundingMode;
-import java.net.*;
+import java.net.ConnectException;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
@@ -72,7 +75,7 @@
   /**
    * Replication server URLs under this format: "<code>hostname:port</code>".
    */
-  private volatile Collection<String> replicationServerUrls;
+  private volatile Set<String> replicationServerUrls;
   private volatile boolean connected = false;
   /**
    * String reported under CSN=monitor when there is no connected RS.
@@ -98,9 +101,9 @@
   /** The server id of the RS we are connected to. */
   private Integer rsServerId = -1;
   /** The server URL of the RS we are connected to. */
-  private String rsServerUrl = null;
+  private String rsServerUrl;
   /** Our replication domain. */
-  private ReplicationDomain domain = null;
+  private ReplicationDomain domain;
   /**
    * This object is used as a conditional event to be notified about
    * the reception of monitor information from the Replication Server.
@@ -121,7 +124,7 @@
   /**
    * A thread to monitor heartbeats on the session.
    */
-  private HeartbeatMonitor heartbeatMonitor = null;
+  private HeartbeatMonitor heartbeatMonitor;
   /**
    * The number of times the connection was lost.
    */
@@ -140,7 +143,7 @@
    * The thread that publishes messages to the RS containing the current
    * change time of this DS.
    */
-  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread = null;
+  private CTHeartbeatPublisherThread ctHeartbeatPublisherThread;
   /**
    * The expected period in milliseconds between these messages are sent
    * to the replication server. Zero means heartbeats are off.
@@ -149,8 +152,11 @@
   /*
    * Properties for the last topology info received from the network.
    */
-  // Info for other DSs.
-  // Warning: does not contain info for us (for our server id)
+  /**
+   * Info for other DSs.
+   * <p>
+   * Warning: does not contain info for us (for our server id)
+   */
   private volatile List<DSInfo> dsList = new ArrayList<DSInfo>();
   private volatile long generationID;
   private volatile int updateDoneCount = 0;
@@ -162,8 +168,7 @@
    * replication server one wants to connect. Key: replication server id Value:
    * replication server info for the matching replication server id
    */
-  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos
-    = null;
+  private volatile Map<Integer, ReplicationServerInfo> replicationServerInfos;
 
   /**
    * This integer defines when the best replication server checking algorithm
@@ -256,7 +261,7 @@
    *
    * @param replicationServers list of servers used
    */
-  public void start(Collection<String> replicationServers)
+  public void start(Set<String> replicationServers)
   {
     synchronized (startStopLock)
     {
@@ -1068,60 +1073,45 @@
     {
       // RS has no generation id
       return ServerStatus.NORMAL_STATUS;
-    } else
+    }
+    else if (rsGenId != dsGenId)
     {
-      if (rsGenId == dsGenId)
+      // DS and RS do not have same generation id
+      return ServerStatus.BAD_GEN_ID_STATUS;
+    }
+    else
+    {
+      /*
+      DS and RS have same generation id
+
+      Determine if we are late or not to replay changes. RS uses a
+      threshold value for pending changes to be replayed by a DS to
+      determine if the DS is in normal status or in degraded status.
+      Let's compare the local and remote server state using  this threshold
+      value to determine if we are late or not
+      */
+
+      int nChanges = ServerState.diffChanges(rsState, state);
+      if (debugEnabled())
       {
-        /*
-        DS and RS have same generation id
-
-        Determine if we are late or not to replay changes. RS uses a
-        threshold value for pending changes to be replayed by a DS to
-        determine if the DS is in normal status or in degraded status.
-        Let's compare the local and remote server state using  this threshold
-        value to determine if we are late or not
-        */
-
-        ServerStatus initStatus;
-        int nChanges = ServerState.diffChanges(rsState, state);
-
-        if (debugEnabled())
-        {
-          TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
-              + serverId + " computed " + nChanges + " changes late.");
-        }
-
-        /*
-        Check status to know if it is relevant to change the status. Do not
-        take RSD lock to test. If we attempt to change the status whereas
-        we are in a status that do not allows that, this will be noticed by
-        the changeStatusFromStatusAnalyzer method. This allows to take the
-        lock roughly only when needed versus every sleep time timeout.
-        */
-        if (degradedStatusThreshold > 0)
-        {
-          if (nChanges >= degradedStatusThreshold)
-          {
-            initStatus = ServerStatus.DEGRADED_STATUS;
-          } else
-          {
-            initStatus = ServerStatus.NORMAL_STATUS;
-          }
-        } else
-        {
-          /*
-          0 threshold value means no degrading system used (no threshold):
-          force normal status
-          */
-          initStatus = ServerStatus.NORMAL_STATUS;
-        }
-
-        return initStatus;
-      } else
-      {
-        // DS and RS do not have same generation id
-        return ServerStatus.BAD_GEN_ID_STATUS;
+        TRACER.debugInfo("RB for dn " + baseDN + " and with server id "
+            + serverId + " computed " + nChanges + " changes late.");
       }
+
+      /*
+      Check status to know if it is relevant to change the status. Do not
+      take RSD lock to test. If we attempt to change the status whereas
+      we are in a status that do not allows that, this will be noticed by
+      the changeStatusFromStatusAnalyzer method. This allows to take the
+      lock roughly only when needed versus every sleep time timeout.
+      */
+      if (degradedStatusThreshold > 0 && nChanges >= degradedStatusThreshold)
+      {
+        return ServerStatus.DEGRADED_STATUS;
+      }
+      // degradedStatusThreshold value of '0' means no degrading system used
+      // (no threshold): force normal status
+      return ServerStatus.NORMAL_STATUS;
     }
   }
 
@@ -1472,20 +1462,16 @@
       return bestServers.values().iterator().next();
     }
 
-      if (firstConnection)
-      {
-        // We are not connected to a server yet
-        return computeBestServerForWeight(bestServers, -1, -1);
-      } else
-      {
-        /*
-        We are already connected to a RS: compute the best RS as far as the
-        weights is concerned. If this is another one, some DS must
-        disconnect.
-        */
-        return computeBestServerForWeight(bestServers, rsServerId,
-          localServerId);
-      }
+    if (firstConnection)
+    {
+      // We are not connected to a server yet
+      return computeBestServerForWeight(bestServers, -1, -1);
+    }
+    /*
+     * We are already connected to a RS: compute the best RS as far as the
+     * weights is concerned. If this is another one, some DS must disconnect.
+     */
+    return computeBestServerForWeight(bestServers, rsServerId, localServerId);
   }
 
   /**
@@ -1783,11 +1769,12 @@
       sumOfWeights += replicationServerInfo.getWeight();
       sumOfConnectedDSs += replicationServerInfo.getConnectedDSNumber();
     }
+
     // Distance (difference) of the current loads to the load goals of each RS:
     // key:server id, value: distance
     Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
     // Precision for the operations (number of digits after the dot)
-    MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
+    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
     for (Integer rsId : bestServers.keySet())
     {
       ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
@@ -1812,199 +1799,210 @@
 
     if (currentRsServerId == -1)
     {
-      // The local server is not connected yet
+      // The local server is not connected yet, find best server to connect to,
+      // taking the weights into account.
+      return computeBestServerWhenNotConnected(bestServers, loadDistances);
+    }
+    // The local server is currently connected to a RS, let's see if it must
+    // disconnect or not, taking the weights into account.
+    return computeBestServerWhenConnected(bestServers, loadDistances,
+        localServerId, currentRsServerId, sumOfWeights, sumOfConnectedDSs);
+  }
 
+  private static ReplicationServerInfo computeBestServerWhenNotConnected(
+      Map<Integer, ReplicationServerInfo> bestServers,
+      Map<Integer, BigDecimal> loadDistances)
+  {
+    /*
+     * Find the server with the current highest distance to its load goal and
+     * choose it. Make an exception if every server is correctly balanced,
+     * that is every current load distances are equal to 0, in that case,
+     * choose the server with the highest weight
+     */
+    int bestRsId = 0; // If all server equal, return the first one
+    float highestDistance = Float.NEGATIVE_INFINITY;
+    boolean allRsWithZeroDistance = true;
+    int highestWeightRsId = -1;
+    int highestWeight = -1;
+    for (Integer rsId : bestServers.keySet())
+    {
+      float loadDistance = loadDistances.get(rsId).floatValue();
+      if (loadDistance > highestDistance)
+      {
+        // This server is far more from its balance point
+        bestRsId = rsId;
+        highestDistance = loadDistance;
+      }
+      if (loadDistance != 0)
+      {
+        allRsWithZeroDistance = false;
+      }
+      int weight = bestServers.get(rsId).getWeight();
+      if (weight > highestWeight)
+      {
+        // This server has a higher weight
+        highestWeightRsId = rsId;
+        highestWeight = weight;
+      }
+    }
+    // All servers with a 0 distance ?
+    if (allRsWithZeroDistance)
+    {
+      // Choose server with the highest weight
+      bestRsId = highestWeightRsId;
+    }
+    return bestServers.get(bestRsId);
+  }
+
+  private static ReplicationServerInfo computeBestServerWhenConnected(
+      Map<Integer, ReplicationServerInfo> bestServers,
+      Map<Integer, BigDecimal> loadDistances, int localServerId,
+      int currentRsServerId, int sumOfWeights, int sumOfConnectedDSs)
+  {
+    final MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
+    float currentLoadDistance =
+      loadDistances.get(currentRsServerId).floatValue();
+    if (currentLoadDistance < 0)
+    {
       /*
-       * Find the server with the current highest distance to its load goal and
-       * choose it. Make an exception if every server is correctly balanced,
-       * that is every current load distances are equal to 0, in that case,
-       * choose the server with the highest weight
-       */
-      int bestRsId = 0; // If all server equal, return the first one
-      float highestDistance = Float.NEGATIVE_INFINITY;
-      boolean allRsWithZeroDistance = true;
-      int highestWeightRsId = -1;
-      int highestWeight = -1;
+      Too much DSs connected to the current RS, compared with its load
+      goal:
+      Determine the potential number of DSs to disconnect from the current
+      RS and see if the local DS is part of them: the DSs that must
+      disconnect are those with the lowest server id.
+      Compute the sum of the distances of the load goals of the other RSs
+      */
+      BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
       for (Integer rsId : bestServers.keySet())
       {
-        float loadDistance = loadDistances.get(rsId).floatValue();
-        if (loadDistance > highestDistance)
+        if (rsId != currentRsServerId)
         {
-          // This server is far more from its balance point
-          bestRsId = rsId;
-          highestDistance = loadDistance;
-        }
-        if (loadDistance != 0)
-        {
-          allRsWithZeroDistance = false;
-        }
-        int weight = bestServers.get(rsId).getWeight();
-        if (weight > highestWeight)
-        {
-          // This server has a higher weight
-          highestWeightRsId = rsId;
-          highestWeight = weight;
+          sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
+            loadDistances.get(rsId), mathContext);
         }
       }
-      // All servers with a 0 distance ?
-      if (allRsWithZeroDistance)
-      {
-        // Choose server with the highest weight
-        bestRsId = highestWeightRsId;
-      }
-      return bestServers.get(bestRsId);
-    } else
-    {
-      // The local server is currently connected to a RS, let's see if it must
-      // disconnect or not, taking the weights into account.
 
-      float currentLoadDistance =
-        loadDistances.get(currentRsServerId).floatValue();
-      if (currentLoadDistance < 0)
+      if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
       {
         /*
-        Too much DSs connected to the current RS, compared with its load
-        goal:
-        Determine the potential number of DSs to disconnect from the current
-        RS and see if the local DS is part of them: the DSs that must
-        disconnect are those with the lowest server id.
-        Compute the sum of the distances of the load goals of the other RSs
+        The average distance of the other RSs shows a lack of DSs.
+        Compute the number of DSs to disconnect from the current RS,
+        rounding to the nearest integer number. Do only this if there is
+        no risk of yoyo effect: when the exact balance cannot be
+        established due to the current number of DSs connected, do not
+        disconnect a DS. A simple example where the balance cannot be
+        reached is:
+        - RS1 has weight 1 and 2 DSs
+        - RS2 has weight 1 and 1 DS
+        => disconnecting a DS from RS1 to reconnect it to RS2 would have no
+        sense as this would lead to the reverse situation. In that case,
+        the perfect balance cannot be reached and we must stick to the
+        current situation, otherwise the DS would keep move between the 2
+        RSs
         */
-        BigDecimal sumOfLoadDistancesOfOtherRSsBd = BigDecimal.ZERO;
-        for (Integer rsId : bestServers.keySet())
+        float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
+          multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
+              .floatValue();
+        int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
+
+        // Avoid yoyo effect
+        if (overloadingDSsNumber == 1)
         {
-          if (rsId != currentRsServerId)
-          {
-            sumOfLoadDistancesOfOtherRSsBd = sumOfLoadDistancesOfOtherRSsBd.add(
-              loadDistances.get(rsId), mathContext);
-          }
-        }
-
-        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
-        {
-          /*
-          The average distance of the other RSs shows a lack of DSs.
-          Compute the number of DSs to disconnect from the current RS,
-          rounding to the nearest integer number. Do only this if there is
-          no risk of yoyo effect: when the exact balance cannot be
-          established due to the current number of DSs connected, do not
-          disconnect a DS. A simple example where the balance cannot be
-          reached is:
-          - RS1 has weight 1 and 2 DSs
-          - RS2 has weight 1 and 1 DS
-          => disconnecting a DS from RS1 to reconnect it to RS2 would have no
-          sense as this would lead to the reverse situation. In that case,
-          the perfect balance cannot be reached and we must stick to the
-          current situation, otherwise the DS would keep move between the 2
-          RSs
-          */
-          float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
-            multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
-                .floatValue();
-          int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
-
-          // Avoid yoyo effect
-          if (overloadingDSsNumber == 1)
-          {
-            // What would be the new load distance for the current RS if
-            // we disconnect some DSs ?
-            ReplicationServerInfo currentReplicationServerInfo =
-              bestServers.get(currentRsServerId);
-
-            int currentRsWeight = currentReplicationServerInfo.getWeight();
-            BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
-            BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
-            BigDecimal currentRsLoadGoalBd =
-              currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
-            BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
-            if (sumOfConnectedDSs != 0)
-            {
-              int connectedDSs = currentReplicationServerInfo.
-                getConnectedDSNumber();
-              BigDecimal potentialNewConnectedDSsBd =
-                  BigDecimal.valueOf(connectedDSs - 1);
-              BigDecimal sumOfConnectedDSsBd =
-                  BigDecimal.valueOf(sumOfConnectedDSs);
-              potentialCurrentRsNewLoadBd =
-                potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
-                  mathContext);
-            }
-            BigDecimal potentialCurrentRsNewLoadDistanceBd =
-              currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
-                mathContext);
-
-            // What would be the new load distance for the other RSs ?
-            BigDecimal additionalDsLoadBd =
-                BigDecimal.ONE.divide(
-                    BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
-            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
-              sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
-                    mathContext);
-
-            /*
-            Now compare both values: we must no disconnect the DS if this
-            is for going in a situation where the load distance of the other
-            RSs is the opposite of the future load distance of the local RS
-            or we would evaluate that we should disconnect just after being
-            arrived on the new RS. But we should disconnect if we reach the
-            perfect balance (both values are 0).
-            */
-            MathContext roundMc =
-              new MathContext(6, RoundingMode.DOWN);
-            BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
-              potentialCurrentRsNewLoadDistanceBd.round(roundMc);
-            BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
-              potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
-
-            if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
-              BigDecimal.ZERO) != 0)
-              && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
-              potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
-            {
-              // Avoid the yoyo effect, and keep the local DS connected to its
-              // current RS
-              return bestServers.get(currentRsServerId);
-            }
-          }
-
-          // Prepare a sorted list (from lowest to highest) or DS server ids
-          // connected to the current RS
-          ReplicationServerInfo currentRsInfo =
+          // What would be the new load distance for the current RS if
+          // we disconnect some DSs ?
+          ReplicationServerInfo currentReplicationServerInfo =
             bestServers.get(currentRsServerId);
-          List<Integer> serversConnectedToCurrentRS =
-            currentRsInfo.getConnectedDSs();
-          List<Integer> sortedServers = new ArrayList<Integer>(
-            serversConnectedToCurrentRS);
-          Collections.sort(sortedServers);
 
-          // Go through the list of DSs to disconnect and see if the local
-          // server is part of them.
-          int index = 0;
-          while (overloadingDSsNumber > 0)
+          int currentRsWeight = currentReplicationServerInfo.getWeight();
+          BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
+          BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
+          BigDecimal currentRsLoadGoalBd =
+            currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
+          BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
+          if (sumOfConnectedDSs != 0)
           {
-            int severToDisconnectId = sortedServers.get(index);
-            if (severToDisconnectId == localServerId)
-            {
-              // The local server is part of the DSs to disconnect
-              return null;
-            }
-            overloadingDSsNumber--;
-            index++;
+            int connectedDSs = currentReplicationServerInfo.
+              getConnectedDSNumber();
+            BigDecimal potentialNewConnectedDSsBd =
+                BigDecimal.valueOf(connectedDSs - 1);
+            BigDecimal sumOfConnectedDSsBd =
+                BigDecimal.valueOf(sumOfConnectedDSs);
+            potentialCurrentRsNewLoadBd =
+              potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
+                mathContext);
           }
+          BigDecimal potentialCurrentRsNewLoadDistanceBd =
+            currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
+              mathContext);
 
-          // The local server is not part of the servers to disconnect from the
-          // current RS.
-        } else {
-          // The average distance of the other RSs does not show a lack of DSs:
-          // no need to disconnect any DS from the current RS.
+          // What would be the new load distance for the other RSs ?
+          BigDecimal additionalDsLoadBd =
+              BigDecimal.ONE.divide(
+                  BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
+          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
+            sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
+                  mathContext);
+
+          /*
+          Now compare both values: we must no disconnect the DS if this
+          is for going in a situation where the load distance of the other
+          RSs is the opposite of the future load distance of the local RS
+          or we would evaluate that we should disconnect just after being
+          arrived on the new RS. But we should disconnect if we reach the
+          perfect balance (both values are 0).
+          */
+          MathContext roundMc = new MathContext(6, RoundingMode.DOWN);
+          BigDecimal potentialCurrentRsNewLoadDistanceBdRounded =
+            potentialCurrentRsNewLoadDistanceBd.round(roundMc);
+          BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBdRounded =
+            potentialNewSumOfLoadDistancesOfOtherRSsBd.round(roundMc);
+
+          if ((potentialCurrentRsNewLoadDistanceBdRounded.compareTo(
+            BigDecimal.ZERO) != 0)
+            && (potentialCurrentRsNewLoadDistanceBdRounded.equals(
+            potentialNewSumOfLoadDistancesOfOtherRSsBdRounded.negate())))
+          {
+            // Avoid the yoyo effect, and keep the local DS connected to its
+            // current RS
+            return bestServers.get(currentRsServerId);
+          }
         }
+
+        // Prepare a sorted list (from lowest to highest) or DS server ids
+        // connected to the current RS
+        ReplicationServerInfo currentRsInfo =
+          bestServers.get(currentRsServerId);
+        List<Integer> serversConnectedToCurrentRS =
+            new ArrayList<Integer>(currentRsInfo.getConnectedDSs());
+        Collections.sort(serversConnectedToCurrentRS);
+
+        // Go through the list of DSs to disconnect and see if the local
+        // server is part of them.
+        int index = 0;
+        while (overloadingDSsNumber > 0)
+        {
+          int serverIdToDisconnect = serversConnectedToCurrentRS.get(index);
+          if (serverIdToDisconnect == localServerId)
+          {
+            // The local server is part of the DSs to disconnect
+            return null;
+          }
+          overloadingDSsNumber--;
+          index++;
+        }
+
+        // The local server is not part of the servers to disconnect from the
+        // current RS.
       } else {
-        // The RS load goal is reached or there are not enough DSs connected to
-        // it to reach it: do not disconnect from this RS and return rsInfo for
-        // this RS
+        // The average distance of the other RSs does not show a lack of DSs:
+        // no need to disconnect any DS from the current RS.
       }
-      return bestServers.get(currentRsServerId);
+    } else {
+      // The RS load goal is reached or there are not enough DSs connected to
+      // it to reach it: do not disconnect from this RS and return rsInfo for
+      // this RS
     }
+    return bestServers.get(currentRsServerId);
   }
 
   /**
@@ -2117,7 +2115,7 @@
    */
   public void publish(ReplicationMsg msg)
   {
-    _publish(msg, false, true);
+    publish(msg, false, true);
   }
 
   /**
@@ -2128,7 +2126,7 @@
    */
   public boolean publish(ReplicationMsg msg, boolean retryOnFailure)
   {
-    return _publish(msg, false, retryOnFailure);
+    return publish(msg, false, retryOnFailure);
   }
 
   /**
@@ -2137,7 +2135,7 @@
    */
   public void publishRecovery(ReplicationMsg msg)
   {
-    _publish(msg, true, true);
+    publish(msg, true, true);
   }
 
   /**
@@ -2147,7 +2145,7 @@
    * @param retryOnFailure whether retry should be done on failure
    * @return whether the message was successfully sent.
    */
-  boolean _publish(ReplicationMsg msg, boolean recoveryMsg,
+  private boolean publish(ReplicationMsg msg, boolean recoveryMsg,
       boolean retryOnFailure)
   {
     boolean done = false;
@@ -2175,10 +2173,6 @@
 
       try
       {
-        boolean credit;
-        Session current_session;
-        Semaphore currentWindowSemaphore;
-
         /*
         save the session at the time when we acquire the
         sendwindow credit so that we can make sure later
@@ -2187,9 +2181,11 @@
         on a session with a credit that was acquired from a previous
         session.
         */
+        Session currentSession;
+        Semaphore currentWindowSemaphore;
         synchronized (connectPhaseLock)
         {
-          current_session = session;
+          currentSession = session;
           currentWindowSemaphore = sendWindow;
         }
 
@@ -2204,6 +2200,7 @@
           return false;
         }
 
+        boolean credit;
         if (msg instanceof UpdateMsg)
         {
           /*
@@ -2217,6 +2214,7 @@
         {
           credit = true;
         }
+
         if (credit)
         {
           synchronized (connectPhaseLock)
@@ -2228,8 +2226,7 @@
             reconnection happened and we need to restart from scratch.
             */
 
-            if ((session != null) &&
-                (session == current_session))
+            if (session != null && session == currentSession)
             {
               session.publish(msg);
               done = true;
@@ -2340,7 +2337,7 @@
         break;
       }
 
-      final int replicationServerID = rsServerId;
+      final int previousRsServerID = rsServerId;
       try
       {
         ReplicationMsg msg = savedSession.receive();
@@ -2375,7 +2372,7 @@
         {
           // RS performs a proper disconnection
           Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
-              replicationServerID, savedSession.getReadableRemoteAddress(),
+              previousRsServerID, savedSession.getReadableRemoteAddress(),
               serverId, baseDN.toNormalizedString());
           logError(message);
 
@@ -2425,13 +2422,12 @@
             {
               // Stable topology (no topo msg since few seconds): proceed with
               // best server checking.
-              ReplicationServerInfo bestServerInfo =
-                computeBestReplicationServer(false, rsServerId, state,
-                replicationServerInfos, serverId, groupId,
-                generationID);
-
-              if ((rsServerId != -1) && ((bestServerInfo == null) ||
-                (bestServerInfo.getServerId() != rsServerId)))
+              final ReplicationServerInfo bestServerInfo =
+                  computeBestReplicationServer(false, previousRsServerID, state,
+                      replicationServerInfos, serverId, groupId, generationID);
+              if (previousRsServerID != -1
+                  && (bestServerInfo == null
+                      || bestServerInfo.getServerId() != previousRsServerID))
               {
                 // The best replication server is no more the one we are
                 // currently using. Disconnect properly then reconnect.
@@ -2439,14 +2435,14 @@
                 if (bestServerInfo == null)
                 {
                   message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
-                      serverId, replicationServerID,
+                      serverId, previousRsServerID,
                       savedSession.getReadableRemoteAddress(),
                       baseDN.toNormalizedString());
                 }
                 else
                 {
                   message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
-                      serverId, replicationServerID,
+                      serverId, previousRsServerID,
                       savedSession.getReadableRemoteAddress(),
                       bestServerInfo.getServerId(),
                       baseDN.toNormalizedString());
@@ -2483,7 +2479,7 @@
           {
             // We did not initiate the close on our side, log an error message.
             Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
-                serverId, baseDN.toNormalizedString(), replicationServerID,
+                serverId, baseDN.toNormalizedString(), previousRsServerID,
                 savedSession.getReadableRemoteAddress());
             logError(message);
           }
@@ -2684,9 +2680,8 @@
    *                            requires to restart the service.
    * @param groupId            The new group id to use
    */
-  public boolean changeConfig(
-    Collection<String> replicationServers, int window, long heartbeatInterval,
-    byte groupId)
+  public boolean changeConfig(Set<String> replicationServers, int window,
+      long heartbeatInterval, byte groupId)
   {
     // These parameters needs to be renegotiated with the ReplicationServer
     // so if they have changed, that requires restarting the session with
@@ -2695,8 +2690,7 @@
     // the connection is modified
     boolean needToRestartSession =
         this.replicationServerUrls == null
-        || replicationServers.size() != this.replicationServerUrls.size()
-        || !replicationServers.containsAll(this.replicationServerUrls)
+        || !replicationServers.equals(this.replicationServerUrls)
         || window != this.maxRcvWindow
         || heartbeatInterval != this.heartbeatInterval
         || groupId != this.groupId;
@@ -2788,12 +2782,10 @@
    */
   public List<RSInfo> getRsList()
   {
-    List<RSInfo> result = new ArrayList<RSInfo>();
-
-    for (ReplicationServerInfo replicationServerInfo :
-      replicationServerInfos.values())
+    final List<RSInfo> result = new ArrayList<RSInfo>();
+    for (ReplicationServerInfo rsInfo : replicationServerInfos.values())
     {
-      result.add(replicationServerInfo.toRSInfo());
+      result.add(rsInfo.toRSInfo());
     }
     return result;
   }
@@ -2989,14 +2981,14 @@
   }
 
   /**
-   * Returns the replication monitor associated with this broker.
+   * Returns the replication monitor instance name associated with this broker.
    *
-   * @return The replication monitor.
+   * @return The replication monitor instance name.
    */
-  ReplicationMonitor getReplicationMonitor()
+  String getReplicationMonitorInstanceName()
   {
     // Only invoked by replication domain so always non-null.
-    return monitor;
+    return monitor.getMonitorInstanceName();
   }
 
   private void setSession(final Session newSession)

--
Gitblit v1.10.0