From 6ee1440f6f56ac066f97383315b2798287f0821a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 23 Mar 2011 22:27:01 +0000
Subject: [PATCH] Fix issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server
---
opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 334 ++++++++++++++++++++++++++++++++++++-------------------
1 files changed, 220 insertions(+), 114 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 9e2a936..7cf1a96 100644
--- a/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@
*/
package org.opends.server.replication.protocol;
+
+
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+
/**
* This class Implement a protocol session using a basic socket and relying on
- * the innate encoding/decoding capabilities of the ReplicationMsg
- * by using the getBytes() and generateMsg() methods of those classes.
+ * the innate encoding/decoding capabilities of the ReplicationMsg by using the
+ * getBytes() and generateMsg() methods of those classes.
*/
-public class SocketSession implements ProtocolSession
+public final class SocketSession implements ProtocolSession
{
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private Socket socket;
- private InputStream input;
- private OutputStream output;
- byte[] rcvLengthBuf = new byte[8];
+ private final Socket socket;
+ private final InputStream input;
+ private final OutputStream output;
+ private final byte[] rcvLengthBuf = new byte[8];
/**
* The time the last message published to this session.
*/
private volatile long lastPublishTime = 0;
-
/**
* The time the last message was received on this session.
*/
- private long lastReceiveTime = 0;
+ private volatile long lastReceiveTime = 0;
+ // Close guarded by closeLock: use a different lock to publish since
+ // publishing can block, and we don't want to block while closing failed
+ // connections.
+ private final Object closeLock = new Object();
private boolean closeInitiated = false;
+ // Publish guarded by publishLock: use a full lock here so that we can
+ // optionally publish StopMsg during close.
+ private final Lock publishLock = new ReentrantLock();
+
+ // Does not need protecting: updated only during single threaded handshake.
private short protocolVersion = ProtocolVersion.getCurrentVersion();
+
+
/**
* Creates a new SocketSession based on the provided socket.
*
- * @param socket The Socket on which the SocketSession will be based.
- * @throws IOException When an IException happens on the socket.
+ * @param socket
+ * The Socket on which the SocketSession will be based.
+ * @throws IOException
+ * When an IException happens on the socket.
*/
- public SocketSession(Socket socket) throws IOException
+ public SocketSession(final Socket socket) throws IOException
{
- this.socket = socket;
- /*
- * Use a window instead of the TCP flow control.
- * Therefore set a very large value for send and receive buffer sizes.
- */
- input = socket.getInputStream();
- output = socket.getOutputStream();
- }
-
- /**
- * {@inheritDoc}
- */
- public void close() throws IOException
- {
- closeInitiated = true;
-
if (debugEnabled())
{
- TRACER.debugInfo("Closing SocketSession."
- + stackTraceToSingleLineString(new Exception()));
+ TRACER.debugInfo("Creating SocketSession to %s from %s", socket
+ .getRemoteSocketAddress().toString(),
+ stackTraceToSingleLineString(new Exception()));
}
- socket.close();
+
+ this.socket = socket;
+ this.input = socket.getInputStream();
+ this.output = socket.getOutputStream();
}
+
+
/**
* {@inheritDoc}
*/
- public synchronized void publish(ReplicationMsg msg)
- throws IOException
+ @Override
+ public void close()
+ {
+ synchronized (closeLock)
+ {
+ if (closeInitiated)
+ {
+ return;
+ }
+
+ closeInitiated = true;
+ }
+
+ // Perform close outside of critical section.
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Closing SocketSession to %s from %s", socket
+ .getRemoteSocketAddress().toString(),
+ stackTraceToSingleLineString(new Exception()));
+ }
+
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications.
+ if (publishLock.tryLock())
+ {
+ try
+ {
+ publish(new StopMsg());
+ }
+ catch (final IOException ignored)
+ {
+ // Ignore errors on close.
+ }
+ finally
+ {
+ publishLock.unlock();
+ }
+ }
+ }
+
+ if (socket != null && !socket.isClosed())
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (final IOException ignored)
+ {
+ // Ignore errors on close.
+ }
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean closeInitiated()
+ {
+ synchronized (closeLock)
+ {
+ return closeInitiated;
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getLastPublishTime()
+ {
+ return lastPublishTime;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getLastReceiveTime()
+ {
+ if (lastReceiveTime == 0)
+ {
+ return System.currentTimeMillis();
+ }
+ return lastReceiveTime;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getReadableRemoteAddress()
+ {
+ return socket.getRemoteSocketAddress().toString();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getRemoteAddress()
+ {
+ return socket.getInetAddress().getHostAddress();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isEncrypted()
+ {
+ return false;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void publish(final ReplicationMsg msg) throws IOException
{
publish(msg, ProtocolVersion.getCurrentVersion());
}
+
+
/**
* {@inheritDoc}
*/
- public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
- throws IOException
+ @Override
+ public void publish(final ReplicationMsg msg,
+ final short reqProtocolVersion) throws IOException
{
- byte[] buffer = msg.getBytes(reqProtocolVersion);
- String str = String.format("%08x", buffer.length);
+ final byte[] buffer = msg.getBytes(reqProtocolVersion);
+ final String str = String.format("%08x", buffer.length);
+ final byte[] sendLengthBuf = str.getBytes();
- if (debugEnabled())
+ publishLock.lock();
+ try
{
- TRACER.debugInfo("SocketSession publish <" + str + ">");
+ output.write(sendLengthBuf);
+ output.write(buffer);
+ output.flush();
}
-
- byte[] sendLengthBuf = str.getBytes();
-
- output.write(sendLengthBuf);
- output.write(buffer);
- output.flush();
+ finally
+ {
+ publishLock.unlock();
+ }
lastPublishTime = System.currentTimeMillis();
}
+
+
/**
* {@inheritDoc}
*/
+ @Override
public ReplicationMsg receive() throws IOException,
ClassNotFoundException, DataFormatException,
NotSupportedOldVersionPDUException
{
- /* Read the first 8 bytes containing the packet length */
+ // Read the first 8 bytes containing the packet length
int length = 0;
- /* Let's start the stop-watch before waiting on read */
- /* for the heartbeat check to be operationnal */
+ // Let's start the stop-watch before waiting on read for the heartbeat check
+ // to be operational
lastReceiveTime = System.currentTimeMillis();
- while (length<8)
+ while (length < 8)
{
- int read = input.read(rcvLengthBuf, length, 8-length);
+ final int read = input.read(rcvLengthBuf, length, 8 - length);
if (read == -1)
{
- lastReceiveTime=0;
+ lastReceiveTime = 0;
throw new IOException("no more data");
}
else
@@ -164,101 +312,59 @@
}
}
- int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
+ final int totalLength = Integer.parseInt(
+ new String(rcvLengthBuf), 16);
try
{
length = 0;
- byte[] buffer = new byte[totalLength];
+ final byte[] buffer = new byte[totalLength];
while (length < totalLength)
{
length += input.read(buffer, length, totalLength - length);
}
- /* We do not want the heartbeat to close the session when */
- /* we are processing a message even a time consuming one. */
- lastReceiveTime=0;
+ // We do not want the heartbeat to close the session when we are
+ // processing a message even a time consuming one.
+ lastReceiveTime = 0;
return ReplicationMsg.generateMsg(buffer, protocolVersion);
}
- catch (OutOfMemoryError e)
+ catch (final OutOfMemoryError e)
{
throw new IOException("Packet too large, can't allocate "
- + totalLength + " bytes.");
+ + totalLength + " bytes.");
}
}
- /**
- * {@inheritDoc}
- */
- public void stopEncryption()
- {
- // There is no security layer.
- }
+
/**
* {@inheritDoc}
*/
- public boolean isEncrypted()
+ @Override
+ public void setProtocolVersion(final short version)
{
- return false;
+ protocolVersion = version;
}
+
+
/**
* {@inheritDoc}
*/
- public long getLastPublishTime()
- {
- return lastPublishTime;
- }
-
- /**
- * {@inheritDoc}
- */
- public long getLastReceiveTime()
- {
- if (lastReceiveTime==0)
- {
- return System.currentTimeMillis();
- }
- return lastReceiveTime;
- }
-
- /**
- * {@inheritDoc}
- */
- public String getRemoteAddress()
- {
- return socket.getInetAddress().getHostAddress();
- }
-
- /**
- * {@inheritDoc}
- */
- public String getReadableRemoteAddress()
- {
- return socket.getRemoteSocketAddress().toString();
- }
-
- /**
- * {@inheritDoc}
- */
- public void setSoTimeout(int timeout) throws SocketException
+ @Override
+ public void setSoTimeout(final int timeout) throws SocketException
{
socket.setSoTimeout(timeout);
}
- /**
- * {@inheritDoc}
- */
- public boolean closeInitiated()
- {
- return closeInitiated;
- }
+
/**
* {@inheritDoc}
*/
- public void setProtocolVersion(short version)
+ @Override
+ public void stopEncryption()
{
- protocolVersion = version;
+ // There is no security layer.
}
}
--
Gitblit v1.10.0