From 0e53b0680f39190c20027a489b0f862150f6d80a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 19 Dec 2013 09:24:04 +0000
Subject: [PATCH] First step towards simplifying the ReplicationBroker class.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  370 ++++++++++++++++++++++++++++------------------------
 1 files changed, 200 insertions(+), 170 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index bc298fa..adc5046 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -36,9 +36,11 @@
 import java.net.SocketTimeoutException;
 import java.util.*;
 import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
@@ -67,18 +69,95 @@
 {
 
   /**
+   * Immutable class containing information about whether the broker is
+   * connected to an RS and data associated to this connected RS.
+   * <p>
+   * Mutable methods return a new version of this object copying the data that
+   * did not change.
+   */
+  // @Immutable
+  private static final class ConnectedRS
+  {
+
+    private final String replicationServer;
+    /** The info of the RS we are connected to. */
+    private final ReplicationServerInfo rsInfo;
+    private final boolean connected;
+
+    private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo,
+        String replicationServer)
+    {
+      this.connected = connected;
+      this.rsInfo = rsInfo;
+      this.replicationServer = replicationServer;
+    }
+
+    private static ConnectedRS stopped()
+    {
+      return new ConnectedRS(false, null, "stopped");
+    }
+
+    private static ConnectedRS noConnectedRS()
+    {
+      return new ConnectedRS(false, null, NO_CONNECTED_SERVER);
+    }
+
+    /**
+     * Returns a new version of the current object with the connected status set
+     * to true.
+     */
+    private ConnectedRS setConnected()
+    {
+      return new ConnectedRS(true, rsInfo, replicationServer);
+    }
+
+    public int getServerId()
+    {
+      return rsInfo != null ? rsInfo.getServerId() : -1;
+    }
+
+    private byte getGroupId()
+    {
+      return rsInfo != null ? rsInfo.getGroupId() : -1;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      final StringBuilder sb = new StringBuilder();
+      toString(sb);
+      return sb.toString();
+    }
+
+    public void toString(StringBuilder sb)
+    {
+      sb.append("connected=").append(connected).append(", ");
+      if (rsInfo == null) // this is a null object
+      {
+        sb.append("no connected RS");
+      }
+      else
+      {
+        sb.append("connected RS(serverId=").append(rsInfo.getServerId())
+          .append(", serverUrl=").append(rsInfo.getServerURL())
+          .append(", groupId=").append(rsInfo.getGroupId())
+          .append(")");
+      }
+    }
+  }
+
+  /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
   private volatile boolean shutdown = false;
   private final Object startStopLock = new Object();
   private volatile ReplicationDomainCfg config;
-  private volatile boolean connected = false;
   /**
    * String reported under CSN=monitor when there is no connected RS.
    */
   public final static String NO_CONNECTED_SERVER = "Not connected";
-  private volatile String replicationServer = NO_CONNECTED_SERVER;
   private volatile Session session;
   private final ServerState state;
   private Semaphore sendWindow;
@@ -88,19 +167,15 @@
   private int timeout = 0;
   private short protocolVersion;
   private ReplSessionSecurity replSessionSecurity;
-  /** The group id of the RS we are connected to. */
-  private byte rsGroupId = -1;
-  /** The server id of the RS we are connected to. */
-  private int rsServerId = -1;
-  /** The server URL of the RS we are connected to. */
-  private String rsServerUrl;
+  private final AtomicReference<ConnectedRS> connectedRS =
+      new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
   /** Our replication domain. */
   private ReplicationDomain domain;
   /**
    * This object is used as a conditional event to be notified about
    * the reception of monitor information from the Replication Server.
    */
-  private final MutableBoolean monitorResponse = new MutableBoolean(false);
+  private final AtomicBoolean monitorResponse = new AtomicBoolean(false);
   /**
    * A Map containing the ServerStates of all the replicas in the topology
    * as seen by the ReplicationServer the last time it was polled or the last
@@ -217,7 +292,7 @@
     {
       shutdown = false;
       this.rcvWindow = getMaxRcvWindow();
-      connect();
+      connect(connectedRS.get());
     }
   }
 
@@ -227,7 +302,7 @@
    */
   public byte getRsGroupId()
   {
-    return rsGroupId;
+    return connectedRS.get().getGroupId();
   }
 
   /**
@@ -236,7 +311,7 @@
    */
   public int getRsServerId()
   {
-    return rsServerId;
+    return connectedRS.get().getServerId();
   }
 
   /**
@@ -287,20 +362,11 @@
   }
 
   /**
-   * Gets the server url of the RS we are connected to.
-   * @return The server url of the RS we are connected to
-   */
-  public String getRsServerUrl()
-  {
-    return rsServerUrl;
-  }
-
-  /**
    * Sets the locally configured flag for the passed ReplicationServerInfo
    * object, analyzing the local configuration.
    * @param rsInfo the Replication server to check and update
    */
-  private void updateRSInfoLocallyConfiguredStatus(ReplicationServerInfo rsInfo)
+  private void setLocallyConfiguredFlag(ReplicationServerInfo rsInfo)
   {
     // Determine if the passed ReplicationServerInfo has a URL that is present
     // in the locally configured replication servers
@@ -678,13 +744,14 @@
     }
   }
 
-  private void connect()
+  private void connect(ConnectedRS rs)
   {
     if (getBaseDN().toNormalizedString().equalsIgnoreCase(
         ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
     {
-      connectAsECL();
-    } else
+      connectAsECL(rs);
+    }
+    else
     {
       connectAsDataServer();
     }
@@ -697,8 +764,8 @@
    */
   private Map<Integer, ReplicationServerInfo> collectReplicationServersInfo()
   {
-    Map<Integer, ReplicationServerInfo> rsInfos =
-      new ConcurrentHashMap<Integer, ReplicationServerInfo>();
+    final Map<Integer, ReplicationServerInfo> rsInfos =
+        new ConcurrentSkipListMap<Integer, ReplicationServerInfo>();
 
     for (String serverUrl : getReplicationServerUrls())
     {
@@ -732,14 +799,13 @@
    * </li>
    * </ul>
    */
-  private void connectAsECL()
+  private void connectAsECL(ConnectedRS rs)
   {
     // FIXME:ECL List of RS to connect is for now limited to one RS only
-    String bestServer = getReplicationServerUrls().iterator().next();
-
+    final String bestServer = getReplicationServerUrls().iterator().next();
     if (performPhaseOneHandshake(bestServer, true, true) != null)
     {
-      performECLPhaseTwoHandshake(bestServer);
+      performECLPhaseTwoHandshake(bestServer, rs);
     }
   }
 
@@ -808,14 +874,16 @@
       // Get info from every available replication servers
       replicationServerInfos = collectReplicationServersInfo();
 
-      ReplicationServerInfo electedRsInfo = null;
-
-      if (replicationServerInfos.size() > 0)
+      if (replicationServerInfos.isEmpty())
+      {
+        connectedRS.set(ConnectedRS.noConnectedRS());
+      }
+      else
       {
         // At least one server answered, find the best one.
         RSEvaluations evals = computeBestReplicationServer(true, -1, state,
             replicationServerInfos, serverId, getGroupId(), getGenerationID());
-        electedRsInfo = evals.getBestRS();
+        ReplicationServerInfo electedRsInfo = evals.getBestRS();
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
@@ -850,43 +918,33 @@
           {
             connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
           } // Could perform handshake phase 2 with best
-
         } // Could perform handshake phase 1 with best
+      }
 
-      } // Reached some servers
-
-      // connected is set by connectToReplicationServer()
-      // and electedRsInfo isn't null then. Check anyway
-      if (electedRsInfo != null && connected)
+      final ConnectedRS rs = connectedRS.get();
+      if (rs.connected)
       {
         connectPhaseLock.notify();
 
-        if ((electedRsInfo.getGenerationId() == getGenerationID())
-            || (electedRsInfo.getGenerationId() == -1))
+        final long rsGenId = rs.rsInfo.getGenerationId();
+        final int rsServerId = rs.rsInfo.getServerId();
+        if (rsGenId == getGenerationID() || rsGenId == -1)
         {
-          Message message = NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG
-              .get(serverId, rsServerId, baseDN.toNormalizedString(),
-                  session.getReadableRemoteAddress(),
-                  getGenerationID());
-          logError(message);
-        } else
-        {
-          Message message = WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG
-              .get(serverId, rsServerId, baseDN.toNormalizedString(),
-                  session.getReadableRemoteAddress(),
-                  getGenerationID(),
-                  electedRsInfo.getGenerationId());
-          logError(message);
+          logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
+              serverId, rsServerId, baseDN.toNormalizedString(),
+              session.getReadableRemoteAddress(), getGenerationID()));
         }
-      } else
+        else
+        {
+          logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
+              serverId, rsServerId, baseDN.toNormalizedString(),
+              session.getReadableRemoteAddress(), getGenerationID(), rsGenId));
+        }
+      }
+      else
       {
-        /*
-         * This server could not find any replicationServer. It's going to start
-         * in degraded mode. Log a message.
-         */
-        connected = false;
-        replicationServer = NO_CONNECTED_SERVER;
-
+         // This server could not find any replicationServer.
+         // It's going to start in degraded mode. Log a message.
         if (!connectionError)
         {
           connectionError = true;
@@ -894,16 +952,14 @@
 
           if (replicationServerInfos.size() > 0)
           {
-            Message message = WARN_COULD_NOT_FIND_CHANGELOG.get(
+            logError(WARN_COULD_NOT_FIND_CHANGELOG.get(
                 serverId, baseDN.toNormalizedString(),
-                collectionToString(replicationServerInfos.keySet(), ", "));
-            logError(message);
+                collectionToString(replicationServerInfos.keySet(), ", ")));
           }
           else
           {
-            Message message = WARN_NO_AVAILABLE_CHANGELOGS.get(
-                serverId, baseDN.toNormalizedString());
-            logError(message);
+            logError(WARN_NO_AVAILABLE_CHANGELOGS.get(
+                serverId, baseDN.toNormalizedString()));
           }
         }
       }
@@ -925,13 +981,11 @@
   {
     final int serverId = getServerId();
     final DN baseDN = getBaseDN();
+
+    ConnectedRS rs = null;
     try
     {
-      replicationServer = session.getReadableRemoteAddress();
       maxSendWindow = rsInfo.getWindowSize();
-      rsGroupId = rsInfo.getGroupId();
-      rsServerId = rsInfo.getServerId();
-      rsServerUrl = rsInfo.getServerURL();
 
       receiveTopo(topologyMsg);
 
@@ -964,7 +1018,8 @@
       }
       sendWindow = new Semaphore(maxSendWindow);
       rcvWindow = getMaxRcvWindow();
-      connected = true;
+      rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress());
+      connectedRS.set(rs);
 
       /*
       May have created a broker with null replication domain for
@@ -977,18 +1032,17 @@
       }
 
       final byte groupId = getGroupId();
-      if (getRsGroupId() != groupId)
+      if (rs.getGroupId() != groupId)
       {
         /*
         Connected to replication server with wrong group id:
         warn user and start heartbeat monitor to recover when a server
         with the right group id shows up.
         */
-        Message message = WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
-                Byte.toString(groupId), Integer.toString(rsServerId),
-                rsInfo.getServerURL(), Byte.toString(getRsGroupId()),
-                baseDN.toNormalizedString(), Integer.toString(serverId));
-        logError(message);
+        logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
+            Byte.toString(groupId), Integer.toString(rs.getServerId()),
+            rsInfo.getServerURL(), Byte.toString(rs.getGroupId()),
+            baseDN.toNormalizedString(), Integer.toString(serverId)));
       }
       startRSHeartBeatMonitoring();
       if (rsInfo.getProtocolVersion() >=
@@ -1006,8 +1060,9 @@
     }
     finally
     {
-      if (!connected)
+      if (rs == null)
       {
+        connectedRS.set(ConnectedRS.noConnectedRS());
         setSession(null);
       }
     }
@@ -1108,8 +1163,9 @@
       boolean isSslEncryption = replSessionSecurity.isSslEncryption();
 
       // Send our ServerStartMsg.
-      String url = socket.getLocalAddress().getHostName() + ":"
-          + socket.getLocalPort();
+      final HostPort hp = new HostPort(
+          socket.getLocalAddress().getHostName(), socket.getLocalPort());
+      String url = hp.toString();
       StartMsg serverStartMsg;
       if (!isECL)
       {
@@ -1135,11 +1191,11 @@
       }
 
       // Wrap received message in a server info object
-      ReplicationServerInfo replServerInfo = ReplicationServerInfo
-          .newInstance(msg, server);
+      final ReplicationServerInfo replServerInfo =
+          ReplicationServerInfo.newInstance(msg, server);
 
       // Sanity check
-      DN repDN = replServerInfo.getBaseDN();
+      final DN repDN = replServerInfo.getBaseDN();
       if (!getBaseDN().equals(repDN))
       {
         errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
@@ -1167,7 +1223,7 @@
 
       hasConnected = true;
 
-      // If this connection as the one to use for sending and receiving
+      // If this connection is the one to use for sending and receiving
       // updates, store it.
       if (keepConnection)
       {
@@ -1180,20 +1236,17 @@
     {
       errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
           server, getBaseDN().toNormalizedString());
-      return null;
     }
     catch (SocketTimeoutException e)
     {
       errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
           server, getBaseDN().toNormalizedString());
-      return null;
     }
     catch (Exception e)
     {
       errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
           server, getBaseDN().toNormalizedString(),
           stackTraceToSingleLineString(e));
-      return null;
     }
     finally
     {
@@ -1203,26 +1256,28 @@
         close(socket);
       }
 
-      if (!hasConnected && errorMessage != null)
+      if (keepConnection && !hasConnected)
       {
-        // There was no server waiting on this host:port Log a notice and try
-        // the next replicationServer in the list
-        if (!connectionError)
-        {
-          if (keepConnection) // Log error message only for final connection
-          {
-            // the error message is only logged once to avoid overflowing
-            // the error log
-            logError(errorMessage);
-          }
+        connectedRS.set(ConnectedRS.noConnectedRS());
+      }
 
-          if (debugEnabled())
-          {
-            TRACER.debugInfo(errorMessage.toString());
-          }
+      if (!hasConnected && errorMessage != null && !connectionError)
+      {
+        // There was no server waiting on this host:port
+        // Log a notice and will try the next replicationServer in the list
+        if (keepConnection) // Log error message only for final connection
+        {
+          // log the error message only once to avoid overflowing the error log
+          logError(errorMessage);
+        }
+
+        if (debugEnabled())
+        {
+          TRACER.debugInfo(errorMessage.toString());
         }
       }
     }
+    return null;
   }
 
 
@@ -1234,7 +1289,7 @@
    *
    * @param server Server we are connecting with.
    */
-  private void performECLPhaseTwoHandshake(String server)
+  private void performECLPhaseTwoHandshake(String server, ConnectedRS rs)
   {
     try
     {
@@ -1252,14 +1307,16 @@
 
       // Alright set the timeout to the desired value
       localSession.setSoTimeout(timeout);
-      connected = true;
-    } catch (Exception e)
+      connectedRS.set(rs.setConnected());
+    }
+    catch (Exception e)
     {
       Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
           getServerId(), server, getBaseDN().toNormalizedString(),
           stackTraceToSingleLineString(e));
       logError(message);
 
+      connectedRS.set(ConnectedRS.noConnectedRS());
       setSession(null);
     }
   }
@@ -1287,8 +1344,7 @@
       // unit test purpose.
       if (domain != null)
       {
-        startSessionMsg =
-          new StartSessionMsg(
+        startSessionMsg = new StartSessionMsg(
           initStatus,
           domain.getRefUrls(),
           domain.isAssured(),
@@ -1306,9 +1362,7 @@
       final Session localSession = session;
       localSession.publish(startSessionMsg);
 
-      /*
-       * Read the TopologyMsg that should come back.
-       */
+      // Read the TopologyMsg that should come back.
       final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
 
       if (debugEnabled())
@@ -1320,13 +1374,15 @@
       // Alright set the timeout to the desired value
       localSession.setSoTimeout(timeout);
       return topologyMsg;
-    } catch (Exception e)
+    }
+    catch (Exception e)
     {
       Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
           getServerId(), server, getBaseDN().toNormalizedString(),
           stackTraceToSingleLineString(e));
       logError(message);
 
+      connectedRS.set(ConnectedRS.noConnectedRS());
       setSession(null);
 
       // Be sure to return null.
@@ -2263,13 +2319,14 @@
       numLostConnections++;
     }
 
+    ConnectedRS rs;
     if (failingSession == session)
     {
-      connected = false;
-      rsGroupId = -1;
-      rsServerId = -1;
-      rsServerUrl = null;
+      rs = ConnectedRS.noConnectedRS();
+      connectedRS.set(rs);
       setSession(null);
+    } else {
+      rs = connectedRS.get();
     }
 
     while (true)
@@ -2277,14 +2334,15 @@
       // Synchronize inside the loop in order to allow shutdown.
       synchronized (startStopLock)
       {
-        if (connected || shutdown)
+        if (rs.connected || shutdown)
         {
           break;
         }
 
         try
         {
-          connect();
+          connect(rs);
+          rs = connectedRS.get();
         }
         catch (Exception e)
         {
@@ -2295,11 +2353,10 @@
           logError(mb.toMessage());
         }
 
-        if (connected || !infiniteTry)
+        if (rs.connected || !infiniteTry)
         {
           break;
         }
-
       }
       try
       {
@@ -2313,8 +2370,8 @@
 
     if (debugEnabled())
     {
-      debugInfo("end restart : connected=" + connected + " with RS("
-          + getRsServerId() + ") genId=" + this.generationID);
+      debugInfo("end restart : connected=" + rs.connected + " with RS("
+          + rs.getServerId() + ") genId=" + generationID);
     }
   }
 
@@ -2533,7 +2590,8 @@
   {
     while (!shutdown)
     {
-      if (reconnectOnFailure && !connected)
+      final ConnectedRS rs = connectedRS.get();
+      if (reconnectOnFailure && !rs.connected)
       {
         // infinite try to reconnect
         reStart(null, true);
@@ -2550,7 +2608,7 @@
 
       final int serverId = getServerId();
       final DN baseDN = getBaseDN();
-      final int previousRsServerID = rsServerId;
+      final int previousRsServerID = rs.getServerId();
       try
       {
         ReplicationMsg msg = localSession.receive();
@@ -2786,19 +2844,15 @@
   public void stop()
   {
     if (debugEnabled())
-      debugInfo("is stopping and will close the connection to"
-          + " replication server " + rsServerId);
+      debugInfo("is stopping and will close the connection to RS("
+          + getRsServerId() + ")");
 
     synchronized (startStopLock)
     {
       shutdown = true;
-      connected = false;
       stopRSHeartBeatMonitoring();
       stopChangeTimeHeartBeatPublishing();
-      replicationServer = "stopped";
-      rsGroupId = -1;
-      rsServerId = -1;
-      rsServerUrl = null;
+      connectedRS.set(ConnectedRS.stopped());
       setSession(null);
       deregisterReplicationMonitor();
     }
@@ -2834,7 +2888,7 @@
    */
   public String getReplicationServer()
   {
-    return replicationServer;
+    return connectedRS.get().replicationServer;
   }
 
   /**
@@ -2874,7 +2928,7 @@
    */
   public int getCurrentSendWindow()
   {
-    if (connected)
+    if (isConnected())
     {
       return sendWindow.availablePermits();
     }
@@ -2934,7 +2988,7 @@
    */
   public boolean isConnected()
   {
-    return connected;
+    return connectedRS.get().connected;
   }
 
   /**
@@ -3003,7 +3057,7 @@
   {
     List<Integer> connectedDSs = new ArrayList<Integer>();
 
-    if (rsServerId == rsId)
+    if (getRsServerId() == rsId)
     {
       /*
       If we are computing connected DSs for the RS we are connected
@@ -3044,15 +3098,13 @@
     {
       int rsId = rsInfo.getId();
       rssToKeep.add(rsId); // Mark this server as still existing
-      List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+      final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
       ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
       if (rsInfo2 == null)
       {
         // New replication server, create info for it add it to the list
         rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
-        // Set the locally configured flag for this new RS only if it is
-        // configured
-        updateRSInfoLocallyConfiguredStatus(rsInfo2);
+        setLocallyConfiguredFlag(rsInfo2);
         replicationServerInfos.put(rsId, rsInfo2);
       } else
       {
@@ -3061,21 +3113,9 @@
       }
     }
 
-    /**
-     * Now remove any replication server that may have disappeared from the
-     * topology.
-     */
-    Iterator<Integer> rsInfoIt = replicationServerInfos.keySet().iterator();
-    while (rsInfoIt.hasNext())
-    {
-      final Integer rsId = rsInfoIt.next();
-      if (!rssToKeep.contains(rsId))
-      {
-        // This replication server has quit the topology, remove it from the
-        // list
-        rsInfoIt.remove();
-      }
-    }
+    // Remove any replication server that may have disappeared from the topology
+    replicationServerInfos.keySet().retainAll(rssToKeep);
+
     if (domain != null)
     {
       for (DSInfo info : dsList)
@@ -3231,18 +3271,8 @@
       .append(getServerId()).append("\",")
       .append(" groupId=").append(getGroupId())
       .append(", genId=").append(generationID)
-      .append(", connected=").append(connected).append(", ");
-    if (rsServerId == -1)
-    {
-      sb.append("no RS");
-    }
-    else
-    {
-      sb.append("bestRS(serverId=").append(rsServerId)
-        .append(", serverUrl=").append(rsServerUrl)
-        .append(", groupId=").append(rsGroupId)
-        .append(")");
-    }
+      .append(", ");
+    connectedRS.get().toString(sb);
     return sb.toString();
   }
 

--
Gitblit v1.10.0