From c7077670daca3b689ed75e4bf71dad0483af8473 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 19 Aug 2013 13:27:40 +0000
Subject: [PATCH] Avoided possible costly thread leaks in ReplicationServerDomain.

---
 opends/src/server/org/opends/server/replication/server/ServerReader.java |  126 +++++++++++++++---------------------------
 1 files changed, 45 insertions(+), 81 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 82dc898..2a33972 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -27,11 +27,6 @@
  */
 package org.opends.server.replication.server;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
 import java.io.IOException;
 
 import org.opends.messages.Message;
@@ -40,6 +35,12 @@
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.*;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
+import static org.opends.server.util.StaticUtils.*;
+
 /**
  * This class implement the part of the replicationServer that is reading
  * the connection from the LDAP servers to get all the updates that
@@ -74,7 +75,7 @@
   public ServerReader(Session session, ServerHandler handler)
   {
     super("Replication server RS(" + handler.getReplicationServerId()
-        + ") reading from " + handler.toString() + " at "
+        + ") reading from " + handler + " at "
         + session.getReadableRemoteAddress());
     this.session = session;
     this.handler = handler;
@@ -90,7 +91,7 @@
     Message errMessage = null;
     if (debugEnabled())
     {
-      TRACER.debugInfo(this.getName() + " starting");
+      TRACER.debugInfo(getName() + " starting");
     }
     /*
      * wait on input stream
@@ -110,13 +111,14 @@
 
           if (msg instanceof AckMsg)
           {
-            AckMsg ack = (AckMsg) msg;
             handler.checkWindow();
-            handler.processAck(ack);
+            handler.processAck((AckMsg) msg);
           } else if (msg instanceof UpdateMsg)
           {
+            UpdateMsg updateMsg = (UpdateMsg) msg;
+
             boolean filtered = false;
-            /* Ignore updates in some cases */
+            // Ignore updates in some cases
             if (handler.isDataServer())
             {
               /**
@@ -133,22 +135,22 @@
                * better performances in normal mode (most of the time).
                */
               ServerStatus dsStatus = handler.getStatus();
-              if ((dsStatus == ServerStatus.BAD_GEN_ID_STATUS) ||
-                (dsStatus == ServerStatus.FULL_UPDATE_STATUS))
+              if (dsStatus == BAD_GEN_ID_STATUS
+                  || dsStatus == FULL_UPDATE_STATUS)
               {
                 long referenceGenerationId = handler.getReferenceGenId();
-                if (dsStatus == ServerStatus.BAD_GEN_ID_STATUS)
+                if (dsStatus == BAD_GEN_ID_STATUS)
                   logError(WARN_IGNORING_UPDATE_FROM_DS_BADGENID.get(
                       handler.getReplicationServerId(),
-                      ((UpdateMsg) msg).getChangeNumber().toString(),
+                      updateMsg.getChangeNumber().toString(),
                       handler.getBaseDN(), handler.getServerId(),
                       session.getReadableRemoteAddress(),
                       handler.getGenerationId(),
                       referenceGenerationId));
-                if (dsStatus == ServerStatus.FULL_UPDATE_STATUS)
+                if (dsStatus == FULL_UPDATE_STATUS)
                   logError(WARN_IGNORING_UPDATE_FROM_DS_FULLUP.get(
                       handler.getReplicationServerId(),
-                      ((UpdateMsg) msg).getChangeNumber().toString(),
+                      updateMsg.getChangeNumber().toString(),
                       handler.getBaseDN(), handler.getServerId(),
                       session.getReadableRemoteAddress()));
                 filtered = true;
@@ -159,14 +161,14 @@
                * Ignore updates from RS with bad gen id
                * (no system managed status for a RS)
                */
-              long referenceGenerationId =handler.getReferenceGenId();
-              if ((referenceGenerationId > 0) &&
-                (referenceGenerationId != handler.getGenerationId()))
+              long referenceGenerationId = handler.getReferenceGenId();
+              if (referenceGenerationId > 0
+                  && referenceGenerationId != handler.getGenerationId())
               {
                 logError(
                     WARN_IGNORING_UPDATE_FROM_RS.get(
                         handler.getReplicationServerId(),
-                        ((UpdateMsg) msg).getChangeNumber().toString(),
+                        updateMsg.getChangeNumber().toString(),
                         handler.getBaseDN(),
                         handler.getServerId(),
                         session.getReadableRemoteAddress(),
@@ -178,53 +180,24 @@
 
             if (!filtered)
             {
-              UpdateMsg update = (UpdateMsg) msg;
-              handler.decAndCheckWindow();
-              handler.put(update);
+              handler.put(updateMsg);
             }
           } else if (msg instanceof WindowMsg)
           {
-            WindowMsg windowMsg = (WindowMsg) msg;
-            handler.updateWindow(windowMsg);
-          } else if (msg instanceof InitializeRequestMsg)
+            handler.updateWindow((WindowMsg) msg);
+          } else if (msg instanceof RoutableMsg)
           {
-            InitializeRequestMsg initializeMsg =
-              (InitializeRequestMsg) msg;
-            handler.process(initializeMsg);
-          } else if (msg instanceof InitializeRcvAckMsg)
-          {
-            InitializeRcvAckMsg initializeRcvAckMsg =
-              (InitializeRcvAckMsg) msg;
-            handler.process(initializeRcvAckMsg);
-          } else if (msg instanceof InitializeTargetMsg)
-          {
-            InitializeTargetMsg initializeMsg = (InitializeTargetMsg) msg;
-            handler.process(initializeMsg);
-          } else if (msg instanceof EntryMsg)
-          {
-            EntryMsg entryMsg = (EntryMsg) msg;
-            handler.process(entryMsg);
-          } else if (msg instanceof DoneMsg)
-          {
-            DoneMsg doneMsg = (DoneMsg) msg;
-            handler.process(doneMsg);
-          } else if (msg instanceof ErrorMsg)
-          {
-            ErrorMsg errorMsg = (ErrorMsg) msg;
-            handler.process(errorMsg);
+            handler.process((RoutableMsg) msg);
           } else if (msg instanceof ResetGenerationIdMsg)
           {
-            ResetGenerationIdMsg genIdMsg = (ResetGenerationIdMsg) msg;
-            handler.processResetGenId(genIdMsg);
+            handler.processResetGenId((ResetGenerationIdMsg) msg);
           } else if (msg instanceof WindowProbeMsg)
           {
-            WindowProbeMsg windowProbeMsg = (WindowProbeMsg) msg;
-            handler.process(windowProbeMsg);
+            handler.replyToWindowProbe();
           } else if (msg instanceof TopologyMsg)
           {
-            TopologyMsg topoMsg = (TopologyMsg) msg;
             ReplicationServerHandler rsh = (ReplicationServerHandler)handler;
-            rsh.receiveTopoInfoFromRS(topoMsg);
+            rsh.receiveTopoInfoFromRS((TopologyMsg) msg);
           } else if (msg instanceof ChangeStatusMsg)
           {
             ChangeStatusMsg csMsg = (ChangeStatusMsg) msg;
@@ -242,28 +215,18 @@
                     csMsg.toString());
               logError(errMessage);
             }
-          } else if (msg instanceof MonitorRequestMsg)
-          {
-            MonitorRequestMsg replServerMonitorRequestMsg =
-              (MonitorRequestMsg) msg;
-            handler.process(replServerMonitorRequestMsg);
-          } else if (msg instanceof MonitorMsg)
-          {
-            MonitorMsg replServerMonitorMsg = (MonitorMsg) msg;
-            handler.process(replServerMonitorMsg);
           } else if (msg instanceof ChangeTimeHeartbeatMsg)
           {
-            ChangeTimeHeartbeatMsg cthbMsg = (ChangeTimeHeartbeatMsg) msg;
-            handler.process(cthbMsg);
+            handler.process((ChangeTimeHeartbeatMsg) msg);
           } else if (msg instanceof StopMsg)
           {
             // Peer server is properly disconnecting: go out of here to
             // properly close the server handler going to finally block.
             if (debugEnabled())
             {
-              TRACER.debugInfo(handler.toString() + " has properly " +
-                "disconnected from this replication server " +
-                Integer.toString(handler.getReplicationServerId()));
+              TRACER.debugInfo(handler
+                  + " has properly disconnected from this replication server "
+                  + handler.getReplicationServerId());
             }
             return;
           } else if (msg == null)
@@ -281,9 +244,7 @@
           // Received a V1 PDU we do not need to support:
           // we just trash the message and log the event for debug purpose,
           // then continue receiving messages.
-          if (debugEnabled())
-            TRACER.debugInfo(
-                "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+          logException(e);
         }
       }
     }
@@ -294,9 +255,7 @@
        * Log a message and exit from this loop
        * So that this handler is stopped.
        */
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+      logException(e);
       if (!handler.shuttingDown())
       {
         if (handler.isDataServer())
@@ -316,9 +275,7 @@
     }
     catch (Exception e)
     {
-      if (debugEnabled())
-        TRACER.debugInfo(
-          "In " + this.getName() + " " + stackTraceToSingleLineString(e));
+      logException(e);
       /*
        * The remote server has sent an unknown message,
        * close the connection.
@@ -334,14 +291,21 @@
        */
       if (debugEnabled())
       {
-        TRACER.debugInfo("In " + this.getName() + " closing the session");
+        TRACER.debugInfo("In " + getName() + " closing the session");
       }
       session.close();
       handler.doStop();
       if (debugEnabled())
       {
-        TRACER.debugInfo(this.getName() + " stopped " + errMessage);
+        TRACER.debugInfo(getName() + " stopped: " + errMessage);
       }
     }
   }
+
+  private void logException(Exception e)
+  {
+    if (debugEnabled())
+      TRACER.debugInfo(
+          "In " + getName() + " " + stackTraceToSingleLineString(e));
+  }
 }

--
Gitblit v1.10.0