From 42dd3533f278a8d66edae0fa08d4a8abfb5d6835 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 07 Jun 2013 13:02:24 +0000
Subject: [PATCH] Partial fix for OPENDJ-951: Reduce size and frequency of replication MonitorMsg
---
opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 100 +++++++++++++++++++++++++++-----------------------
1 files changed, 54 insertions(+), 46 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index 805e8ba..112efab 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -33,6 +33,8 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -45,6 +47,7 @@
import javax.net.ssl.SSLSocket;
import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.util.StaticUtils;
@@ -77,21 +80,38 @@
*/
private volatile long lastReceiveTime = 0;
- // Close and error guarded by stateLock: use a different lock to publish since
- // publishing can block, and we don't want to block while closing failed
- // connections.
+ /*
+ * Close and error guarded by stateLock: use a different lock to publish since
+ * publishing can block, and we don't want to block while closing failed
+ * connections.
+ */
private final Object stateLock = new Object();
private boolean closeInitiated = false;
private Throwable sessionError = null;
- // Publish guarded by publishLock: use a full lock here so that we can
- // optionally publish StopMsg during close.
+ /*
+ * Publish guarded by publishLock: use a full lock here so that we can
+ * optionally publish StopMsg during close.
+ */
private final Lock publishLock = new ReentrantLock();
- // Does not need protecting: updated only during single threaded handshake.
+ /*
+ * These do not need synchronization because they are only modified during the
+ * initial single threaded handshake.
+ */
private short protocolVersion = ProtocolVersion.getCurrentVersion();
- private InputStream input;
- private OutputStream output;
+ private boolean isEncrypted = true; // Initially encrypted.
+
+ /*
+ * Use a buffered input stream to avoid too many system calls.
+ */
+ private BufferedInputStream input;
+
+ /*
+ * Use a buffered output stream in order to combine message length and content
+ * into a single TCP packet if possible.
+ */
+ private BufferedOutputStream output;
@@ -121,8 +141,8 @@
this.secureSocket = secureSocket;
this.plainInput = plainSocket.getInputStream();
this.plainOutput = plainSocket.getOutputStream();
- this.input = secureSocket.getInputStream();
- this.output = secureSocket.getOutputStream();
+ this.input = new BufferedInputStream(secureSocket.getInputStream());
+ this.output = new BufferedOutputStream(secureSocket.getOutputStream());
this.readableRemoteAddress = plainSocket.getRemoteSocketAddress()
.toString();
this.remoteAddress = plainSocket.getInetAddress().getHostAddress();
@@ -197,23 +217,7 @@
}
}
- try
- {
- plainSocket.close();
- }
- catch (final IOException ignored)
- {
- // Ignore errors on close.
- }
-
- try
- {
- secureSocket.close();
- }
- catch (final IOException ignored)
- {
- // Ignore errors on close.
- }
+ StaticUtils.close(plainSocket, secureSocket);
}
@@ -297,7 +301,7 @@
@Override
public boolean isEncrypted()
{
- return input != plainInput;
+ return isEncrypted;
}
@@ -327,6 +331,10 @@
publishLock.lock();
try
{
+ /*
+ * The buffered output stream ensures that the message is usually sent as
+ * a single TCP packet.
+ */
output.write(sendLengthBuf);
output.write(buffer);
output.flush();
@@ -358,9 +366,10 @@
// Read the first 8 bytes containing the packet length.
int length = 0;
- // Let's start the stop-watch before waiting on read for the heartbeat
- // check
- // to be operational.
+ /*
+ * Let's start the stop-watch before waiting on read for the heartbeat
+ * check to be operational.
+ */
lastReceiveTime = System.currentTimeMillis();
while (length < 8)
@@ -398,8 +407,11 @@
length += read;
}
}
- // We do not want the heartbeat to close the session when we are
- // processing a message even a time consuming one.
+
+ /*
+ * We do not want the heartbeat to close the session when we are
+ * processing a message even a time consuming one.
+ */
lastReceiveTime = 0;
return ReplicationMsg.generateMsg(buffer, protocolVersion);
}
@@ -462,23 +474,19 @@
@Override
public void stopEncryption()
{
- // The secure socket has been configured not to auto close the underlying
- // plain socket. We should close it here and properly tear down the SSL
- // session, but this is not compatible with the existing protocol.
+ /*
+ * The secure socket has been configured not to auto close the underlying
+ * plain socket. We should close it here and properly tear down the SSL
+ * session, but this is not compatible with the existing protocol.
+ */
if (false)
{
- try
- {
- secureSocket.close();
- }
- catch (IOException ignored)
- {
- // Ignore.
- }
+ StaticUtils.close(secureSocket);
}
- input = plainInput;
- output = plainOutput;
+ input = new BufferedInputStream(plainInput);
+ output = new BufferedOutputStream(plainOutput);
+ isEncrypted = false;
}
--
Gitblit v1.10.0