mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
20.02.2013 a32bc83b64f0f82d731980a433a2180fe4f366f3
opendj-sdk/opends/src/messages/messages/replication.properties
@@ -542,4 +542,5 @@
SEVERE_WARN_INVALID_SYNC_HIST_VALUE_214=The attribute value '%s' is not a valid \
 synchronization history value
SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \
 failed to parse change record with changenumber %s from the database. Error: %s
 failed to parse change record with changenumber %s from the database. Error: %s
SEVERE_ERR_SESSION_STARTUP_INTERRUPTED_216=%s was interrupted in the startup phase
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -86,19 +86,7 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
@@ -4581,7 +4569,7 @@
      ServerStatus initStatus,
      ServerState replicationServerState,
      long generationID,
      ProtocolSession session)
      Session session)
  {
    // Check domain fractional configuration consistency with local
    // configuration variables
@@ -4876,7 +4864,7 @@
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    // Ignore message if fractional configuration is inconcsistent and
    // Ignore message if fractional configuration is inconsistent and
    // we have been passed into bad data set status
    if (forceBadDataSet)
    {
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -57,7 +57,7 @@
  /**
   * The session on which heartbeats are to be sent.
   */
  private final ProtocolSession session;
  private final Session session;
  /**
@@ -80,7 +80,7 @@
   * @param heartbeatInterval The desired interval between heartbeats in
   * milliseconds.
   */
  public HeartbeatThread(String threadName, ProtocolSession session,
  public HeartbeatThread(String threadName, Session session,
                  long heartbeatInterval)
  {
    super(threadName);
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
File was deleted
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -161,7 +161,7 @@
   *           If the protocol session could not be established for some other
   *           reason.
   */
  public ProtocolSession createClientSession(final Socket socket,
  public Session createClientSession(final Socket socket,
      final int soTimeout) throws ConfigException, IOException
  {
    boolean hasCompleted = false;
@@ -197,7 +197,7 @@
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      hasCompleted = true;
      return new TLSSocketSession(socket, secureSocket);
      return new Session(socket, secureSocket);
    }
    finally
    {
@@ -244,7 +244,7 @@
   *           If the protocol session could not be established for some other
   *           reason.
   */
  public ProtocolSession createServerSession(final Socket socket,
  public Session createServerSession(final Socket socket,
      final int soTimeout) throws ConfigException, IOException
  {
    boolean hasCompleted = false;
@@ -281,7 +281,7 @@
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      hasCompleted = true;
      return new TLSSocketSession(socket, secureSocket);
      return new Session(socket, secureSocket);
    }
    catch (final SSLException e)
    {
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/Session.java
File was renamed from opendj-sdk/opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -40,21 +40,25 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import javax.net.ssl.SSLSocket;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.StaticUtils;
/**
 * This class implements a protocol session using TLS.
 * This class defines a replication session using TLS.
 */
public final class TLSSocketSession implements ProtocolSession
public final class Session extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
@@ -86,7 +90,7 @@
   * connections.
   */
  private final Object stateLock = new Object();
  private boolean closeInitiated = false;
  private volatile boolean closeInitiated = false;
  private Throwable sessionError = null;
  /*
@@ -113,10 +117,13 @@
   */
  private BufferedOutputStream output;
  private final LinkedBlockingQueue<byte[]> sendQueue =
      new LinkedBlockingQueue<byte[]>(4000);
  private AtomicBoolean isRunning = new AtomicBoolean(false);
  private final CountDownLatch latch = new CountDownLatch(1);
  /**
   * Creates a new TLSSocketSession.
   * Creates a new Session.
   *
   * @param socket
   *          The regular Socket on which the SocketSession will be based.
@@ -125,13 +132,15 @@
   * @throws IOException
   *           When an IException happens on the socket.
   */
  public TLSSocketSession(final Socket socket,
      final SSLSocket secureSocket) throws IOException
  public Session(final Socket socket,
                 final SSLSocket secureSocket) throws IOException
  {
    super("Replication Session from "+ socket.getLocalSocketAddress() +
        " to " + socket.getRemoteSocketAddress());
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "Creating TLSSocketSession from %s to %s in %s",
          "Creating Session from %s to %s in %s",
          socket.getLocalSocketAddress(),
          socket.getRemoteSocketAddress(),
          stackTraceToSingleLineString(new Exception()));
@@ -153,9 +162,9 @@
  /**
   * {@inheritDoc}
   * This method is called when the session with the remote must be closed.
   * This object won't be used anymore after this method is called.
   */
  @Override
  public void close()
  {
    Throwable localSessionError;
@@ -171,13 +180,21 @@
      closeInitiated = true;
    }
    try {
      this.interrupt();
      this.join();
    }
    catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    // Perform close outside of critical section.
    if (debugEnabled())
    {
      if (localSessionError == null)
      {
        TRACER.debugInfo(
            "Closing TLSSocketSession from %s to %s in %s",
            "Closing Session from %s to %s in %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
            stackTraceToSingleLineString(new Exception()));
@@ -185,7 +202,7 @@
      else
      {
        TRACER.debugInfo(
            "Aborting TLSSocketSession from %s to %s in %s due to the "
            "Aborting Session from %s to %s in %s due to the "
                + "following error: %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
@@ -199,20 +216,13 @@
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        if (publishLock.tryLock())
        try
        {
          try
          {
            publish(new StopMsg());
          }
          catch (final IOException ignored)
          {
            // Ignore errors on close.
          }
          finally
          {
            publishLock.unlock();
          }
          publish(new StopMsg());
        }
        catch (final IOException ignored)
        {
          // Ignore errors on close.
        }
      }
    }
@@ -223,9 +233,12 @@
  /**
   * {@inheritDoc}
   * This methods allows to determine if the session close was initiated
   * on this Session.
   *
   * @return A boolean allowing to determine if the session close was initiated
   * on this Session.
   */
  @Override
  public boolean closeInitiated()
  {
    synchronized (stateLock)
@@ -237,9 +250,10 @@
  /**
   * {@inheritDoc}
   * Gets the time the last replication message was published on this
   * session.
   * @return The timestamp in milliseconds of the last message published.
   */
  @Override
  public long getLastPublishTime()
  {
    return lastPublishTime;
@@ -248,9 +262,10 @@
  /**
   * {@inheritDoc}
   * Gets the time the last replication message was received on this
   * session.
   * @return The timestamp in milliseconds of the last message received.
   */
  @Override
  public long getLastReceiveTime()
  {
    if (lastReceiveTime == 0)
@@ -263,9 +278,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the local URL in the form host:port.
   *
   * @return The local URL.
   */
  @Override
  public String getLocalUrl()
  {
    return localUrl;
@@ -274,9 +290,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the human readable address of the remote server.
   *
   * @return The human readable address of the remote server.
   */
  @Override
  public String getReadableRemoteAddress()
  {
    return readableRemoteAddress;
@@ -285,9 +302,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the IP address of the remote server.
   *
   * @return The IP address of the remote server.
   */
  @Override
  public String getRemoteAddress()
  {
    return remoteAddress;
@@ -296,9 +314,9 @@
  /**
   * {@inheritDoc}
   * Determine whether the session is using a security layer.
   * @return true if the connection is encrypted, false otherwise.
   */
  @Override
  public boolean isEncrypted()
  {
    return isEncrypted;
@@ -307,12 +325,38 @@
  /**
   * {@inheritDoc}
   * Sends a replication message to the remote peer.
   *
   * @param msg
   *          The message to be sent.
   * @throws IOException
   *           If an IO error occurred.
   */
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    final byte[] buffer = msg.getBytes(protocolVersion);
    if (isRunning.get())
    {
      try {
        sendQueue.put(buffer);
      }
      catch (final InterruptedException e) {
        setSessionError(e);
        throw new IOException(e.getMessage());
      }
    } else {
      send(buffer);
    }
  }
  /** Sends a replication message already encoded to the socket.
   *
   * @param buffer
   *          the encoded buffer
   * @throws IOException if the message could not be sent
   */
  private void send(final byte[] buffer) throws IOException
  {
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
@@ -326,9 +370,7 @@
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
    }
    catch (final IOException e)
    {
    } catch (final IOException e) {
      setSessionError(e);
      throw e;
    }
@@ -343,9 +385,20 @@
  /**
   * {@inheritDoc}
   * Attempt to receive a ReplicationMsg.
   * This method should block the calling thread until a
   * ReplicationMsg is available or until an error condition.
   *
   * This method can only be called by a single thread and therefore does not
   * need to implement any replication.
   *
   * @return The ReplicationMsg that was received.
   * @throws IOException When error happened during IO process.
   * @throws DataFormatException When the data received is not formatted as a
   *         ReplicationMsg.
   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
   * an old protocol version and we do not support it.
   */
  @Override
  public ReplicationMsg receive() throws IOException,
      DataFormatException, NotSupportedOldVersionPDUException
  {
@@ -432,22 +485,23 @@
  }
  /**
   * {@inheritDoc}
   * This method is called at the establishment of the session and can
   * be used to record the version of the protocol that is currently used.
   *
   * @param version The version of the protocol that is currently used.
   */
  @Override
  public void setProtocolVersion(final short version)
  {
    protocolVersion = version;
  }
  /**
   * {@inheritDoc}
   * Returns the version of the protocol that is currently used.
   *
   * @return The version of the protocol that is currently used.
   */
  @Override
  public short getProtocolVersion()
  {
    return protocolVersion;
@@ -456,9 +510,16 @@
  /**
   * {@inheritDoc}
   * Set a timeout value.
   * With this option set to a non-zero value, calls to the receive() method
   * block for only this amount of time after which a
   * java.net.SocketTimeoutException is raised.
   * The Broker is valid and usable even after such an Exception is raised.
   *
   * @param timeout the specified timeout, in milliseconds.
   * @throws SocketException if there is an error in the underlying protocol,
   *         such as a TCP error.
   */
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    plainSocket.setSoTimeout(timeout);
@@ -467,10 +528,8 @@
  /**
   * {@inheritDoc}
   * Stop using the security layer, if there is any.
   */
  @SuppressWarnings("unused")
  @Override
  public void stopEncryption()
  {
    /*
@@ -500,4 +559,59 @@
      }
    }
  }
  /**
   * Run method for the Session.
   * Loops waiting for buffers from the queue and sends them when available.
   */
  public void run()
  {
    isRunning.set(true);
    latch.countDown();
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " starting.");
    }
    boolean needClosing = false;
    while (!closeInitiated)
    {
      byte[] buffer;
      try
      {
        buffer = sendQueue.take();
      }
      catch (InterruptedException ie)
      {
        break;
      }
      try
      {
        send(buffer);
      }
      catch (IOException e)
      {
        setSessionError(e);
        needClosing = true;
      }
    }
    isRunning.set(false);
    if (needClosing)
    {
      close();
    }
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " stopped.");
    }
  }
  /**
   * This method can be called to wait until the session thread is
   * properly started.
   * @throws InterruptedException when interrupted
   */
  public void waitForStartup() throws InterruptedException
  {
    latch.await();
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/protocol/package-info.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 */
@@ -35,8 +36,8 @@
 * The main classes of this packages are :
 * <br>
 * <ul>
 * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
 * implements the ProtocolSession interface that is
 * <li><A HREF="Session.html"><B>Session</B></A>
 * implements the session and protocol that is
 * used by the replication server and the directory server to communicate.
 * This is done by using the innate encoding/decoding capabilities of the
 * ReplicationMessages objects. This class is used by both the
opendj-sdk/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -90,7 +90,7 @@
   * @param rcvWindowSize The receiving window size.
   */
  public DataServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -391,7 +391,7 @@
   * @param rcvWindowSize The receiving window size.
   */
  public ECLServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -41,7 +41,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.ECLUpdateMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
@@ -61,7 +61,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private ProtocolSession session;
  private Session session;
  private ECLServerHandler handler;
  private ReplicationServerDomain replicationServerDomain;
  private boolean suspended;
@@ -71,12 +71,12 @@
  /**
   * Create a ServerWriter.
   *
   * @param session     the ProtocolSession that will be used to send updates.
   * @param session     the Session that will be used to send updates.
   * @param handler     ECL handler for which the ServerWriter is created.
   * @param replicationServerDomain the ReplicationServerDomain of this
   *                    ServerWriter.
   */
  public ECLServerWriter(ProtocolSession session, ECLServerHandler handler,
  public ECLServerWriter(Session session, ECLServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    super(session, handler, replicationServerDomain);
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -304,7 +304,7 @@
      try
      {
        ProtocolSession session;
        Session session;
        Socket newSocket = null;
        try
        {
@@ -485,7 +485,7 @@
               " connects to " + remoteServerURL);
    Socket socket = new Socket();
    ProtocolSession session = null;
    Session session = null;
    try
    {
      InetSocketAddress ServerAddr = new InetSocketAddress(
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -187,7 +187,7 @@
  // The timer used to run the timeout code (timer tasks) for the assured update
  // messages we are waiting acks for.
  private Timer assuredTimeoutTimer = null;
  // Counter used to purge the timer tasks referemces in assuredTimeoutTimer,
  // Counter used to purge the timer tasks references in assuredTimeoutTimer,
  // every n number of treated assured messages
  private int assuredTimeoutTimerPurgeCounter = 0;
@@ -588,17 +588,16 @@
            if (serverStatus == ServerStatus.DEGRADED_STATUS)
            {
              wrongStatusServers.add(handler.getServerId());
            } else
            {
              /**
               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
               * We do not want this to be reported as an error to the update
               * maker -> no pollution or potential misunderstanding when
               * reading logs or monitoring and it was just administration (for
               * instance new server is being configured in topo: it goes in bad
               * gen then then full full update).
               */
            }
            /**
             * else
             * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
             * We do not want this to be reported as an error to the update
             * maker -> no pollution or potential misunderstanding when
             * reading logs or monitoring and it was just administration (for
             * instance new server is being configured in topo: it goes in bad
             * gen then then full full update).
             */
          }
        }
      }
@@ -685,19 +684,12 @@
          }
        } else
        { // A RS sent us the safe data message, for sure no further ack to wait
          if (safeDataLevel == (byte) 1)
          /**
           * Level 1 has already been reached so no further acks to wait.
           * Just deal with level > 1
           */
          if (safeDataLevel > (byte) 1)
          {
            /**
             * The original level was 1 so the RS that sent us this message
             * should have already sent his ack to the sender DS. Level 1 has
             * already been reached so no further acks to wait.
             * This should not happen in theory as the sender RS server should
             * have sent us a matching not assured message so we should not come
             * to here.
             */
          } else
          {
            // level > 1, so Ack this message to originator RS
            sourceHandler.send(new AckMsg(cn));
          }
        }
@@ -815,11 +807,10 @@
          expectedAcksInfo.completed();
        }
      }
    } else
    {
      // The timeout occurred for the update matching this change number and the
      // ack with timeout error has probably already been sent.
    }
    /* Else the timeout occurred for the update matching this change number
     * and the ack with timeout error has probably already been sent.
     */
  }
  /**
@@ -934,10 +925,8 @@
                  expectedServerInTimeout.
                    incrementAssuredSdSentUpdatesTimeout();
                }
              } else
              {
                // Server disappeared ? Let's forget about it.
              }
              /* else server disappeared ? Let's forget about it. */
            }
          }
          // Mark the ack info object as completed to prevent potential
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -136,7 +136,7 @@
   * @param rcvWindowSize The receiving window size.
   */
  public ReplicationServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,11 +50,11 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
@@ -88,7 +88,7 @@
   * @param providedMsg     The provided error message.
   * @param handler         The handler that manages that session.
   */
  static protected void closeSession(ProtocolSession providedSession,
  static protected void closeSession(Session providedSession,
      Message providedMsg, ServerHandler handler)
  {
    if (providedMsg != null)
@@ -118,7 +118,7 @@
  /**
   * The session opened with the remote server.
   */
  protected ProtocolSession session;
  protected Session session;
  /**
   * The serverURL of the remote server.
@@ -237,7 +237,7 @@
  /**
   * Creates a new server handler instance with the provided socket.
   *
   * @param session The ProtocolSession used by the ServerHandler to
   * @param session The Session used by the ServerHandler to
   *                 communicate with the remote entity.
   * @param queueSize The maximum number of update that will be kept
   *                  in memory by this ServerHandler.
@@ -247,7 +247,7 @@
   * @param rcvWindowSize The window size to receive from the remote server.
   */
  public ServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
@@ -271,7 +271,7 @@
    // We did not recognize the message, close session as what
    // can happen after is undetermined and we do not want the server to
    // be disturbed
    ProtocolSession localSession = session;
    Session localSession = session;
    if (localSession != null)
    {
      closeSession(localSession, reason, this);
@@ -366,6 +366,22 @@
          replicationServerDomain);
      reader = new ServerReader(session, this);
      session.setName("Replication server RS("
          + this.getReplicationServerId()
          + ") session thread to " + this.toString() + " at "
          + session.getReadableRemoteAddress());
      session.start();
      try
      {
        session.waitForStartup();
      }
      catch (InterruptedException e)
      {
        final Message message =
            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
      reader.start();
      writer.start();
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -58,7 +58,7 @@
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private final ProtocolSession session;
  private final Session session;
  private final ServerHandler handler;
  private final String remoteAddress;
@@ -68,11 +68,11 @@
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session
   *          The ProtocolSession from which to read the data.
   *          The Session from which to read the data.
   * @param handler
   *          The server handler for this server reader.
   */
  public ServerReader(ProtocolSession session, ServerHandler handler)
  public ServerReader(Session session, ServerHandler handler)
  {
    super("Replication server RS(" + handler.getReplicationServerId()
        + ") reading from " + handler.toString() + " at "
@@ -314,18 +314,6 @@
        logError(errMessage);
      }
    }
    catch (ClassNotFoundException e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
       */
      errMessage = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(errMessage);
    }
    catch (Exception e)
    {
      if (debugEnabled())
opendj-sdk/opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -40,7 +40,7 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -55,7 +55,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private final ProtocolSession session;
  private final Session session;
  private final ServerHandler handler;
  private final ReplicationServerDomain replicationServerDomain;
@@ -66,13 +66,13 @@
   * for new updates and forward them to the server
   *
   * @param session
   *          the ProtocolSession that will be used to send updates.
   *          the Session that will be used to send updates.
   * @param handler
   *          handler for which the ServerWriter is created.
   * @param replicationServerDomain
   *          The ReplicationServerDomain of this ServerWriter.
   */
  public ServerWriter(ProtocolSession session, ServerHandler handler,
  public ServerWriter(Session session, ServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    // Session may be null for ECLServerWriter.
opendj-sdk/opends/src/server/org/opends/server/replication/server/package-info.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 */
@@ -46,13 +47,6 @@
 * The main classes of this packages are :
 * <br>
 * <ul>
 * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
 * implements the ProtocolSession interface that is
 * used by the replication server and the directory server to communicate.
 * This is done by using the innate encoding/decoding capabilities of the
 * ReplicationMessages objects. This class is used by both the
 * replicationServer and the replication package.
 * </li>
 * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A>
 * implements the multiplexing part of the replication
 * server. It contains method for forwarding all the received messages to
opendj-sdk/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -34,7 +34,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.TimeThread;
@@ -55,7 +55,7 @@
  /**
   * The session on which heartbeats are to be sent.
   */
  private final ProtocolSession session;
  private final Session session;
  /**
   * The time in milliseconds between heartbeats.
@@ -77,7 +77,7 @@
   *                          (in milliseconds).
   * @param serverId2 The serverId of the sender domain.
   */
  public CTHeartbeatPublisherThread(String threadName, ProtocolSession session,
  public CTHeartbeatPublisherThread(String threadName, Session session,
                  long heartbeatInterval, int serverId2)
  {
    super(threadName);
opendj-sdk/opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -33,7 +33,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.api.DirectoryThread;
@@ -54,7 +54,7 @@
  /**
   * The session on which heartbeats are to be monitored.
   */
  private final ProtocolSession session;
  private final Session session;
  /**
@@ -93,7 +93,7 @@
   *          milliseconds).
   */
  HeartbeatMonitor(int serverID, int replicationServerID,
      String baseDN, ProtocolSession session, long heartbeatInterval)
      String baseDN, Session session, long heartbeatInterval)
  {
    super("Replica DS("
      + serverID + ") heartbeat monitor for domain \""
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -95,7 +95,7 @@
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile ProtocolSession session = null;
  private volatile Session session = null;
  private final ServerState state;
  private final String baseDn;
  private final int serverId;
@@ -1230,7 +1230,7 @@
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    ProtocolSession localSession = null;
    Session localSession = null;
    Socket socket = null;
    boolean hasConnected = false;
    Message errorMessage = null;
@@ -2180,7 +2180,7 @@
   * @param failingSession the socket which failed
   * @param infiniteTry the socket which failed
   */
  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
  public void reStart(Session failingSession, boolean infiniteTry)
  {
    if (failingSession != null)
    {
@@ -2308,7 +2308,7 @@
      try
      {
        boolean credit;
        ProtocolSession current_session;
        Session current_session;
        Semaphore currentWindowSemaphore;
        /*
@@ -2465,7 +2465,7 @@
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final ProtocolSession savedSession = session;
      final Session savedSession = session;
      if (savedSession == null)
      {
        // Must be shutting down.
@@ -2612,7 +2612,7 @@
        if (!shutdown)
        {
          final ProtocolSession tmpSession = session;
          final Session tmpSession = session;
          if (tmpSession == null || !tmpSession.closeInitiated())
          {
            /*
@@ -2879,7 +2879,7 @@
   */
  public boolean isSessionEncrypted()
  {
    final ProtocolSession tmp = session;
    final Session tmp = session;
    return tmp != null ? tmp.isEncrypted() : false;
  }
@@ -3127,7 +3127,7 @@
   */
  String getLocalUrl()
  {
    final ProtocolSession tmp = session;
    final Session tmp = session;
    return tmp != null ? tmp.getLocalUrl() : "";
  }
@@ -3142,12 +3142,12 @@
    return monitor;
  }
  private void setSession(final ProtocolSession newSession)
  private void setSession(final Session newSession)
  {
    // De-register the monitor with the old name.
    deregisterReplicationMonitor();
    final ProtocolSession oldSession = session;
    final Session oldSession = session;
    if (oldSession != null)
    {
      oldSession.close();
opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -68,7 +68,7 @@
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -401,13 +401,13 @@
   * @param generationID            The current generationID of the
   *                                ReplicationServer with which the session
   *                                was established.
   * @param session                 The ProtocolSession that is currently used.
   * @param session                 The Session that is currently used.
   */
  public void sessionInitiated(
      ServerStatus initStatus,
      ServerState replicationServerState,
      long generationID,
      ProtocolSession session)
      Session session)
  {
    // Sanity check: is it a valid initial status?
    if (!isValidInitialStatus(initStatus))
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -27,25 +27,6 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -66,34 +47,34 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.LockManager;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.*;
/**
 * An abstract class that all Replication unit test should extend.
 */
@@ -900,7 +881,7 @@
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * @param errorMessage The expected error message when the expected
   * result code is not SUCCESS
   */
  protected void addTask(Entry taskEntry, ResultCode expectedResult,
@@ -1168,7 +1149,7 @@
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
  protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) {
    ReplicationMsg replMsg = null;
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -52,39 +52,17 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.testng.Assert.*;
/**
@@ -279,7 +257,7 @@
    private ServerSocket listenSocket;
    private boolean shutdown = false;
    private ProtocolSession session = null;
    private Session session = null;
    // Parameters given at constructor time
    private final int port;
@@ -626,8 +604,7 @@
        session.publish(addMsg);
        // Read and return matching ack
        AckMsg ackMsg = (AckMsg)session.receive();
        return ackMsg;
        return (AckMsg)session.receive();
      } catch(SocketTimeoutException e)
      {
@@ -889,7 +866,7 @@
        replicationServer.setAssured(false);
      replicationServer.start(TIMEOUT_SCENARIO);
      long startTime = System.currentTimeMillis();
      long startTime;
      // Create a safe data assured domain
      if (rsGroupId == (byte)1)
      {
@@ -998,7 +975,7 @@
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeReadModeTimeout" + rsGroupId;;
    String testcase = "testSafeReadModeTimeout" + rsGroupId;
    try
    {
      // Create and start a RS expecting clients in safe read assured mode
@@ -1008,7 +985,7 @@
        replicationServer.setAssured(false);
      replicationServer.start(TIMEOUT_SCENARIO);
      long startTime = 0;
      long startTime;
      // Create a safe data assured domain
      if (rsGroupId == (byte)1)
@@ -1363,10 +1340,9 @@
        "objectClass: organizationalUnit\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
      String parentUid = getEntryUUID(DN.decode(SAFE_READ_DN));
      AckMsg ackMsg = null;
      try {
        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
        AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
         if (rsGroupId == (byte)2)
           fail("Should only go here for RS with same group id as DS");
@@ -1461,7 +1437,7 @@
      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
      String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
      AckMsg ackMsg = null;
      AckMsg ackMsg;
      try
      {
        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
@@ -1853,7 +1829,7 @@
    /*
     * Find the multi valued attribute matching the requested assured mode
     */
    String assuredAttr = null;
    String assuredAttr;
    switch(assuredMode)
    {
      case SAFE_READ_MODE:
@@ -1873,19 +1849,15 @@
      return resultMap; // Empty map
    Attribute attr = attrs.get(0);
    Iterator<AttributeValue> attValIt = attr.iterator();
    // Parse and store values
    while (attValIt.hasNext())
    {
      String srvStr = attValIt.next().toString();
    for (AttributeValue val : attr) {
      String srvStr = val.toString();
      StringTokenizer strtok = new StringTokenizer(srvStr, ":");
      String token = strtok.nextToken();
      if (token != null)
      {
      if (token != null) {
        int serverId = Integer.valueOf(token);
        token = strtok.nextToken();
        if (token != null)
        {
        if (token != null) {
          Integer nerrors = Integer.valueOf(token);
          resultMap.put(serverId, nerrors);
        }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -62,7 +62,7 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -673,7 +673,7 @@
      ServerStatus initStatus,
      ServerState replicationServerState,
      long generationId,
      ProtocolSession session)
      Session session)
    {
      super.sessionInitiated(initStatus, replicationServerState, generationId, session);
    }
@@ -813,7 +813,7 @@
  {
    private boolean shutdown = false;
    private ProtocolSession session = null;
    private Session session = null;
    /** Parameters given at constructor time */
    private int port;
@@ -1942,7 +1942,7 @@
        } else
        {
          // Already errors for this server, increment the value
          int newVal = prevInt.intValue() + 1;
          int newVal = prevInt + 1;
          prevServerErrors.put(serverId, newVal);
        }
      }
@@ -2018,11 +2018,8 @@
   */
  private boolean areGroupAndGenerationIdOk(int fakeRsGid, long fakeRsGenId)
  {
    if ((fakeRsGid != -1) && (fakeRsGenId != -1L))
    {
      return ( (fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID) );
    }
    return false;
    return (fakeRsGid != -1) && (fakeRsGenId != -1L) &&
        ((fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID));
  }
  /**
@@ -2030,7 +2027,10 @@
   * data assured update and that are expected to effectively ack the update. If
   * -1 is used, the server is out of scope
   */
  private List<Integer> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
  private List<Integer> computeExpectedServersSafeData(
      int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen,
      int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
      int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
  {
    List<Integer> exptectedServers = new ArrayList<Integer>();
    if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
@@ -2650,7 +2650,7 @@
       */
      fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
        otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false),
        otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        otherFakeDsScen);
      assertNotNull(fakeRd3);
@@ -2669,7 +2669,7 @@
       */
      fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID,
        otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false),
        otherFakeRsGenId, (otherFakeRsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen);
      assertNotNull(fakeRs2);
@@ -2684,13 +2684,14 @@
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      // Compute some thing that will help determine what to check according to
      // the current test configurarion: compute if DS and RS subject to conf
      // the current test configuration: compute if DS and RS subject to conf
      // change are eligible and expected for safe read assured
      // eligible: the server should receive the ack request
      // expected: the server should send back an ack (with or without error)
      boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId);
      boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId);
      boolean dsIsExpected = false;
      boolean rsIsExpected = false;
      // Booleans to tell if we expect to see the timeout, wrong status and replay error flags
      boolean shouldSeeTimeout = false;
      boolean shouldSeeWrongStatus = false;
@@ -2723,6 +2724,7 @@
        switch (otherFakeRsScen)
        {
          case REPLY_OK_RS_SCENARIO:
            rsIsExpected = true;
            break;
          case TIMEOUT_RS_SCENARIO:
            shouldSeeRsIdInError = true;
@@ -3443,7 +3445,7 @@
      // DS 2 connected to RS 2
      fakeRd2 = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID,
        fakeDsGenId, (fakeDsGid == DEFAULT_GID ? true : false),
        fakeDsGenId, (fakeDsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen);
      assertNotNull(fakeRd2);
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -60,28 +60,14 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tools.LDAPModify;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.*;
@@ -975,7 +961,7 @@
    int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
    socket.connect(ServerAddr, timeoutMS);
    ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
    ProtocolSession session = replSessionSecurity.createClientSession(socket,
    Session session = replSessionSecurity.createClientSession(socket,
        timeoutMS);
    boolean sslEncryption =