From 80774bcd0c732d9446cfc09fc9b7c39a3e4003ad Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 23 Mar 2011 22:27:01 +0000
Subject: [PATCH] Fix issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server
---
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java | 113 ---
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java | 5
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java | 451 ++++++++++++-----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java | 44 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java | 27
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java | 17
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java | 334 ++++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java | 36 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java | 24
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java | 5
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java | 29 -
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java | 307 +++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java | 35 -
opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java | 7
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java | 27
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 8
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java | 8
18 files changed, 811 insertions(+), 678 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
index 1f43a02..21b4132 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2009 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -91,8 +92,8 @@
/**
* Constructor.
- * @param fractionalConfig
- * @param domain
+ * @param fractionalConfig The fractional configuration.
+ * @param domain The replication domain.
*/
public ImportFractionalContext(FractionalConfig fractionalConfig,
LDAPReplicationDomain domain)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
index 5a0688b..854a7b2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -35,8 +35,6 @@
import org.opends.server.loggers.debug.DebugTracer;
-import java.io.IOException;
-
import org.opends.server.api.DirectoryThread;
/**
@@ -71,27 +69,18 @@
private volatile boolean shutdown = false;
/**
- * Send StopMsg before session closure or not.
- */
- private final boolean sendStopBeforeClose;
-
-
- /**
* Create a heartbeat monitor thread.
* @param threadName The name of the heartbeat thread.
* @param session The session on which heartbeats are to be monitored.
* @param heartbeatInterval The expected interval between heartbeats received
* (in milliseconds).
- * @param sendStopBeforeClose Should we send a StopMsg before closing the
- * session ?
*/
public HeartbeatMonitor(String threadName, ProtocolSession session,
- long heartbeatInterval, boolean sendStopBeforeClose)
+ long heartbeatInterval)
{
super(threadName);
this.session = session;
this.heartbeatInterval = heartbeatInterval;
- this.sendStopBeforeClose = sendStopBeforeClose;
}
/**
@@ -126,18 +115,6 @@
{
// Heartbeat is well overdue so the server is assumed to be dead.
logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName()));
- if (sendStopBeforeClose)
- {
- // V4 protocol introduces a StopMsg to properly end communications
- try
- {
- session.publish(new StopMsg());
- }
- catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
session.close();
break;
}
@@ -160,10 +137,6 @@
}
}
}
- catch (IOException e)
- {
- // Hope that's OK.
- }
finally
{
if (debugEnabled())
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index ee31cb6..e356005 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
@@ -44,10 +45,8 @@
/**
* This method is called when the session with the remote must be closed.
* This object won't be used anymore after this method is called.
- *
- * @throws IOException If an error happen during the close process.
*/
- public abstract void close() throws IOException;
+ public abstract void close();
/**
* This method is called when a ReplicationMsg must be sent to
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
index fdfe13a..204b776 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -28,77 +28,104 @@
package org.opends.server.replication.protocol;
-import static org.opends.server.loggers.ErrorLogger.logError;
+
+
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
-import org.opends.messages.Message;
-import org.opends.server.types.DirectoryConfig;
-import org.opends.server.types.CryptoManager;
-import org.opends.server.config.ConfigException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.SortedSet;
+import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
-import java.util.SortedSet;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.io.IOException;
+
+import org.opends.messages.Message;
+import org.opends.server.config.ConfigException;
+import org.opends.server.types.CryptoManager;
+import org.opends.server.types.DirectoryConfig;
+
+
/**
* This class represents the security configuration for replication protocol
* sessions. It contains all the configuration required to use SSL, and it
* determines whether encryption should be enabled for a session to a given
* replication server.
- *
*/
-public class ReplSessionSecurity
+public final class ReplSessionSecurity
{
/**
* Whether replication sessions use SSL encryption.
*/
- private boolean sslEncryption;
+ private final boolean sslEncryption;
/**
* The name of the local certificate to use, or null if none is specified.
*/
- private String sslCertNickname;
+ private final String sslCertNickname;
/**
* The set of enabled SSL protocols, or null for the default set.
*/
- private String sslProtocols[];
+ private final String sslProtocols[];
/**
* The set of enabled SSL cipher suites, or null for the default set.
*/
- private String sslCipherSuites[];
+ private final String sslCipherSuites[];
/**
- * The default soTimeout value to be used at handshake phases.
- * (DS<->RS and RS<->RS)
+ * The default soTimeout value to be used at handshake phases. (DS<->RS and
+ * RS<->RS)
*/
public static final int HANDSHAKE_TIMEOUT = 4000;
+
+
+ /**
+ * Create a ReplSessionSecurity instance from a provided multimaster domain
+ * configuration.
+ *
+ * @throws ConfigException
+ * If the supplied configuration was not valid.
+ */
+ public ReplSessionSecurity() throws ConfigException
+ {
+ // Currently use global settings from the crypto manager.
+ this(DirectoryConfig.getCryptoManager().getSslCertNickname(),
+ DirectoryConfig.getCryptoManager().getSslProtocols(),
+ DirectoryConfig.getCryptoManager().getSslCipherSuites(),
+ DirectoryConfig.getCryptoManager().isSslEncryption());
+ }
+
+
+
/**
* Create a ReplSessionSecurity instance from the supplied configuration
* values.
*
- * @param sslCertNickname The name of the local certificate to use, or null
- * if none is specified.
- * @param sslProtocols The protocols that should be enabled, or null if
- * the default protocols should be used.
- * @param sslCipherSuites The cipher suites that should be enabled, or null
- * if the default cipher suites should be used.
- * @param sslEncryption Whether replication sessions use SSL encryption.
- *
- * @throws ConfigException If the supplied configuration was not valid.
+ * @param sslCertNickname
+ * The name of the local certificate to use, or null if none is
+ * specified.
+ * @param sslProtocols
+ * The protocols that should be enabled, or null if the default
+ * protocols should be used.
+ * @param sslCipherSuites
+ * The cipher suites that should be enabled, or null if the default
+ * cipher suites should be used.
+ * @param sslEncryption
+ * Whether replication sessions use SSL encryption.
+ * @throws ConfigException
+ * If the supplied configuration was not valid.
*/
- public ReplSessionSecurity(String sslCertNickname,
- SortedSet<String> sslProtocols,
- SortedSet<String> sslCipherSuites,
- boolean sslEncryption)
- throws ConfigException
+ public ReplSessionSecurity(final String sslCertNickname,
+ final SortedSet<String> sslProtocols,
+ final SortedSet<String> sslCipherSuites,
+ final boolean sslEncryption) throws ConfigException
{
if (sslProtocols == null || sslProtocols.size() == 0)
{
@@ -124,76 +151,46 @@
this.sslCertNickname = sslCertNickname;
}
- /**
- * Create a ReplSessionSecurity instance from a provided multimaster domain
- * configuration.
- *
- * @throws ConfigException If the supplied configuration was not valid.
- */
- public ReplSessionSecurity()
- throws ConfigException
- {
- // Currently use global settings from the crypto manager.
- this(DirectoryConfig.getCryptoManager().getSslCertNickname(),
- DirectoryConfig.getCryptoManager().getSslProtocols(),
- DirectoryConfig.getCryptoManager().getSslCipherSuites(),
- DirectoryConfig.getCryptoManager().isSslEncryption());
- }
- /**
- * Determine whether a given replication server is listening on a secure
- * port.
- * @param serverURL The replication server URL.
- * @return true if the given replication server is listening on a secure
- * port, or false if it is listening on a non-secure port.
- */
- private boolean isSecurePort(String serverURL)
- {
- // Always true unless changed for test purposes.
- return true;
- }
-
- /**
- * Determine whether sessions to a given replication server should be
- * encrypted.
- * @param serverURL The replication server URL.
- * @return true if sessions to the given replication server should be
- * encrypted, or false if they should not be encrypted.
- */
- public boolean isSslEncryption(String serverURL)
- {
- // Currently use global settings from the crypto manager.
- return sslEncryption;
- }
/**
* Create a new protocol session in the client role on the provided socket.
- * @param serverURL The remote replication server to which the socket is
- * connected.
- * @param socket The connected socket.
- * @param soTimeout The socket timeout option to use for the protocol session.
+ *
+ * @param serverURL
+ * The remote replication server to which the socket is connected.
+ * @param socket
+ * The connected socket.
+ * @param soTimeout
+ * The socket timeout option to use for the protocol session.
* @return The new protocol session.
- * @throws ConfigException If the protocol session could not be established
- * due to a configuration problem.
- * @throws IOException If the protocol session could not be established
- * for some other reason.
+ * @throws ConfigException
+ * If the protocol session could not be established due to a
+ * configuration problem.
+ * @throws IOException
+ * If the protocol session could not be established for some other
+ * reason.
*/
- public ProtocolSession createClientSession(String serverURL, Socket socket,
- int soTimeout)
- throws ConfigException, IOException
+ public ProtocolSession createClientSession(final String serverURL,
+ final Socket socket, final int soTimeout)
+ throws ConfigException, IOException
{
- boolean useSSL = isSecurePort(serverURL);
- if (useSSL)
+ boolean hasCompleted = false;
+ SSLSocket secureSocket = null;
+
+ try
{
// Create a new SSL context every time to make sure we pick up the
// latest contents of the trust store.
- CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
- SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
- SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ final CryptoManager cryptoManager = DirectoryConfig
+ .getCryptoManager();
+ final SSLContext sslContext = cryptoManager
+ .getSslContext(sslCertNickname);
+ final SSLSocketFactory sslSocketFactory = sslContext
+ .getSocketFactory();
- SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket,
- socket.getInetAddress().getHostName(),
- socket.getPort(), false);
+ secureSocket = (SSLSocket) sslSocketFactory.createSocket(
+ socket, socket.getInetAddress().getHostName(),
+ socket.getPort(), false);
secureSocket.setUseClientMode(true);
secureSocket.setSoTimeout(soTimeout);
@@ -209,39 +206,73 @@
// Force TLS negotiation now.
secureSocket.startHandshake();
-
+ hasCompleted = true;
return new TLSSocketSession(socket, secureSocket);
}
- else
+ finally
{
- return new SocketSession(socket);
+ if (!hasCompleted)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (final Exception ignored)
+ {
+ // Ignore.
+ }
+
+ if (secureSocket != null)
+ {
+ try
+ {
+ secureSocket.close();
+ }
+ catch (final Exception ignored)
+ {
+ // Ignore.
+ }
+ }
+ }
}
}
+
+
/**
* Create a new protocol session in the server role on the provided socket.
- * @param socket The connected socket.
+ *
+ * @param socket
+ * The connected socket.
+ * @param soTimeout
+ * The socket timeout option to use for the protocol session.
* @return The new protocol session.
- * @param soTimeout The socket timeout option to use for the protocol session.
- * @throws ConfigException If the protocol session could not be established
- * due to a configuration problem.
- * @throws IOException If the protocol session could not be established
- * for some other reason.
+ * @throws ConfigException
+ * If the protocol session could not be established due to a
+ * configuration problem.
+ * @throws IOException
+ * If the protocol session could not be established for some other
+ * reason.
*/
- public ProtocolSession createServerSession(Socket socket, int soTimeout)
- throws ConfigException, IOException
+ public ProtocolSession createServerSession(final Socket socket,
+ final int soTimeout) throws ConfigException, IOException
{
+ boolean hasCompleted = false;
+ SSLSocket secureSocket = null;
+
try
{
// Create a new SSL context every time to make sure we pick up the
// latest contents of the trust store.
- CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
- SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
- SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+ final CryptoManager cryptoManager = DirectoryConfig
+ .getCryptoManager();
+ final SSLContext sslContext = cryptoManager
+ .getSslContext(sslCertNickname);
+ final SSLSocketFactory sslSocketFactory = sslContext
+ .getSocketFactory();
- SSLSocket secureSocket = (SSLSocket)
- sslSocketFactory.createSocket(socket,
- socket.getInetAddress().getHostName(),
+ secureSocket = (SSLSocket) sslSocketFactory.createSocket(
+ socket, socket.getInetAddress().getHostName(),
socket.getPort(), false);
secureSocket.setUseClientMode(false);
secureSocket.setNeedClientAuth(true);
@@ -259,23 +290,63 @@
// Force TLS negotiation now.
secureSocket.startHandshake();
-
- // SSLSession sslSession = secureSocket.getSession();
- // System.out.println("Peer = " + sslSession.getPeerHost() + ":" +
- // sslSession.getPeerPort());
- // System.out.println("Principal = " + sslSession.getPeerPrincipal());
-
+ hasCompleted = true;
return new TLSSocketSession(socket, secureSocket);
- } catch (SSLException e)
+ }
+ catch (final SSLException e)
{
// This is probably a connection attempt from an unexpected client
// log that to warn the administrator.
- InetAddress remHost = socket.getInetAddress();
- Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost.
- getHostName(), remHost.getHostAddress(), e.getLocalizedMessage());
+ final InetAddress remHost = socket.getInetAddress();
+ final Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(
+ remHost.getHostName(), remHost.getHostAddress(),
+ e.getLocalizedMessage());
logError(message);
return null;
}
+ finally
+ {
+ if (!hasCompleted)
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (final Exception ignored)
+ {
+ // Ignore.
+ }
+
+ if (secureSocket != null)
+ {
+ try
+ {
+ secureSocket.close();
+ }
+ catch (final Exception ignored)
+ {
+ // Ignore.
+ }
+ }
+ }
+ }
+ }
+
+
+
+ /**
+ * Determine whether sessions to a given replication server should be
+ * encrypted.
+ *
+ * @param serverURL
+ * The replication server URL.
+ * @return true if sessions to the given replication server should be
+ * encrypted, or false if they should not be encrypted.
+ */
+ public boolean isSslEncryption(final String serverURL)
+ {
+ // Currently use global settings from the crypto manager.
+ return sslEncryption;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 9e2a936..7cf1a96 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@
*/
package org.opends.server.replication.protocol;
+
+
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+
/**
* This class Implement a protocol session using a basic socket and relying on
- * the innate encoding/decoding capabilities of the ReplicationMsg
- * by using the getBytes() and generateMsg() methods of those classes.
+ * the innate encoding/decoding capabilities of the ReplicationMsg by using the
+ * getBytes() and generateMsg() methods of those classes.
*/
-public class SocketSession implements ProtocolSession
+public final class SocketSession implements ProtocolSession
{
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private Socket socket;
- private InputStream input;
- private OutputStream output;
- byte[] rcvLengthBuf = new byte[8];
+ private final Socket socket;
+ private final InputStream input;
+ private final OutputStream output;
+ private final byte[] rcvLengthBuf = new byte[8];
/**
* The time the last message published to this session.
*/
private volatile long lastPublishTime = 0;
-
/**
* The time the last message was received on this session.
*/
- private long lastReceiveTime = 0;
+ private volatile long lastReceiveTime = 0;
+ // Close guarded by closeLock: use a different lock to publish since
+ // publishing can block, and we don't want to block while closing failed
+ // connections.
+ private final Object closeLock = new Object();
private boolean closeInitiated = false;
+ // Publish guarded by publishLock: use a full lock here so that we can
+ // optionally publish StopMsg during close.
+ private final Lock publishLock = new ReentrantLock();
+
+ // Does not need protecting: updated only during single threaded handshake.
private short protocolVersion = ProtocolVersion.getCurrentVersion();
+
+
/**
* Creates a new SocketSession based on the provided socket.
*
- * @param socket The Socket on which the SocketSession will be based.
- * @throws IOException When an IException happens on the socket.
+ * @param socket
+ * The Socket on which the SocketSession will be based.
+ * @throws IOException
+ * When an IException happens on the socket.
*/
- public SocketSession(Socket socket) throws IOException
+ public SocketSession(final Socket socket) throws IOException
{
- this.socket = socket;
- /*
- * Use a window instead of the TCP flow control.
- * Therefore set a very large value for send and receive buffer sizes.
- */
- input = socket.getInputStream();
- output = socket.getOutputStream();
- }
-
- /**
- * {@inheritDoc}
- */
- public void close() throws IOException
- {
- closeInitiated = true;
-
if (debugEnabled())
{
- TRACER.debugInfo("Closing SocketSession."
- + stackTraceToSingleLineString(new Exception()));
+ TRACER.debugInfo("Creating SocketSession to %s from %s", socket
+ .getRemoteSocketAddress().toString(),
+ stackTraceToSingleLineString(new Exception()));
}
- socket.close();
+
+ this.socket = socket;
+ this.input = socket.getInputStream();
+ this.output = socket.getOutputStream();
}
+
+
/**
* {@inheritDoc}
*/
- public synchronized void publish(ReplicationMsg msg)
- throws IOException
+ @Override
+ public void close()
+ {
+ synchronized (closeLock)
+ {
+ if (closeInitiated)
+ {
+ return;
+ }
+
+ closeInitiated = true;
+ }
+
+ // Perform close outside of critical section.
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("Closing SocketSession to %s from %s", socket
+ .getRemoteSocketAddress().toString(),
+ stackTraceToSingleLineString(new Exception()));
+ }
+
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ // V4 protocol introduces a StopMsg to properly end communications.
+ if (publishLock.tryLock())
+ {
+ try
+ {
+ publish(new StopMsg());
+ }
+ catch (final IOException ignored)
+ {
+ // Ignore errors on close.
+ }
+ finally
+ {
+ publishLock.unlock();
+ }
+ }
+ }
+
+ if (socket != null && !socket.isClosed())
+ {
+ try
+ {
+ socket.close();
+ }
+ catch (final IOException ignored)
+ {
+ // Ignore errors on close.
+ }
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean closeInitiated()
+ {
+ synchronized (closeLock)
+ {
+ return closeInitiated;
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getLastPublishTime()
+ {
+ return lastPublishTime;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public long getLastReceiveTime()
+ {
+ if (lastReceiveTime == 0)
+ {
+ return System.currentTimeMillis();
+ }
+ return lastReceiveTime;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getReadableRemoteAddress()
+ {
+ return socket.getRemoteSocketAddress().toString();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public String getRemoteAddress()
+ {
+ return socket.getInetAddress().getHostAddress();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isEncrypted()
+ {
+ return false;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void publish(final ReplicationMsg msg) throws IOException
{
publish(msg, ProtocolVersion.getCurrentVersion());
}
+
+
/**
* {@inheritDoc}
*/
- public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
- throws IOException
+ @Override
+ public void publish(final ReplicationMsg msg,
+ final short reqProtocolVersion) throws IOException
{
- byte[] buffer = msg.getBytes(reqProtocolVersion);
- String str = String.format("%08x", buffer.length);
+ final byte[] buffer = msg.getBytes(reqProtocolVersion);
+ final String str = String.format("%08x", buffer.length);
+ final byte[] sendLengthBuf = str.getBytes();
- if (debugEnabled())
+ publishLock.lock();
+ try
{
- TRACER.debugInfo("SocketSession publish <" + str + ">");
+ output.write(sendLengthBuf);
+ output.write(buffer);
+ output.flush();
}
-
- byte[] sendLengthBuf = str.getBytes();
-
- output.write(sendLengthBuf);
- output.write(buffer);
- output.flush();
+ finally
+ {
+ publishLock.unlock();
+ }
lastPublishTime = System.currentTimeMillis();
}
+
+
/**
* {@inheritDoc}
*/
+ @Override
public ReplicationMsg receive() throws IOException,
ClassNotFoundException, DataFormatException,
NotSupportedOldVersionPDUException
{
- /* Read the first 8 bytes containing the packet length */
+ // Read the first 8 bytes containing the packet length
int length = 0;
- /* Let's start the stop-watch before waiting on read */
- /* for the heartbeat check to be operationnal */
+ // Let's start the stop-watch before waiting on read for the heartbeat check
+ // to be operational
lastReceiveTime = System.currentTimeMillis();
- while (length<8)
+ while (length < 8)
{
- int read = input.read(rcvLengthBuf, length, 8-length);
+ final int read = input.read(rcvLengthBuf, length, 8 - length);
if (read == -1)
{
- lastReceiveTime=0;
+ lastReceiveTime = 0;
throw new IOException("no more data");
}
else
@@ -164,101 +312,59 @@
}
}
- int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
+ final int totalLength = Integer.parseInt(
+ new String(rcvLengthBuf), 16);
try
{
length = 0;
- byte[] buffer = new byte[totalLength];
+ final byte[] buffer = new byte[totalLength];
while (length < totalLength)
{
length += input.read(buffer, length, totalLength - length);
}
- /* We do not want the heartbeat to close the session when */
- /* we are processing a message even a time consuming one. */
- lastReceiveTime=0;
+ // We do not want the heartbeat to close the session when we are
+ // processing a message even a time consuming one.
+ lastReceiveTime = 0;
return ReplicationMsg.generateMsg(buffer, protocolVersion);
}
- catch (OutOfMemoryError e)
+ catch (final OutOfMemoryError e)
{
throw new IOException("Packet too large, can't allocate "
- + totalLength + " bytes.");
+ + totalLength + " bytes.");
}
}
- /**
- * {@inheritDoc}
- */
- public void stopEncryption()
- {
- // There is no security layer.
- }
+
/**
* {@inheritDoc}
*/
- public boolean isEncrypted()
+ @Override
+ public void setProtocolVersion(final short version)
{
- return false;
+ protocolVersion = version;
}
+
+
/**
* {@inheritDoc}
*/
- public long getLastPublishTime()
- {
- return lastPublishTime;
- }
-
- /**
- * {@inheritDoc}
- */
- public long getLastReceiveTime()
- {
- if (lastReceiveTime==0)
- {
- return System.currentTimeMillis();
- }
- return lastReceiveTime;
- }
-
- /**
- * {@inheritDoc}
- */
- public String getRemoteAddress()
- {
- return socket.getInetAddress().getHostAddress();
- }
-
- /**
- * {@inheritDoc}
- */
- public String getReadableRemoteAddress()
- {
- return socket.getRemoteSocketAddress().toString();
- }
-
- /**
- * {@inheritDoc}
- */
- public void setSoTimeout(int timeout) throws SocketException
+ @Override
+ public void setSoTimeout(final int timeout) throws SocketException
{
socket.setSoTimeout(timeout);
}
- /**
- * {@inheritDoc}
- */
- public boolean closeInitiated()
- {
- return closeInitiated;
- }
+
/**
* {@inheritDoc}
*/
- public void setProtocolVersion(short version)
+ @Override
+ public void stopEncryption()
{
- protocolVersion = version;
+ // There is no security layer.
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index adf16b2..8a54bc2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -27,6 +27,8 @@
*/
package org.opends.server.replication.protocol;
+
+
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,241 +38,430 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
+import javax.net.ssl.SSLSocket;
+
import org.opends.server.loggers.debug.DebugTracer;
-import javax.net.ssl.SSLSocket;
+
/**
* This class implements a protocol session using TLS.
*/
-public class TLSSocketSession implements ProtocolSession
+public final class TLSSocketSession implements ProtocolSession
{
/**
* The tracer object for the debug logger.
*/
private static final DebugTracer TRACER = getTracer();
- private Socket plainSocket;
- private SSLSocket secureSocket;
- private InputStream input;
- private OutputStream output;
- private InputStream plainInput;
- private OutputStream plainOutput;
- byte[] rcvLengthBuf = new byte[8];
+ private final Socket plainSocket;
+ private final SSLSocket secureSocket;
+ private final InputStream plainInput;
+ private final OutputStream plainOutput;
+ private final byte[] rcvLengthBuf = new byte[8];
/**
* The time the last message published to this session.
*/
private volatile long lastPublishTime = 0;
-
/**
* The time the last message was received on this session.
*/
- private long lastReceiveTime = 0;
+ private volatile long lastReceiveTime = 0;
+ // Close and error guarded by stateLock: use a different lock to publish since
+ // publishing can block, and we don't want to block while closing failed
+ // connections.
+ private final Object stateLock = new Object();
private boolean closeInitiated = false;
+ private Throwable sessionError = null;
+ // Publish guarded by publishLock: use a full lock here so that we can
+ // optionally publish StopMsg during close.
+ private final Lock publishLock = new ReentrantLock();
+
+ // Does not need protecting: updated only during single threaded handshake.
private short protocolVersion = ProtocolVersion.getCurrentVersion();
+ private InputStream input;
+ private OutputStream output;
+
+
/**
* Creates a new TLSSocketSession.
*
- * @param socket The regular Socket on which the SocketSession will be
- * based.
- * @param secureSocket The secure Socket on which the SocketSession will be
- * based.
- * @throws IOException When an IException happens on the socket.
+ * @param socket
+ * The regular Socket on which the SocketSession will be based.
+ * @param secureSocket
+ * The secure Socket on which the SocketSession will be based.
+ * @throws IOException
+ * When an IException happens on the socket.
*/
- public TLSSocketSession(Socket socket, SSLSocket secureSocket)
- throws IOException
+ public TLSSocketSession(final Socket socket,
+ final SSLSocket secureSocket) throws IOException
{
- plainSocket = socket;
- this.secureSocket = secureSocket;
- plainInput = plainSocket.getInputStream();
- plainOutput = plainSocket.getOutputStream();
- input = secureSocket.getInputStream();
- output = secureSocket.getOutputStream();
- }
-
-
- /**
- * {@inheritDoc}
- */
- public void close() throws IOException
- {
- closeInitiated = true;
if (debugEnabled())
{
- TRACER.debugInfo("Closing SocketSession." +
- stackTraceToSingleLineString(new Exception("Stack:")));
+ TRACER.debugInfo(
+ "Creating TLSSocketSession from %s to %s in %s",
+ socket.getLocalSocketAddress(),
+ socket.getRemoteSocketAddress(),
+ stackTraceToSingleLineString(new Exception()));
}
- if (plainSocket != null && !plainSocket.isClosed())
- {
- plainInput.close();
- plainOutput.close();
- plainSocket.close();
- }
- if (secureSocket != null && !secureSocket.isClosed())
- {
- input.close();
- output.close();
- secureSocket.close();
- }
+
+ this.plainSocket = socket;
+ this.secureSocket = secureSocket;
+ this.plainInput = plainSocket.getInputStream();
+ this.plainOutput = plainSocket.getOutputStream();
+ this.input = secureSocket.getInputStream();
+ this.output = secureSocket.getOutputStream();
}
+
+
/**
* {@inheritDoc}
*/
- public synchronized void publish(ReplicationMsg msg)
- throws IOException
+ @Override
+ public void close()
{
- publish(msg, ProtocolVersion.getCurrentVersion());
- }
+ Throwable localSessionError;
- /**
- * {@inheritDoc}
- */
- public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
- throws IOException
- {
- byte[] buffer = msg.getBytes(reqProtocolVersion);
- String str = String.format("%08x", buffer.length);
- byte[] sendLengthBuf = str.getBytes();
-
- output.write(sendLengthBuf);
- output.write(buffer);
- output.flush();
-
- lastPublishTime = System.currentTimeMillis();
- }
-
- /**
- * {@inheritDoc}
- */
- public ReplicationMsg receive() throws IOException,
- ClassNotFoundException, DataFormatException,
- NotSupportedOldVersionPDUException
- {
- /* Read the first 8 bytes containing the packet length */
- int length = 0;
-
- /* Let's start the stop-watch before waiting on read */
- /* for the heartbeat check to be operationnal */
- lastReceiveTime = System.currentTimeMillis();
-
- while (length<8)
+ synchronized (stateLock)
{
- int read = input.read(rcvLengthBuf, length, 8-length);
- if (read == -1)
+ if (closeInitiated)
{
- lastReceiveTime=0;
- throw new IOException("no more data");
+ return;
+ }
+
+ localSessionError = sessionError;
+ closeInitiated = true;
+ }
+
+ // Perform close outside of critical section.
+ if (debugEnabled())
+ {
+ if (localSessionError == null)
+ {
+ TRACER.debugInfo(
+ "Closing TLSSocketSession from %s to %s in %s",
+ plainSocket.getLocalSocketAddress(),
+ plainSocket.getRemoteSocketAddress(),
+ stackTraceToSingleLineString(new Exception()));
}
else
{
- length += read;
+ TRACER.debugInfo(
+ "Aborting TLSSocketSession from %s to %s in %s due to the "
+ + "following error: %s",
+ plainSocket.getLocalSocketAddress(),
+ plainSocket.getRemoteSocketAddress(),
+ stackTraceToSingleLineString(new Exception()),
+ stackTraceToSingleLineString(localSessionError));
}
}
- int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
+ // V4 protocol introduces a StopMsg to properly end communications.
+ if (localSessionError == null)
+ {
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ if (publishLock.tryLock())
+ {
+ try
+ {
+ publish(new StopMsg());
+ }
+ catch (final IOException ignored)
+ {
+ // Ignore errors on close.
+ }
+ finally
+ {
+ publishLock.unlock();
+ }
+ }
+ }
+ }
try
{
- length = 0;
- byte[] buffer = new byte[totalLength];
- while (length < totalLength)
- {
- length += input.read(buffer, length, totalLength - length);
- }
- /* We do not want the heartbeat to close the session when */
- /* we are processing a message even a time consuming one. */
- lastReceiveTime=0;
- return ReplicationMsg.generateMsg(buffer, protocolVersion);
+ plainSocket.close();
}
- catch (OutOfMemoryError e)
+ catch (final IOException ignored)
{
- throw new IOException("Packet too large, can't allocate "
- + totalLength + " bytes.");
+ // Ignore errors on close.
+ }
+
+ try
+ {
+ secureSocket.close();
+ }
+ catch (final IOException ignored)
+ {
+ // Ignore errors on close.
}
}
- /**
- * {@inheritDoc}
- */
- public void stopEncryption()
- {
- input = plainInput;
- output = plainOutput;
- }
+
/**
* {@inheritDoc}
*/
- public boolean isEncrypted()
+ @Override
+ public boolean closeInitiated()
{
- return !(input == plainInput);
+ synchronized (stateLock)
+ {
+ return closeInitiated;
+ }
}
+
+
/**
* {@inheritDoc}
*/
+ @Override
public long getLastPublishTime()
{
return lastPublishTime;
}
+
+
/**
* {@inheritDoc}
*/
+ @Override
public long getLastReceiveTime()
{
- if (lastReceiveTime==0)
+ if (lastReceiveTime == 0)
{
return System.currentTimeMillis();
}
return lastReceiveTime;
}
- /**
- * {@inheritDoc}
- */
- public String getRemoteAddress()
- {
- return plainSocket.getInetAddress().getHostAddress();
- }
+
/**
* {@inheritDoc}
*/
+ @Override
public String getReadableRemoteAddress()
{
return plainSocket.getRemoteSocketAddress().toString();
}
+
+
/**
* {@inheritDoc}
*/
- public void setSoTimeout(int timeout) throws SocketException
+ @Override
+ public String getRemoteAddress()
+ {
+ return plainSocket.getInetAddress().getHostAddress();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isEncrypted()
+ {
+ return input != plainInput;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void publish(final ReplicationMsg msg) throws IOException
+ {
+ publish(msg, ProtocolVersion.getCurrentVersion());
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void publish(final ReplicationMsg msg,
+ final short reqProtocolVersion) throws IOException
+ {
+ final byte[] buffer = msg.getBytes(reqProtocolVersion);
+ final String str = String.format("%08x", buffer.length);
+ final byte[] sendLengthBuf = str.getBytes();
+
+ publishLock.lock();
+ try
+ {
+ output.write(sendLengthBuf);
+ output.write(buffer);
+ output.flush();
+ }
+ catch (final IOException e)
+ {
+ setSessionError(e);
+ throw e;
+ }
+ finally
+ {
+ publishLock.unlock();
+ }
+
+ lastPublishTime = System.currentTimeMillis();
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ReplicationMsg receive() throws IOException,
+ DataFormatException, NotSupportedOldVersionPDUException
+ {
+ try
+ {
+ // Read the first 8 bytes containing the packet length.
+ int length = 0;
+
+ // Let's start the stop-watch before waiting on read for the heartbeat
+ // check
+ // to be operational.
+ lastReceiveTime = System.currentTimeMillis();
+
+ while (length < 8)
+ {
+ final int read = input.read(rcvLengthBuf, length, 8 - length);
+ if (read == -1)
+ {
+ lastReceiveTime = 0;
+ throw new IOException("no more data");
+ }
+ else
+ {
+ length += read;
+ }
+ }
+
+ final int totalLength = Integer.parseInt(new String(
+ rcvLengthBuf), 16);
+
+ try
+ {
+ length = 0;
+ final byte[] buffer = new byte[totalLength];
+ while (length < totalLength)
+ {
+ final int read = input.read(buffer, length, totalLength
+ - length);
+ if (read == -1)
+ {
+ lastReceiveTime = 0;
+ throw new IOException("no more data");
+ }
+ else
+ {
+ length += read;
+ }
+ }
+ // We do not want the heartbeat to close the session when we are
+ // processing a message even a time consuming one.
+ lastReceiveTime = 0;
+ return ReplicationMsg.generateMsg(buffer, protocolVersion);
+ }
+ catch (final OutOfMemoryError e)
+ {
+ throw new IOException("Packet too large, can't allocate "
+ + totalLength + " bytes.");
+ }
+ }
+ catch (final IOException e)
+ {
+ setSessionError(e);
+ throw e;
+ }
+ catch (final DataFormatException e)
+ {
+ setSessionError(e);
+ throw e;
+ }
+ catch (final NotSupportedOldVersionPDUException e)
+ {
+ setSessionError(e);
+ throw e;
+ }
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setProtocolVersion(final short version)
+ {
+ protocolVersion = version;
+ }
+
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void setSoTimeout(final int timeout) throws SocketException
{
plainSocket.setSoTimeout(timeout);
}
- /**
- * {@inheritDoc}
- */
- public boolean closeInitiated()
- {
- return closeInitiated;
- }
+
/**
* {@inheritDoc}
*/
- public void setProtocolVersion(short version)
+ @Override
+ public void stopEncryption()
{
- protocolVersion = version;
+ // The secure socket has been configured not to auto close the underlying
+ // plain socket.
+ try
+ {
+ secureSocket.close();
+ }
+ catch (IOException ignored)
+ {
+ // Ignore.
+ }
+
+ input = plainInput;
+ output = plainOutput;
+ }
+
+
+
+ private void setSessionError(final Exception e)
+ {
+ synchronized (stateLock)
+ {
+ if (sessionError == null)
+ {
+ sessionError = e;
+ }
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index b70fd4a..fa113b4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -129,12 +129,16 @@
}
catch (LockConflictException e)
{
- if (txn != null)
- txn.abort();
- txn = null;
+ // Try again.
}
finally
{
+ if (txn != null)
+ {
+ // No effect if txn has committed.
+ txn.abort();
+ txn = null;
+ }
dbCloseLock.readLock().unlock();
}
}
@@ -145,10 +149,6 @@
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
logError(mb.toMessage());
- if (txn != null)
- {
- txn.abort();
- }
replicationServer.shutdown();
}
}
@@ -158,16 +158,6 @@
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shuting down.
- }
- }
replicationServer.shutdown();
}
catch (UnsupportedEncodingException e)
@@ -177,17 +167,6 @@
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
replicationServer.shutdown();
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shuting down.
- }
- }
- replicationServer.shutdown();
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index 4b71c95..567d289 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -386,9 +386,10 @@
else
{
ServerState startState = domain.getStartState();
- // We don't use the endState but it's updating CN as reading
- ServerState endState =
- domain.getEligibleState(crossDomainEligibleCN, false);
+
+ // We don't use the returned endState but it's updating CN as
+ // reading
+ domain.getEligibleState(crossDomainEligibleCN, false);
ChangeNumber fcn = startState.getMaxChangeNumber(
cn.getServerId());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index b111327..4c53060 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -23,7 +23,7 @@
*
*
* Copyright 2009-2010 Sun Microsystems, Inc.
- * Portions Copyright 2010 ForgeRock AS
+ * Portions Copyright 2010-2011 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -169,6 +169,7 @@
}
/**
* Provide a string representation of this object for debug purpose..
+ * @param buffer Append to this buffer.
*/
public void toString(StringBuilder buffer)
{
@@ -1550,9 +1551,10 @@
// INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
searchPhase = PERSISTENT_PHASE;
- if (writer ==null)
+ final ProtocolSession localSession = session;
+ if (writer ==null && localSession != null)
{
- writer = new ECLServerWriter(session,this,replicationServerDomain);
+ writer = new ECLServerWriter(localSession,this,replicationServerDomain);
writer.start(); // start suspended
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 4f7aaaf..62f8260 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -186,13 +186,7 @@
{
if (session!=null)
{
- try
- {
- session.close();
- } catch (IOException e)
- {
- // Can't do much more : ignore
- }
+ session.close();
}
if (replicationServerDomain!=null)
replicationServerDomain.stopServer(handler, false);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 1fe4afa..4d631d0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2007-2009 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
import static org.opends.messages.BackendMessages.*;
@@ -36,12 +37,10 @@
import org.opends.server.replication.protocol.LDAPUpdateMsg;
-import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
-import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -1479,19 +1478,7 @@
return writer;
}
- /**
- * Close the writer and get a string reader for the LDIF content.
- *
- * @return Returns the string contents of the writer.
- * @throws Exception
- * If an error occurred closing the writer.
- */
- public BufferedReader getLDIFBufferedReader() throws Exception {
- writer.close();
- String ldif = stream.toString("UTF-8");
- StringReader reader = new StringReader(ldif);
- return new BufferedReader(reader);
- }
+
/**
* Close the writer and get an LDIF reader for the LDIF content.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 8c5ffb3..7330fac 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -252,12 +252,17 @@
}
catch (LockConflictException e)
{
- if (txn != null)
- txn.abort();
- txn = null;
+ // Try again.
}
finally
{
+ if (txn != null)
+ {
+ // No effect if txn has committed.
+ txn.abort();
+ txn = null;
+ }
+
dbCloseLock.readLock().unlock();
}
}
@@ -268,10 +273,6 @@
MessageBuilder mb = new MessageBuilder();
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
logError(mb.toMessage());
- if (txn != null)
- {
- txn.abort();
- }
replicationServer.shutdown();
}
}
@@ -281,16 +282,6 @@
mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shuting down.
- }
- }
replicationServer.shutdown();
}
catch (UnsupportedEncodingException e)
@@ -300,17 +291,6 @@
mb.append(stackTraceToSingleLineString(e));
logError(mb.toMessage());
replicationServer.shutdown();
- if (txn != null)
- {
- try
- {
- txn.abort();
- } catch (DatabaseException e1)
- {
- // can't do much more. The ReplicationServer is shuting down.
- }
- }
- replicationServer.shutdown();
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 631d578..4c3f132 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -63,7 +63,6 @@
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
-import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
@@ -107,17 +106,14 @@
providedMsg.toString());
logError(providedMsg);
}
- try
+
+ if (providedSession != null)
{
- if (providedSession != null)
- // This method is only called when aborting a failing handshake and
- // not StopMsg should be sent in such situation. StopMsg are only
- // expected when full handshake has been performed, or at end of
- // handshake phase 1, when DS was just gathering available RS info
- providedSession.close();
- } catch (IOException e)
- {
- // ignore
+ // This method is only called when aborting a failing handshake and
+ // not StopMsg should be sent in such situation. StopMsg are only
+ // expected when full handshake has been performed, or at end of
+ // handshake phase 1, when DS was just gathering available RS info
+ providedSession.close();
}
}
@@ -286,9 +282,10 @@
// We did not recognize the message, close session as what
// can happen after is undetermined and we do not want the server to
// be disturbed
- if (session!=null)
+ ProtocolSession localSession = session;
+ if (localSession != null)
{
- closeSession(session, reason, this);
+ closeSession(localSession, reason, this);
}
if ((replicationServerDomain != null) &&
@@ -1101,26 +1098,7 @@
if (session != null)
{
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end
- // communications
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- // Close session to end ServerReader or ServerWriter
- try
- {
- session.close();
- } catch (IOException e)
- {
- // ignore.
- }
+ session.close();
}
/*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index f7b8356..7517b25 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -341,32 +341,15 @@
finally
{
/*
- * The thread only exits the loop above if some error condition
- * happen.
+ * The thread only exits the loop above if some error condition happen.
* Attempt to close the socket and stop the server handler.
*/
- try
+ if (debugEnabled())
{
- if (handler.getProtocolVersion() >=
- ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end
- // communications
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- if (debugEnabled())
- TRACER.debugInfo("In " + this.getName() + " closing the session");
- session.close();
- } catch (IOException e)
- {
- // ignore
+ TRACER.debugInfo("In " + this.getName()
+ + " closing the session");
}
+ session.close();
handler.doStop();
if (debugEnabled())
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 4eee050..6457b6a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -33,8 +33,6 @@
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.messages.ReplicationMessages.*;
-
-import java.io.IOException;
import java.net.SocketException;
import java.util.NoSuchElementException;
@@ -42,8 +40,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -229,25 +225,7 @@
logError(errMessage);
}
finally {
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end
- // communications
- try
- {
- session.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- try
- {
- session.close();
- } catch (IOException e)
- {
- // Can't do much more : ignore
- }
+ session.close();
replicationServerDomain.stopServer(handler, false);
if (debugEnabled())
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index f9335fb..22148e5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.ServerConstants;
import org.opends.server.replication.server.ReplicationServer;
@@ -673,6 +674,7 @@
this.weight = rsInfo.getWeight();
this.connectedDSs = connectedDSs;
this.connectedDSNumber = connectedDSs.size();
+ this.serverState = new ServerState();
}
/**
@@ -1006,15 +1008,10 @@
{
if (connected == false)
{
- if (session != null)
+ ProtocolSession localSession = session;
+ if (localSession != null)
{
- try
- {
- session.close();
- } catch (IOException e)
- {
- // The session was already closed, just ignore.
- }
+ localSession.close();
session = null;
}
}
@@ -1287,30 +1284,11 @@
{
if (localSession != null)
{
- if (debugEnabled())
+ if (debugEnabled()) {
debugInfo("In RB, closing session after phase 1");
+ }
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end communications
- if (!error)
- {
- try
- {
- localSession.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- }
- try
- {
- localSession.close();
- } catch (IOException e)
- {
- // The session was already closed, just ignore.
- }
+ localSession.close();
localSession = null;
}
if (error)
@@ -1459,27 +1437,10 @@
{
if (localSession != null)
{
- if (debugEnabled())
+ if (debugEnabled()) {
debugInfo("In RB, closing session after phase 1");
-
- // V4 protocol introduces a StopMsg to properly end communications
- if (!error)
- {
- try
- {
- localSession.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
}
- try
- {
- localSession.close();
- } catch (IOException e)
- {
- // The session was already closed, just ignore.
- }
+ localSession.close();
localSession = null;
}
if (error)
@@ -1545,13 +1506,7 @@
if (session != null)
{
- try
- {
- session.close();
- } catch (IOException ex)
- {
- // The session was already closed, just ignore.
- }
+ session.close();
session = null;
}
// Be sure to return null.
@@ -1625,13 +1580,7 @@
if (session != null)
{
- try
- {
- session.close();
- } catch (IOException ex)
- {
- // The session was already closed, just ignore.
- }
+ session.close();
session = null;
}
// Be sure to return null.
@@ -2255,8 +2204,7 @@
heartbeatMonitor = new HeartbeatMonitor(
threadName,
session,
- heartbeatInterval,
- (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4));
+ heartbeatInterval);
heartbeatMonitor.start();
}
}
@@ -2293,24 +2241,7 @@
if (failingSession != null)
{
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end communications
- try
- {
- failingSession.publish(new StopMsg());
- } catch (IOException ioe)
- {
- // Anyway, going to close session, so nothing to do
- }
- }
- try
- {
- failingSession.close();
- } catch (IOException e1)
- {
- // ignore
- }
+ failingSession.close();
numLostConnections++;
}
@@ -2689,6 +2620,11 @@
throw e;
} catch (Exception e)
{
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+
if (shutdown == false)
{
if ((session == null) || (!session.closeInitiated()))
@@ -2790,18 +2726,9 @@
rsGroupId = (byte) -1;
rsServerId = -1;
rsServerUrl = null;
-
- try
+ if (session != null)
{
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // V4 protocol introduces a StopMsg to properly end communications
- session.publish(new StopMsg());
- }
session.close();
- } catch (Exception e)
- {
- // Anyway, going to close session, so nothing to do
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 85ad615..678cd4f 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -23,6 +23,7 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.plugin;
@@ -440,15 +441,9 @@
/*
* Shutdown any current client handling code
*/
- try
+ if (session != null)
{
- if (session != null)
- {
- session.close();
- }
- } catch (IOException e)
- {
- // ignore.
+ session.close();
}
try
@@ -567,13 +562,7 @@
// Handle DS connexion
if (!performHandshake())
{
- try
- {
- session.close();
- } catch (IOException e)
- {
- // nothing to do
- }
+ session.close();
return;
}
// If we come here, assured parameters sent by DS are as expected and
@@ -731,7 +720,7 @@
*/
private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
{
- assertEquals(updateMsg.isAssured(), isAssured,
+ assertEquals(updateMsg.isAssured(), isAssured,
"msg=" + ((updateMsg instanceof AddMsg)?
((AddMsg)updateMsg).getDn():updateMsg.getChangeNumber()));
if (isAssured)
@@ -1052,7 +1041,7 @@
String entry = "dn: ou=assured-sr-timeout-entry" + rsGroupId + "," + NOT_ASSURED_DN + "\n" +
"objectClass: top\n" +
"objectClass: organizationalUnit\n";
- addEntry(TestCaseUtils.entryFromLdifString(entry));
+ addEntry(TestCaseUtils.entryFromLdifString(entry));
}
long endTime = System.currentTimeMillis();
@@ -1919,8 +1908,8 @@
sleep(50);
ii++;
if (ii>10)
- assertEquals(operation.getResultCode(), expectedResult,
- operation.getErrorMessage().toString());
+ assertEquals(operation.getResultCode(), expectedResult,
+ operation.getErrorMessage().toString());
}
}
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index cc9bb17..25235ae 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -23,10 +23,10 @@
*
*
* Copyright 2008-2010 Sun Microsystems, Inc.
+ * Portions copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.server;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
@@ -1123,15 +1123,9 @@
/*
* Shutdown any current client handling code
*/
- try
+ if (session != null)
{
- if (session != null)
- {
- session.close();
- }
- } catch (IOException e)
- {
- // ignore.
+ session.close();
}
try
--
Gitblit v1.10.0