From 09e3430331f80ba64b3318c5d315c59ed230af1f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 13:27:40 +0000
Subject: [PATCH] Avoided possible costly thread leaks in ReplicationServerDomain.

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java        |    6 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java      |   33 ++---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java            |   31 +++-
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java         |   23 +--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java  |   85 ++++++-------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java             |  126 +++++++-------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java         |    6 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java |    6 
 8 files changed, 134 insertions(+), 182 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
index 91e3679..afddeb9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/WindowProbeMsg.java
@@ -29,17 +29,16 @@
 
 import java.util.zip.DataFormatException;
 
-
 /**
- * This message is used by LDAP or Replication Server that have been
- * out of credit for a while and want to check that the remote servers.
- *
- * A sending entity that is blocked because its send window is closed
- * for a while should create such a message to check that the window
- * closure is valid.
- *
+ * This message is used by LDAP or Replication Server that have been out of
+ * credit for a while and want to check if the remote servers is able to accept
+ * more messages.
+ * <p>
+ * A sending entity that is blocked because its send window is closed for a
+ * while should create such a message to check that the window closure is valid.
+ * <p>
  * An entity that received such a message should respond with a
- * WindowUpdate message indicating the curent credit available.
+ * {@link WindowMsg} indicating the current credit available.
  */
 public class WindowProbeMsg extends ReplicationMsg
 {
@@ -72,10 +71,6 @@
   public byte[] getBytes(short protocolVersion)
   {
     // WindowProbeMsg Message only contains its type.
-
-    byte[] resultByteArray = new byte[1];
-    resultByteArray[0] = MSG_TYPE_WINDOW_PROBE;
-
-    return resultByteArray;
+    return new byte[] { MSG_TYPE_WINDOW_PROBE };
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 4b25380..d956b28 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -542,11 +542,7 @@
     if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
     {
       // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
-      startMsg = new ReplServerStartMsg(getReplicationServerId(),
-          getReplicationServerURL(), getBaseDN(), maxRcvWindow,
-          replicationServerDomain.getDbServerState(),
-          localGenerationId, sslEncryption, getLocalGroupId(),
-          replicationServer.getDegradedStatusThreshold());
+      startMsg = createReplServerStartMsg();
     }
     else
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index eae646a..2399136 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -365,11 +365,7 @@
     if (getProtocolVersion() < ProtocolVersion.REPLICATION_PROTOCOL_V4)
     {
       // Peer DS uses protocol < V4 : send it a ReplServerStartMsg
-       startMsg = new ReplServerStartMsg(getReplicationServerId(),
-           getReplicationServerURL(), getBaseDN(), maxRcvWindow,
-           replicationServerDomain.getDbServerState(),
-           localGenerationId, sslEncryption, getLocalGroupId(),
-           replicationServer.getDegradedStatusThreshold());
+      startMsg = createReplServerStartMsg();
     }
     else
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
index eb678b0..1a5e7bf 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MonitoringPublisher.java
@@ -91,8 +91,7 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("Monitoring publisher starting for dn "
-          + replicationServerDomain.getBaseDn());
+      TRACER.debugInfo(getMessage("Monitoring publisher starting."));
     }
 
     try
@@ -134,16 +133,12 @@
     }
     catch (InterruptedException e)
     {
-      TRACER.debugInfo("Monitoring publisher for dn "
-          + replicationServerDomain.getBaseDn()
-          + " in RS " + replicationServerDomain.getLocalRSServerId()
-          + " has been interrupted while sleeping.");
+      TRACER.debugInfo(getMessage(
+          "Monitoring publisher has been interrupted while sleeping."));
     }
 
     done = true;
-    TRACER.debugInfo("Monitoring publisher for dn "
-        + replicationServerDomain.getBaseDn() + " is terminated."
-        + " This is in RS " + replicationServerDomain.getLocalRSServerId());
+    TRACER.debugInfo(getMessage("Monitoring publisher is terminated."));
   }
 
 
@@ -160,9 +155,7 @@
 
       if (debugEnabled())
       {
-        TRACER.debugInfo("Shutting down monitoring publisher for dn "
-            + replicationServerDomain.getBaseDn()
-            + " in RS " + replicationServerDomain.getLocalRSServerId());
+        TRACER.debugInfo(getMessage("Shutting down monitoring publisher."));
       }
     }
   }
@@ -183,9 +176,7 @@
         n++;
         if (n >= FACTOR)
         {
-          TRACER.debugInfo("Interrupting monitoring publisher for dn " +
-            replicationServerDomain.getBaseDn() + " in RS " +
-            replicationServerDomain.getLocalRSServerId());
+          TRACER.debugInfo(getMessage("Interrupting monitoring publisher."));
           interrupt();
         }
       }
@@ -203,11 +194,17 @@
   {
     if (debugEnabled())
     {
-      TRACER.debugInfo("Monitoring publisher for dn " +
-        replicationServerDomain.getBaseDn() +
-        " changing period value to " + period);
+      TRACER.debugInfo(getMessage(
+          "Monitoring publisher changing period value to " + period));
     }
 
     this.period = period;
   }
+
+  private String getMessage(String message)
+  {
+    return "In RS " + replicationServerDomain.getLocalRSServerId()
+        + ", for base dn " + replicationServerDomain.getBaseDn() + ": "
+        + message;
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 099f608..5b8859a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -34,6 +34,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.opends.messages.Category;
@@ -75,17 +76,22 @@
 public class ReplicationServerDomain extends MonitorProvider<MonitorProviderCfg>
 {
   private final String baseDn;
+
   /**
    * The Status analyzer that periodically verifies whether the connected DSs
-   * are late.
+   * are late. Using an AtomicReference to avoid leaking references to costly
+   * threads.
    */
-  private StatusAnalyzer statusAnalyzer = null;
+  private AtomicReference<StatusAnalyzer> statusAnalyzer =
+      new AtomicReference<StatusAnalyzer>();
 
   /**
    * The monitoring publisher that periodically sends monitoring messages to the
-   * topology.
+   * topology. Using an AtomicReference to avoid leaking references to costly
+   * threads.
    */
-  private MonitoringPublisher monitoringPublisher = null;
+  private AtomicReference<MonitoringPublisher> monitoringPublisher =
+      new AtomicReference<MonitoringPublisher>();
 
   /**
    * The following map contains one balanced tree for each replica ID to which
@@ -2853,14 +2859,15 @@
    */
   public void startStatusAnalyzer()
   {
-    if (!isRunningStatusAnalyzer())
-    {
-      int degradedStatusThreshold =
+    int degradedStatusThreshold =
         localReplicationServer.getDegradedStatusThreshold();
-      if (degradedStatusThreshold > 0) // 0 means no status analyzer
+    if (degradedStatusThreshold > 0) // 0 means no status analyzer
+    {
+      final StatusAnalyzer thread =
+          new StatusAnalyzer(this, degradedStatusThreshold);
+      if (statusAnalyzer.compareAndSet(null, thread))
       {
-        statusAnalyzer = new StatusAnalyzer(this, degradedStatusThreshold);
-        statusAnalyzer.start();
+        thread.start();
       }
     }
   }
@@ -2870,35 +2877,26 @@
    */
   private void stopStatusAnalyzer()
   {
-    if (isRunningStatusAnalyzer())
+    final StatusAnalyzer thread = statusAnalyzer.get();
+    if (statusAnalyzer.compareAndSet(thread, null))
     {
-      statusAnalyzer.shutdown();
-      statusAnalyzer.waitForShutdown();
-      statusAnalyzer = null;
+      thread.shutdown();
+      thread.waitForShutdown();
     }
   }
 
   /**
-   * Tests if the status analyzer for this domain is running.
-   * @return True if the status analyzer is running, false otherwise.
-   */
-  private boolean isRunningStatusAnalyzer()
-  {
-    return statusAnalyzer != null;
-  }
-
-  /**
    * Starts the monitoring publisher for the domain if not already started.
    */
   public void startMonitoringPublisher()
   {
-    if (!isRunningMonitoringPublisher())
+    long period = localReplicationServer.getMonitoringPublisherPeriod();
+    if (period > 0) // 0 means no monitoring publisher
     {
-      long period = localReplicationServer.getMonitoringPublisherPeriod();
-      if (period > 0) // 0 means no monitoring publisher
+      final MonitoringPublisher thread = new MonitoringPublisher(this, period);
+      if (monitoringPublisher.compareAndSet(null, thread))
       {
-        monitoringPublisher = new MonitoringPublisher(this, period);
-        monitoringPublisher.start();
+        thread.start();
       }
     }
   }
@@ -2908,24 +2906,15 @@
    */
   private void stopMonitoringPublisher()
   {
-    if (isRunningMonitoringPublisher())
+    final MonitoringPublisher thread = monitoringPublisher.get();
+    if (monitoringPublisher.compareAndSet(thread, null))
     {
-      monitoringPublisher.shutdown();
-      monitoringPublisher.waitForShutdown();
-      monitoringPublisher = null;
+      thread.shutdown();
+      thread.waitForShutdown();
     }
   }
 
   /**
-   * Tests if the monitoring publisher for this domain is running.
-   * @return True if the monitoring publisher is running, false otherwise.
-   */
-  private boolean isRunningMonitoringPublisher()
-  {
-    return monitoringPublisher != null;
-  }
-
-  /**
    * {@inheritDoc}
    */
   @Override
@@ -3360,10 +3349,13 @@
     {
       // Requested to stop analyzers
       stopStatusAnalyzer();
+      return;
     }
-    else if (isRunningStatusAnalyzer())
+
+    final StatusAnalyzer saThread = statusAnalyzer.get();
+    if (saThread != null) // it is running
     {
-      statusAnalyzer.setDegradedStatusThreshold(degradedStatusThreshold);
+      saThread.setDegradedStatusThreshold(degradedStatusThreshold);
     }
     else if (getConnectedDSs().size() > 0)
     {
@@ -3384,10 +3376,13 @@
     {
       // Requested to stop monitoring publishers
       stopMonitoringPublisher();
+      return;
     }
-    else if (isRunningMonitoringPublisher())
+
+    final MonitoringPublisher mpThread = monitoringPublisher.get();
+    if (mpThread != null) // it is running
     {
-      monitoringPublisher.setPeriod(period);
+      mpThread.setPeriod(period);
     }
     else if (getConnectedDSs().size() > 0 || getConnectedRSs().size() > 0)
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 1753dd6..94334d1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -113,11 +113,7 @@
    */
   private ReplServerStartMsg sendStartToRemote() throws IOException
   {
-    ReplServerStartMsg outReplServerStartMsg = new ReplServerStartMsg(
-        getReplicationServerId(), getReplicationServerURL(), getBaseDN(),
-        maxRcvWindow, replicationServerDomain.getDbServerState(),
-        localGenerationId, sslEncryption,
-        getLocalGroupId(), replicationServer.getDegradedStatusThreshold());
+    ReplServerStartMsg outReplServerStartMsg = createReplServerStartMsg();
     send(outReplServerStartMsg);
     return outReplServerStartMsg;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 339ce80..18a0593 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -155,12 +155,12 @@
 
   // window
   private int rcvWindow;
-  private int rcvWindowSizeHalf;
+  private final int rcvWindowSizeHalf;
 
   /**
    * The size of the receiving window.
    */
-  protected int maxRcvWindow;
+  protected final int maxRcvWindow;
   /**
    * Semaphore that the writer uses to control the flow to the remote server.
    */
@@ -288,7 +288,7 @@
    *
    * @throws IOException when the session becomes unavailable.
    */
-  public synchronized void decAndCheckWindow() throws IOException
+  private synchronized void decAndCheckWindow() throws IOException
   {
     rcvWindow--;
     checkWindow();
@@ -318,8 +318,7 @@
    * and monitoring system.
    * @throws DirectoryException When an exception is raised.
    */
-  protected void finalizeStart()
-  throws DirectoryException
+  protected void finalizeStart() throws DirectoryException
   {
     // FIXME:ECL We should refactor so that a SH always have a session
     if (session != null)
@@ -906,11 +905,10 @@
   /**
    * Process the reception of a WindowProbeMsg message.
    *
-   * @param  windowProbeMsg The message to process.
-   *
-   * @throws IOException    When the session becomes unavailable.
+   * @throws IOException
+   *           When the session becomes unavailable.
    */
-  public void process(WindowProbeMsg windowProbeMsg) throws IOException
+  public void replyToWindowProbe() throws IOException
   {
     if (rcvWindow > 0)
     {
@@ -1250,6 +1248,7 @@
    */
   public void put(UpdateMsg update) throws IOException
   {
+    decAndCheckWindow();
     if (replicationServerDomain!=null)
       replicationServerDomain.put(update, this);
   }
@@ -1262,4 +1261,18 @@
     if (replicationServerDomain!=null)
       replicationServerDomain.stopServer(this, false);
   }
+
+  /**
+   * Creates a ReplServerStartMsg for the current ServerHandler.
+   *
+   * @return a new ReplServerStartMsg for the current ServerHandler.
+   */
+  protected ReplServerStartMsg createReplServerStartMsg()
+  {
+    return new ReplServerStartMsg(getReplicationServerId(),
+        getReplicationServerURL(), getBaseDN(), maxRcvWindow,
+        replicationServerDomain.getDbServerState(), localGenerationId,
+        sslEncryption, getLocalGroupId(),
+        replicationServer.getDegradedStatusThreshold());
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 82dc898..2a33972 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -27,11 +27,6 @@
  */
 package org.opends.server.replication.server;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
 import java.io.IOException;
 
 import org.opends.messages.Message;
@@ -40,6 +35,12 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.*;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.util.StaticUtils.*;
+
 /**
  * This class implement the part of the replicationServer that is reading
  * the connection from the LDAP servers to get all the updates that
@@ -74,7 +75,7 @@
   public ServerReader(Session session, ServerHandler handler)
   {
     super("Replication server RS(" + handler.getReplicationServerId()
-        + ") reading from " + handler.toString() + " at "
+        + ") reading from " + handler + " at "
         + session.getReadableRemoteAddress());
     this.session = session;
     this.handler = handler;
@@ -90,7 +91,7 @@
     Message errMessage = null;
     if (debugEnabled())
     {
-      TRACER.debugInfo(this.getName() + " starting");
+      TRACER.debugInfo(getName() + " starting");
     }
     /*
      * wait on input stream
@@ -110,13 +111,14 @@
 
           if (msg instanceof AckMsg)
           {
-            AckMsg ack = (AckMsg) msg;
             handler.checkWindow();
-            handler.processAck(ack);
+            handler.processAck((AckMsg) msg);
           } else if (msg instanceof UpdateMsg)
           {
+            UpdateMsg updateMsg = (UpdateMsg) msg;
+
             boolean filtered = false;
-            /* Ignore updates in some cases */
+            // Ignore updates in some cases
             if (handler.isDataServer())
             {
               /**
@@ -133,22 +135,22 @@
                * better performances in normal mode (most of the time).
                */
               ServerStatus dsStatus = handler.getStatus();
-              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
-                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+              if (dsStatus == BAD_GEN_ID_STATUS
+                  || dsStatus == FULL_UPDATE_STATUS)
               {
                 long referenceGenerationId = handler.getReferenceGenId();
-                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+                if (dsStatus == BAD_GEN_ID_STATUS)
                   logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                       handler.getReplicationServerId(),
-                      ((UpdateMsg) msg).getChangeNumber().toString(),
+                      updateMsg.getChangeNumber().toString(),
                       handler.getBaseDN(), handler.getServerId(),
                       session.getReadableRemoteAddress(),
                       handler.getGenerationId(),
                       referenceGenerationId));
-                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+                if (dsStatus == FULL_UPDATE_STATUS)
                   logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                       handler.getReplicationServerId(),
-                      ((UpdateMsg) msg).getChangeNumber().toString(),
+                      updateMsg.getChangeNumber().toString(),
                       handler.getBaseDN(), handler.getServerId(),
                       session.getReadableRemoteAddress()));
                 filtered = true;
@@ -159,14 +161,14 @@
                * Ignore updates from RS with bad gen id
                * (no system managed status for a RS)
                */
-              long referenceGenerationId =handler.getReferenceGenId();
-              if ((referenceGenerationId > 0) &&
-                (referenceGenerationId != handler.getGenerationId()))
+              long referenceGenerationId = handler.getReferenceGenId();
+              if (referenceGenerationId > 0
+                  && referenceGenerationId != handler.getGenerationId())
               {
                 logError(
                     WARN_IGNORING_UPDATE_FROM_RS.get(
                         handler.getReplicationServerId(),
-                        ((UpdateMsg) msg).getChangeNumber().toString(),
+                        updateMsg.getChangeNumber().toString(),
                         handler.getBaseDN(),
                         handler.getServerId(),
                         session.getReadableRemoteAddress(),
@@ -178,53 +180,24 @@
 
             if (!filtered)
             {
-              UpdateMsg update = (UpdateMsg) msg;
-              handler.decAndCheckWindow();
-              handler.put(update);
+              handler.put(updateMsg);
             }
           } else if (msg instanceof WindowMsg)
           {
-            WindowMsg windowMsg = (WindowMsg) msg;
-            handler.updateWindow(windowMsg);
-          } else if (msg instanceof InitializeRequestMsg)
+            handler.updateWindow((WindowMsg) msg);
+          } else if (msg instanceof RoutableMsg)
           {
-            InitializeRequestMsg initializeMsg =
-              (InitializeRequestMsg) msg;
-            handler.process(initializeMsg);
-          } else if (msg instanceof InitializeRcvAckMsg)
-          {
-            InitializeRcvAckMsg initializeRcvAckMsg =
-              (InitializeRcvAckMsg) msg;
-            handler.process(initializeRcvAckMsg);
-          } else if (msg instanceof InitializeTargetMsg)
-          {
-            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
-            handler.process(initializeMsg);
-          } else if (msg instanceof EntryMsg)
-          {
-            EntryMsg entryMsg = (EntryMsg) msg;
-            handler.process(entryMsg);
-          } else if (msg instanceof DoneMsg)
-          {
-            DoneMsg doneMsg = (DoneMsg) msg;
-            handler.process(doneMsg);
-          } else if (msg instanceof ErrorMsg)
-          {
-            ErrorMsg errorMsg = (ErrorMsg) msg;
-            handler.process(errorMsg);
+            handler.process((RoutableMsg) msg);
           } else if (msg instanceof ResetGenerationIdMsg)
           {
-            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
-            handler.processResetGenId(genIdMsg);
+            handler.processResetGenId((ResetGenerationIdMsg) msg);
           } else if (msg instanceof WindowProbeMsg)
           {
-            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
-            handler.process(windowProbeMsg);
+            handler.replyToWindowProbe();
           } else if (msg instanceof TopologyMsg)
           {
-            TopologyMsg topoMsg = (TopologyMsg) msg;
             ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
-            rsh.receiveTopoInfoFromRS(topoMsg);
+            rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
           } else if (msg instanceof ChangeStatusMsg)
           {
             ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
@@ -242,28 +215,18 @@
                     csMsg.toString());
               logError(errMessage);
             }
-          } else if (msg instanceof MonitorRequestMsg)
-          {
-            MonitorRequestMsg replServerMonitorRequestMsg =
-              (MonitorRequestMsg) msg;
-            handler.process(replServerMonitorRequestMsg);
-          } else if (msg instanceof MonitorMsg)
-          {
-            MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
-            handler.process(replServerMonitorMsg);
           } else if (msg instanceof ChangeTimeHeartbeatMsg)
           {
-            ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
-            handler.process(cthbMsg);
+            handler.process((ChangeTimeHeartbeatMsg) msg);
           } else if (msg instanceof StopMsg)
           {
             // Peer server is properly disconnecting: go out of here to
             // properly close the server handler going to finally block.
             if (debugEnabled())
             {
-              TRACER.debugInfo(handler.toString() + " has properly " +
-                "disconnected from this replication server " +
-                Integer.toString(handler.getReplicationServerId()));
+              TRACER.debugInfo(handler
+                  + " has properly disconnected from this replication server "
+                  + handler.getReplicationServerId());
             }
             return;
           } else if (msg == null)
@@ -281,9 +244,7 @@
           // Received a V1 PDU we do not need to support:
           // we just trash the message and log the event for debug purpose,
           // then continue receiving messages.
-          if (debugEnabled())
-            TRACER.debugInfo(
-                "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+          logException(e);
         }
       }
     }
@@ -294,9 +255,7 @@
        * Log a message and exit from this loop
        * So that this handler is stopped.
        */
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+      logException(e);
       if (!handler.shuttingDown())
       {
         if (handler.isDataServer())
@@ -316,9 +275,7 @@
     }
     catch (Exception e)
     {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+      logException(e);
       /*
        * The remote server has sent an unknown message,
        * close the connection.
@@ -334,14 +291,21 @@
        */
       if (debugEnabled())
       {
-        TRACER.debugInfo("In " + this.getName() + " closing the session");
+        TRACER.debugInfo("In " + getName() + " closing the session");
       }
       session.close();
       handler.doStop();
       if (debugEnabled())
       {
-        TRACER.debugInfo(this.getName() + " stopped " + errMessage);
+        TRACER.debugInfo(getName() + " stopped: " + errMessage);
       }
     }
   }
+
+  private void logException(Exception e)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+          "In " + getName() + " " + stackTraceToSingleLineString(e));
+  }
 }

--
Gitblit v1.10.0