From 6ee1440f6f56ac066f97383315b2798287f0821a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 23 Mar 2011 22:27:01 +0000
Subject: [PATCH] Fix issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server

---
 opends/src/server/org/opends/server/replication/protocol/SocketSession.java |  334 ++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 220 insertions(+), 114 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 9e2a936..7cf1a96 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@
  */
 package org.opends.server.replication.protocol;
 
+
+
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+
 
 /**
  * This class Implement a protocol session using a basic socket and relying on
- * the innate encoding/decoding capabilities of the ReplicationMsg
- * by using the getBytes() and generateMsg() methods of those classes.
+ * the innate encoding/decoding capabilities of the ReplicationMsg by using the
+ * getBytes() and generateMsg() methods of those classes.
  */
-public class SocketSession implements ProtocolSession
+public final class SocketSession implements ProtocolSession
 {
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private Socket socket;
-  private InputStream input;
-  private OutputStream output;
-  byte[] rcvLengthBuf = new byte[8];
+  private final Socket socket;
+  private final InputStream input;
+  private final OutputStream output;
+  private final byte[] rcvLengthBuf = new byte[8];
 
   /**
    * The time the last message published to this session.
    */
   private volatile long lastPublishTime = 0;
 
-
   /**
    * The time the last message was received on this session.
    */
-  private long lastReceiveTime = 0;
+  private volatile long lastReceiveTime = 0;
 
+  // Close guarded by closeLock: use a different lock to publish since
+  // publishing can block, and we don't want to block while closing failed
+  // connections.
+  private final Object closeLock = new Object();
   private boolean closeInitiated = false;
 
+  // Publish guarded by publishLock: use a full lock here so that we can
+  // optionally publish StopMsg during close.
+  private final Lock publishLock = new ReentrantLock();
+
+  // Does not need protecting: updated only during single threaded handshake.
   private short protocolVersion = ProtocolVersion.getCurrentVersion();
 
+
+
   /**
    * Creates a new SocketSession based on the provided socket.
    *
-   * @param socket The Socket on which the SocketSession will be based.
-   * @throws IOException When an IException happens on the socket.
+   * @param socket
+   *          The Socket on which the SocketSession will be based.
+   * @throws IOException
+   *           When an IException happens on the socket.
    */
-  public SocketSession(Socket socket) throws IOException
+  public SocketSession(final Socket socket) throws IOException
   {
-    this.socket = socket;
-    /*
-     * Use a window instead of the TCP flow control.
-     * Therefore set a very large value for send and receive buffer sizes.
-     */
-    input = socket.getInputStream();
-    output = socket.getOutputStream();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void close() throws IOException
-  {
-    closeInitiated = true;
-
     if (debugEnabled())
     {
-      TRACER.debugInfo("Closing SocketSession."
-          + stackTraceToSingleLineString(new Exception()));
+      TRACER.debugInfo("Creating SocketSession to %s from %s", socket
+          .getRemoteSocketAddress().toString(),
+          stackTraceToSingleLineString(new Exception()));
     }
-    socket.close();
+
+    this.socket = socket;
+    this.input = socket.getInputStream();
+    this.output = socket.getOutputStream();
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public synchronized void publish(ReplicationMsg msg)
-         throws IOException
+  @Override
+  public void close()
+  {
+    synchronized (closeLock)
+    {
+      if (closeInitiated)
+      {
+        return;
+      }
+
+      closeInitiated = true;
+    }
+
+    // Perform close outside of critical section.
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("Closing SocketSession to %s from %s", socket
+          .getRemoteSocketAddress().toString(),
+          stackTraceToSingleLineString(new Exception()));
+    }
+
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+      // V4 protocol introduces a StopMsg to properly end communications.
+      if (publishLock.tryLock())
+      {
+        try
+        {
+          publish(new StopMsg());
+        }
+        catch (final IOException ignored)
+        {
+          // Ignore errors on close.
+        }
+        finally
+        {
+          publishLock.unlock();
+        }
+      }
+    }
+
+    if (socket != null && !socket.isClosed())
+    {
+      try
+      {
+        socket.close();
+      }
+      catch (final IOException ignored)
+      {
+        // Ignore errors on close.
+      }
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean closeInitiated()
+  {
+    synchronized (closeLock)
+    {
+      return closeInitiated;
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long getLastPublishTime()
+  {
+    return lastPublishTime;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long getLastReceiveTime()
+  {
+    if (lastReceiveTime == 0)
+    {
+      return System.currentTimeMillis();
+    }
+    return lastReceiveTime;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getReadableRemoteAddress()
+  {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getRemoteAddress()
+  {
+    return socket.getInetAddress().getHostAddress();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isEncrypted()
+  {
+    return false;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void publish(final ReplicationMsg msg) throws IOException
   {
     publish(msg, ProtocolVersion.getCurrentVersion());
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
-         throws IOException
+  @Override
+  public void publish(final ReplicationMsg msg,
+      final short reqProtocolVersion) throws IOException
   {
-    byte[] buffer = msg.getBytes(reqProtocolVersion);
-    String str = String.format("%08x", buffer.length);
+    final byte[] buffer = msg.getBytes(reqProtocolVersion);
+    final String str = String.format("%08x", buffer.length);
+    final byte[] sendLengthBuf = str.getBytes();
 
-    if (debugEnabled())
+    publishLock.lock();
+    try
     {
-      TRACER.debugInfo("SocketSession publish <" + str + ">");
+      output.write(sendLengthBuf);
+      output.write(buffer);
+      output.flush();
     }
-
-    byte[] sendLengthBuf = str.getBytes();
-
-    output.write(sendLengthBuf);
-    output.write(buffer);
-    output.flush();
+    finally
+    {
+      publishLock.unlock();
+    }
 
     lastPublishTime = System.currentTimeMillis();
   }
 
+
+
   /**
    * {@inheritDoc}
    */
+  @Override
   public ReplicationMsg receive() throws IOException,
       ClassNotFoundException, DataFormatException,
       NotSupportedOldVersionPDUException
   {
-    /* Read the first 8 bytes containing the packet length */
+    // Read the first 8 bytes containing the packet length
     int length = 0;
 
-    /* Let's start the stop-watch before waiting on read */
-    /* for the heartbeat check to be operationnal        */
+    // Let's start the stop-watch before waiting on read for the heartbeat check
+    // to be operational
     lastReceiveTime = System.currentTimeMillis();
 
-    while (length<8)
+    while (length < 8)
     {
-      int read = input.read(rcvLengthBuf, length, 8-length);
+      final int read = input.read(rcvLengthBuf, length, 8 - length);
       if (read == -1)
       {
-        lastReceiveTime=0;
+        lastReceiveTime = 0;
         throw new IOException("no more data");
       }
       else
@@ -164,101 +312,59 @@
       }
     }
 
-    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
+    final int totalLength = Integer.parseInt(
+        new String(rcvLengthBuf), 16);
 
     try
     {
       length = 0;
-      byte[] buffer = new byte[totalLength];
+      final byte[] buffer = new byte[totalLength];
       while (length < totalLength)
       {
         length += input.read(buffer, length, totalLength - length);
       }
-      /* We do not want the heartbeat to close the session when */
-      /* we are processing a message even a time consuming one. */
-      lastReceiveTime=0;
+      // We do not want the heartbeat to close the session when we are
+      // processing a message even a time consuming one.
+      lastReceiveTime = 0;
       return ReplicationMsg.generateMsg(buffer, protocolVersion);
     }
-    catch (OutOfMemoryError e)
+    catch (final OutOfMemoryError e)
     {
       throw new IOException("Packet too large, can't allocate "
-                            + totalLength + " bytes.");
+          + totalLength + " bytes.");
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public void stopEncryption()
-  {
-    // There is no security layer.
-  }
+
 
   /**
    * {@inheritDoc}
    */
-  public boolean isEncrypted()
+  @Override
+  public void setProtocolVersion(final short version)
   {
-    return false;
+    protocolVersion = version;
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public long getLastPublishTime()
-  {
-    return lastPublishTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public long getLastReceiveTime()
-  {
-    if (lastReceiveTime==0)
-    {
-      return System.currentTimeMillis();
-    }
-    return lastReceiveTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public String getRemoteAddress()
-  {
-    return socket.getInetAddress().getHostAddress();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public String getReadableRemoteAddress()
-  {
-    return socket.getRemoteSocketAddress().toString();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void setSoTimeout(int timeout) throws SocketException
+  @Override
+  public void setSoTimeout(final int timeout) throws SocketException
   {
     socket.setSoTimeout(timeout);
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public boolean closeInitiated()
-  {
-    return closeInitiated;
-  }
+
 
   /**
    * {@inheritDoc}
    */
-  public void setProtocolVersion(short version)
+  @Override
+  public void stopEncryption()
   {
-    protocolVersion = version;
+    // There is no security layer.
   }
 }

--
Gitblit v1.10.0