From 950cfbfa7d895b728f432500027df274d6b1d6c7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 05 Mar 2013 16:50:35 +0000
Subject: [PATCH] OPENDJ-66 (CR-1365) DS does not failover between replication servers in different groups when configured explicitly for one of the groups 

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  277 ++++++++++++++++++++++++++-----------------------------
 1 files changed, 131 insertions(+), 146 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1ac917a..487544e 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -23,17 +23,14 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2012 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
-import java.net.UnknownHostException;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.server.util.StaticUtils.collectionToString;
-import static org.opends.server.util.StaticUtils.isLocalAddress;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.server.util.StaticUtils.*;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -45,6 +42,7 @@
 import java.net.Socket;
 import java.net.SocketException;
 import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -70,9 +68,9 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.ServerConstants;
-import org.opends.server.replication.server.ReplicationServer;
 
 /**
  * The broker for Multi-master Replication.
@@ -86,7 +84,10 @@
   private static final DebugTracer TRACER = getTracer();
   private volatile boolean shutdown = false;
   private final Object startStopLock = new Object();
-  private volatile Collection<String> servers;
+  /**
+   * Replication server URLs under this format: "<code>hostname:port</code>".
+   */
+  private volatile Collection<String> replicationServerUrls;
   private volatile boolean connected = false;
   private volatile String replicationServer = "Not connected";
   private volatile ProtocolSession session = null;
@@ -101,15 +102,15 @@
   private int timeout = 0;
   private short protocolVersion;
   private ReplSessionSecurity replSessionSecurity;
-  // My group id
-  private byte groupId = (byte) -1;
-  // The group id of the RS we are connected to
-  private byte rsGroupId = (byte) -1;
-  // The server id of the RS we are connected to
+  /** My group id. */
+  private byte groupId = -1;
+  /** The group id of the RS we are connected to. */
+  private byte rsGroupId = -1;
+  /** The server id of the RS we are connected to. */
   private Integer rsServerId = -1;
-  // The server URL of the RS we are connected to
+  /** The server URL of the RS we are connected to. */
   private String rsServerUrl = null;
-  // Our replication domain
+  /** Our replication domain. */
   private ReplicationDomain domain = null;
   /**
    * This object is used as a conditional event to be notified about
@@ -182,7 +183,7 @@
    * received, it is incremented. When it reaches 2, we run the checking
    * algorithm to see if we must reconnect to another best replication server.
    * Then we reset the value to 0. But when a topology message is received, the
-   * integer is reseted to 0. This insures that we wait at least one monitoring
+   * integer is reseted to 0. This ensures that we wait at least one monitoring
    * publisher period before running the algorithm, but also that we wait at
    * least for a monitoring period after the last received topology message
    * (topology stabilization).
@@ -247,19 +248,17 @@
   /**
    * Start the ReplicationBroker.
    *
-   * @param servers list of servers used
+   * @param replicationServers list of servers used
    */
-  public void start(Collection<String> servers)
+  public void start(Collection<String> replicationServers)
   {
     synchronized (startStopLock)
     {
-      /*
-       * Open Socket to the ReplicationServer Send the Start message
-       */
+      // Open Socket to the ReplicationServer Send the Start message
       shutdown = false;
-      this.servers = servers;
+      this.replicationServerUrls = replicationServers;
 
-      if (servers.size() < 1)
+      if (this.replicationServerUrls.size() < 1)
       {
         Message message = NOTE_NEED_MORE_THAN_ONE_CHANGELOG_SERVER.get();
         logError(message);
@@ -348,7 +347,7 @@
       replicationServerInfo.setLocallyConfigured(false);
       return;
     }
-    for (String serverUrl : servers)
+    for (String serverUrl : replicationServerUrls)
     {
       if (isSameReplicationServerUrl(serverUrl, rsUrl))
       {
@@ -428,12 +427,10 @@
     }
 
     // Now compare addresses, if at least one match, this is the same server
-    for (int i = 0; i < rs1Addresses.length; i++)
+    for (InetAddress inetAddress1 : rs1Addresses)
     {
-      InetAddress inetAddress1 = rs1Addresses[i];
-      for (int j = 0; j < rs2Addresses.length; j++)
+      for (InetAddress inetAddress2 : rs2Addresses)
       {
-        InetAddress inetAddress2 = rs2Addresses[j];
         if (inetAddress2.equals(inetAddress1))
         {
           return true;
@@ -454,7 +451,7 @@
   {
     private short protocolVersion;
     private long generationId;
-    private byte groupId = (byte) -1;
+    private byte groupId = -1;
     private int serverId;
     // Received server URL
     private String serverURL;
@@ -517,8 +514,11 @@
     }
 
     /**
-     * Constructs a ReplicationServerInfo object wrapping a ReplServerStartMsg.
-     * @param replServerStartMsg The ReplServerStartMsg this object will wrap.
+     * Constructs a ReplicationServerInfo object wrapping a
+     * {@link ReplServerStartMsg}.
+     *
+     * @param replServerStartMsg
+     *          The {@link ReplServerStartMsg} this object will wrap.
      */
     private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
     {
@@ -537,9 +537,10 @@
 
     /**
      * Constructs a ReplicationServerInfo object wrapping a
-     * ReplServerStartDSMsg.
-     * @param replServerStartDSMsg The ReplServerStartDSMsg this object will
-     * wrap.
+     * {@link ReplServerStartDSMsg}.
+     *
+     * @param replServerStartDSMsg
+     *          The {@link ReplServerStartDSMsg} this object will wrap.
      */
     private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
     {
@@ -756,9 +757,11 @@
      * Returns a string representation of this object.
      * @return A string representation of this object.
      */
+    @Override
     public String toString()
     {
-      return "Url:"+ this.getServerURL() + " ServerId:" + this.serverId;
+      return "Url:" + this.serverURL + " ServerId:" + this.serverId
+          + " GroupId:" + this.groupId;
     }
   }
 
@@ -781,15 +784,14 @@
    */
   private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
   {
-
     Map<Integer, ReplicationServerInfo> rsInfos =
       new ConcurrentHashMap<Integer, ReplicationServerInfo>();
 
-    for (String server : servers)
+    for (String serverUrl : replicationServerUrls)
     {
       // Connect to server and get info about it
       ReplicationServerInfo replicationServerInfo =
-        performPhaseOneHandshake(server, false, false);
+        performPhaseOneHandshake(serverUrl, false, false);
 
       // Store server info in list
       if (replicationServerInfo != null)
@@ -802,20 +804,27 @@
   }
 
   /**
-   * Special aspects of connecting as ECL compared to connecting as data server
-   * are :
-   * - 1 single RS configured
-   * - so no choice of the preferred RS
-   * - ?? Heartbeat
-   * - Start handshake is :
+   * Special aspects of connecting as ECL (External Change Log) compared to
+   * connecting as data server are :
+   * <ul>
+   * <li>1 single RS configured</li>
+   * <li>so no choice of the preferred RS</li>
+   * <li>?? Heartbeat</li>
+   * <li>Start handshake is :
+   * 
+   * <pre>
    *    Broker ---> StartECLMsg       ---> RS
    *          <---- ReplServerStartMsg ---
    *           ---> StartSessionECLMsg --> RS
+   * </pre>
+   * 
+   * </li>
+   * </ul>
    */
   private void connectAsECL()
   {
     // FIXME:ECL List of RS to connect is for now limited to one RS only
-    String bestServer = this.servers.iterator().next();
+    String bestServer = this.replicationServerUrls.iterator().next();
 
     if (performPhaseOneHandshake(bestServer, true, true) != null)
     {
@@ -1323,17 +1332,7 @@
           localSession.close();
         }
 
-        if (socket != null)
-        {
-          try
-          {
-            socket.close();
-          }
-          catch (IOException e)
-          {
-            // Ignore.
-          }
-        }
+        close(socket);
       }
 
       if (!hasConnected && errorMessage != null)
@@ -1361,9 +1360,9 @@
 
 
   /**
-   * Performs the second phase handshake (send StartSessionMsg and receive
-   * TopologyMsg messages exchange) and return the reply message from the
-   * replication server.
+   * Performs the second phase handshake for External Change Log (send
+   * StartSessionMsg and receive TopologyMsg messages exchange) and return the
+   * reply message from the replication server.
    *
    * @param server Server we are connecting with.
    * @param initStatus The status we are starting with
@@ -1535,58 +1534,34 @@
      * - replication server in the same VM as local DS one
      */
     Map<Integer, ReplicationServerInfo> bestServers = rsInfos;
-    Map<Integer, ReplicationServerInfo> newBestServers;
     // The list of best replication servers is filtered with each criteria. At
-    // each criteria, the list is replaced with the filtered one if some there
+    // each criteria, the list is replaced with the filtered one if there
     // are some servers from the filtering, otherwise, the list is left as is
     // and the new filtering for the next criteria is applied and so on.
-    for (int filterLevel = 1; filterLevel <= 4; filterLevel++)
+
+
+    // Use only servers locally configured: those are servers declared in
+    // the local configuration. When the current method is called, for
+    // sure, at least one server from the list is locally configured
+    bestServers =
+        keepBest(filterServersLocallyConfigured(bestServers), bestServers);
+    // Some servers with same group id ?
+    bestServers =
+        keepBest(filterServersWithSameGroupId(bestServers, groupId),
+            bestServers);
+    // Some servers with same generation id ?
+    Map<Integer, ReplicationServerInfo> sameGenerationId =
+        filterServersWithSameGenerationId(bestServers, generationId);
+    if (sameGenerationId.size() > 0)
     {
-      newBestServers = null;
-      switch (filterLevel)
-      {
-        case 1:
-          // Use only servers locally configured: those are servers declared in
-          // the local configuration. When the current method is called, for
-          // sure, at least one server from the list is locally configured
-          bestServers = filterServersLocallyConfigured(bestServers);
-          break;
-        case 2:
-          // Some servers with same group id ?
-          newBestServers = filterServersWithSameGroupId(bestServers, groupId);
-          if (newBestServers.size() > 0)
-          {
-            bestServers = newBestServers;
-          }
-          break;
-        case 3:
-          // Some servers with same generation id ?
-          newBestServers = filterServersWithSameGenerationId(bestServers,
-            generationId);
-          if (newBestServers.size() > 0)
-          {
-            // Ok some servers with the right generation id
-            bestServers = newBestServers;
-            // If some servers with the right generation id this is useful to
-            // run the local DS change criteria
-            newBestServers = filterServersWithAllLocalDSChanges(bestServers,
-              myState, localServerId);
-            if (newBestServers.size() > 0)
-            {
-              bestServers = newBestServers;
-            }
-          }
-          break;
-        case 4:
-          // Some servers in the local VM ?
-          newBestServers = filterServersInSameVM(bestServers);
-          if (newBestServers.size() > 0)
-          {
-            bestServers = newBestServers;
-          }
-          break;
-      }
+      // If some servers with the right generation id this is useful to
+      // run the local DS change criteria
+      bestServers =
+          keepBest(filterServersWithAllLocalDSChanges(sameGenerationId,
+              myState, localServerId), sameGenerationId);
     }
+    // Some servers in the local VM ?
+    bestServers = keepBest(filterServersInSameVM(bestServers), bestServers);
 
     /**
      * Now apply the choice base on the weight to the best servers list
@@ -1612,6 +1587,23 @@
   }
 
   /**
+   * If the filtered Map is not empty then it is returned, else return the
+   * original unfiltered Map.
+   *
+   * @return the best fit Map between the filtered Map and the original
+   * unfiltered Map.
+   */
+  private static <K, V> Map<K, V> keepBest(Map<K, V> filteredMap,
+      Map<K, V> unfilteredMap)
+  {
+    if (!filteredMap.isEmpty())
+    {
+      return filteredMap;
+    }
+    return unfilteredMap;
+  }
+
+  /**
    * Creates a new list that contains only replication servers that are locally
    * configured.
    * @param bestServers The list of replication servers to filter
@@ -1867,24 +1859,22 @@
     // key:server id, value: distance
     Map<Integer, BigDecimal> loadDistances = new HashMap<Integer, BigDecimal>();
     // Precision for the operations (number of digits after the dot)
-    // Default value of rounding method is HALF_UP for
-    // the MathContext
-    MathContext mathContext = new MathContext(32);
+    MathContext mathContext = new MathContext(32, RoundingMode.HALF_UP);
     for (Integer rsId : bestServers.keySet())
     {
       ReplicationServerInfo replicationServerInfo = bestServers.get(rsId);
 
       int rsWeight = replicationServerInfo.getWeight();
       //  load goal = rs weight / sum of weights
-      BigDecimal loadGoalBd = (new BigDecimal(rsWeight)).divide(
-        new BigDecimal(sumOfWeights), mathContext);
+      BigDecimal loadGoalBd = BigDecimal.valueOf(rsWeight).divide(
+        BigDecimal.valueOf(sumOfWeights), mathContext);
       BigDecimal currentLoadBd = BigDecimal.ZERO;
       if (sumOfConnectedDSs != 0)
       {
         // current load = number of connected DSs / total number of DSs
         int connectedDSs = replicationServerInfo.getConnectedDSNumber();
-        currentLoadBd = (new BigDecimal(connectedDSs)).divide(
-        new BigDecimal(sumOfConnectedDSs), mathContext);
+        currentLoadBd = BigDecimal.valueOf(connectedDSs).divide(
+          BigDecimal.valueOf(sumOfConnectedDSs), mathContext);
       }
       // load distance = load goal - current load
       BigDecimal loadDistanceBd =
@@ -1916,7 +1906,7 @@
           bestRsId = rsId;
           highestDistance = loadDistance;
         }
-        if (loadDistance != (float)0)
+        if (loadDistance != 0)
         {
           allRsWithZeroDistance = false;
         }
@@ -1931,7 +1921,7 @@
       // All servers with a 0 distance ?
       if (allRsWithZeroDistance)
       {
-        // Choose server withe the highest weight
+        // Choose server with the highest weight
         bestRsId = highestWeightRsId;
       }
       return bestServers.get(bestRsId);
@@ -1942,7 +1932,7 @@
 
       float currentLoadDistance =
         loadDistances.get(currentRsServerId).floatValue();
-      if (currentLoadDistance < (float) 0)
+      if (currentLoadDistance < 0)
       {
         // Too much DSs connected to the current RS, compared with its load
         // goal:
@@ -1960,7 +1950,7 @@
           }
         }
 
-        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > (float) 0)
+        if (sumOfLoadDistancesOfOtherRSsBd.floatValue() > 0)
         {
           // The average distance of the other RSs shows a lack of DSs.
           // Compute the number of DSs to disconnect from the current RS,
@@ -1977,8 +1967,8 @@
           // current situation, otherwise the DS would keep move between the 2
           // RSs
           float notRoundedOverloadingDSsNumber = sumOfLoadDistancesOfOtherRSsBd.
-            multiply(new BigDecimal(sumOfConnectedDSs), mathContext).
-            floatValue();
+            multiply(BigDecimal.valueOf(sumOfConnectedDSs), mathContext)
+                .floatValue();
           int overloadingDSsNumber = Math.round(notRoundedOverloadingDSsNumber);
 
           // Avoid yoyo effect
@@ -1990,34 +1980,34 @@
               bestServers.get(currentRsServerId);
 
             int currentRsWeight = currentReplicationServerInfo.getWeight();
-            BigDecimal currentRsWeightBd = new BigDecimal(currentRsWeight);
-            BigDecimal sumOfWeightsBd = new BigDecimal(sumOfWeights);
+            BigDecimal currentRsWeightBd = BigDecimal.valueOf(currentRsWeight);
+            BigDecimal sumOfWeightsBd = BigDecimal.valueOf(sumOfWeights);
             BigDecimal currentRsLoadGoalBd =
               currentRsWeightBd.divide(sumOfWeightsBd, mathContext);
-            BigDecimal potentialCurrentRsNewLoadBd = new BigDecimal(0);
+            BigDecimal potentialCurrentRsNewLoadBd = BigDecimal.ZERO;
             if (sumOfConnectedDSs != 0)
             {
               int connectedDSs = currentReplicationServerInfo.
                 getConnectedDSNumber();
               BigDecimal potentialNewConnectedDSsBd =
-                new BigDecimal(connectedDSs - 1);
+                  BigDecimal.valueOf(connectedDSs - 1);
               BigDecimal sumOfConnectedDSsBd =
-                new BigDecimal(sumOfConnectedDSs);
+                  BigDecimal.valueOf(sumOfConnectedDSs);
               potentialCurrentRsNewLoadBd =
                 potentialNewConnectedDSsBd.divide(sumOfConnectedDSsBd,
-                mathContext);
+                  mathContext);
             }
             BigDecimal potentialCurrentRsNewLoadDistanceBd =
               currentRsLoadGoalBd.subtract(potentialCurrentRsNewLoadBd,
-              mathContext);
+                mathContext);
 
             // What would be the new load distance for the other RSs ?
             BigDecimal additionalDsLoadBd =
-              (new BigDecimal(1)).divide(
-              new BigDecimal(sumOfConnectedDSs), mathContext);
+                BigDecimal.ONE.divide(
+                  BigDecimal.valueOf(sumOfConnectedDSs),mathContext);
             BigDecimal potentialNewSumOfLoadDistancesOfOtherRSsBd =
               sumOfLoadDistancesOfOtherRSsBd.subtract(additionalDsLoadBd,
-              mathContext);
+                    mathContext);
 
             // Now compare both values: we must no disconnect the DS if this
             // is for going in a situation where the load distance of the other
@@ -2139,7 +2129,7 @@
     if (failingSession == session)
     {
       connected = false;
-      rsGroupId = (byte) -1;
+      rsGroupId = -1;
       rsServerId = -1;
       rsServerUrl = null;
       session = null;
@@ -2291,8 +2281,7 @@
           // connectPhaseLock because it can be blocking and we don't
           // want to hold off reconnection in case the connection dropped.
           credit =
-            currentWindowSemaphore.tryAcquire(
-            (long) 500, TimeUnit.MILLISECONDS);
+            currentWindowSemaphore.tryAcquire(500, TimeUnit.MILLISECONDS);
         } else
         {
           credit = true;
@@ -2466,14 +2455,12 @@
         {
           // This is the response to a MonitorRequest that was sent earlier or
           // the regular message of the monitoring publisher of the RS.
+          MonitorMsg monitorMsg = (MonitorMsg) msg;
 
           // Extract and store replicas ServerStates
           replicaStates = new HashMap<Integer, ServerState>();
-          MonitorMsg monitorMsg = (MonitorMsg) msg;
-          Iterator<Integer> it = monitorMsg.ldapIterator();
-          while (it.hasNext())
+          for (int srvId : toIterable(monitorMsg.ldapIterator()))
           {
-            int srvId = it.next();
             replicaStates.put(srvId, monitorMsg.getLDAPServerState(srvId));
           }
 
@@ -2485,10 +2472,8 @@
           }
 
           // Update the replication servers ServerStates with new received info
-          it = monitorMsg.rsIterator();
-          while (it.hasNext())
+          for (int srvId : toIterable(monitorMsg.rsIterator()))
           {
-            int srvId = it.next();
             ReplicationServerInfo rsInfo = replicationServerInfos.get(srvId);
             if (rsInfo != null)
             {
@@ -2662,7 +2647,7 @@
       stopRSHeartBeatMonitoring();
       stopChangeTimeHeartBeatPublishing();
       replicationServer = "stopped";
-      rsGroupId = (byte) -1;
+      rsGroupId = -1;
       rsServerId = -1;
       rsServerUrl = null;
       if (session != null)
@@ -2781,17 +2766,17 @@
 
     // A new session is necessary only when information regarding
     // the connection is modified
-    if ((servers == null) ||
-      (!(replicationServers.size() == servers.size() && replicationServers.
-      containsAll(servers))) ||
-      window != this.maxRcvWindow ||
-      heartbeatInterval != this.heartbeatInterval ||
-      (groupId != this.groupId))
+    if (this.replicationServerUrls == null
+        || replicationServers.size() != this.replicationServerUrls.size()
+        || !replicationServers.containsAll(this.replicationServerUrls)
+        || window != this.maxRcvWindow
+        || heartbeatInterval != this.heartbeatInterval
+        || groupId != this.groupId)
     {
       needToRestartSession = true;
     }
 
-    this.servers = replicationServers;
+    this.replicationServerUrls = replicationServers;
     this.rcvWindow = window;
     this.maxRcvWindow = window;
     this.halfRcvWindow = window / 2;

--
Gitblit v1.10.0