From 80774bcd0c732d9446cfc09fc9b7c39a3e4003ad Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 23 Mar 2011 22:27:01 +0000
Subject: [PATCH] Fix issue OpenDJ-95: Socket leak and constant disconnect/reconnect when a directory server can no longer reach its connected replication server

---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |  113 ---
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java                                    |    5 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |   12 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java                                   |  451 ++++++++++++-----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                        |   44 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                         |   27 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java                                   |   17 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java                                      |  334 ++++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java                                        |   36 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java                                         |   24 
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java                           |    5 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java                                   |   29 -
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java                                |  307 +++++++----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java                                            |   35 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java                                     |    7 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |   27 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                                     |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java                                      |    8 
 18 files changed, 811 insertions(+), 678 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
index 1f43a02..21b4132 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/FractionalLDIFImportPlugin.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
@@ -91,8 +92,8 @@
 
     /**
      * Constructor.
-     * @param fractionalConfig
-     * @param domain
+     * @param fractionalConfig The fractional configuration.
+     * @param domain The replication domain.
      */
     public ImportFractionalContext(FractionalConfig fractionalConfig,
       LDAPReplicationDomain domain)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
index 5a0688b..854a7b2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatMonitor.java
@@ -35,8 +35,6 @@
 
 import org.opends.server.loggers.debug.DebugTracer;
 
-import java.io.IOException;
-
 import org.opends.server.api.DirectoryThread;
 
 /**
@@ -71,27 +69,18 @@
   private volatile boolean shutdown = false;
 
   /**
-   * Send StopMsg before session closure or not.
-   */
-  private final boolean sendStopBeforeClose;
-
-
-  /**
    * Create a heartbeat monitor thread.
    * @param threadName The name of the heartbeat thread.
    * @param session The session on which heartbeats are to be monitored.
    * @param heartbeatInterval The expected interval between heartbeats received
    * (in milliseconds).
-   * @param sendStopBeforeClose Should we send a StopMsg before closing the
-   *        session ?
    */
   public HeartbeatMonitor(String threadName, ProtocolSession session,
-                          long heartbeatInterval, boolean sendStopBeforeClose)
+                          long heartbeatInterval)
   {
     super(threadName);
     this.session = session;
     this.heartbeatInterval = heartbeatInterval;
-    this.sendStopBeforeClose = sendStopBeforeClose;
   }
 
   /**
@@ -126,18 +115,6 @@
           {
             // Heartbeat is well overdue so the server is assumed to be dead.
             logError(NOTE_HEARTBEAT_FAILURE.get(currentThread().getName()));
-            if (sendStopBeforeClose)
-            {
-              // V4 protocol introduces a StopMsg to properly end communications
-              try
-              {
-                session.publish(new StopMsg());
-              }
-              catch (IOException ioe)
-              {
-                // Anyway, going to close session, so nothing to do
-              }
-            }
             session.close();
             break;
           }
@@ -160,10 +137,6 @@
         }
       }
     }
-    catch (IOException e)
-    {
-      // Hope that's OK.
-    }
     finally
     {
       if (debugEnabled())
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
index ee31cb6..e356005 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.protocol;
 
@@ -44,10 +45,8 @@
   /**
    * This method is called when the session with the remote must be closed.
    * This object won't be used anymore after this method is called.
-   *
-   * @throws IOException If an error happen during the close process.
    */
-  public abstract void close() throws IOException;
+  public abstract void close();
 
   /**
    * This method is called when a ReplicationMsg must be sent to
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
index fdfe13a..204b776 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -28,77 +28,104 @@
 
 package org.opends.server.replication.protocol;
 
-import static org.opends.server.loggers.ErrorLogger.logError;
+
+
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
 
-import org.opends.messages.Message;
-import org.opends.server.types.DirectoryConfig;
-import org.opends.server.types.CryptoManager;
-import org.opends.server.config.ConfigException;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.util.SortedSet;
 
+import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLException;
 import javax.net.ssl.SSLSocket;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLSocketFactory;
-import java.util.SortedSet;
-import java.net.Socket;
-import java.net.InetAddress;
-import java.io.IOException;
+
+import org.opends.messages.Message;
+import org.opends.server.config.ConfigException;
+import org.opends.server.types.CryptoManager;
+import org.opends.server.types.DirectoryConfig;
+
+
 
 /**
  * This class represents the security configuration for replication protocol
  * sessions. It contains all the configuration required to use SSL, and it
  * determines whether encryption should be enabled for a session to a given
  * replication server.
- *
  */
-public class ReplSessionSecurity
+public final class ReplSessionSecurity
 {
   /**
    * Whether replication sessions use SSL encryption.
    */
-  private boolean sslEncryption;
+  private final boolean sslEncryption;
 
   /**
    * The name of the local certificate to use, or null if none is specified.
    */
-  private String sslCertNickname;
+  private final String sslCertNickname;
 
   /**
    * The set of enabled SSL protocols, or null for the default set.
    */
-  private String sslProtocols[];
+  private final String sslProtocols[];
 
   /**
    * The set of enabled SSL cipher suites, or null for the default set.
    */
-  private String sslCipherSuites[];
+  private final String sslCipherSuites[];
 
   /**
-   * The default soTimeout value to be used at handshake phases.
-   * (DS<->RS and RS<->RS)
+   * The default soTimeout value to be used at handshake phases. (DS<->RS and
+   * RS<->RS)
    */
   public static final int HANDSHAKE_TIMEOUT = 4000;
 
+
+
+  /**
+   * Create a ReplSessionSecurity instance from a provided multimaster domain
+   * configuration.
+   *
+   * @throws ConfigException
+   *           If the supplied configuration was not valid.
+   */
+  public ReplSessionSecurity() throws ConfigException
+  {
+    // Currently use global settings from the crypto manager.
+    this(DirectoryConfig.getCryptoManager().getSslCertNickname(),
+        DirectoryConfig.getCryptoManager().getSslProtocols(),
+        DirectoryConfig.getCryptoManager().getSslCipherSuites(),
+        DirectoryConfig.getCryptoManager().isSslEncryption());
+  }
+
+
+
   /**
    * Create a ReplSessionSecurity instance from the supplied configuration
    * values.
    *
-   * @param sslCertNickname The name of the local certificate to use, or null
-   *                        if none is specified.
-   * @param sslProtocols    The protocols that should be enabled, or null if
-   *                        the default protocols should be used.
-   * @param sslCipherSuites The cipher suites that should be enabled, or null
-   *                        if the default cipher suites should be used.
-   * @param sslEncryption   Whether replication sessions use SSL encryption.
-   *
-   * @throws ConfigException    If the supplied configuration was not valid.
+   * @param sslCertNickname
+   *          The name of the local certificate to use, or null if none is
+   *          specified.
+   * @param sslProtocols
+   *          The protocols that should be enabled, or null if the default
+   *          protocols should be used.
+   * @param sslCipherSuites
+   *          The cipher suites that should be enabled, or null if the default
+   *          cipher suites should be used.
+   * @param sslEncryption
+   *          Whether replication sessions use SSL encryption.
+   * @throws ConfigException
+   *           If the supplied configuration was not valid.
    */
-  public ReplSessionSecurity(String sslCertNickname,
-    SortedSet<String> sslProtocols,
-    SortedSet<String> sslCipherSuites,
-    boolean sslEncryption)
-    throws ConfigException
+  public ReplSessionSecurity(final String sslCertNickname,
+      final SortedSet<String> sslProtocols,
+      final SortedSet<String> sslCipherSuites,
+      final boolean sslEncryption) throws ConfigException
   {
     if (sslProtocols == null || sslProtocols.size() == 0)
     {
@@ -124,76 +151,46 @@
     this.sslCertNickname = sslCertNickname;
   }
 
-  /**
-   * Create a ReplSessionSecurity instance from a provided multimaster domain
-   * configuration.
-   *
-   * @throws ConfigException If the supplied configuration was not valid.
-   */
-  public ReplSessionSecurity()
-    throws ConfigException
-  {
-    // Currently use global settings from the crypto manager.
-    this(DirectoryConfig.getCryptoManager().getSslCertNickname(),
-      DirectoryConfig.getCryptoManager().getSslProtocols(),
-      DirectoryConfig.getCryptoManager().getSslCipherSuites(),
-      DirectoryConfig.getCryptoManager().isSslEncryption());
-  }
 
-  /**
-   * Determine whether a given replication server is listening on a secure
-   * port.
-   * @param serverURL The replication server URL.
-   * @return true if the given replication server is listening on a secure
-   *         port, or false if it is listening on a non-secure port.
-   */
-  private boolean isSecurePort(String serverURL)
-  {
-    // Always true unless changed for test purposes.
-    return true;
-  }
-
-  /**
-   * Determine whether sessions to a given replication server should be
-   * encrypted.
-   * @param serverURL The replication server URL.
-   * @return true if sessions to the given replication server should be
-   *         encrypted, or false if they should not be encrypted.
-   */
-  public boolean isSslEncryption(String serverURL)
-  {
-    // Currently use global settings from the crypto manager.
-    return sslEncryption;
-  }
 
   /**
    * Create a new protocol session in the client role on the provided socket.
-   * @param serverURL The remote replication server to which the socket is
-   *                  connected.
-   * @param socket The connected socket.
-   * @param soTimeout The socket timeout option to use for the protocol session.
+   *
+   * @param serverURL
+   *          The remote replication server to which the socket is connected.
+   * @param socket
+   *          The connected socket.
+   * @param soTimeout
+   *          The socket timeout option to use for the protocol session.
    * @return The new protocol session.
-   * @throws ConfigException If the protocol session could not be established
-   *                         due to a configuration problem.
-   * @throws IOException     If the protocol session could not be established
-   *                         for some other reason.
+   * @throws ConfigException
+   *           If the protocol session could not be established due to a
+   *           configuration problem.
+   * @throws IOException
+   *           If the protocol session could not be established for some other
+   *           reason.
    */
-  public ProtocolSession createClientSession(String serverURL, Socket socket,
-    int soTimeout)
-    throws ConfigException, IOException
+  public ProtocolSession createClientSession(final String serverURL,
+      final Socket socket, final int soTimeout)
+      throws ConfigException, IOException
   {
-    boolean useSSL = isSecurePort(serverURL);
-    if (useSSL)
+    boolean hasCompleted = false;
+    SSLSocket secureSocket = null;
+
+    try
     {
       // Create a new SSL context every time to make sure we pick up the
       // latest contents of the trust store.
-      CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
-      SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
-      SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+      final CryptoManager cryptoManager = DirectoryConfig
+          .getCryptoManager();
+      final SSLContext sslContext = cryptoManager
+          .getSslContext(sslCertNickname);
+      final SSLSocketFactory sslSocketFactory = sslContext
+          .getSocketFactory();
 
-      SSLSocket secureSocket = (SSLSocket) sslSocketFactory.createSocket(socket,
-        socket.getInetAddress().getHostName(),
-        socket.getPort(), false);
+      secureSocket = (SSLSocket) sslSocketFactory.createSocket(
+          socket, socket.getInetAddress().getHostName(),
+          socket.getPort(), false);
       secureSocket.setUseClientMode(true);
       secureSocket.setSoTimeout(soTimeout);
 
@@ -209,39 +206,73 @@
 
       // Force TLS negotiation now.
       secureSocket.startHandshake();
-
+      hasCompleted = true;
       return new TLSSocketSession(socket, secureSocket);
     }
-    else
+    finally
     {
-      return new SocketSession(socket);
+      if (!hasCompleted)
+      {
+        try
+        {
+          socket.close();
+        }
+        catch (final Exception ignored)
+        {
+          // Ignore.
+        }
+
+        if (secureSocket != null)
+        {
+          try
+          {
+            secureSocket.close();
+          }
+          catch (final Exception ignored)
+          {
+            // Ignore.
+          }
+        }
+      }
     }
   }
 
+
+
   /**
    * Create a new protocol session in the server role on the provided socket.
-   * @param socket The connected socket.
+   *
+   * @param socket
+   *          The connected socket.
+   * @param soTimeout
+   *          The socket timeout option to use for the protocol session.
    * @return The new protocol session.
-   * @param soTimeout The socket timeout option to use for the protocol session.
-   * @throws ConfigException If the protocol session could not be established
-   *                         due to a configuration problem.
-   * @throws IOException     If the protocol session could not be established
-   *                         for some other reason.
+   * @throws ConfigException
+   *           If the protocol session could not be established due to a
+   *           configuration problem.
+   * @throws IOException
+   *           If the protocol session could not be established for some other
+   *           reason.
    */
-  public ProtocolSession createServerSession(Socket socket, int soTimeout)
-    throws ConfigException, IOException
+  public ProtocolSession createServerSession(final Socket socket,
+      final int soTimeout) throws ConfigException, IOException
   {
+    boolean hasCompleted = false;
+    SSLSocket secureSocket = null;
+
     try
     {
       // Create a new SSL context every time to make sure we pick up the
       // latest contents of the trust store.
-      CryptoManager cryptoManager = DirectoryConfig.getCryptoManager();
-      SSLContext sslContext = cryptoManager.getSslContext(sslCertNickname);
-      SSLSocketFactory sslSocketFactory = sslContext.getSocketFactory();
+      final CryptoManager cryptoManager = DirectoryConfig
+          .getCryptoManager();
+      final SSLContext sslContext = cryptoManager
+          .getSslContext(sslCertNickname);
+      final SSLSocketFactory sslSocketFactory = sslContext
+          .getSocketFactory();
 
-      SSLSocket secureSocket = (SSLSocket)
-      sslSocketFactory.createSocket(socket,
-          socket.getInetAddress().getHostName(),
+      secureSocket = (SSLSocket) sslSocketFactory.createSocket(
+          socket, socket.getInetAddress().getHostName(),
           socket.getPort(), false);
       secureSocket.setUseClientMode(false);
       secureSocket.setNeedClientAuth(true);
@@ -259,23 +290,63 @@
 
       // Force TLS negotiation now.
       secureSocket.startHandshake();
-
-      // SSLSession sslSession = secureSocket.getSession();
-      // System.out.println("Peer      = " + sslSession.getPeerHost() + ":" +
-      //   sslSession.getPeerPort());
-      // System.out.println("Principal = " + sslSession.getPeerPrincipal());
-
+      hasCompleted = true;
       return new TLSSocketSession(socket, secureSocket);
-    } catch (SSLException e)
+    }
+    catch (final SSLException e)
     {
       // This is probably a connection attempt from an unexpected client
       // log that to warn the administrator.
-      InetAddress remHost = socket.getInetAddress();
-      Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(remHost.
-          getHostName(), remHost.getHostAddress(), e.getLocalizedMessage());
+      final InetAddress remHost = socket.getInetAddress();
+      final Message message = NOTE_SSL_SERVER_CON_ATTEMPT_ERROR.get(
+          remHost.getHostName(), remHost.getHostAddress(),
+          e.getLocalizedMessage());
       logError(message);
       return null;
     }
+    finally
+    {
+      if (!hasCompleted)
+      {
+        try
+        {
+          socket.close();
+        }
+        catch (final Exception ignored)
+        {
+          // Ignore.
+        }
+
+        if (secureSocket != null)
+        {
+          try
+          {
+            secureSocket.close();
+          }
+          catch (final Exception ignored)
+          {
+            // Ignore.
+          }
+        }
+      }
+    }
+  }
+
+
+
+  /**
+   * Determine whether sessions to a given replication server should be
+   * encrypted.
+   *
+   * @param serverURL
+   *          The replication server URL.
+   * @return true if sessions to the given replication server should be
+   *         encrypted, or false if they should not be encrypted.
+   */
+  public boolean isSslEncryption(final String serverURL)
+  {
+    // Currently use global settings from the crypto manager.
+    return sslEncryption;
   }
 
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
index 9e2a936..7cf1a96 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/SocketSession.java
@@ -27,135 +27,283 @@
  */
 package org.opends.server.replication.protocol;
 
+
+
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.zip.DataFormatException;
 
 import org.opends.server.loggers.debug.DebugTracer;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+
+
 
 /**
  * This class Implement a protocol session using a basic socket and relying on
- * the innate encoding/decoding capabilities of the ReplicationMsg
- * by using the getBytes() and generateMsg() methods of those classes.
+ * the innate encoding/decoding capabilities of the ReplicationMsg by using the
+ * getBytes() and generateMsg() methods of those classes.
  */
-public class SocketSession implements ProtocolSession
+public final class SocketSession implements ProtocolSession
 {
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private Socket socket;
-  private InputStream input;
-  private OutputStream output;
-  byte[] rcvLengthBuf = new byte[8];
+  private final Socket socket;
+  private final InputStream input;
+  private final OutputStream output;
+  private final byte[] rcvLengthBuf = new byte[8];
 
   /**
    * The time the last message published to this session.
    */
   private volatile long lastPublishTime = 0;
 
-
   /**
    * The time the last message was received on this session.
    */
-  private long lastReceiveTime = 0;
+  private volatile long lastReceiveTime = 0;
 
+  // Close guarded by closeLock: use a different lock to publish since
+  // publishing can block, and we don't want to block while closing failed
+  // connections.
+  private final Object closeLock = new Object();
   private boolean closeInitiated = false;
 
+  // Publish guarded by publishLock: use a full lock here so that we can
+  // optionally publish StopMsg during close.
+  private final Lock publishLock = new ReentrantLock();
+
+  // Does not need protecting: updated only during single threaded handshake.
   private short protocolVersion = ProtocolVersion.getCurrentVersion();
 
+
+
   /**
    * Creates a new SocketSession based on the provided socket.
    *
-   * @param socket The Socket on which the SocketSession will be based.
-   * @throws IOException When an IException happens on the socket.
+   * @param socket
+   *          The Socket on which the SocketSession will be based.
+   * @throws IOException
+   *           When an IException happens on the socket.
    */
-  public SocketSession(Socket socket) throws IOException
+  public SocketSession(final Socket socket) throws IOException
   {
-    this.socket = socket;
-    /*
-     * Use a window instead of the TCP flow control.
-     * Therefore set a very large value for send and receive buffer sizes.
-     */
-    input = socket.getInputStream();
-    output = socket.getOutputStream();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void close() throws IOException
-  {
-    closeInitiated = true;
-
     if (debugEnabled())
     {
-      TRACER.debugInfo("Closing SocketSession."
-          + stackTraceToSingleLineString(new Exception()));
+      TRACER.debugInfo("Creating SocketSession to %s from %s", socket
+          .getRemoteSocketAddress().toString(),
+          stackTraceToSingleLineString(new Exception()));
     }
-    socket.close();
+
+    this.socket = socket;
+    this.input = socket.getInputStream();
+    this.output = socket.getOutputStream();
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public synchronized void publish(ReplicationMsg msg)
-         throws IOException
+  @Override
+  public void close()
+  {
+    synchronized (closeLock)
+    {
+      if (closeInitiated)
+      {
+        return;
+      }
+
+      closeInitiated = true;
+    }
+
+    // Perform close outside of critical section.
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("Closing SocketSession to %s from %s", socket
+          .getRemoteSocketAddress().toString(),
+          stackTraceToSingleLineString(new Exception()));
+    }
+
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+      // V4 protocol introduces a StopMsg to properly end communications.
+      if (publishLock.tryLock())
+      {
+        try
+        {
+          publish(new StopMsg());
+        }
+        catch (final IOException ignored)
+        {
+          // Ignore errors on close.
+        }
+        finally
+        {
+          publishLock.unlock();
+        }
+      }
+    }
+
+    if (socket != null && !socket.isClosed())
+    {
+      try
+      {
+        socket.close();
+      }
+      catch (final IOException ignored)
+      {
+        // Ignore errors on close.
+      }
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean closeInitiated()
+  {
+    synchronized (closeLock)
+    {
+      return closeInitiated;
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long getLastPublishTime()
+  {
+    return lastPublishTime;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long getLastReceiveTime()
+  {
+    if (lastReceiveTime == 0)
+    {
+      return System.currentTimeMillis();
+    }
+    return lastReceiveTime;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getReadableRemoteAddress()
+  {
+    return socket.getRemoteSocketAddress().toString();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String getRemoteAddress()
+  {
+    return socket.getInetAddress().getHostAddress();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isEncrypted()
+  {
+    return false;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void publish(final ReplicationMsg msg) throws IOException
   {
     publish(msg, ProtocolVersion.getCurrentVersion());
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
-         throws IOException
+  @Override
+  public void publish(final ReplicationMsg msg,
+      final short reqProtocolVersion) throws IOException
   {
-    byte[] buffer = msg.getBytes(reqProtocolVersion);
-    String str = String.format("%08x", buffer.length);
+    final byte[] buffer = msg.getBytes(reqProtocolVersion);
+    final String str = String.format("%08x", buffer.length);
+    final byte[] sendLengthBuf = str.getBytes();
 
-    if (debugEnabled())
+    publishLock.lock();
+    try
     {
-      TRACER.debugInfo("SocketSession publish <" + str + ">");
+      output.write(sendLengthBuf);
+      output.write(buffer);
+      output.flush();
     }
-
-    byte[] sendLengthBuf = str.getBytes();
-
-    output.write(sendLengthBuf);
-    output.write(buffer);
-    output.flush();
+    finally
+    {
+      publishLock.unlock();
+    }
 
     lastPublishTime = System.currentTimeMillis();
   }
 
+
+
   /**
    * {@inheritDoc}
    */
+  @Override
   public ReplicationMsg receive() throws IOException,
       ClassNotFoundException, DataFormatException,
       NotSupportedOldVersionPDUException
   {
-    /* Read the first 8 bytes containing the packet length */
+    // Read the first 8 bytes containing the packet length
     int length = 0;
 
-    /* Let's start the stop-watch before waiting on read */
-    /* for the heartbeat check to be operationnal        */
+    // Let's start the stop-watch before waiting on read for the heartbeat check
+    // to be operational
     lastReceiveTime = System.currentTimeMillis();
 
-    while (length<8)
+    while (length < 8)
     {
-      int read = input.read(rcvLengthBuf, length, 8-length);
+      final int read = input.read(rcvLengthBuf, length, 8 - length);
       if (read == -1)
       {
-        lastReceiveTime=0;
+        lastReceiveTime = 0;
         throw new IOException("no more data");
       }
       else
@@ -164,101 +312,59 @@
       }
     }
 
-    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
+    final int totalLength = Integer.parseInt(
+        new String(rcvLengthBuf), 16);
 
     try
     {
       length = 0;
-      byte[] buffer = new byte[totalLength];
+      final byte[] buffer = new byte[totalLength];
       while (length < totalLength)
       {
         length += input.read(buffer, length, totalLength - length);
       }
-      /* We do not want the heartbeat to close the session when */
-      /* we are processing a message even a time consuming one. */
-      lastReceiveTime=0;
+      // We do not want the heartbeat to close the session when we are
+      // processing a message even a time consuming one.
+      lastReceiveTime = 0;
       return ReplicationMsg.generateMsg(buffer, protocolVersion);
     }
-    catch (OutOfMemoryError e)
+    catch (final OutOfMemoryError e)
     {
       throw new IOException("Packet too large, can't allocate "
-                            + totalLength + " bytes.");
+          + totalLength + " bytes.");
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public void stopEncryption()
-  {
-    // There is no security layer.
-  }
+
 
   /**
    * {@inheritDoc}
    */
-  public boolean isEncrypted()
+  @Override
+  public void setProtocolVersion(final short version)
   {
-    return false;
+    protocolVersion = version;
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public long getLastPublishTime()
-  {
-    return lastPublishTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public long getLastReceiveTime()
-  {
-    if (lastReceiveTime==0)
-    {
-      return System.currentTimeMillis();
-    }
-    return lastReceiveTime;
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public String getRemoteAddress()
-  {
-    return socket.getInetAddress().getHostAddress();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public String getReadableRemoteAddress()
-  {
-    return socket.getRemoteSocketAddress().toString();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public void setSoTimeout(int timeout) throws SocketException
+  @Override
+  public void setSoTimeout(final int timeout) throws SocketException
   {
     socket.setSoTimeout(timeout);
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public boolean closeInitiated()
-  {
-    return closeInitiated;
-  }
+
 
   /**
    * {@inheritDoc}
    */
-  public void setProtocolVersion(short version)
+  @Override
+  public void stopEncryption()
   {
-    protocolVersion = version;
+    // There is no security layer.
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
index adf16b2..8a54bc2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -27,6 +27,8 @@
  */
 package org.opends.server.replication.protocol;
 
+
+
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
@@ -36,241 +38,430 @@
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.zip.DataFormatException;
 
+import javax.net.ssl.SSLSocket;
+
 import org.opends.server.loggers.debug.DebugTracer;
 
-import javax.net.ssl.SSLSocket;
+
 
 /**
  * This class implements a protocol session using TLS.
  */
-public class TLSSocketSession implements ProtocolSession
+public final class TLSSocketSession implements ProtocolSession
 {
   /**
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private Socket plainSocket;
-  private SSLSocket secureSocket;
-  private InputStream input;
-  private OutputStream output;
-  private InputStream plainInput;
-  private OutputStream plainOutput;
-  byte[] rcvLengthBuf = new byte[8];
+  private final Socket plainSocket;
+  private final SSLSocket secureSocket;
+  private final InputStream plainInput;
+  private final OutputStream plainOutput;
+  private final byte[] rcvLengthBuf = new byte[8];
 
   /**
    * The time the last message published to this session.
    */
   private volatile long lastPublishTime = 0;
 
-
   /**
    * The time the last message was received on this session.
    */
-  private long lastReceiveTime = 0;
+  private volatile long lastReceiveTime = 0;
 
+  // Close and error guarded by stateLock: use a different lock to publish since
+  // publishing can block, and we don't want to block while closing failed
+  // connections.
+  private final Object stateLock = new Object();
   private boolean closeInitiated = false;
+  private Throwable sessionError = null;
 
+  // Publish guarded by publishLock: use a full lock here so that we can
+  // optionally publish StopMsg during close.
+  private final Lock publishLock = new ReentrantLock();
+
+  // Does not need protecting: updated only during single threaded handshake.
   private short protocolVersion = ProtocolVersion.getCurrentVersion();
+  private InputStream input;
+  private OutputStream output;
+
+
 
   /**
    * Creates a new TLSSocketSession.
    *
-   * @param socket       The regular Socket on which the SocketSession will be
-   *                     based.
-   * @param secureSocket The secure Socket on which the SocketSession will be
-   *                     based.
-   * @throws IOException When an IException happens on the socket.
+   * @param socket
+   *          The regular Socket on which the SocketSession will be based.
+   * @param secureSocket
+   *          The secure Socket on which the SocketSession will be based.
+   * @throws IOException
+   *           When an IException happens on the socket.
    */
-  public TLSSocketSession(Socket socket, SSLSocket secureSocket)
-       throws IOException
+  public TLSSocketSession(final Socket socket,
+      final SSLSocket secureSocket) throws IOException
   {
-    plainSocket = socket;
-    this.secureSocket = secureSocket;
-    plainInput = plainSocket.getInputStream();
-    plainOutput = plainSocket.getOutputStream();
-    input = secureSocket.getInputStream();
-    output = secureSocket.getOutputStream();
-  }
-
-
-  /**
-   * {@inheritDoc}
-   */
-  public void close() throws IOException
-  {
-    closeInitiated = true;
     if (debugEnabled())
     {
-      TRACER.debugInfo("Closing SocketSession." +
-          stackTraceToSingleLineString(new Exception("Stack:")));
+      TRACER.debugInfo(
+          "Creating TLSSocketSession from %s to %s in %s",
+          socket.getLocalSocketAddress(),
+          socket.getRemoteSocketAddress(),
+          stackTraceToSingleLineString(new Exception()));
     }
-    if (plainSocket != null && !plainSocket.isClosed())
-    {
-      plainInput.close();
-      plainOutput.close();
-      plainSocket.close();
-    }
-    if (secureSocket != null && !secureSocket.isClosed())
-    {
-      input.close();
-      output.close();
-      secureSocket.close();
-    }
+
+    this.plainSocket = socket;
+    this.secureSocket = secureSocket;
+    this.plainInput = plainSocket.getInputStream();
+    this.plainOutput = plainSocket.getOutputStream();
+    this.input = secureSocket.getInputStream();
+    this.output = secureSocket.getOutputStream();
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public synchronized void publish(ReplicationMsg msg)
-         throws IOException
+  @Override
+  public void close()
   {
-    publish(msg, ProtocolVersion.getCurrentVersion());
-  }
+    Throwable localSessionError;
 
-  /**
-   * {@inheritDoc}
-   */
-  public synchronized void publish(ReplicationMsg msg, short reqProtocolVersion)
-         throws IOException
-  {
-    byte[] buffer = msg.getBytes(reqProtocolVersion);
-    String str = String.format("%08x", buffer.length);
-    byte[] sendLengthBuf = str.getBytes();
-
-    output.write(sendLengthBuf);
-    output.write(buffer);
-    output.flush();
-
-    lastPublishTime = System.currentTimeMillis();
-  }
-
-  /**
-   * {@inheritDoc}
-   */
-  public ReplicationMsg receive() throws IOException,
-      ClassNotFoundException, DataFormatException,
-      NotSupportedOldVersionPDUException
-  {
-    /* Read the first 8 bytes containing the packet length */
-    int length = 0;
-
-    /* Let's start the stop-watch before waiting on read */
-    /* for the heartbeat check to be operationnal        */
-    lastReceiveTime = System.currentTimeMillis();
-
-    while (length<8)
+    synchronized (stateLock)
     {
-      int read = input.read(rcvLengthBuf, length, 8-length);
-      if (read == -1)
+      if (closeInitiated)
       {
-        lastReceiveTime=0;
-        throw new IOException("no more data");
+        return;
+      }
+
+      localSessionError = sessionError;
+      closeInitiated = true;
+    }
+
+    // Perform close outside of critical section.
+    if (debugEnabled())
+    {
+      if (localSessionError == null)
+      {
+        TRACER.debugInfo(
+            "Closing TLSSocketSession from %s to %s in %s",
+            plainSocket.getLocalSocketAddress(),
+            plainSocket.getRemoteSocketAddress(),
+            stackTraceToSingleLineString(new Exception()));
       }
       else
       {
-        length += read;
+        TRACER.debugInfo(
+            "Aborting TLSSocketSession from %s to %s in %s due to the "
+                + "following error: %s",
+            plainSocket.getLocalSocketAddress(),
+            plainSocket.getRemoteSocketAddress(),
+            stackTraceToSingleLineString(new Exception()),
+            stackTraceToSingleLineString(localSessionError));
       }
     }
 
-    int totalLength = Integer.parseInt(new String(rcvLengthBuf), 16);
+    // V4 protocol introduces a StopMsg to properly end communications.
+    if (localSessionError == null)
+    {
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+      {
+        if (publishLock.tryLock())
+        {
+          try
+          {
+            publish(new StopMsg());
+          }
+          catch (final IOException ignored)
+          {
+            // Ignore errors on close.
+          }
+          finally
+          {
+            publishLock.unlock();
+          }
+        }
+      }
+    }
 
     try
     {
-      length = 0;
-      byte[] buffer = new byte[totalLength];
-      while (length < totalLength)
-      {
-        length += input.read(buffer, length, totalLength - length);
-      }
-      /* We do not want the heartbeat to close the session when */
-      /* we are processing a message even a time consuming one. */
-      lastReceiveTime=0;
-      return ReplicationMsg.generateMsg(buffer, protocolVersion);
+      plainSocket.close();
     }
-    catch (OutOfMemoryError e)
+    catch (final IOException ignored)
     {
-      throw new IOException("Packet too large, can't allocate "
-                            + totalLength + " bytes.");
+      // Ignore errors on close.
+    }
+
+    try
+    {
+      secureSocket.close();
+    }
+    catch (final IOException ignored)
+    {
+      // Ignore errors on close.
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public void stopEncryption()
-  {
-    input = plainInput;
-    output = plainOutput;
-  }
+
 
   /**
    * {@inheritDoc}
    */
-  public boolean isEncrypted()
+  @Override
+  public boolean closeInitiated()
   {
-    return !(input == plainInput);
+    synchronized (stateLock)
+    {
+      return closeInitiated;
+    }
   }
 
+
+
   /**
    * {@inheritDoc}
    */
+  @Override
   public long getLastPublishTime()
   {
     return lastPublishTime;
   }
 
+
+
   /**
    * {@inheritDoc}
    */
+  @Override
   public long getLastReceiveTime()
   {
-    if (lastReceiveTime==0)
+    if (lastReceiveTime == 0)
     {
       return System.currentTimeMillis();
     }
     return lastReceiveTime;
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public String getRemoteAddress()
-  {
-    return plainSocket.getInetAddress().getHostAddress();
-  }
+
 
   /**
    * {@inheritDoc}
    */
+  @Override
   public String getReadableRemoteAddress()
   {
     return plainSocket.getRemoteSocketAddress().toString();
   }
 
+
+
   /**
    * {@inheritDoc}
    */
-  public void setSoTimeout(int timeout) throws SocketException
+  @Override
+  public String getRemoteAddress()
+  {
+    return plainSocket.getInetAddress().getHostAddress();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isEncrypted()
+  {
+    return input != plainInput;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void publish(final ReplicationMsg msg) throws IOException
+  {
+    publish(msg, ProtocolVersion.getCurrentVersion());
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void publish(final ReplicationMsg msg,
+      final short reqProtocolVersion) throws IOException
+  {
+    final byte[] buffer = msg.getBytes(reqProtocolVersion);
+    final String str = String.format("%08x", buffer.length);
+    final byte[] sendLengthBuf = str.getBytes();
+
+    publishLock.lock();
+    try
+    {
+      output.write(sendLengthBuf);
+      output.write(buffer);
+      output.flush();
+    }
+    catch (final IOException e)
+    {
+      setSessionError(e);
+      throw e;
+    }
+    finally
+    {
+      publishLock.unlock();
+    }
+
+    lastPublishTime = System.currentTimeMillis();
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ReplicationMsg receive() throws IOException,
+      DataFormatException, NotSupportedOldVersionPDUException
+  {
+    try
+    {
+      // Read the first 8 bytes containing the packet length.
+      int length = 0;
+
+      // Let's start the stop-watch before waiting on read for the heartbeat
+      // check
+      // to be operational.
+      lastReceiveTime = System.currentTimeMillis();
+
+      while (length < 8)
+      {
+        final int read = input.read(rcvLengthBuf, length, 8 - length);
+        if (read == -1)
+        {
+          lastReceiveTime = 0;
+          throw new IOException("no more data");
+        }
+        else
+        {
+          length += read;
+        }
+      }
+
+      final int totalLength = Integer.parseInt(new String(
+          rcvLengthBuf), 16);
+
+      try
+      {
+        length = 0;
+        final byte[] buffer = new byte[totalLength];
+        while (length < totalLength)
+        {
+          final int read = input.read(buffer, length, totalLength
+              - length);
+          if (read == -1)
+          {
+            lastReceiveTime = 0;
+            throw new IOException("no more data");
+          }
+          else
+          {
+            length += read;
+          }
+        }
+        // We do not want the heartbeat to close the session when we are
+        // processing a message even a time consuming one.
+        lastReceiveTime = 0;
+        return ReplicationMsg.generateMsg(buffer, protocolVersion);
+      }
+      catch (final OutOfMemoryError e)
+      {
+        throw new IOException("Packet too large, can't allocate "
+            + totalLength + " bytes.");
+      }
+    }
+    catch (final IOException e)
+    {
+      setSessionError(e);
+      throw e;
+    }
+    catch (final DataFormatException e)
+    {
+      setSessionError(e);
+      throw e;
+    }
+    catch (final NotSupportedOldVersionPDUException e)
+    {
+      setSessionError(e);
+      throw e;
+    }
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setProtocolVersion(final short version)
+  {
+    protocolVersion = version;
+  }
+
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void setSoTimeout(final int timeout) throws SocketException
   {
     plainSocket.setSoTimeout(timeout);
   }
 
-  /**
-   * {@inheritDoc}
-   */
-  public boolean closeInitiated()
-  {
-    return closeInitiated;
-  }
+
 
   /**
    * {@inheritDoc}
    */
-  public void setProtocolVersion(short version)
+  @Override
+  public void stopEncryption()
   {
-    protocolVersion = version;
+    // The secure socket has been configured not to auto close the underlying
+    // plain socket.
+    try
+    {
+      secureSocket.close();
+    }
+    catch (IOException ignored)
+    {
+      // Ignore.
+    }
+
+    input = plainInput;
+    output = plainOutput;
+  }
+
+
+
+  private void setSessionError(final Exception e)
+  {
+    synchronized (stateLock)
+    {
+      if (sessionError == null)
+      {
+        sessionError = e;
+      }
+    }
   }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index b70fd4a..fa113b4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -129,12 +129,16 @@
         }
         catch (LockConflictException e)
         {
-          if (txn != null)
-            txn.abort();
-          txn = null;
+          // Try again.
         }
         finally
         {
+          if (txn != null)
+          {
+            // No effect if txn has committed.
+            txn.abort();
+            txn = null;
+          }
           dbCloseLock.readLock().unlock();
         }
       }
@@ -145,10 +149,6 @@
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
         logError(mb.toMessage());
-        if (txn != null)
-        {
-          txn.abort();
-        }
         replicationServer.shutdown();
       }
     }
@@ -158,16 +158,6 @@
       mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
-      if (txn != null)
-      {
-        try
-        {
-          txn.abort();
-        } catch (DatabaseException e1)
-        {
-          // can't do much more. The ReplicationServer is shuting down.
-        }
-      }
       replicationServer.shutdown();
     }
     catch (UnsupportedEncodingException e)
@@ -177,17 +167,6 @@
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
       replicationServer.shutdown();
-      if (txn != null)
-      {
-        try
-        {
-          txn.abort();
-        } catch (DatabaseException e1)
-        {
-          // can't do much more. The ReplicationServer is shuting down.
-        }
-      }
-      replicationServer.shutdown();
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index 4b71c95..567d289 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -386,9 +386,10 @@
             else
             {
               ServerState startState = domain.getStartState();
-              // We don't use the endState but it's updating CN as reading
-              ServerState endState =
-                  domain.getEligibleState(crossDomainEligibleCN, false);
+
+              // We don't use the returned endState but it's updating CN as
+              // reading
+              domain.getEligibleState(crossDomainEligibleCN, false);
 
               ChangeNumber fcn = startState.getMaxChangeNumber(
                   cn.getServerId());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index b111327..4c53060 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2009-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2010 ForgeRock AS
+ *      Portions Copyright 2010-2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -169,6 +169,7 @@
     }
     /**
      * Provide a string representation of this object for debug purpose..
+     * @param buffer Append to this buffer.
      */
     public void toString(StringBuilder buffer)
     {
@@ -1550,9 +1551,10 @@
       // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
       searchPhase = PERSISTENT_PHASE;
 
-      if (writer ==null)
+      final ProtocolSession localSession = session;
+      if (writer ==null && localSession != null)
       {
-        writer = new ECLServerWriter(session,this,replicationServerDomain);
+        writer = new ECLServerWriter(localSession,this,replicationServerDomain);
         writer.start();  // start suspended
       }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index 4f7aaaf..62f8260 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -186,13 +186,7 @@
     {
       if (session!=null)
       {
-        try
-        {
-          session.close();
-        } catch (IOException e)
-        {
-          // Can't do much more : ignore
-        }
+        session.close();
       }
       if (replicationServerDomain!=null)
         replicationServerDomain.stopServer(handler, false);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
index 1fe4afa..4d631d0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationBackend.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2007-2009 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 import static org.opends.messages.BackendMessages.*;
@@ -36,12 +37,10 @@
 
 import org.opends.server.replication.protocol.LDAPUpdateMsg;
 
-import java.io.BufferedReader;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
-import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1479,19 +1478,7 @@
       return writer;
     }
 
-    /**
-     * Close the writer and get a string reader for the LDIF content.
-     *
-     * @return Returns the string contents of the writer.
-     * @throws Exception
-     *           If an error occurred closing the writer.
-     */
-    public BufferedReader getLDIFBufferedReader() throws Exception {
-      writer.close();
-      String ldif = stream.toString("UTF-8");
-      StringReader reader = new StringReader(ldif);
-      return new BufferedReader(reader);
-    }
+
 
     /**
      * Close the writer and get an LDIF reader for the LDIF content.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 8c5ffb3..7330fac 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -252,12 +252,17 @@
         }
         catch (LockConflictException e)
         {
-          if (txn != null)
-            txn.abort();
-          txn = null;
+          // Try again.
         }
         finally
         {
+          if (txn != null)
+          {
+            // No effect if txn has committed.
+            txn.abort();
+            txn = null;
+          }
+
           dbCloseLock.readLock().unlock();
         }
       }
@@ -268,10 +273,6 @@
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
         logError(mb.toMessage());
-        if (txn != null)
-        {
-           txn.abort();
-        }
         replicationServer.shutdown();
       }
     }
@@ -281,16 +282,6 @@
       mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
-      if (txn != null)
-      {
-        try
-        {
-          txn.abort();
-        } catch (DatabaseException e1)
-        {
-          // can't do much more. The ReplicationServer is shuting down.
-        }
-      }
       replicationServer.shutdown();
     }
     catch (UnsupportedEncodingException e)
@@ -300,17 +291,6 @@
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
       replicationServer.shutdown();
-      if (txn != null)
-      {
-        try
-        {
-          txn.abort();
-        } catch (DatabaseException e1)
-        {
-          // can't do much more. The ReplicationServer is shuting down.
-        }
-      }
-      replicationServer.shutdown();
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 631d578..4c3f132 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -63,7 +63,6 @@
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
-import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.TopologyMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.WindowMsg;
@@ -107,17 +106,14 @@
           providedMsg.toString());
       logError(providedMsg);
     }
-    try
+
+    if (providedSession != null)
     {
-      if (providedSession != null)
-        // This method is only called when aborting a failing handshake and
-        // not StopMsg should be sent in such situation. StopMsg are only
-        // expected when full handshake has been performed, or at end of
-        // handshake phase 1, when DS was just gathering available RS info
-        providedSession.close();
-    } catch (IOException e)
-    {
-      // ignore
+      // This method is only called when aborting a failing handshake and
+      // not StopMsg should be sent in such situation. StopMsg are only
+      // expected when full handshake has been performed, or at end of
+      // handshake phase 1, when DS was just gathering available RS info
+      providedSession.close();
     }
   }
 
@@ -286,9 +282,10 @@
     // We did not recognize the message, close session as what
     // can happen after is undetermined and we do not want the server to
     // be disturbed
-    if (session!=null)
+    ProtocolSession localSession = session;
+    if (localSession != null)
     {
-      closeSession(session, reason, this);
+      closeSession(localSession, reason, this);
     }
 
     if ((replicationServerDomain != null) &&
@@ -1101,26 +1098,7 @@
 
     if (session != null)
     {
-      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-      {
-        // V4 protocol introduces a StopMsg to properly end
-        // communications
-        try
-        {
-          session.publish(new StopMsg());
-        } catch (IOException ioe)
-        {
-          // Anyway, going to close session, so nothing to do
-        }
-      }
-      // Close session to end ServerReader or ServerWriter
-      try
-      {
-        session.close();
-      } catch (IOException e)
-      {
-        // ignore.
-      }
+      session.close();
     }
 
     /*
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index f7b8356..7517b25 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -341,32 +341,15 @@
     finally
     {
       /*
-       * The thread only exits the loop above if some error condition
-       * happen.
+       * The thread only exits the loop above if some error condition happen.
        * Attempt to close the socket and stop the server handler.
        */
-      try
+      if (debugEnabled())
       {
-        if (handler.getProtocolVersion() >=
-          ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          // V4 protocol introduces a StopMsg to properly end
-          // communications
-          try
-          {
-            session.publish(new StopMsg());
-          } catch (IOException ioe)
-          {
-            // Anyway, going to close session, so nothing to do
-          }
-        }
-        if (debugEnabled())
-          TRACER.debugInfo("In " + this.getName() + " closing the session");
-        session.close();
-      } catch (IOException e)
-      {
-      // ignore
+        TRACER.debugInfo("In " + this.getName()
+            + " closing the session");
       }
+      session.close();
       handler.doStop();
       if (debugEnabled())
       {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 4eee050..6457b6a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -33,8 +33,6 @@
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 import static org.opends.messages.ReplicationMessages.*;
-
-import java.io.IOException;
 import java.net.SocketException;
 import java.util.NoSuchElementException;
 
@@ -42,8 +40,6 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ProtocolVersion;
-import org.opends.server.replication.protocol.StopMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 
 
@@ -229,25 +225,7 @@
       logError(errMessage);
     }
     finally {
-      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-      {
-        // V4 protocol introduces a StopMsg to properly end
-        // communications
-        try
-        {
-          session.publish(new StopMsg());
-        } catch (IOException ioe)
-        {
-          // Anyway, going to close session, so nothing to do
-        }
-      }
-      try
-      {
-        session.close();
-      } catch (IOException e)
-      {
-       // Can't do much more : ignore
-      }
+      session.close();
       replicationServerDomain.stopServer(handler, false);
       if (debugEnabled())
       {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index f9335fb..22148e5 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -85,6 +85,7 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.protocol.WindowMsg;
 import org.opends.server.replication.protocol.WindowProbeMsg;
+import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.ServerConstants;
 import org.opends.server.replication.server.ReplicationServer;
 
@@ -673,6 +674,7 @@
       this.weight = rsInfo.getWeight();
       this.connectedDSs = connectedDSs;
       this.connectedDSNumber = connectedDSs.size();
+      this.serverState = new ServerState();
     }
 
     /**
@@ -1006,15 +1008,10 @@
             {
               if (connected == false)
               {
-                if (session != null)
+                ProtocolSession localSession = session;
+                if (localSession != null)
                 {
-                  try
-                  {
-                    session.close();
-                  } catch (IOException e)
-                  {
-                    // The session was already closed, just ignore.
-                  }
+                  localSession.close();
                   session = null;
                 }
               }
@@ -1287,30 +1284,11 @@
     {
       if (localSession != null)
       {
-        if (debugEnabled())
+        if (debugEnabled()) {
           debugInfo("In RB, closing session after phase 1");
+        }
 
-        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          // V4 protocol introduces a StopMsg to properly end communications
-          if (!error)
-          {
-            try
-            {
-              localSession.publish(new StopMsg());
-            } catch (IOException ioe)
-            {
-              // Anyway, going to close session, so nothing to do
-            }
-          }
-        }
-        try
-        {
-          localSession.close();
-        } catch (IOException e)
-        {
-          // The session was already closed, just ignore.
-        }
+        localSession.close();
         localSession = null;
       }
       if (error)
@@ -1459,27 +1437,10 @@
     {
       if (localSession != null)
       {
-        if (debugEnabled())
+        if (debugEnabled()) {
           debugInfo("In RB, closing session after phase 1");
-
-        // V4 protocol introduces a StopMsg to properly end communications
-        if (!error)
-        {
-          try
-          {
-            localSession.publish(new StopMsg());
-          } catch (IOException ioe)
-          {
-            // Anyway, going to close session, so nothing to do
-          }
         }
-        try
-        {
-          localSession.close();
-        } catch (IOException e)
-        {
-          // The session was already closed, just ignore.
-        }
+        localSession.close();
         localSession = null;
       }
       if (error)
@@ -1545,13 +1506,7 @@
 
       if (session != null)
       {
-        try
-        {
-          session.close();
-        } catch (IOException ex)
-        {
-          // The session was already closed, just ignore.
-        }
+        session.close();
         session = null;
       }
       // Be sure to return null.
@@ -1625,13 +1580,7 @@
 
       if (session != null)
       {
-        try
-        {
-          session.close();
-        } catch (IOException ex)
-        {
-          // The session was already closed, just ignore.
-        }
+        session.close();
         session = null;
       }
       // Be sure to return null.
@@ -2255,8 +2204,7 @@
       heartbeatMonitor = new HeartbeatMonitor(
           threadName,
           session,
-          heartbeatInterval,
-          (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4));
+          heartbeatInterval);
       heartbeatMonitor.start();
     }
   }
@@ -2293,24 +2241,7 @@
 
     if (failingSession != null)
     {
-      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-      {
-        // V4 protocol introduces a StopMsg to properly end communications
-        try
-        {
-          failingSession.publish(new StopMsg());
-        } catch (IOException ioe)
-        {
-          // Anyway, going to close session, so nothing to do
-        }
-      }
-      try
-      {
-        failingSession.close();
-      } catch (IOException e1)
-      {
-        // ignore
-      }
+      failingSession.close();
       numLostConnections++;
     }
 
@@ -2689,6 +2620,11 @@
         throw e;
       } catch (Exception e)
       {
+        if (debugEnabled())
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+        }
+
         if (shutdown == false)
         {
           if ((session == null) || (!session.closeInitiated()))
@@ -2790,18 +2726,9 @@
     rsGroupId = (byte) -1;
     rsServerId = -1;
     rsServerUrl = null;
-
-    try
+    if (session != null)
     {
-      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-      {
-        // V4 protocol introduces a StopMsg to properly end communications
-          session.publish(new StopMsg());
-      }
       session.close();
-    } catch (Exception e)
-    {
-      // Anyway, going to close session, so nothing to do
     }
   }
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 85ad615..678cd4f 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
@@ -440,15 +441,9 @@
       /*
        * Shutdown any current client handling code
        */
-      try
+      if (session != null)
       {
-        if (session != null)
-        {
-          session.close();
-        }
-      } catch (IOException e)
-      {
-        // ignore.
+        session.close();
       }
 
       try
@@ -567,13 +562,7 @@
       // Handle DS connexion
       if (!performHandshake())
       {
-        try
-        {
-          session.close();
-        } catch (IOException e)
-        {
-          // nothing to do
-        }
+        session.close();
         return;
       }
       // If we come here, assured parameters sent by DS are as expected and
@@ -731,7 +720,7 @@
      */
     private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
     {
-      assertEquals(updateMsg.isAssured(), isAssured, 
+      assertEquals(updateMsg.isAssured(), isAssured,
           "msg=" + ((updateMsg instanceof AddMsg)?
               ((AddMsg)updateMsg).getDn():updateMsg.getChangeNumber()));
       if (isAssured)
@@ -1052,7 +1041,7 @@
         String entry = "dn: ou=assured-sr-timeout-entry" + rsGroupId + "," + NOT_ASSURED_DN + "\n" +
         "objectClass: top\n" +
         "objectClass: organizationalUnit\n";
-        addEntry(TestCaseUtils.entryFromLdifString(entry));        
+        addEntry(TestCaseUtils.entryFromLdifString(entry));
       }
       long endTime = System.currentTimeMillis();
 
@@ -1919,8 +1908,8 @@
       sleep(50);
       ii++;
       if (ii>10)
-        assertEquals(operation.getResultCode(), expectedResult, 
-            operation.getErrorMessage().toString());                
+        assertEquals(operation.getResultCode(), expectedResult,
+            operation.getErrorMessage().toString());
     }
   }
 }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index cc9bb17..25235ae 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -23,10 +23,10 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
+ *      Portions copyright 2011 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
@@ -1123,15 +1123,9 @@
       /*
        * Shutdown any current client handling code
        */
-      try
+      if (session != null)
       {
-        if (session != null)
-        {
-          session.close();
-        }
-      } catch (IOException e)
-      {
-        // ignore.
+        session.close();
       }
 
       try

--
Gitblit v1.10.0