From d2db6915a220002a55281ebeb94fc8c590a33853 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Thu, 14 Apr 2011 16:45:52 +0000
Subject: [PATCH] Another fix for issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server

---
 opends/src/server/org/opends/server/replication/service/ReplicationBroker.java |  216 +++++++----------------------------------------------
 1 files changed, 31 insertions(+), 185 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 1e8de0f..9559252 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -67,24 +67,7 @@
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ChangeStatusMsg;
-import org.opends.server.replication.protocol.MonitorMsg;
-import org.opends.server.replication.protocol.MonitorRequestMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.ReplServerStartDSMsg;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ServerStartECLMsg;
-import org.opends.server.replication.protocol.ServerStartMsg;
-import org.opends.server.replication.protocol.StartECLSessionMsg;
-import org.opends.server.replication.protocol.StartSessionMsg;
-import org.opends.server.replication.protocol.StopMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.protocol.WindowMsg;
-import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.replication.protocol.*;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.ServerConstants;
 import org.opends.server.replication.server.ReplicationServer;
@@ -780,7 +763,7 @@
     {
       // Connect to server and get info about it
       ReplicationServerInfo replicationServerInfo =
-        performPhaseOneHandshake(server, false);
+        performPhaseOneHandshake(server, false, false);
 
       // Store server info in list
       if (replicationServerInfo != null)
@@ -808,11 +791,10 @@
     // FIXME:ECL List of RS to connect is for now limited to one RS only
     String bestServer = this.servers.iterator().next();
 
-    ReplServerStartDSMsg inReplServerStartDSMsg = performECLPhaseOneHandshake(
-      bestServer, true);
-
-    if (inReplServerStartDSMsg != null)
+    if (performPhaseOneHandshake(bestServer, true, true) != null)
+    {
       performECLPhaseTwoHandshake(bestServer);
+    }
   }
 
   /**
@@ -887,7 +869,7 @@
             " phase 2 : will perform PhaseOneH with the preferred RS="
               + replicationServerInfo);
         replicationServerInfo = performPhaseOneHandshake(
-          replicationServerInfo.getServerURL(), true);
+          replicationServerInfo.getServerURL(), true, false);
 
         if (replicationServerInfo != null)
         {
@@ -1170,10 +1152,12 @@
    *          Do we keep session opened or not after handshake. Use true if want
    *          to perform handshake phase 2 with the same session and keep the
    *          session to create as the current one.
+   * @param isECL
+   *          Indicates whether or not the an ECL handshake is to be performed.
    * @return The answer from the server . Null if could not get an answer.
    */
   private ReplicationServerInfo performPhaseOneHandshake(
-      String server, boolean keepConnection)
+      String server, boolean keepConnection, boolean isECL)
   {
     int separator = server.lastIndexOf(':');
     String port = server.substring(separator + 1);
@@ -1202,10 +1186,21 @@
           .isSslEncryption(server);
 
       // Send our ServerStartMsg.
-      ServerStartMsg serverStartMsg = new ServerStartMsg(serverId,
-          baseDn, maxRcvWindow, heartbeatInterval, state,
-          ProtocolVersion.getCurrentVersion(),
-          this.getGenerationID(), isSslEncryption, groupId);
+      StartMsg serverStartMsg;
+      if (!isECL)
+      {
+        serverStartMsg = new ServerStartMsg(serverId, baseDn,
+            maxRcvWindow, heartbeatInterval, state,
+            ProtocolVersion.getCurrentVersion(),
+            this.getGenerationID(), isSslEncryption, groupId);
+      }
+      else
+      {
+        serverStartMsg = new ServerStartECLMsg(baseDn, 0, 0, 0, 0,
+            maxRcvWindow, heartbeatInterval, state,
+            ProtocolVersion.getCurrentVersion(),
+            this.getGenerationID(), isSslEncryption, groupId);
+      }
       localSession.publish(serverStartMsg);
 
       // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
@@ -1236,9 +1231,13 @@
        * replication server will use the same one (or an older one if it is an
        * old replication server).
        */
-      protocolVersion = ProtocolVersion.minWithCurrent(replServerInfo
-          .getProtocolVersion());
-      localSession.setProtocolVersion(protocolVersion);
+      final short localProtocolVersion = ProtocolVersion
+          .minWithCurrent(replServerInfo.getProtocolVersion());
+      if (keepConnection)
+      {
+        protocolVersion = localProtocolVersion;
+      }
+      localSession.setProtocolVersion(localProtocolVersion);
 
       if (!isSslEncryption)
       {
@@ -1318,160 +1317,7 @@
     }
   }
 
-  /**
-   * Connect to the provided server performing the first phase handshake
-   * (start messages exchange) and return the reply message from the replication
-   * server.
-   *
-   * @param server Server to connect to.
-   * @param keepConnection Do we keep session opened or not after handshake.
-   *        Use true if want to perform handshake phase 2 with the same session
-   *        and keep the session to create as the current one.
-   * @return The ReplServerStartDSMsg the server replied. Null if could not
-   *         get an answer.
-   */
-  private ReplServerStartDSMsg performECLPhaseOneHandshake(String server,
-    boolean keepConnection)
-  {
-    // FIXME: this should be merged with performPhaseOneHandshake to avoid
-    // code/bug duplication.
-    ReplServerStartDSMsg replServerStartDSMsg = null;
 
-    // Parse server string.
-    int separator = server.lastIndexOf(':');
-    String port = server.substring(separator + 1);
-    String hostname = server.substring(0, separator);
-    ProtocolSession localSession = null;
-
-    boolean error = false;
-    try
-    {
-      /*
-       * Open a socket connection to the next candidate.
-       */
-      int intPort = Integer.parseInt(port);
-      InetSocketAddress serverAddr = new InetSocketAddress(
-        InetAddress.getByName(hostname), intPort);
-      Socket socket = new Socket();
-      socket.setReceiveBufferSize(1000000);
-      socket.setTcpNoDelay(true);
-      socket.connect(serverAddr, 500);
-      localSession = replSessionSecurity.createClientSession(socket,
-        ReplSessionSecurity.HANDSHAKE_TIMEOUT);
-      boolean isSslEncryption =
-        replSessionSecurity.isSslEncryption(server);
-
-      // Send our start msg.
-      ServerStartECLMsg serverStartECLMsg = new ServerStartECLMsg(
-        baseDn, 0, 0, 0, 0,
-        maxRcvWindow, heartbeatInterval, state,
-        ProtocolVersion.getCurrentVersion(), this.getGenerationID(),
-        isSslEncryption,
-        groupId);
-      localSession.publish(serverStartECLMsg);
-
-      // Read the ReplServerStartMsg that should come back.
-      replServerStartDSMsg = (ReplServerStartDSMsg) localSession.receive();
-
-      if (debugEnabled())
-      {
-        debugInfo("In RB for " + baseDn +
-          "\nRB HANDSHAKE SENT:\n" + serverStartECLMsg.toString() +
-          "\nAND RECEIVED:\n" + replServerStartDSMsg.toString());
-      }
-
-      // Sanity check
-      String repDn = replServerStartDSMsg.getBaseDn();
-      if (!(this.baseDn.equals(repDn)))
-      {
-        Message message = ERR_DS_DN_DOES_NOT_MATCH.get(repDn.toString(),
-          this.baseDn);
-        logError(message);
-        error = true;
-      }
-
-      /*
-       * We have sent our own protocol version to the replication server.
-       * The replication server will use the same one (or an older one
-       * if it is an old replication server).
-       */
-      if (keepConnection)
-        protocolVersion = ProtocolVersion.minWithCurrent(
-          replServerStartDSMsg.getVersion());
-      localSession.setProtocolVersion(protocolVersion);
-
-      if (!isSslEncryption)
-      {
-        localSession.stopEncryption();
-      }
-    } catch (ConnectException e)
-    {
-      /*
-       * There was no server waiting on this host:port
-       * Log a notice and try the next replicationServer in the list
-       */
-      if (!connectionError)
-      {
-        Message message = WARN_NO_CHANGELOG_SERVER_LISTENING.get(serverId,
-            server, baseDn);
-
-        if (keepConnection) // Log error message only for final connection
-        {
-          // the error message is only logged once to avoid overflowing
-          // the error log
-          logError(message);
-        } else if (debugEnabled())
-        {
-          debugInfo(message.toString());
-        }
-      }
-      error = true;
-    } catch (Exception e)
-    {
-      if ((e instanceof SocketTimeoutException) && debugEnabled())
-      {
-        debugInfo("Timeout trying to connect to RS " + server +
-          " for dn: " + baseDn);
-      }
-      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(serverId,
-          server, baseDn, stackTraceToSingleLineString(e));
-      if (keepConnection) // Log error message only for final connection
-      {
-        logError(message);
-      } else if (debugEnabled())
-      {
-        debugInfo(message.toString());
-      }
-      error = true;
-    }
-
-    // Close session if requested
-    if (!keepConnection || error)
-    {
-      if (localSession != null)
-      {
-        if (debugEnabled()) {
-          debugInfo("In RB, closing session after phase 1");
-        }
-        localSession.close();
-        localSession = null;
-      }
-      if (error)
-      {
-        replServerStartDSMsg = null;
-      } // Be sure to return null.
-
-    }
-
-    // If this connection as the one to use for sending and receiving updates,
-    // store it.
-    if (keepConnection)
-    {
-      session = localSession;
-    }
-
-    return replServerStartDSMsg;
-  }
 
   /**
    * Performs the second phase handshake (send StartSessionMsg and receive

--
Gitblit v1.10.0