From e13d1429c75364a349dee5d7f8703593fa0adf4f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 19 Dec 2013 16:31:41 +0000
Subject: [PATCH] Second step in simplifying the ReplicationBroker class.

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  628 +++++++++++++++++++++++++++-----------------------------
 1 files changed, 301 insertions(+), 327 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index adc5046..3e464a0 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -71,44 +71,43 @@
   /**
    * Immutable class containing information about whether the broker is
    * connected to an RS and data associated to this connected RS.
-   * <p>
-   * Mutable methods return a new version of this object copying the data that
-   * did not change.
    */
   // @Immutable
   private static final class ConnectedRS
   {
 
-    private final String replicationServer;
+    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
+        NO_CONNECTED_SERVER);
+
     /** The info of the RS we are connected to. */
     private final ReplicationServerInfo rsInfo;
-    private final boolean connected;
+    private final Session session;
+    private final String replicationServer;
 
-    private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo,
-        String replicationServer)
+    private ConnectedRS(String replicationServer)
     {
-      this.connected = connected;
-      this.rsInfo = rsInfo;
+      this.rsInfo = null;
+      this.session = null;
       this.replicationServer = replicationServer;
     }
 
+    private ConnectedRS(ReplicationServerInfo rsInfo, Session session)
+    {
+      this.rsInfo = rsInfo;
+      this.session = session;
+      this.replicationServer = session != null ?
+          session.getReadableRemoteAddress()
+          : NO_CONNECTED_SERVER;
+    }
+
     private static ConnectedRS stopped()
     {
-      return new ConnectedRS(false, null, "stopped");
+      return new ConnectedRS("stopped");
     }
 
     private static ConnectedRS noConnectedRS()
     {
-      return new ConnectedRS(false, null, NO_CONNECTED_SERVER);
-    }
-
-    /**
-     * Returns a new version of the current object with the connected status set
-     * to true.
-     */
-    private ConnectedRS setConnected()
-    {
-      return new ConnectedRS(true, rsInfo, replicationServer);
+      return NO_CONNECTED_RS;
     }
 
     public int getServerId()
@@ -121,6 +120,11 @@
       return rsInfo != null ? rsInfo.getGroupId() : -1;
     }
 
+    private boolean isConnected()
+    {
+      return session != null;
+    }
+
     /** {@inheritDoc} */
     @Override
     public String toString()
@@ -132,19 +136,20 @@
 
     public void toString(StringBuilder sb)
     {
-      sb.append("connected=").append(connected).append(", ");
-      if (rsInfo == null) // this is a null object
+      sb.append("connected=").append(isConnected()).append(", ");
+      if (!isConnected())
       {
-        sb.append("no connected RS");
+        sb.append("no connectedRS");
       }
       else
       {
-        sb.append("connected RS(serverId=").append(rsInfo.getServerId())
+        sb.append("connectedRS(serverId=").append(rsInfo.getServerId())
           .append(", serverUrl=").append(rsInfo.getServerURL())
           .append(", groupId=").append(rsInfo.getGroupId())
           .append(")");
       }
     }
+
   }
 
   /**
@@ -158,18 +163,27 @@
    * String reported under CSN=monitor when there is no connected RS.
    */
   public final static String NO_CONNECTED_SERVER = "Not connected";
-  private volatile Session session;
   private final ServerState state;
   private Semaphore sendWindow;
   private int maxSendWindow;
   private int rcvWindow = 100;
   private int halfRcvWindow = rcvWindow / 2;
   private int timeout = 0;
-  private short protocolVersion;
   private ReplSessionSecurity replSessionSecurity;
+  /**
+   * The RS this DS is currently connected to.
+   * <p>
+   * Always use {@link #setConnectedRS(ConnectedRS)} to set a new
+   * connected RS.
+   */
+  // @NotNull // for the reference
   private final AtomicReference<ConnectedRS> connectedRS =
       new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
-  /** Our replication domain. */
+  /**
+   * Our replication domain.
+   * <p>
+   * Can be null for unit test purpose.
+   */
   private ReplicationDomain domain;
   /**
    * This object is used as a conditional event to be notified about
@@ -242,11 +256,12 @@
   private int mustRunBestServerCheckingAlgorithm = 0;
 
   /**
-   * The monitor provider for this replication domain. The name of the monitor
-   * includes the local address and must therefore be re-registered every time
-   * the session is re-established or destroyed. The monitor provider can only
-   * be created (i.e. non-null) if there is a replication domain, which is not
-   * the case in unit tests.
+   * The monitor provider for this replication domain.
+   * <p>
+   * The name of the monitor includes the local address and must therefore be
+   * re-registered every time the session is re-established or destroyed. The
+   * monitor provider can only be created (i.e. non-null) if there is a
+   * replication domain, which is not the case in unit tests.
    */
   private final ReplicationMonitor monitor;
 
@@ -268,7 +283,6 @@
     this.domain = replicationDomain;
     this.state = state;
     this.config = config;
-    this.protocolVersion = ProtocolVersion.getCurrentVersion();
     this.replSessionSecurity = replSessionSecurity;
     this.generationID = generationId;
     this.rcvWindow = getMaxRcvWindow();
@@ -292,7 +306,7 @@
     {
       shutdown = false;
       this.rcvWindow = getMaxRcvWindow();
-      connect(connectedRS.get());
+      connect();
     }
   }
 
@@ -385,7 +399,7 @@
       {
         // This RS is locally configured, mark this
         rsInfo.setLocallyConfigured(true);
-        rsInfo.serverURL = serverUrl;
+        rsInfo.setServerURL(serverUrl);
         return;
       }
     }
@@ -425,22 +439,16 @@
    */
   public static class ReplicationServerInfo
   {
+    private RSInfo rsInfo;
     private short protocolVersion;
-    private long generationId;
-    private byte groupId = -1;
-    private int serverId;
-    /** Received server URL. */
-    private String serverURL;
     private DN baseDN;
     private int windowSize;
     private ServerState serverState;
     private boolean sslEncryption;
-    private int degradedStatusThreshold = -1;
-    /** Keeps the 1 value if created with a ReplServerStartMsg. */
-    private int weight = 1;
+    private final int degradedStatusThreshold;
     /** Keeps the 0 value if created with a ReplServerStartMsg. */
     private int connectedDSNumber = 0;
-    private List<Integer> connectedDSs;
+    private Set<Integer> connectedDSs;
     /**
      * Is this RS locally configured? (the RS is recognized as a usable server).
      */
@@ -458,8 +466,8 @@
     public static ReplicationServerInfo newInstance(
       ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
     {
-      ReplicationServerInfo rsInfo = newInstance(msg);
-      rsInfo.serverURL = newServerURL;
+      final ReplicationServerInfo rsInfo = newInstance(msg);
+      rsInfo.setServerURL(newServerURL);
       return rsInfo;
     }
 
@@ -471,70 +479,62 @@
      * @throws IllegalArgumentException If the passed message has an unexpected
      *                                  type.
      */
-    public static ReplicationServerInfo newInstance(
-      ReplicationMsg msg) throws IllegalArgumentException
+    public static ReplicationServerInfo newInstance(ReplicationMsg msg)
+        throws IllegalArgumentException
     {
       if (msg instanceof ReplServerStartMsg)
       {
-        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
-        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
-        return new ReplicationServerInfo(replServerStartMsg);
-      } else if (msg instanceof ReplServerStartDSMsg)
+        // RS uses protocol V3 or lower
+        return new ReplicationServerInfo((ReplServerStartMsg) msg);
+      }
+      else if (msg instanceof ReplServerStartDSMsg)
       {
-        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
-        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
-        return new ReplicationServerInfo(replServerStartDSMsg);
+        // RS uses protocol V4 or higher
+        return new ReplicationServerInfo((ReplServerStartDSMsg) msg);
       }
 
       // Unsupported message type: should not happen
-      throw new IllegalArgumentException("Unexpected PDU type: " +
-        msg.getClass().getName() + " :\n" + msg);
+      throw new IllegalArgumentException("Unexpected PDU type: "
+          + msg.getClass().getName() + " :\n" + msg);
     }
 
     /**
      * Constructs a ReplicationServerInfo object wrapping a
      * {@link ReplServerStartMsg}.
      *
-     * @param replServerStartMsg
+     * @param msg
      *          The {@link ReplServerStartMsg} this object will wrap.
      */
-    private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
+    private ReplicationServerInfo(ReplServerStartMsg msg)
     {
-      this.protocolVersion = replServerStartMsg.getVersion();
-      this.generationId = replServerStartMsg.getGenerationId();
-      this.groupId = replServerStartMsg.getGroupId();
-      this.serverId = replServerStartMsg.getServerId();
-      this.serverURL = replServerStartMsg.getServerURL();
-      this.baseDN = replServerStartMsg.getBaseDN();
-      this.windowSize = replServerStartMsg.getWindowSize();
-      this.serverState = replServerStartMsg.getServerState();
-      this.sslEncryption = replServerStartMsg.getSSLEncryption();
-      this.degradedStatusThreshold =
-        replServerStartMsg.getDegradedStatusThreshold();
+      this.protocolVersion = msg.getVersion();
+      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
+          msg.getGenerationId(), msg.getGroupId(), 1);
+      this.baseDN = msg.getBaseDN();
+      this.windowSize = msg.getWindowSize();
+      this.serverState = msg.getServerState();
+      this.sslEncryption = msg.getSSLEncryption();
+      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
     }
 
     /**
      * Constructs a ReplicationServerInfo object wrapping a
      * {@link ReplServerStartDSMsg}.
      *
-     * @param replServerStartDSMsg
+     * @param msg
      *          The {@link ReplServerStartDSMsg} this object will wrap.
      */
-    private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
+    private ReplicationServerInfo(ReplServerStartDSMsg msg)
     {
-      this.protocolVersion = replServerStartDSMsg.getVersion();
-      this.generationId = replServerStartDSMsg.getGenerationId();
-      this.groupId = replServerStartDSMsg.getGroupId();
-      this.serverId = replServerStartDSMsg.getServerId();
-      this.serverURL = replServerStartDSMsg.getServerURL();
-      this.baseDN = replServerStartDSMsg.getBaseDN();
-      this.windowSize = replServerStartDSMsg.getWindowSize();
-      this.serverState = replServerStartDSMsg.getServerState();
-      this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
-      this.degradedStatusThreshold =
-        replServerStartDSMsg.getDegradedStatusThreshold();
-      this.weight = replServerStartDSMsg.getWeight();
-      this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
+      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
+          msg.getGenerationId(), msg.getGroupId(), msg.getWeight());
+      this.protocolVersion = msg.getVersion();
+      this.baseDN = msg.getBaseDN();
+      this.windowSize = msg.getWindowSize();
+      this.serverState = msg.getServerState();
+      this.sslEncryption = msg.getSSLEncryption();
+      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
+      this.connectedDSNumber = msg.getConnectedDSNumber();
     }
 
     /**
@@ -552,7 +552,7 @@
      */
     public byte getGroupId()
     {
-      return groupId;
+      return rsInfo.getGroupId();
     }
 
     /**
@@ -570,7 +570,7 @@
      */
     public long getGenerationId()
     {
-      return generationId;
+      return rsInfo.getGenerationId();
     }
 
     /**
@@ -579,7 +579,7 @@
      */
     public int getServerId()
     {
-      return serverId;
+      return rsInfo.getId();
     }
 
     /**
@@ -588,7 +588,7 @@
      */
     public String getServerURL()
     {
-      return serverURL;
+      return rsInfo.getServerUrl();
     }
 
     /**
@@ -635,7 +635,7 @@
      */
     public int getWeight()
     {
-      return weight;
+      return rsInfo.getWeight();
     }
 
     /**
@@ -654,15 +654,13 @@
      * @param rsInfo The RSinfo to use for the update
      * @param connectedDSs The new connected DSs
      */
-    public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
+    public ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
     {
-      this.serverId = rsInfo.getId();
-      this.serverURL = rsInfo.getServerUrl();
-      this.generationId = rsInfo.getGenerationId();
-      this.groupId = rsInfo.getGroupId();
-      this.weight = rsInfo.getWeight();
+      this.rsInfo = new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(),
+          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
       this.connectedDSs = connectedDSs;
       this.connectedDSNumber = connectedDSs.size();
+      this.degradedStatusThreshold = -1;
       this.serverState = new ServerState();
     }
 
@@ -672,7 +670,7 @@
      */
     public RSInfo toRSInfo()
     {
-      return new RSInfo(serverId, serverURL, generationId, groupId, weight);
+      return rsInfo;
     }
 
     /**
@@ -681,15 +679,20 @@
      * @param rsInfo The RSinfo to use for the update
      * @param connectedDSs The new connected DSs
      */
-    public void update(RSInfo rsInfo, List<Integer> connectedDSs)
+    public void update(RSInfo rsInfo, Set<Integer> connectedDSs)
     {
-      this.generationId = rsInfo.getGenerationId();
-      this.groupId = rsInfo.getGroupId();
-      this.weight = rsInfo.getWeight();
+      this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
+          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
       this.connectedDSs = connectedDSs;
       this.connectedDSNumber = connectedDSs.size();
     }
 
+    private void setServerURL(String newServerURL)
+    {
+      rsInfo = new RSInfo(rsInfo.getId(), newServerURL,
+          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
+    }
+
     /**
      * Updates replication server info with the passed server state.
      * @param serverState The ServerState to use for the update
@@ -699,7 +702,8 @@
       if (this.serverState != null)
       {
         this.serverState.update(serverState);
-      } else
+      }
+      else
       {
         this.serverState = serverState;
       }
@@ -709,7 +713,7 @@
      * Get the getConnectedDSs.
      * @return the getConnectedDSs
      */
-    public List<Integer> getConnectedDSs()
+    public Set<Integer> getConnectedDSs()
     {
       return connectedDSs;
     }
@@ -739,17 +743,17 @@
     @Override
     public String toString()
     {
-      return "Url:" + this.serverURL + " ServerId:" + this.serverId
-          + " GroupId:" + this.groupId;
+      return "Url:" + getServerURL() + " ServerId:" + getServerId()
+          + " GroupId:" + getGroupId();
     }
   }
 
-  private void connect(ConnectedRS rs)
+  private void connect()
   {
     if (getBaseDN().toNormalizedString().equalsIgnoreCase(
         ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
     {
-      connectAsECL(rs);
+      connectAsECL();
     }
     else
     {
@@ -770,8 +774,8 @@
     for (String serverUrl : getReplicationServerUrls())
     {
       // Connect to server + get and store info about it
-      ReplicationServerInfo rsInfo =
-          performPhaseOneHandshake(serverUrl, false, false);
+      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
+      final ReplicationServerInfo rsInfo = rs.rsInfo;
       if (rsInfo != null)
       {
         rsInfos.put(rsInfo.getServerId(), rsInfo);
@@ -799,13 +803,14 @@
    * </li>
    * </ul>
    */
-  private void connectAsECL(ConnectedRS rs)
+  private void connectAsECL()
   {
     // FIXME:ECL List of RS to connect is for now limited to one RS only
-    final String bestServer = getReplicationServerUrls().iterator().next();
-    if (performPhaseOneHandshake(bestServer, true, true) != null)
+    final String bestServerURL = getReplicationServerUrls().iterator().next();
+    final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
+    if (rs.isConnected())
     {
-      performECLPhaseTwoHandshake(bestServer, rs);
+      performECLPhaseTwoHandshake(bestServerURL, rs);
     }
   }
 
@@ -836,10 +841,6 @@
    */
   private void connectAsDataServer()
   {
-    /*
-    May have created a broker with null replication domain for
-    unit test purpose.
-    */
     if (domain != null)
     {
       /*
@@ -876,22 +877,22 @@
 
       if (replicationServerInfos.isEmpty())
       {
-        connectedRS.set(ConnectedRS.noConnectedRS());
+        setConnectedRS(ConnectedRS.noConnectedRS());
       }
       else
       {
         // At least one server answered, find the best one.
         RSEvaluations evals = computeBestReplicationServer(true, -1, state,
             replicationServerInfos, serverId, getGroupId(), getGenerationID());
-        ReplicationServerInfo electedRsInfo = evals.getBestRS();
 
         // Best found, now initialize connection to this one (handshake phase 1)
         if (debugEnabled())
           debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
-              + electedRsInfo);
-        electedRsInfo = performPhaseOneHandshake(
-          electedRsInfo.getServerURL(), true, false);
+              + evals.getBestRS());
 
+        final ConnectedRS electedRS = performPhaseOneHandshake(
+            evals.getBestRS().getServerURL(), true, false);
+        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
         if (electedRsInfo != null)
         {
           /*
@@ -904,25 +905,24 @@
           // Handshake phase 1 exchange went well
 
           // Compute in which status we are starting the session to tell the RS
-          ServerStatus initStatus =
-            computeInitialServerStatus(electedRsInfo.getGenerationId(),
-            electedRsInfo.getServerState(),
-            electedRsInfo.getDegradedStatusThreshold(),
-            getGenerationID());
+          final ServerStatus initStatus = computeInitialServerStatus(
+              electedRsInfo.getGenerationId(), electedRsInfo.getServerState(),
+              electedRsInfo.getDegradedStatusThreshold(), getGenerationID());
 
           // Perform session start (handshake phase 2)
-          TopologyMsg topologyMsg = performPhaseTwoHandshake(
-            electedRsInfo.getServerURL(), initStatus);
+          final TopologyMsg topologyMsg =
+              performPhaseTwoHandshake(electedRS, initStatus);
 
           if (topologyMsg != null) // Handshake phase 2 exchange went well
           {
-            connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
+            connectToReplicationServer(electedRS, initStatus, topologyMsg);
           } // Could perform handshake phase 2 with best
         } // Could perform handshake phase 1 with best
       }
 
+      // connectedRS has been updated by calls above, reload it
       final ConnectedRS rs = connectedRS.get();
-      if (rs.connected)
+      if (rs.isConnected())
       {
         connectPhaseLock.notify();
 
@@ -932,13 +932,13 @@
         {
           logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
               serverId, rsServerId, baseDN.toNormalizedString(),
-              session.getReadableRemoteAddress(), getGenerationID()));
+              rs.replicationServer, getGenerationID()));
         }
         else
         {
           logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
               serverId, rsServerId, baseDN.toNormalizedString(),
-              session.getReadableRemoteAddress(), getGenerationID(), rsGenId));
+              rs.replicationServer, getGenerationID(), rsGenId));
         }
       }
       else
@@ -969,29 +969,28 @@
   /**
    * Connects to a replication server.
    *
-   * @param rsInfo
+   * @param rs
    *          the Replication Server to connect to
    * @param initStatus
    *          The status to enter the state machine with
    * @param topologyMsg
    *          the message containing the topology information
    */
-  private void connectToReplicationServer(ReplicationServerInfo rsInfo,
+  private void connectToReplicationServer(ConnectedRS rs,
       ServerStatus initStatus, TopologyMsg topologyMsg)
   {
-    final int serverId = getServerId();
     final DN baseDN = getBaseDN();
+    final ReplicationServerInfo rsInfo = rs.rsInfo;
 
-    ConnectedRS rs = null;
+    boolean connectSuccessful = false;
     try
     {
       maxSendWindow = rsInfo.getWindowSize();
 
-      receiveTopo(topologyMsg);
+      receiveTopo(topologyMsg, rs.getServerId());
 
       /*
-      Log a message to let the administrator know that the failure
-      was resolved.
+      Log a message to let the administrator know that the failure was resolved.
       Wake up all the thread that were waiting on the window
       on the previous connection.
       */
@@ -1018,17 +1017,11 @@
       }
       sendWindow = new Semaphore(maxSendWindow);
       rcvWindow = getMaxRcvWindow();
-      rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress());
-      connectedRS.set(rs);
 
-      /*
-      May have created a broker with null replication domain for
-      unit test purpose.
-      */
       if (domain != null)
       {
-        domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
-            .getGenerationId(), session);
+        domain.sessionInitiated(initStatus, rsInfo.getServerState(),
+            rsInfo.getGenerationId(), rs.session);
       }
 
       final byte groupId = getGroupId();
@@ -1042,28 +1035,28 @@
         logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
             Byte.toString(groupId), Integer.toString(rs.getServerId()),
             rsInfo.getServerURL(), Byte.toString(rs.getGroupId()),
-            baseDN.toNormalizedString(), Integer.toString(serverId)));
+            baseDN.toNormalizedString(), Integer.toString(getServerId())));
       }
-      startRSHeartBeatMonitoring();
+      startRSHeartBeatMonitoring(rs);
       if (rsInfo.getProtocolVersion() >=
         ProtocolVersion.REPLICATION_PROTOCOL_V3)
       {
-        startChangeTimeHeartBeatPublishing();
+        startChangeTimeHeartBeatPublishing(rs);
       }
+      setConnectedRS(rs);
+      connectSuccessful = true;
     }
     catch (Exception e)
     {
-      Message message = ERR_COMPUTING_FAKE_OPS.get(
+      logError(ERR_COMPUTING_FAKE_OPS.get(
           baseDN.toNormalizedString(), rsInfo.getServerURL(),
-          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
-      logError(message);
+          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)));
     }
     finally
     {
-      if (rs == null)
+      if (!connectSuccessful)
       {
-        connectedRS.set(ConnectedRS.noConnectedRS());
-        setSession(null);
+        setConnectedRS(ConnectedRS.noConnectedRS());
       }
     }
   }
@@ -1133,9 +1126,9 @@
    * messages exchange) and return the reply message from the replication
    * server, wrapped in a ReplicationServerInfo object.
    *
-   * @param server
+   * @param serverURL
    *          Server to connect to.
-   * @param keepConnection
+   * @param keepSession
    *          Do we keep session opened or not after handshake. Use true if want
    *          to perform handshake phase 2 with the same session and keep the
    *          session to create as the current one.
@@ -1143,10 +1136,10 @@
    *          Indicates whether or not the an ECL handshake is to be performed.
    * @return The answer from the server . Null if could not get an answer.
    */
-  private ReplicationServerInfo performPhaseOneHandshake(
-      String server, boolean keepConnection, boolean isECL)
+  private ConnectedRS performPhaseOneHandshake(String serverURL,
+      boolean keepSession, boolean isECL)
   {
-    Session localSession = null;
+    Session newSession = null;
     Socket socket = null;
     boolean hasConnected = false;
     Message errorMessage = null;
@@ -1158,15 +1151,16 @@
       socket.setReceiveBufferSize(1000000);
       socket.setTcpNoDelay(true);
       int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
-      socket.connect(HostPort.valueOf(server).toInetSocketAddress(), timeoutMS);
-      localSession = replSessionSecurity.createClientSession(socket, timeoutMS);
+      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
+          timeoutMS);
+      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
       boolean isSslEncryption = replSessionSecurity.isSslEncryption();
 
       // Send our ServerStartMsg.
       final HostPort hp = new HostPort(
           socket.getLocalAddress().getHostName(), socket.getLocalPort());
-      String url = hp.toString();
-      StartMsg serverStartMsg;
+      final String url = hp.toString();
+      final StartMsg serverStartMsg;
       if (!isECL)
       {
         serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
@@ -1179,11 +1173,11 @@
             getMaxRcvWindow(), config.getHeartbeatInterval(), state,
             getGenerationID(), isSslEncryption, getGroupId());
       }
-      localSession.publish(serverStartMsg);
+      newSession.publish(serverStartMsg);
 
       // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
       // come back.
-      ReplicationMsg msg = localSession.receive();
+      ReplicationMsg msg = newSession.receive();
       if (debugEnabled())
       {
         debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
@@ -1192,7 +1186,7 @@
 
       // Wrap received message in a server info object
       final ReplicationServerInfo replServerInfo =
-          ReplicationServerInfo.newInstance(msg, server);
+          ReplicationServerInfo.newInstance(msg, serverURL);
 
       // Sanity check
       final DN repDN = replServerInfo.getBaseDN();
@@ -1200,7 +1194,7 @@
       {
         errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
             repDN.toNormalizedString(), getBaseDN().toNormalizedString());
-        return null;
+        return setConnectedRS(ConnectedRS.noConnectedRS());
       }
 
       /*
@@ -1208,64 +1202,53 @@
        * replication server will use the same one (or an older one if it is an
        * old replication server).
        */
-      final short localProtocolVersion = getCompatibleVersion(replServerInfo
-          .getProtocolVersion());
-      if (keepConnection)
-      {
-        protocolVersion = localProtocolVersion;
-      }
-      localSession.setProtocolVersion(localProtocolVersion);
+      newSession.setProtocolVersion(
+          getCompatibleVersion(replServerInfo.getProtocolVersion()));
 
       if (!isSslEncryption)
       {
-        localSession.stopEncryption();
+        newSession.stopEncryption();
       }
 
       hasConnected = true;
 
-      // If this connection is the one to use for sending and receiving
-      // updates, store it.
-      if (keepConnection)
+      if (keepSession)
       {
-        setSession(localSession);
+        // cannot store it yet,
+        // only store after a successful phase two handshake
+        return new ConnectedRS(replServerInfo, newSession);
       }
-
-      return replServerInfo;
+      return new ConnectedRS(replServerInfo, null);
     }
     catch (ConnectException e)
     {
       errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
-          server, getBaseDN().toNormalizedString());
+          serverURL, getBaseDN().toNormalizedString());
     }
     catch (SocketTimeoutException e)
     {
       errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
-          server, getBaseDN().toNormalizedString());
+          serverURL, getBaseDN().toNormalizedString());
     }
     catch (Exception e)
     {
       errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
-          server, getBaseDN().toNormalizedString(),
+          serverURL, getBaseDN().toNormalizedString(),
           stackTraceToSingleLineString(e));
     }
     finally
     {
-      if (!hasConnected || !keepConnection)
+      if (!hasConnected || !keepSession)
       {
-        close(localSession);
+        close(newSession);
         close(socket);
       }
 
-      if (keepConnection && !hasConnected)
-      {
-        connectedRS.set(ConnectedRS.noConnectedRS());
-      }
-
       if (!hasConnected && errorMessage != null && !connectionError)
       {
         // There was no server waiting on this host:port
         // Log a notice and will try the next replicationServer in the list
-        if (keepConnection) // Log error message only for final connection
+        if (keepSession) // Log error message only for final connection
         {
           // log the error message only once to avoid overflowing the error log
           logError(errorMessage);
@@ -1277,7 +1260,7 @@
         }
       }
     }
-    return null;
+    return setConnectedRS(ConnectedRS.noConnectedRS());
   }
 
 
@@ -1294,10 +1277,9 @@
     try
     {
       // Send our Start Session
-      StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
+      final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
       startECLSessionMsg.setOperationId("-1");
-      final Session localSession = session;
-      localSession.publish(startECLSessionMsg);
+      rs.session.publish(startECLSessionMsg);
 
       // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
       if (debugEnabled())
@@ -1306,18 +1288,17 @@
       }
 
       // Alright set the timeout to the desired value
-      localSession.setSoTimeout(timeout);
-      connectedRS.set(rs.setConnected());
+      rs.session.setSoTimeout(timeout);
+      setConnectedRS(rs);
     }
     catch (Exception e)
     {
-      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
+      logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
           getServerId(), server, getBaseDN().toNormalizedString(),
-          stackTraceToSingleLineString(e));
-      logError(message);
+          stackTraceToSingleLineString(e)));
 
-      connectedRS.set(ConnectedRS.noConnectedRS());
-      setSession(null);
+      rs.session.close();
+      setConnectedRS(ConnectedRS.noConnectedRS());
     }
   }
 
@@ -1326,22 +1307,18 @@
    * TopologyMsg messages exchange) and return the reply message from the
    * replication server.
    *
-   * @param server Server we are connecting with.
+   * @param electedRS Server we are connecting with.
    * @param initStatus The status we are starting with
    * @return The ReplServerStartMsg the server replied. Null if could not
    *         get an answer.
    */
-  private TopologyMsg performPhaseTwoHandshake(String server,
+  private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS,
     ServerStatus initStatus)
   {
     try
     {
-      /*
-       * Send our StartSessionMsg.
-       */
-      StartSessionMsg startSessionMsg;
-      // May have created a broker with null replication domain for
-      // unit test purpose.
+      // Send our StartSessionMsg.
+      final StartSessionMsg startSessionMsg;
       if (domain != null)
       {
         startSessionMsg = new StartSessionMsg(
@@ -1359,11 +1336,11 @@
         startSessionMsg =
           new StartSessionMsg(initStatus, new ArrayList<String>());
       }
-      final Session localSession = session;
-      localSession.publish(startSessionMsg);
+      final Session session = electedRS.session;
+      session.publish(startSessionMsg);
 
       // Read the TopologyMsg that should come back.
-      final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
+      final TopologyMsg topologyMsg = (TopologyMsg) session.receive();
 
       if (debugEnabled())
       {
@@ -1372,20 +1349,16 @@
       }
 
       // Alright set the timeout to the desired value
-      localSession.setSoTimeout(timeout);
+      session.setSoTimeout(timeout);
       return topologyMsg;
     }
     catch (Exception e)
     {
-      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
-          getServerId(), server, getBaseDN().toNormalizedString(),
-          stackTraceToSingleLineString(e));
-      logError(message);
+      logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
+          getServerId(), electedRS.rsInfo.getServerURL(),
+          getBaseDN().toNormalizedString(), stackTraceToSingleLineString(e)));
 
-      connectedRS.set(ConnectedRS.noConnectedRS());
-      setSession(null);
-
-      // Be sure to return null.
+      setConnectedRS(ConnectedRS.noConnectedRS());
       return null;
     }
   }
@@ -2272,14 +2245,13 @@
   /**
    * Start the heartbeat monitor thread.
    */
-  private void startRSHeartBeatMonitoring()
+  private void startRSHeartBeatMonitoring(ConnectedRS rs)
   {
-    // Start a heartbeat monitor thread.
     final long heartbeatInterval = config.getHeartbeatInterval();
     if (heartbeatInterval > 0)
     {
-      heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
-          getBaseDN().toNormalizedString(), session, heartbeatInterval);
+      heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(),
+          getBaseDN().toNormalizedString(), rs.session, heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
@@ -2302,7 +2274,7 @@
    */
   public void reStart(boolean infiniteTry)
   {
-    reStart(session, infiniteTry);
+    reStart(connectedRS.get().session, infiniteTry);
   }
 
   /**
@@ -2319,14 +2291,10 @@
       numLostConnections++;
     }
 
-    ConnectedRS rs;
-    if (failingSession == session)
+    ConnectedRS rs = connectedRS.get();
+    if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS()))
     {
-      rs = ConnectedRS.noConnectedRS();
-      connectedRS.set(rs);
-      setSession(null);
-    } else {
-      rs = connectedRS.get();
+      rs = setConnectedRS(ConnectedRS.noConnectedRS());
     }
 
     while (true)
@@ -2334,14 +2302,14 @@
       // Synchronize inside the loop in order to allow shutdown.
       synchronized (startStopLock)
       {
-        if (rs.connected || shutdown)
+        if (rs.isConnected() || shutdown)
         {
           break;
         }
 
         try
         {
-          connect(rs);
+          connect();
           rs = connectedRS.get();
         }
         catch (Exception e)
@@ -2353,7 +2321,7 @@
           logError(mb.toMessage());
         }
 
-        if (rs.connected || !infiniteTry)
+        if (rs.isConnected() || !infiniteTry)
         {
           break;
         }
@@ -2362,7 +2330,7 @@
       {
           Thread.sleep(500);
       }
-      catch (InterruptedException e)
+      catch (InterruptedException ignored)
       {
         // ignore
       }
@@ -2370,7 +2338,7 @@
 
     if (debugEnabled())
     {
-      debugInfo("end restart : connected=" + rs.connected + " with RS("
+      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
           + rs.getServerId() + ") genId=" + generationID);
     }
   }
@@ -2451,7 +2419,7 @@
         Semaphore currentWindowSemaphore;
         synchronized (connectPhaseLock)
         {
-          currentSession = session;
+          currentSession = connectedRS.get().session;
           currentWindowSemaphore = sendWindow;
         }
 
@@ -2491,10 +2459,10 @@
             Check the session. If it has changed, some disconnection or
             reconnection happened and we need to restart from scratch.
             */
-            final Session localSession = session;
-            if (localSession != null && session == currentSession)
+            final Session session = connectedRS.get().session;
+            if (session != null && session == currentSession)
             {
-              localSession.publish(msg);
+              session.publish(msg);
               done = true;
             }
           }
@@ -2509,17 +2477,20 @@
             window update message was lost somehow...
             then loop to check again if connection was closed.
             */
-            Session localSession = session;
-            if (localSession != null)
+            Session session = connectedRS.get().session;
+            if (session != null)
             {
-              localSession.publish(new WindowProbeMsg());
+              session.publish(new WindowProbeMsg());
             }
           }
         }
-      } catch (IOException e)
+      }
+      catch (IOException e)
       {
         if (!retryOnFailure)
+        {
           return false;
+        }
 
         // The receive threads should handle reconnection or
         // mark this broker in error. Just retry.
@@ -2590,17 +2561,17 @@
   {
     while (!shutdown)
     {
-      final ConnectedRS rs = connectedRS.get();
-      if (reconnectOnFailure && !rs.connected)
+      ConnectedRS rs = connectedRS.get();
+      if (reconnectOnFailure && !rs.isConnected())
       {
         // infinite try to reconnect
         reStart(null, true);
+        continue;
       }
 
       // Save session information for later in case we need it for log messages
       // after the session has been closed and/or failed.
-      final Session localSession = session;
-      if (localSession == null)
+      if (rs.session == null)
       {
         // Must be shutting down.
         break;
@@ -2611,7 +2582,7 @@
       final int previousRsServerID = rs.getServerId();
       try
       {
-        ReplicationMsg msg = localSession.receive();
+        ReplicationMsg msg = rs.session.receive();
         if (msg instanceof UpdateMsg)
         {
           synchronized (this)
@@ -2621,13 +2592,13 @@
         }
         if (msg instanceof WindowMsg)
         {
-          WindowMsg windowMsg = (WindowMsg) msg;
+          final WindowMsg windowMsg = (WindowMsg) msg;
           sendWindow.release(windowMsg.getNumAck());
         }
         else if (msg instanceof TopologyMsg)
         {
-          TopologyMsg topoMsg = (TopologyMsg) msg;
-          receiveTopo(topoMsg);
+          final TopologyMsg topoMsg = (TopologyMsg) msg;
+          receiveTopo(topoMsg, getRsServerId());
           if (reconnectToTheBestRS)
           {
             // Reset wait time before next computation of best server
@@ -2636,19 +2607,20 @@
 
           // Caller wants to check what's changed
           if (returnOnTopoChange)
+          {
             return msg;
-
+          }
         }
         else if (msg instanceof StopMsg)
         {
           // RS performs a proper disconnection
           Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
-              previousRsServerID, localSession.getReadableRemoteAddress(),
+              previousRsServerID, rs.replicationServer,
               serverId, baseDN.toNormalizedString());
           logError(message);
 
           // Try to find a suitable RS
-          reStart(localSession, true);
+          reStart(rs.session, true);
         }
         else if (msg instanceof MonitorMsg)
         {
@@ -2709,16 +2681,14 @@
                 if (bestServerInfo == null)
                 {
                   message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
-                      serverId, previousRsServerID,
-                      localSession.getReadableRemoteAddress(),
+                      serverId, previousRsServerID, rs.replicationServer,
                       baseDN.toNormalizedString());
                 }
                 else
                 {
                   final int bestRsServerId = bestServerInfo.getServerId();
                   message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
-                      serverId, previousRsServerID,
-                      localSession.getReadableRemoteAddress(),
+                      serverId, previousRsServerID, rs.replicationServer,
                       bestRsServerId,
                       baseDN.toNormalizedString(),
                       evals.getEvaluation(previousRsServerID).toString(),
@@ -2754,24 +2724,20 @@
 
         if (!shutdown)
         {
-          final Session tmpSession = session;
-          if (tmpSession == null || !tmpSession.closeInitiated())
+          if (rs.session == null || !rs.session.closeInitiated())
           {
             // We did not initiate the close on our side, log an error message.
-            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
+            logError(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                 serverId, baseDN.toNormalizedString(), previousRsServerID,
-                localSession.getReadableRemoteAddress());
-            logError(message);
+                rs.replicationServer));
           }
 
-          if (reconnectOnFailure)
-          {
-            reStart(localSession, true);
-          }
-          else
+          if (!reconnectOnFailure)
           {
             break; // does not seem necessary to explicitly disconnect ..
           }
+
+          reStart(rs.session, true);
         }
       }
     } // while !shutdown
@@ -2824,10 +2790,10 @@
     try
     {
       updateDoneCount++;
-      final Session localSession = session;
-      if (updateDoneCount >= halfRcvWindow && localSession != null)
+      final Session session = connectedRS.get().session;
+      if (updateDoneCount >= halfRcvWindow && session != null)
       {
-        localSession.publish(new WindowMsg(updateDoneCount));
+        session.publish(new WindowMsg(updateDoneCount));
         rcvWindow += updateDoneCount;
         updateDoneCount = 0;
       }
@@ -2850,10 +2816,9 @@
     synchronized (startStopLock)
     {
       shutdown = true;
+      setConnectedRS(ConnectedRS.stopped());
       stopRSHeartBeatMonitoring();
       stopChangeTimeHeartBeatPublishing();
-      connectedRS.set(ConnectedRS.stopped());
-      setSession(null);
       deregisterReplicationMonitor();
     }
   }
@@ -2872,10 +2837,10 @@
   public void setSoTimeout(int timeout) throws SocketException
   {
     this.timeout = timeout;
-    final Session localSession = session;
-    if (localSession != null)
+    final Session session = connectedRS.get().session;
+    if (session != null)
     {
-      localSession.setSoTimeout(timeout);
+      session.setSoTimeout(timeout);
     }
   }
 
@@ -2977,7 +2942,12 @@
    */
   public short getProtocolVersion()
   {
-    return protocolVersion;
+    final Session session = connectedRS.get().session;
+    if (session != null)
+    {
+      return session.getProtocolVersion();
+    }
+    return ProtocolVersion.getCurrentVersion();
   }
 
   /**
@@ -2988,7 +2958,7 @@
    */
   public boolean isConnected()
   {
-    return connectedRS.get().connected;
+    return connectedRS.get().isConnected();
   }
 
   /**
@@ -2997,8 +2967,8 @@
    */
   public boolean isSessionEncrypted()
   {
-    final Session tmp = session;
-    return tmp != null ? tmp.isEncrypted() : false;
+    final Session session = connectedRS.get().session;
+    return session != null ? session.isEncrypted() : false;
   }
 
   /**
@@ -3009,9 +2979,8 @@
   {
     try
     {
-      ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
-        newStatus);
-      session.publish(csMsg);
+      connectedRS.get().session.publish(
+          new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus));
     } catch (IOException ex)
     {
       Message message = ERR_EXCEPTION_SENDING_CS.get(
@@ -3051,13 +3020,14 @@
    * Computes the list of DSs connected to a particular RS.
    * @param rsId The RS id of the server one wants to know the connected DSs
    * @param dsList The list of DSinfo from which to compute things
+   * @param rsServerId the serverId to use for the connectedDS
    * @return The list of connected DSs to the server rsId
    */
-  private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
+  private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
+      int rsServerId)
   {
-    List<Integer> connectedDSs = new ArrayList<Integer>();
-
-    if (getRsServerId() == rsId)
+    final Set<Integer> connectedDSs = new HashSet<Integer>();
+    if (rsServerId == rsId)
     {
       /*
       If we are computing connected DSs for the RS we are connected
@@ -3071,7 +3041,9 @@
     for (DSInfo dsInfo : dsList)
     {
       if (dsInfo.getRsId() == rsId)
+      {
         connectedDSs.add(dsInfo.getDsId());
+      }
     }
 
     return connectedDSs;
@@ -3081,9 +3053,12 @@
    * Processes an incoming TopologyMsg.
    * Updates the structures for the local view of the topology.
    *
-   * @param topoMsg The topology information received from RS.
+   * @param topoMsg
+   *          The topology information received from RS.
+   * @param rsServerId
+   *          the serverId to use for the connectedDS
    */
-  public void receiveTopo(TopologyMsg topoMsg)
+  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
   {
     if (debugEnabled())
       debugInfo("receive TopologyMsg=" + topoMsg);
@@ -3096,9 +3071,9 @@
     final Set<Integer> rssToKeep = new HashSet<Integer>();
     for (RSInfo rsInfo : topoMsg.getRsList())
     {
-      int rsId = rsInfo.getId();
+      final int rsId = rsInfo.getId();
       rssToKeep.add(rsId); // Mark this server as still existing
-      final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
+      Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
       ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
       if (rsInfo2 == null)
       {
@@ -3106,7 +3081,8 @@
         rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
         setLocallyConfiguredFlag(rsInfo2);
         replicationServerInfos.put(rsId, rsInfo2);
-      } else
+      }
+      else
       {
         // Update the existing info for the replication server
         rsInfo2.update(rsInfo, connectedDSs);
@@ -3140,20 +3116,18 @@
   /**
    * Starts publishing to the RS the current timestamp used in this server.
    */
-  private void startChangeTimeHeartBeatPublishing()
+  private void startChangeTimeHeartBeatPublishing(ConnectedRS rs)
   {
     // Start a CSN heartbeat thread.
     long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
     if (changeTimeHeartbeatInterval > 0)
     {
-      final Session localSession = session;
       final String threadName = "Replica DS(" + getServerId()
-          + ") change time heartbeat publisher for domain \""
-          + getBaseDN() + "\" to RS(" + getRsServerId()
-          + ") at " + localSession.getReadableRemoteAddress();
+              + ") change time heartbeat publisher for domain \"" + getBaseDN()
+              + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer;
 
       ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
-          threadName, localSession, changeTimeHeartbeatInterval, getServerId());
+          threadName, rs.session, changeTimeHeartbeatInterval, getServerId());
       ctHeartbeatPublisherThread.start();
     }
     else
@@ -3206,8 +3180,8 @@
    */
   String getLocalUrl()
   {
-    final Session tmp = session;
-    return tmp != null ? tmp.getLocalUrl() : "";
+    final Session session = connectedRS.get().session;
+    return session != null ? session.getLocalUrl() : "";
   }
 
   /**
@@ -3221,28 +3195,30 @@
     return monitor.getMonitorInstanceName();
   }
 
-  private void setSession(final Session newSession)
+  private ConnectedRS setConnectedRS(final ConnectedRS newRS)
   {
-    // De-register the monitor with the old name.
-    deregisterReplicationMonitor();
-
-    final Session oldSession = session;
-    if (oldSession != null)
+    final ConnectedRS oldRS = connectedRS.getAndSet(newRS);
+    if (!oldRS.equals(newRS) && oldRS.session != null)
     {
-      oldSession.close();
+      // monitor name is changing, deregister before registering again
+      deregisterReplicationMonitor();
+      oldRS.session.close();
+      registerReplicationMonitor();
     }
-    session = newSession;
-
-    // Re-register the monitor with the new name.
-    registerReplicationMonitor();
+    return newRS;
   }
 
+  /**
+   * Must be invoked each time the session changes because, the monitor name is
+   * dynamically created with the session name, while monitor registration is
+   * static.
+   *
+   * @see #monitor
+   */
   private void registerReplicationMonitor()
   {
-    /*
-     * The monitor should not be registered if this is a unit test because the
-     * replication domain is null.
-     */
+    // The monitor should not be registered if this is a unit test
+    // because the replication domain is null.
     if (monitor != null)
     {
       DirectoryServer.registerMonitorProvider(monitor);
@@ -3251,10 +3227,8 @@
 
   private void deregisterReplicationMonitor()
   {
-    /*
-     * The monitor should not be deregistered if this is a unit test because the
-     * replication domain is null.
-     */
+    // The monitor should not be deregistered if this is a unit test
+    // because the replication domain is null.
     if (monitor != null)
     {
       DirectoryServer.deregisterMonitorProvider(monitor);

--
Gitblit v1.10.0