From a32bc83b64f0f82d731980a433a2180fe4f366f3 Mon Sep 17 00:00:00 2001
From: Ludovic Poitou <ludovic.poitou@forgerock.com>
Date: Thu, 20 Jun 2013 15:02:35 +0000
Subject: [PATCH] Fix for OPENDJ-846, Intermittent Replication failure. The issue was triggered by the mix of AssuredReplication and bad network conditions, which resulted in a deadlock between 2 RS, as both were blocked on writing to the TCP socket and not reading (because waiting on the write lock). The solution (more of a workaround) is to have another thread for sending data to the socket and have the reader and writer posting data to send to a queue that this new thread is polling. There are still potential deadlocks but they will occur much later, if the sendQueue gets full.  The code needs more work post 2.6 to be fully non blocking, but the changes are enough for now to resolve the customer deadlock case.

---
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java                 |   55 +--
 opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                |   18 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java                                        |   28 +
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                              |   49 +--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java                                         |   20 -
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java                             |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                                    |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/package-info.java                                       |    5 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java                                         |    8 
 opendj-sdk/opends/src/messages/messages/replication.properties                                                              |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java                                    |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java                                    |    6 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |   66 +---
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                                   |   20 
 opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java                          |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/Session.java                                            |  228 ++++++++++++----
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |   30 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java                                         |    8 
 /dev/null                                                                                                                   |  170 ------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java                                    |    2 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java        |   20 -
 opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java                                |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                                     |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java                                      |    8 
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |    6 
 25 files changed, 322 insertions(+), 460 deletions(-)

diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index 066a26f..69af0e6 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -542,4 +542,5 @@
 SEVERE_WARN_INVALID_SYNC_HIST_VALUE_214=The attribute value '%s' is not a valid \
  synchronization history value
 SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \
- failed to parse change record with changenumber %s from the database. Error: %s
\ No newline at end of file
+ failed to parse change record with changenumber %s from the database. Error: %s
+SEVERE_ERR_SESSION_STARTUP_INTERRUPTED_216=%s was interrupted in the startup phase
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index e52b41b..4a7af7e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -86,19 +86,7 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.common.StatusMachineEvent;
-import org.opends.server.replication.protocol.AddContext;
-import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.DeleteContext;
-import org.opends.server.replication.protocol.DeleteMsg;
-import org.opends.server.replication.protocol.LDAPUpdateMsg;
-import org.opends.server.replication.protocol.ModifyContext;
-import org.opends.server.replication.protocol.ModifyDNMsg;
-import org.opends.server.replication.protocol.ModifyDnContext;
-import org.opends.server.replication.protocol.ModifyMsg;
-import org.opends.server.replication.protocol.OperationContext;
-import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.RoutableMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.protocol.*;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.replication.service.ReplicationMonitor;
@@ -4581,7 +4569,7 @@
       ServerStatus initStatus,
       ServerState replicationServerState,
       long generationID,
-      ProtocolSession session)
+      Session session)
   {
     // Check domain fractional configuration consistency with local
     // configuration variables
@@ -4876,7 +4864,7 @@
   @Override
   public boolean processUpdate(UpdateMsg updateMsg)
   {
-    // Ignore message if fractional configuration is inconcsistent and
+    // Ignore message if fractional configuration is inconsistent and
     // we have been passed into bad data set status
     if (forceBadDataSet)
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
index 86f9024..6f6cf2f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2008 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2012 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 
 package org.opends.server.replication.protocol;
@@ -57,7 +57,7 @@
   /**
    * The session on which heartbeats are to be sent.
    */
-  private final ProtocolSession session;
+  private final Session session;
 
 
   /**
@@ -80,7 +80,7 @@
    * @param heartbeatInterval The desired interval between heartbeats in
    * milliseconds.
    */
-  public HeartbeatThread(String threadName, ProtocolSession session,
+  public HeartbeatThread(String threadName, Session session,
                   long heartbeatInterval)
   {
     super(threadName);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
deleted file mode 100644
index c37bfff..0000000
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License").  You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE
- * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at
- * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
- * add the following below this CDDL HEADER, with the fields enclosed
- * by brackets "[]" replaced with your own identifying information:
- *      Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *
- *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions copyright 2011-2013 ForgeRock AS
- */
-package org.opends.server.replication.protocol;
-
-import java.io.IOException;
-import java.net.SocketException;
-import java.util.zip.DataFormatException;
-
-/**
- * The ProtocolSession interface should be implemented by a class that
- * implement the send/reception part of the Multi-master replication
- * protocol.
- *
- * This interface is designed to make easy the move from one format
- * of the ReplicationMsg on the wire to another format.
- */
-public interface ProtocolSession
-{
-
-  /**
-   * This method is called when the session with the remote must be closed.
-   * This object won't be used anymore after this method is called.
-   */
-  public abstract void close();
-
-  /**
-   * Sends a replication message to the remote peer.
-   *
-   * @param msg
-   *          The message to be sent.
-   * @throws IOException
-   *           If an IO error occurred.
-   */
-  public abstract void publish(ReplicationMsg msg) throws IOException;
-
-  /**
-   * Attempt to receive a ReplicationMsg.
-   * This method should block the calling thread until a
-   * ReplicationMsg is available or until an error condition.
-   *
-   * This method can only be called by a single thread and therefore does not
-   * need to implement any replication.
-   *
-   * @return The ReplicationMsg that was received.
-   * @throws IOException When error happened during IO process.
-   * @throws ClassNotFoundException When the data received does extend the
-   *         ReplicationMsg class.
-   * @throws DataFormatException When the data received is not formatted as a
-   *         ReplicationMsg.
-   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
-   * an old protocol version and we do not support it.
-   */
-  public abstract ReplicationMsg receive()
-                  throws IOException, ClassNotFoundException,
-                         DataFormatException,
-                         NotSupportedOldVersionPDUException;
-
-  /**
-   * Stop using the security layer, if there is any.
-   */
-  public abstract void stopEncryption();
-
-  /**
-   * Determine whether the session is using a security layer.
-   * @return true if the connection is encrypted, false otherwise.
-   */
-  public abstract boolean isEncrypted();
-
-  /**
-   * Retrieve the local URL in the form host:port.
-   *
-   * @return The local URL.
-   */
-  public abstract String getLocalUrl();
-
-  /**
-   * Retrieve the IP address of the remote server.
-   *
-   * @return The IP address of the remote server.
-   */
-  public abstract String getRemoteAddress();
-
-  /**
-   * Retrieve the human readable address of the remote server.
-   *
-   * @return The human readable address of the remote server.
-   */
-  public abstract String getReadableRemoteAddress();
-
-
-  /**
-  * Set a timeout value.
-  * With this option set to a non-zero value, calls to the receive() method
-  * block for only this amount of time after which a
-  * java.net.SocketTimeoutException is raised.
-  * The Broker is valid and usable even after such an Exception is raised.
-  *
-  * @param timeout the specified timeout, in milliseconds.
-  * @throws SocketException if there is an error in the underlying protocol,
-  *         such as a TCP error.
-  */
-  public abstract void setSoTimeout(int timeout) throws SocketException;
-
-
-
-  /**
-   * Gets the time the last replication message was published on this
-   * session.
-   * @return The timestamp in milliseconds of the last message published.
-   */
-  public abstract long getLastPublishTime();
-
-
-
-  /**
-   * Gets the time the last replication message was received on this
-   * session.
-   * @return The timestamp in milliseconds of the last message received.
-   */
-  public abstract long getLastReceiveTime();
-
-  /**
-   * This methods allows to determine if the session close was initiated
-   * on this ProtocolSession.
-   *
-   * @return A boolean allowing to determine if the session close was initiated
-   * on this ProtocolSession.
-   */
-  public abstract boolean closeInitiated();
-
-  /**
-   * This method is called at the establishment of the session and can
-   * be used to record the version of the protocol that is currently used.
-   *
-   * @param version The version of the protocol that is currently used.
-   */
-  public abstract void setProtocolVersion(short version);
-
-  /**
-   * Returns the version of the protocol that is currently used.
-   *
-   * @return The version of the protocol that is currently used.
-   */
-  public abstract short getProtocolVersion();
-}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
index 3c185a2..7c22efc 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -161,7 +161,7 @@
    *           If the protocol session could not be established for some other
    *           reason.
    */
-  public ProtocolSession createClientSession(final Socket socket,
+  public Session createClientSession(final Socket socket,
       final int soTimeout) throws ConfigException, IOException
   {
     boolean hasCompleted = false;
@@ -197,7 +197,7 @@
       // Force TLS negotiation now.
       secureSocket.startHandshake();
       hasCompleted = true;
-      return new TLSSocketSession(socket, secureSocket);
+      return new Session(socket, secureSocket);
     }
     finally
     {
@@ -244,7 +244,7 @@
    *           If the protocol session could not be established for some other
    *           reason.
    */
-  public ProtocolSession createServerSession(final Socket socket,
+  public Session createServerSession(final Socket socket,
       final int soTimeout) throws ConfigException, IOException
   {
     boolean hasCompleted = false;
@@ -281,7 +281,7 @@
       // Force TLS negotiation now.
       secureSocket.startHandshake();
       hasCompleted = true;
-      return new TLSSocketSession(socket, secureSocket);
+      return new Session(socket, secureSocket);
     }
     catch (final SSLException e)
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/Session.java
similarity index 64%
rename from opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
rename to opendj-sdk/opends/src/server/org/opends/server/replication/protocol/Session.java
index f27c950..6a67b2a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/Session.java
@@ -40,21 +40,25 @@
 import java.io.OutputStream;
 import java.net.Socket;
 import java.net.SocketException;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.zip.DataFormatException;
 
 import javax.net.ssl.SSLSocket;
 
+import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.util.StaticUtils;
 
 
 
 /**
- * This class implements a protocol session using TLS.
+ * This class defines a replication session using TLS.
  */
-public final class TLSSocketSession implements ProtocolSession
+public final class Session extends DirectoryThread
 {
   /**
    * The tracer object for the debug logger.
@@ -86,7 +90,7 @@
    * connections.
    */
   private final Object stateLock = new Object();
-  private boolean closeInitiated = false;
+  private volatile boolean closeInitiated = false;
   private Throwable sessionError = null;
 
   /*
@@ -113,10 +117,13 @@
    */
   private BufferedOutputStream output;
 
-
+  private final LinkedBlockingQueue<byte[]> sendQueue =
+      new LinkedBlockingQueue<byte[]>(4000);
+  private AtomicBoolean isRunning = new AtomicBoolean(false);
+  private final CountDownLatch latch = new CountDownLatch(1);
 
   /**
-   * Creates a new TLSSocketSession.
+   * Creates a new Session.
    *
    * @param socket
    *          The regular Socket on which the SocketSession will be based.
@@ -125,13 +132,15 @@
    * @throws IOException
    *           When an IException happens on the socket.
    */
-  public TLSSocketSession(final Socket socket,
-      final SSLSocket secureSocket) throws IOException
+  public Session(final Socket socket,
+                 final SSLSocket secureSocket) throws IOException
   {
+    super("Replication Session from "+ socket.getLocalSocketAddress() +
+        " to " + socket.getRemoteSocketAddress());
     if (debugEnabled())
     {
       TRACER.debugInfo(
-          "Creating TLSSocketSession from %s to %s in %s",
+          "Creating Session from %s to %s in %s",
           socket.getLocalSocketAddress(),
           socket.getRemoteSocketAddress(),
           stackTraceToSingleLineString(new Exception()));
@@ -153,9 +162,9 @@
 
 
   /**
-   * {@inheritDoc}
+   * This method is called when the session with the remote must be closed.
+   * This object won't be used anymore after this method is called.
    */
-  @Override
   public void close()
   {
     Throwable localSessionError;
@@ -171,13 +180,21 @@
       closeInitiated = true;
     }
 
+    try {
+      this.interrupt();
+      this.join();
+    }
+    catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+
     // Perform close outside of critical section.
     if (debugEnabled())
     {
       if (localSessionError == null)
       {
         TRACER.debugInfo(
-            "Closing TLSSocketSession from %s to %s in %s",
+            "Closing Session from %s to %s in %s",
             plainSocket.getLocalSocketAddress(),
             plainSocket.getRemoteSocketAddress(),
             stackTraceToSingleLineString(new Exception()));
@@ -185,7 +202,7 @@
       else
       {
         TRACER.debugInfo(
-            "Aborting TLSSocketSession from %s to %s in %s due to the "
+            "Aborting Session from %s to %s in %s due to the "
                 + "following error: %s",
             plainSocket.getLocalSocketAddress(),
             plainSocket.getRemoteSocketAddress(),
@@ -199,20 +216,13 @@
     {
       if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
       {
-        if (publishLock.tryLock())
+        try
         {
-          try
-          {
-            publish(new StopMsg());
-          }
-          catch (final IOException ignored)
-          {
-            // Ignore errors on close.
-          }
-          finally
-          {
-            publishLock.unlock();
-          }
+          publish(new StopMsg());
+        }
+        catch (final IOException ignored)
+        {
+          // Ignore errors on close.
         }
       }
     }
@@ -223,9 +233,12 @@
 
 
   /**
-   * {@inheritDoc}
+   * This methods allows to determine if the session close was initiated
+   * on this Session.
+   *
+   * @return A boolean allowing to determine if the session close was initiated
+   * on this Session.
    */
-  @Override
   public boolean closeInitiated()
   {
     synchronized (stateLock)
@@ -237,9 +250,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Gets the time the last replication message was published on this
+   * session.
+   * @return The timestamp in milliseconds of the last message published.
    */
-  @Override
   public long getLastPublishTime()
   {
     return lastPublishTime;
@@ -248,9 +262,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Gets the time the last replication message was received on this
+   * session.
+   * @return The timestamp in milliseconds of the last message received.
    */
-  @Override
   public long getLastReceiveTime()
   {
     if (lastReceiveTime == 0)
@@ -263,9 +278,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Retrieve the local URL in the form host:port.
+   *
+   * @return The local URL.
    */
-  @Override
   public String getLocalUrl()
   {
     return localUrl;
@@ -274,9 +290,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Retrieve the human readable address of the remote server.
+   *
+   * @return The human readable address of the remote server.
    */
-  @Override
   public String getReadableRemoteAddress()
   {
     return readableRemoteAddress;
@@ -285,9 +302,10 @@
 
 
   /**
-   * {@inheritDoc}
+   * Retrieve the IP address of the remote server.
+   *
+   * @return The IP address of the remote server.
    */
-  @Override
   public String getRemoteAddress()
   {
     return remoteAddress;
@@ -296,9 +314,9 @@
 
 
   /**
-   * {@inheritDoc}
+   * Determine whether the session is using a security layer.
+   * @return true if the connection is encrypted, false otherwise.
    */
-  @Override
   public boolean isEncrypted()
   {
     return isEncrypted;
@@ -307,12 +325,38 @@
 
 
   /**
-   * {@inheritDoc}
+   * Sends a replication message to the remote peer.
+   *
+   * @param msg
+   *          The message to be sent.
+   * @throws IOException
+   *           If an IO error occurred.
    */
-  @Override
   public void publish(final ReplicationMsg msg) throws IOException
   {
     final byte[] buffer = msg.getBytes(protocolVersion);
+    if (isRunning.get())
+    {
+      try {
+        sendQueue.put(buffer);
+      }
+      catch (final InterruptedException e) {
+        setSessionError(e);
+        throw new IOException(e.getMessage());
+      }
+    } else {
+      send(buffer);
+    }
+  }
+
+  /** Sends a replication message already encoded to the socket.
+   *
+   * @param buffer
+   *          the encoded buffer
+   * @throws IOException if the message could not be sent
+   */
+  private void send(final byte[] buffer) throws IOException
+  {
     final String str = String.format("%08x", buffer.length);
     final byte[] sendLengthBuf = str.getBytes();
 
@@ -326,9 +370,7 @@
       output.write(sendLengthBuf);
       output.write(buffer);
       output.flush();
-    }
-    catch (final IOException e)
-    {
+    } catch (final IOException e) {
       setSessionError(e);
       throw e;
     }
@@ -343,9 +385,20 @@
 
 
   /**
-   * {@inheritDoc}
+   * Attempt to receive a ReplicationMsg.
+   * This method should block the calling thread until a
+   * ReplicationMsg is available or until an error condition.
+   *
+   * This method can only be called by a single thread and therefore does not
+   * need to implement any replication.
+   *
+   * @return The ReplicationMsg that was received.
+   * @throws IOException When error happened during IO process.
+   * @throws DataFormatException When the data received is not formatted as a
+   *         ReplicationMsg.
+   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
+   * an old protocol version and we do not support it.
    */
-  @Override
   public ReplicationMsg receive() throws IOException,
       DataFormatException, NotSupportedOldVersionPDUException
   {
@@ -432,22 +485,23 @@
   }
 
 
-
   /**
-   * {@inheritDoc}
+   * This method is called at the establishment of the session and can
+   * be used to record the version of the protocol that is currently used.
+   *
+   * @param version The version of the protocol that is currently used.
    */
-  @Override
   public void setProtocolVersion(final short version)
   {
     protocolVersion = version;
   }
 
 
-
   /**
-   * {@inheritDoc}
+   * Returns the version of the protocol that is currently used.
+   *
+   * @return The version of the protocol that is currently used.
    */
-  @Override
   public short getProtocolVersion()
   {
     return protocolVersion;
@@ -456,9 +510,16 @@
 
 
   /**
-   * {@inheritDoc}
+   * Set a timeout value.
+   * With this option set to a non-zero value, calls to the receive() method
+   * block for only this amount of time after which a
+   * java.net.SocketTimeoutException is raised.
+   * The Broker is valid and usable even after such an Exception is raised.
+   *
+   * @param timeout the specified timeout, in milliseconds.
+   * @throws SocketException if there is an error in the underlying protocol,
+   *         such as a TCP error.
    */
-  @Override
   public void setSoTimeout(final int timeout) throws SocketException
   {
     plainSocket.setSoTimeout(timeout);
@@ -467,10 +528,8 @@
 
 
   /**
-   * {@inheritDoc}
+   * Stop using the security layer, if there is any.
    */
-  @SuppressWarnings("unused")
-  @Override
   public void stopEncryption()
   {
     /*
@@ -500,4 +559,59 @@
       }
     }
   }
+
+  /**
+   * Run method for the Session.
+   * Loops waiting for buffers from the queue and sends them when available.
+   */
+  public void run()
+  {
+    isRunning.set(true);
+    latch.countDown();
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(this.getName() + " starting.");
+    }
+    boolean needClosing = false;
+    while (!closeInitiated)
+    {
+      byte[] buffer;
+      try
+      {
+        buffer = sendQueue.take();
+      }
+      catch (InterruptedException ie)
+      {
+        break;
+      }
+      try
+      {
+        send(buffer);
+      }
+      catch (IOException e)
+      {
+        setSessionError(e);
+        needClosing = true;
+      }
+    }
+    isRunning.set(false);
+    if (needClosing)
+    {
+      close();
+    }
+    if (debugEnabled())
+    {
+      TRACER.debugInfo(this.getName() + " stopped.");
+    }
+  }
+
+  /**
+   * This method can be called to wait until the session thread is
+   * properly started.
+   * @throws InterruptedException when interrupted
+   */
+  public void waitForStartup() throws InterruptedException
+  {
+    latch.await();
+  }
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/package-info.java b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/package-info.java
index 3d8fb2f..15c5a1d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/package-info.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/protocol/package-info.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS.
  */
 
 
@@ -35,8 +36,8 @@
  * The main classes of this packages are :
  * <br>
  * <ul>
- * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
- * implements the ProtocolSession interface that is
+ * <li><A HREF="Session.html"><B>Session</B></A>
+ * implements the session and protocol that is
  * used by the replication server and the directory server to communicate.
  * This is done by using the innate encoding/decoding capabilities of the
  * ReplicationMessages objects. This class is used by both the
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index ef55993..978dbea 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -90,7 +90,7 @@
    * @param rcvWindowSize The receiving window size.
    */
   public DataServerHandler(
-      ProtocolSession session,
+      Session session,
       int queueSize,
       String replicationServerURL,
       int replicationServerId,
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 4e35589..e0c4251 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -391,7 +391,7 @@
    * @param rcvWindowSize The receiving window size.
    */
   public ECLServerHandler(
-      ProtocolSession session,
+      Session session,
       int queueSize,
       String replicationServerURL,
       int replicationServerId,
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
index c718865..c2dd931 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -41,7 +41,7 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.protocol.DoneMsg;
 import org.opends.server.replication.protocol.ECLUpdateMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
@@ -61,7 +61,7 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private ProtocolSession session;
+  private Session session;
   private ECLServerHandler handler;
   private ReplicationServerDomain replicationServerDomain;
   private boolean suspended;
@@ -71,12 +71,12 @@
   /**
    * Create a ServerWriter.
    *
-   * @param session     the ProtocolSession that will be used to send updates.
+   * @param session     the Session that will be used to send updates.
    * @param handler     ECL handler for which the ServerWriter is created.
    * @param replicationServerDomain the ReplicationServerDomain of this
    *                    ServerWriter.
    */
-  public ECLServerWriter(ProtocolSession session, ECLServerHandler handler,
+  public ECLServerWriter(Session session, ECLServerHandler handler,
       ReplicationServerDomain replicationServerDomain)
   {
     super(session, handler, replicationServerDomain);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 3c23700..2cdea38 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -304,7 +304,7 @@
 
       try
       {
-        ProtocolSession session;
+        Session session;
         Socket newSocket = null;
         try
         {
@@ -485,7 +485,7 @@
                " connects to " + remoteServerURL);
 
     Socket socket = new Socket();
-    ProtocolSession session = null;
+    Session session = null;
     try
     {
       InetSocketAddress ServerAddr = new InetSocketAddress(
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index a6ebfc1..47f3d7f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -187,7 +187,7 @@
   // The timer used to run the timeout code (timer tasks) for the assured update
   // messages we are waiting acks for.
   private Timer assuredTimeoutTimer = null;
-  // Counter used to purge the timer tasks referemces in assuredTimeoutTimer,
+  // Counter used to purge the timer tasks references in assuredTimeoutTimer,
   // every n number of treated assured messages
   private int assuredTimeoutTimerPurgeCounter = 0;
 
@@ -588,17 +588,16 @@
             if (serverStatus == ServerStatus.DEGRADED_STATUS)
             {
               wrongStatusServers.add(handler.getServerId());
-            } else
-            {
-              /**
-               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
-               * We do not want this to be reported as an error to the update
-               * maker -> no pollution or potential misunderstanding when
-               * reading logs or monitoring and it was just administration (for
-               * instance new server is being configured in topo: it goes in bad
-               * gen then then full full update).
-               */
             }
+            /**
+             * else
+             * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
+             * We do not want this to be reported as an error to the update
+             * maker -> no pollution or potential misunderstanding when
+             * reading logs or monitoring and it was just administration (for
+             * instance new server is being configured in topo: it goes in bad
+             * gen then then full full update).
+             */
           }
         }
       }
@@ -685,19 +684,12 @@
           }
         } else
         { // A RS sent us the safe data message, for sure no further ack to wait
-          if (safeDataLevel == (byte) 1)
+          /**
+           * Level 1 has already been reached so no further acks to wait.
+           * Just deal with level > 1
+           */
+          if (safeDataLevel > (byte) 1)
           {
-            /**
-             * The original level was 1 so the RS that sent us this message
-             * should have already sent his ack to the sender DS. Level 1 has
-             * already been reached so no further acks to wait.
-             * This should not happen in theory as the sender RS server should
-             * have sent us a matching not assured message so we should not come
-             * to here.
-             */
-          } else
-          {
-            // level > 1, so Ack this message to originator RS
             sourceHandler.send(new AckMsg(cn));
           }
         }
@@ -815,11 +807,10 @@
           expectedAcksInfo.completed();
         }
       }
-    } else
-    {
-      // The timeout occurred for the update matching this change number and the
-      // ack with timeout error has probably already been sent.
     }
+    /* Else the timeout occurred for the update matching this change number
+     * and the ack with timeout error has probably already been sent.
+     */
   }
 
   /**
@@ -934,10 +925,8 @@
                   expectedServerInTimeout.
                     incrementAssuredSdSentUpdatesTimeout();
                 }
-              } else
-              {
-                // Server disappeared ? Let's forget about it.
               }
+              /* else server disappeared ? Let's forget about it. */
             }
           }
           // Mark the ack info object as completed to prevent potential
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
index 0291ac3..624e4f7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -136,7 +136,7 @@
    * @param rcvWindowSize The receiving window size.
    */
   public ReplicationServerHandler(
-      ProtocolSession session,
+      Session session,
       int queueSize,
       String replicationServerURL,
       int replicationServerId,
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 7be892c..d252635 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,11 +50,11 @@
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
 import org.opends.server.replication.protocol.HeartbeatThread;
-import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplicationMsg;
 import org.opends.server.replication.protocol.ResetGenerationIdMsg;
 import org.opends.server.replication.protocol.RoutableMsg;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.StartECLSessionMsg;
 import org.opends.server.replication.protocol.StartMsg;
 import org.opends.server.replication.protocol.StartSessionMsg;
@@ -88,7 +88,7 @@
    * @param providedMsg     The provided error message.
    * @param handler         The handler that manages that session.
    */
-  static protected void closeSession(ProtocolSession providedSession,
+  static protected void closeSession(Session providedSession,
       Message providedMsg, ServerHandler handler)
   {
     if (providedMsg != null)
@@ -118,7 +118,7 @@
   /**
    * The session opened with the remote server.
    */
-  protected ProtocolSession session;
+  protected Session session;
 
   /**
    * The serverURL of the remote server.
@@ -237,7 +237,7 @@
   /**
    * Creates a new server handler instance with the provided socket.
    *
-   * @param session The ProtocolSession used by the ServerHandler to
+   * @param session The Session used by the ServerHandler to
    *                 communicate with the remote entity.
    * @param queueSize The maximum number of update that will be kept
    *                  in memory by this ServerHandler.
@@ -247,7 +247,7 @@
    * @param rcvWindowSize The window size to receive from the remote server.
    */
   public ServerHandler(
-      ProtocolSession session,
+      Session session,
       int queueSize,
       String replicationServerURL,
       int replicationServerId,
@@ -271,7 +271,7 @@
     // We did not recognize the message, close session as what
     // can happen after is undetermined and we do not want the server to
     // be disturbed
-    ProtocolSession localSession = session;
+    Session localSession = session;
     if (localSession != null)
     {
       closeSession(localSession, reason, this);
@@ -366,6 +366,22 @@
           replicationServerDomain);
       reader = new ServerReader(session, this);
 
+      session.setName("Replication server RS("
+          + this.getReplicationServerId()
+          + ") session thread to " + this.toString() + " at "
+          + session.getReadableRemoteAddress());
+      session.start();
+      try
+      {
+        session.waitForStartup();
+      }
+      catch (InterruptedException e)
+      {
+        final Message message =
+            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
+        throw new DirectoryException(ResultCode.OTHER,
+            message, e);
+      }
       reader.start();
       writer.start();
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
index 9c5f4bd..c1dfb54 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -58,7 +58,7 @@
    * The tracer object for the debug logger.
    */
   private static final DebugTracer TRACER = getTracer();
-  private final ProtocolSession session;
+  private final Session session;
   private final ServerHandler handler;
   private final String remoteAddress;
 
@@ -68,11 +68,11 @@
    * Constructor for the LDAP server reader part of the replicationServer.
    *
    * @param session
-   *          The ProtocolSession from which to read the data.
+   *          The Session from which to read the data.
    * @param handler
    *          The server handler for this server reader.
    */
-  public ServerReader(ProtocolSession session, ServerHandler handler)
+  public ServerReader(Session session, ServerHandler handler)
   {
     super("Replication server RS(" + handler.getReplicationServerId()
         + ") reading from " + handler.toString() + " at "
@@ -314,18 +314,6 @@
         logError(errMessage);
       }
     }
-    catch (ClassNotFoundException e)
-    {
-      if (debugEnabled())
-        TRACER.debugInfo(
-            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
-      /*
-       * The remote server has sent an unknown message,
-       * close the connection.
-       */
-      errMessage = ERR_UNKNOWN_MESSAGE.get(handler.toString());
-      logError(errMessage);
-    }
     catch (Exception e)
     {
       if (debugEnabled())
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
index 4aae7d7..cd6ed72 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -40,7 +40,7 @@
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.UpdateMsg;
 
 
@@ -55,7 +55,7 @@
    */
   private static final DebugTracer TRACER = getTracer();
 
-  private final ProtocolSession session;
+  private final Session session;
   private final ServerHandler handler;
   private final ReplicationServerDomain replicationServerDomain;
 
@@ -66,13 +66,13 @@
    * for new updates and forward them to the server
    *
    * @param session
-   *          the ProtocolSession that will be used to send updates.
+   *          the Session that will be used to send updates.
    * @param handler
    *          handler for which the ServerWriter is created.
    * @param replicationServerDomain
    *          The ReplicationServerDomain of this ServerWriter.
    */
-  public ServerWriter(ProtocolSession session, ServerHandler handler,
+  public ServerWriter(Session session, ServerHandler handler,
       ReplicationServerDomain replicationServerDomain)
   {
     // Session may be null for ECLServerWriter.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
index 2174f2f..68a5f65 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
@@ -23,6 +23,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2013 ForgeRock AS.
  */
 
 
@@ -46,13 +47,6 @@
  * The main classes of this packages are :
  * <br>
  * <ul>
- * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
- * implements the ProtocolSession interface that is
- * used by the replication server and the directory server to communicate.
- * This is done by using the innate encoding/decoding capabilities of the
- * ReplicationMessages objects. This class is used by both the
- * replicationServer and the replication package.
- * </li>
  * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A>
  * implements the multiplexing part of the replication
  * server. It contains method for forwarding all the received messages to
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index 03ad1db..e14ab51 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2012 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 
 package org.opends.server.replication.service;
@@ -34,7 +34,7 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.TimeThread;
 
@@ -55,7 +55,7 @@
   /**
    * The session on which heartbeats are to be sent.
    */
-  private final ProtocolSession session;
+  private final Session session;
 
   /**
    * The time in milliseconds between heartbeats.
@@ -77,7 +77,7 @@
    *                          (in milliseconds).
    * @param serverId2 The serverId of the sender domain.
    */
-  public CTHeartbeatPublisherThread(String threadName, ProtocolSession session,
+  public CTHeartbeatPublisherThread(String threadName, Session session,
                   long heartbeatInterval, int serverId2)
   {
     super(threadName);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
index 6345f48..c532e5e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@
  *
  *
  *      Copyright 2007-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2012 ForgeRock AS
+ *      Portions Copyright 2011-2013 ForgeRock AS
  */
 
 package org.opends.server.replication.service;
@@ -33,7 +33,7 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.types.DebugLogLevel;
 
 import org.opends.server.api.DirectoryThread;
@@ -54,7 +54,7 @@
   /**
    * The session on which heartbeats are to be monitored.
    */
-  private final ProtocolSession session;
+  private final Session session;
 
 
   /**
@@ -93,7 +93,7 @@
    *          milliseconds).
    */
   HeartbeatMonitor(int serverID, int replicationServerID,
-      String baseDN, ProtocolSession session, long heartbeatInterval)
+      String baseDN, Session session, long heartbeatInterval)
   {
     super("Replica DS("
       + serverID + ") heartbeat monitor for domain \""
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 7ba4c8e..23de638 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -95,7 +95,7 @@
    */
   public final static String NO_CONNECTED_SERVER = "Not connected";
   private volatile String replicationServer = NO_CONNECTED_SERVER;
-  private volatile ProtocolSession session = null;
+  private volatile Session session = null;
   private final ServerState state;
   private final String baseDn;
   private final int serverId;
@@ -1230,7 +1230,7 @@
     String port = server.substring(separator + 1);
     String hostname = server.substring(0, separator);
 
-    ProtocolSession localSession = null;
+    Session localSession = null;
     Socket socket = null;
     boolean hasConnected = false;
     Message errorMessage = null;
@@ -2180,7 +2180,7 @@
    * @param failingSession the socket which failed
    * @param infiniteTry the socket which failed
    */
-  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
+  public void reStart(Session failingSession, boolean infiniteTry)
   {
     if (failingSession != null)
     {
@@ -2308,7 +2308,7 @@
       try
       {
         boolean credit;
-        ProtocolSession current_session;
+        Session current_session;
         Semaphore currentWindowSemaphore;
 
         /*
@@ -2465,7 +2465,7 @@
 
       // Save session information for later in case we need it for log messages
       // after the session has been closed and/or failed.
-      final ProtocolSession savedSession = session;
+      final Session savedSession = session;
       if (savedSession == null)
       {
         // Must be shutting down.
@@ -2612,7 +2612,7 @@
 
         if (!shutdown)
         {
-          final ProtocolSession tmpSession = session;
+          final Session tmpSession = session;
           if (tmpSession == null || !tmpSession.closeInitiated())
           {
             /*
@@ -2879,7 +2879,7 @@
    */
   public boolean isSessionEncrypted()
   {
-    final ProtocolSession tmp = session;
+    final Session tmp = session;
     return tmp != null ? tmp.isEncrypted() : false;
   }
 
@@ -3127,7 +3127,7 @@
    */
   String getLocalUrl()
   {
-    final ProtocolSession tmp = session;
+    final Session tmp = session;
     return tmp != null ? tmp.getLocalUrl() : "";
   }
 
@@ -3142,12 +3142,12 @@
     return monitor;
   }
 
-  private void setSession(final ProtocolSession newSession)
+  private void setSession(final Session newSession)
   {
     // De-register the monitor with the old name.
     deregisterReplicationMonitor();
 
-    final ProtocolSession oldSession = session;
+    final Session oldSession = session;
     if (oldSession != null)
     {
       oldSession.close();
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 4dc31ec..5bcc989 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -68,7 +68,7 @@
 import org.opends.server.replication.protocol.InitializeRcvAckMsg;
 import org.opends.server.replication.protocol.InitializeRequestMsg;
 import org.opends.server.replication.protocol.InitializeTargetMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -401,13 +401,13 @@
    * @param generationID            The current generationID of the
    *                                ReplicationServer with which the session
    *                                was established.
-   * @param session                 The ProtocolSession that is currently used.
+   * @param session                 The Session that is currently used.
    */
   public void sessionInitiated(
       ServerStatus initStatus,
       ServerState replicationServerState,
       long generationID,
-      ProtocolSession session)
+      Session session)
   {
     // Sanity check: is it a valid initial status?
     if (!isValidInitialStatus(initStatus))
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
index df8ff6f..13f15e1 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -27,25 +27,6 @@
  */
 package org.opends.server.replication;
 
-import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
-import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
-import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
-import static org.opends.server.loggers.ErrorLogger.logError;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.io.File;
-import java.net.SocketException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.concurrent.locks.Lock;
-
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
@@ -66,34 +47,34 @@
 import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.plugin.PersistentServerState;
-import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.schema.DirectoryStringSyntax;
 import org.opends.server.schema.IntegerSyntax;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.AttributeValue;
-import org.opends.server.types.AttributeValues;
-import org.opends.server.types.Attributes;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.DN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.LockManager;
-import org.opends.server.types.Modification;
-import org.opends.server.types.ModificationType;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchFilter;
-import org.opends.server.types.SearchResultEntry;
-import org.opends.server.types.SearchScope;
+import org.opends.server.types.*;
 import org.opends.server.util.StaticUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import java.io.File;
+import java.net.SocketException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.locks.Lock;
+
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.testng.Assert.*;
+
 /**
  * An abstract class that all Replication unit test should extend.
  */
@@ -900,7 +881,7 @@
    * Add a task to the configuration of the current running DS.
    * @param taskEntry The task to add.
    * @param expectedResult The expected result code for the ADD.
-   * @param errorMessageID The expected error messageID when the expected
+   * @param errorMessage The expected error message when the expected
    * result code is not SUCCESS
    */
   protected void addTask(Entry taskEntry, ResultCode expectedResult,
@@ -1168,7 +1149,7 @@
    * @param msgType Class of the message we are waiting for.
    * @return The expected message if it comes in time or fails (assertion).
    */
-  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
+  protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) {
 
     ReplicationMsg replMsg = null;
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 566a878..c2ba0ef 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -52,39 +52,17 @@
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.replication.ReplicationTestCase;
-import org.opends.server.replication.common.AssuredMode;
-import org.opends.server.replication.common.ChangeNumberGenerator;
-import org.opends.server.replication.common.DSInfo;
-import org.opends.server.replication.common.RSInfo;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.common.ServerStatus;
-import org.opends.server.replication.protocol.AckMsg;
-import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ReplServerStartMsg;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ServerStartMsg;
-import org.opends.server.replication.protocol.StartSessionMsg;
-import org.opends.server.replication.protocol.StopMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.types.Attribute;
-import org.opends.server.types.AttributeValue;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.DN;
-import org.opends.server.types.Entry;
-import org.opends.server.types.Operation;
-import org.opends.server.types.ResultCode;
-import org.opends.server.types.SearchResultEntry;
-import org.opends.server.types.SearchScope;
+import org.opends.server.replication.common.*;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.types.*;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
-import static org.opends.server.TestCaseUtils.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 import static org.testng.Assert.*;
 
 /**
@@ -279,7 +257,7 @@
 
     private ServerSocket listenSocket;
     private boolean shutdown = false;
-    private ProtocolSession session = null;
+    private Session session = null;
 
     // Parameters given at constructor time
     private final int port;
@@ -626,8 +604,7 @@
         session.publish(addMsg);
 
         // Read and return matching ack
-        AckMsg ackMsg = (AckMsg)session.receive();
-        return ackMsg;
+        return (AckMsg)session.receive();
 
       } catch(SocketTimeoutException e)
       {
@@ -889,7 +866,7 @@
         replicationServer.setAssured(false);
       replicationServer.start(TIMEOUT_SCENARIO);
 
-      long startTime = System.currentTimeMillis();
+      long startTime;
       // Create a safe data assured domain
       if (rsGroupId == (byte)1)
       {
@@ -998,7 +975,7 @@
   {
 
     int TIMEOUT = 5000;
-    String testcase = "testSafeReadModeTimeout" + rsGroupId;;
+    String testcase = "testSafeReadModeTimeout" + rsGroupId;
     try
     {
       // Create and start a RS expecting clients in safe read assured mode
@@ -1008,7 +985,7 @@
         replicationServer.setAssured(false);
       replicationServer.start(TIMEOUT_SCENARIO);
 
-      long startTime = 0;
+      long startTime;
 
       // Create a safe data assured domain
       if (rsGroupId == (byte)1)
@@ -1363,10 +1340,9 @@
         "objectClass: organizationalUnit\n";
       Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
       String parentUid = getEntryUUID(DN.decode(SAFE_READ_DN));
-      AckMsg ackMsg = null;
 
       try {
-        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+        AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
 
          if (rsGroupId == (byte)2)
            fail("Should only go here for RS with same group id as DS");
@@ -1461,7 +1437,7 @@
       Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
       String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
 
-      AckMsg ackMsg = null;
+      AckMsg ackMsg;
       try
       {
         ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
@@ -1853,7 +1829,7 @@
     /*
      * Find the multi valued attribute matching the requested assured mode
      */
-    String assuredAttr = null;
+    String assuredAttr;
     switch(assuredMode)
     {
       case SAFE_READ_MODE:
@@ -1873,19 +1849,15 @@
       return resultMap; // Empty map
 
     Attribute attr = attrs.get(0);
-    Iterator<AttributeValue> attValIt = attr.iterator();
     // Parse and store values
-    while (attValIt.hasNext())
-    {
-      String srvStr = attValIt.next().toString();
+    for (AttributeValue val : attr) {
+      String srvStr = val.toString();
       StringTokenizer strtok = new StringTokenizer(srvStr, ":");
       String token = strtok.nextToken();
-      if (token != null)
-      {
+      if (token != null) {
         int serverId = Integer.valueOf(token);
         token = strtok.nextToken();
-        if (token != null)
-        {
+        if (token != null) {
           Integer nerrors = Integer.valueOf(token);
           resultMap.put(serverId, nerrors);
         }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index 7c136e8..7b404cb 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -62,7 +62,7 @@
 import org.opends.server.replication.protocol.AckMsg;
 import org.opends.server.replication.protocol.DeleteMsg;
 import org.opends.server.replication.protocol.ErrorMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.Session;
 import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
 import org.opends.server.replication.protocol.ReplicationMsg;
@@ -673,7 +673,7 @@
       ServerStatus initStatus,
       ServerState replicationServerState,
       long generationId,
-      ProtocolSession session)
+      Session session)
     {
       super.sessionInitiated(initStatus, replicationServerState, generationId, session);
     }
@@ -813,7 +813,7 @@
   {
 
     private boolean shutdown = false;
-    private ProtocolSession session = null;
+    private Session session = null;
 
     /** Parameters given at constructor time */
     private int port;
@@ -1942,7 +1942,7 @@
         } else
         {
           // Already errors for this server, increment the value
-          int newVal = prevInt.intValue() + 1;
+          int newVal = prevInt + 1;
           prevServerErrors.put(serverId, newVal);
         }
       }
@@ -2018,11 +2018,8 @@
    */
   private boolean areGroupAndGenerationIdOk(int fakeRsGid, long fakeRsGenId)
   {
-    if ((fakeRsGid != -1) && (fakeRsGenId != -1L))
-    {
-      return ( (fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID) );
-    }
-    return false;
+    return (fakeRsGid != -1) && (fakeRsGenId != -1L) &&
+        ((fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID));
   }
 
   /**
@@ -2030,7 +2027,10 @@
    * data assured update and that are expected to effectively ack the update. If
    * -1 is used, the server is out of scope
    */
-  private List<Integer> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
+  private List<Integer> computeExpectedServersSafeData(
+      int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen,
+      int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+      int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
   {
     List<Integer> exptectedServers = new ArrayList<Integer>();
     if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
@@ -2650,7 +2650,7 @@
        */
 
       fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
-        otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false),
+        otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID),
         AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
         otherFakeDsScen);
       assertNotNull(fakeRd3);
@@ -2669,7 +2669,7 @@
        */
 
       fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID,
-        otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false),
+        otherFakeRsGenId, (otherFakeRsGid == DEFAULT_GID),
         AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen);
       assertNotNull(fakeRs2);
 
@@ -2684,13 +2684,14 @@
       long sendUpdateTime = System.currentTimeMillis() - startTime;
 
       // Compute some thing that will help determine what to check according to
-      // the current test configurarion: compute if DS and RS subject to conf
+      // the current test configuration: compute if DS and RS subject to conf
       // change are eligible and expected for safe read assured
       // eligible: the server should receive the ack request
       // expected: the server should send back an ack (with or without error)
       boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId);
       boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId);
       boolean dsIsExpected = false;
+      boolean rsIsExpected = false;
       // Booleans to tell if we expect to see the timeout, wrong status and replay error flags
       boolean shouldSeeTimeout = false;
       boolean shouldSeeWrongStatus = false;
@@ -2723,6 +2724,7 @@
         switch (otherFakeRsScen)
         {
           case REPLY_OK_RS_SCENARIO:
+            rsIsExpected = true;
             break;
           case TIMEOUT_RS_SCENARIO:
             shouldSeeRsIdInError = true;
@@ -3443,7 +3445,7 @@
 
       // DS 2 connected to RS 2
       fakeRd2 = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID,
-        fakeDsGenId, (fakeDsGid == DEFAULT_GID ? true : false),
+        fakeDsGenId, (fakeDsGid == DEFAULT_GID),
         AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen);
       assertNotNull(fakeRd2);
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
index f0ec8f4..aa77ded 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -60,28 +60,14 @@
 import org.opends.server.protocols.ldap.LDAPControl;
 import org.opends.server.protocols.ldap.LDAPFilter;
 import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.plugin.ReplicationServerListener;
-import org.opends.server.replication.protocol.AddMsg;
-import org.opends.server.replication.protocol.DeleteMsg;
-import org.opends.server.replication.protocol.ModifyDNMsg;
-import org.opends.server.replication.protocol.ModifyDnContext;
-import org.opends.server.replication.protocol.ModifyMsg;
-import org.opends.server.replication.protocol.ProtocolSession;
-import org.opends.server.replication.protocol.ReplServerStartDSMsg;
-import org.opends.server.replication.protocol.ReplSessionSecurity;
-import org.opends.server.replication.protocol.ReplicationMsg;
-import org.opends.server.replication.protocol.ServerStartMsg;
-import org.opends.server.replication.protocol.StartSessionMsg;
-import org.opends.server.replication.protocol.TopologyMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.protocol.WindowMsg;
-import org.opends.server.replication.protocol.WindowProbeMsg;
-import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.tools.LDAPModify;
 import org.opends.server.tools.LDAPSearch;
 import org.opends.server.types.*;
@@ -975,7 +961,7 @@
     int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
     socket.connect(ServerAddr, timeoutMS);
     ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
-    ProtocolSession session = replSessionSecurity.createClientSession(socket,
+    Session session = replSessionSecurity.createClientSession(socket,
         timeoutMS);
 
     boolean sslEncryption =

--
Gitblit v1.10.0