From 42dd3533f278a8d66edae0fa08d4a8abfb5d6835 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 07 Jun 2013 13:02:24 +0000
Subject: [PATCH] Partial fix for OPENDJ-951: Reduce size and frequency of replication MonitorMsg

---
 opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java |  100 +++++++++++++++++++++++++++-----------------------
 1 files changed, 54 insertions(+), 46 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 805e8ba..112efab 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -33,6 +33,8 @@
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -45,6 +47,7 @@
 import javax.net.ssl.SSLSocket;
 
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.util.StaticUtils;
 
 
 
@@ -77,21 +80,38 @@
    */
   private volatile long lastReceiveTime = 0;
 
-  // Close and error guarded by stateLock: use a different lock to publish since
-  // publishing can block, and we don't want to block while closing failed
-  // connections.
+  /*
+   * Close and error guarded by stateLock: use a different lock to publish since
+   * publishing can block, and we don't want to block while closing failed
+   * connections.
+   */
   private final Object stateLock = new Object();
   private boolean closeInitiated = false;
   private Throwable sessionError = null;
 
-  // Publish guarded by publishLock: use a full lock here so that we can
-  // optionally publish StopMsg during close.
+  /*
+   * Publish guarded by publishLock: use a full lock here so that we can
+   * optionally publish StopMsg during close.
+   */
   private final Lock publishLock = new ReentrantLock();
 
-  // Does not need protecting: updated only during single threaded handshake.
+  /*
+   * These do not need synchronization because they are only modified during the
+   * initial single threaded handshake.
+   */
   private short protocolVersion = ProtocolVersion.getCurrentVersion();
-  private InputStream input;
-  private OutputStream output;
+  private boolean isEncrypted = true; // Initially encrypted.
+
+  /*
+   * Use a buffered input stream to avoid too many system calls.
+   */
+  private BufferedInputStream input;
+
+  /*
+   * Use a buffered output stream in order to combine message length and content
+   * into a single TCP packet if possible.
+   */
+  private BufferedOutputStream output;
 
 
 
@@ -121,8 +141,8 @@
     this.secureSocket = secureSocket;
     this.plainInput = plainSocket.getInputStream();
     this.plainOutput = plainSocket.getOutputStream();
-    this.input = secureSocket.getInputStream();
-    this.output = secureSocket.getOutputStream();
+    this.input = new BufferedInputStream(secureSocket.getInputStream());
+    this.output = new BufferedOutputStream(secureSocket.getOutputStream());
     this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
         .toString();
     this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
@@ -197,23 +217,7 @@
       }
     }
 
-    try
-    {
-      plainSocket.close();
-    }
-    catch (final IOException ignored)
-    {
-      // Ignore errors on close.
-    }
-
-    try
-    {
-      secureSocket.close();
-    }
-    catch (final IOException ignored)
-    {
-      // Ignore errors on close.
-    }
+    StaticUtils.close(plainSocket, secureSocket);
   }
 
 
@@ -297,7 +301,7 @@
   @Override
   public boolean isEncrypted()
   {
-    return input != plainInput;
+    return isEncrypted;
   }
 
 
@@ -327,6 +331,10 @@
     publishLock.lock();
     try
     {
+      /*
+       * The buffered output stream ensures that the message is usually sent as
+       * a single TCP packet.
+       */
       output.write(sendLengthBuf);
       output.write(buffer);
       output.flush();
@@ -358,9 +366,10 @@
       // Read the first 8 bytes containing the packet length.
       int length = 0;
 
-      // Let's start the stop-watch before waiting on read for the heartbeat
-      // check
-      // to be operational.
+      /*
+       * Let's start the stop-watch before waiting on read for the heartbeat
+       * check to be operational.
+       */
       lastReceiveTime = System.currentTimeMillis();
 
       while (length < 8)
@@ -398,8 +407,11 @@
             length += read;
           }
         }
-        // We do not want the heartbeat to close the session when we are
-        // processing a message even a time consuming one.
+
+        /*
+         * We do not want the heartbeat to close the session when we are
+         * processing a message even a time consuming one.
+         */
         lastReceiveTime = 0;
         return ReplicationMsg.generateMsg(buffer, protocolVersion);
       }
@@ -462,23 +474,19 @@
   @Override
   public void stopEncryption()
   {
-    // The secure socket has been configured not to auto close the underlying
-    // plain socket. We should close it here and properly tear down the SSL
-    // session, but this is not compatible with the existing protocol.
+    /*
+     * The secure socket has been configured not to auto close the underlying
+     * plain socket. We should close it here and properly tear down the SSL
+     * session, but this is not compatible with the existing protocol.
+     */
     if (false)
     {
-      try
-      {
-        secureSocket.close();
-      }
-      catch (IOException ignored)
-      {
-        // Ignore.
-      }
+      StaticUtils.close(secureSocket);
     }
 
-    input = plainInput;
-    output = plainOutput;
+    input = new BufferedInputStream(plainInput);
+    output = new BufferedOutputStream(plainOutput);
+    isEncrypted = false;
   }
 
 

--
Gitblit v1.10.0