mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Ludovic Poitou
20.02.2013 9e1f377c4f21b899d16f4c62450c68691f4b42a8
Fix for OPENDJ-846, Intermittent Replication failure.
The issue was triggered by the mix of AssuredReplication and bad network conditions, which resulted in a deadlock between 2 RS, as both were blocked on writing to the TCP socket and not reading (because waiting on the write lock).
The solution (more of a workaround) is to have another thread for sending data to the socket and have the reader and writer posting data to send to a queue that this new thread is polling.
There are still potential deadlocks but they will occur much later, if the sendQueue gets full. The code needs more work post 2.6 to be fully non blocking, but the changes are enough for now to resolve the customer deadlock case.
1 files deleted
1 files renamed
23 files modified
748 ■■■■■ changed files
opends/src/messages/messages/replication.properties 1 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 18 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java 170 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/Session.java 214 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/package-info.java 5 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 4 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 31 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerHandler.java 28 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerReader.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ServerWriter.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/package-info.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 20 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 6 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 55 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 66 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java 30 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 20 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -543,3 +543,4 @@
 synchronization history value
SEVERE_ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD_215=Replication server RS(%d) \
 failed to parse change record with changenumber %s from the database. Error: %s
SEVERE_ERR_SESSION_STARTUP_INTERRUPTED_216=%s was interrupted in the startup phase
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -86,19 +86,7 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.AddContext;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteContext;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyContext;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.OperationContext;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.replication.service.ReplicationMonitor;
@@ -4581,7 +4569,7 @@
      ServerStatus initStatus,
      ServerState replicationServerState,
      long generationID,
      ProtocolSession session)
      Session session)
  {
    // Check domain fractional configuration consistency with local
    // configuration variables
@@ -4876,7 +4864,7 @@
  @Override
  public boolean processUpdate(UpdateMsg updateMsg)
  {
    // Ignore message if fractional configuration is inconcsistent and
    // Ignore message if fractional configuration is inconsistent and
    // we have been passed into bad data set status
    if (forceBadDataSet)
    {
opends/src/server/org/opends/server/replication/protocol/HeartbeatThread.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.protocol;
@@ -57,7 +57,7 @@
  /**
   * The session on which heartbeats are to be sent.
   */
  private final ProtocolSession session;
  private final Session session;
  /**
@@ -80,7 +80,7 @@
   * @param heartbeatInterval The desired interval between heartbeats in
   * milliseconds.
   */
  public HeartbeatThread(String threadName, ProtocolSession session,
  public HeartbeatThread(String threadName, Session session,
                  long heartbeatInterval)
  {
    super(threadName);
opends/src/server/org/opends/server/replication/protocol/ProtocolSession.java
File was deleted
opends/src/server/org/opends/server/replication/protocol/ReplSessionSecurity.java
@@ -161,7 +161,7 @@
   *           If the protocol session could not be established for some other
   *           reason.
   */
  public ProtocolSession createClientSession(final Socket socket,
  public Session createClientSession(final Socket socket,
      final int soTimeout) throws ConfigException, IOException
  {
    boolean hasCompleted = false;
@@ -197,7 +197,7 @@
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      hasCompleted = true;
      return new TLSSocketSession(socket, secureSocket);
      return new Session(socket, secureSocket);
    }
    finally
    {
@@ -244,7 +244,7 @@
   *           If the protocol session could not be established for some other
   *           reason.
   */
  public ProtocolSession createServerSession(final Socket socket,
  public Session createServerSession(final Socket socket,
      final int soTimeout) throws ConfigException, IOException
  {
    boolean hasCompleted = false;
@@ -281,7 +281,7 @@
      // Force TLS negotiation now.
      secureSocket.startHandshake();
      hasCompleted = true;
      return new TLSSocketSession(socket, secureSocket);
      return new Session(socket, secureSocket);
    }
    catch (final SSLException e)
    {
opends/src/server/org/opends/server/replication/protocol/Session.java
File was renamed from opends/src/server/org/opends/server/replication/protocol/TLSSocketSession.java
@@ -40,21 +40,25 @@
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.DataFormatException;
import javax.net.ssl.SSLSocket;
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.util.StaticUtils;
/**
 * This class implements a protocol session using TLS.
 * This class defines a replication session using TLS.
 */
public final class TLSSocketSession implements ProtocolSession
public final class Session extends DirectoryThread
{
  /**
   * The tracer object for the debug logger.
@@ -86,7 +90,7 @@
   * connections.
   */
  private final Object stateLock = new Object();
  private boolean closeInitiated = false;
  private volatile boolean closeInitiated = false;
  private Throwable sessionError = null;
  /*
@@ -113,10 +117,13 @@
   */
  private BufferedOutputStream output;
  private final LinkedBlockingQueue<byte[]> sendQueue =
      new LinkedBlockingQueue<byte[]>(4000);
  private AtomicBoolean isRunning = new AtomicBoolean(false);
  private final CountDownLatch latch = new CountDownLatch(1);
  /**
   * Creates a new TLSSocketSession.
   * Creates a new Session.
   *
   * @param socket
   *          The regular Socket on which the SocketSession will be based.
@@ -125,13 +132,15 @@
   * @throws IOException
   *           When an IException happens on the socket.
   */
  public TLSSocketSession(final Socket socket,
  public Session(final Socket socket,
      final SSLSocket secureSocket) throws IOException
  {
    super("Replication Session from "+ socket.getLocalSocketAddress() +
        " to " + socket.getRemoteSocketAddress());
    if (debugEnabled())
    {
      TRACER.debugInfo(
          "Creating TLSSocketSession from %s to %s in %s",
          "Creating Session from %s to %s in %s",
          socket.getLocalSocketAddress(),
          socket.getRemoteSocketAddress(),
          stackTraceToSingleLineString(new Exception()));
@@ -153,9 +162,9 @@
  /**
   * {@inheritDoc}
   * This method is called when the session with the remote must be closed.
   * This object won't be used anymore after this method is called.
   */
  @Override
  public void close()
  {
    Throwable localSessionError;
@@ -171,13 +180,21 @@
      closeInitiated = true;
    }
    try {
      this.interrupt();
      this.join();
    }
    catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    // Perform close outside of critical section.
    if (debugEnabled())
    {
      if (localSessionError == null)
      {
        TRACER.debugInfo(
            "Closing TLSSocketSession from %s to %s in %s",
            "Closing Session from %s to %s in %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
            stackTraceToSingleLineString(new Exception()));
@@ -185,7 +202,7 @@
      else
      {
        TRACER.debugInfo(
            "Aborting TLSSocketSession from %s to %s in %s due to the "
            "Aborting Session from %s to %s in %s due to the "
                + "following error: %s",
            plainSocket.getLocalSocketAddress(),
            plainSocket.getRemoteSocketAddress(),
@@ -199,8 +216,6 @@
    {
      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        if (publishLock.tryLock())
        {
          try
          {
            publish(new StopMsg());
@@ -209,11 +224,6 @@
          {
            // Ignore errors on close.
          }
          finally
          {
            publishLock.unlock();
          }
        }
      }
    }
@@ -223,9 +233,12 @@
  /**
   * {@inheritDoc}
   * This methods allows to determine if the session close was initiated
   * on this Session.
   *
   * @return A boolean allowing to determine if the session close was initiated
   * on this Session.
   */
  @Override
  public boolean closeInitiated()
  {
    synchronized (stateLock)
@@ -237,9 +250,10 @@
  /**
   * {@inheritDoc}
   * Gets the time the last replication message was published on this
   * session.
   * @return The timestamp in milliseconds of the last message published.
   */
  @Override
  public long getLastPublishTime()
  {
    return lastPublishTime;
@@ -248,9 +262,10 @@
  /**
   * {@inheritDoc}
   * Gets the time the last replication message was received on this
   * session.
   * @return The timestamp in milliseconds of the last message received.
   */
  @Override
  public long getLastReceiveTime()
  {
    if (lastReceiveTime == 0)
@@ -263,9 +278,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the local URL in the form host:port.
   *
   * @return The local URL.
   */
  @Override
  public String getLocalUrl()
  {
    return localUrl;
@@ -274,9 +290,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the human readable address of the remote server.
   *
   * @return The human readable address of the remote server.
   */
  @Override
  public String getReadableRemoteAddress()
  {
    return readableRemoteAddress;
@@ -285,9 +302,10 @@
  /**
   * {@inheritDoc}
   * Retrieve the IP address of the remote server.
   *
   * @return The IP address of the remote server.
   */
  @Override
  public String getRemoteAddress()
  {
    return remoteAddress;
@@ -296,9 +314,9 @@
  /**
   * {@inheritDoc}
   * Determine whether the session is using a security layer.
   * @return true if the connection is encrypted, false otherwise.
   */
  @Override
  public boolean isEncrypted()
  {
    return isEncrypted;
@@ -307,12 +325,38 @@
  /**
   * {@inheritDoc}
   * Sends a replication message to the remote peer.
   *
   * @param msg
   *          The message to be sent.
   * @throws IOException
   *           If an IO error occurred.
   */
  @Override
  public void publish(final ReplicationMsg msg) throws IOException
  {
    final byte[] buffer = msg.getBytes(protocolVersion);
    if (isRunning.get())
    {
      try {
        sendQueue.put(buffer);
      }
      catch (final InterruptedException e) {
        setSessionError(e);
        throw new IOException(e.getMessage());
      }
    } else {
      send(buffer);
    }
  }
  /** Sends a replication message already encoded to the socket.
   *
   * @param buffer
   *          the encoded buffer
   * @throws IOException if the message could not be sent
   */
  private void send(final byte[] buffer) throws IOException
  {
    final String str = String.format("%08x", buffer.length);
    final byte[] sendLengthBuf = str.getBytes();
@@ -326,9 +370,7 @@
      output.write(sendLengthBuf);
      output.write(buffer);
      output.flush();
    }
    catch (final IOException e)
    {
    } catch (final IOException e) {
      setSessionError(e);
      throw e;
    }
@@ -343,9 +385,20 @@
  /**
   * {@inheritDoc}
   * Attempt to receive a ReplicationMsg.
   * This method should block the calling thread until a
   * ReplicationMsg is available or until an error condition.
   *
   * This method can only be called by a single thread and therefore does not
   * need to implement any replication.
   *
   * @return The ReplicationMsg that was received.
   * @throws IOException When error happened during IO process.
   * @throws DataFormatException When the data received is not formatted as a
   *         ReplicationMsg.
   * @throws NotSupportedOldVersionPDUException If the received PDU is part of
   * an old protocol version and we do not support it.
   */
  @Override
  public ReplicationMsg receive() throws IOException,
      DataFormatException, NotSupportedOldVersionPDUException
  {
@@ -432,22 +485,23 @@
  }
  /**
   * {@inheritDoc}
   * This method is called at the establishment of the session and can
   * be used to record the version of the protocol that is currently used.
   *
   * @param version The version of the protocol that is currently used.
   */
  @Override
  public void setProtocolVersion(final short version)
  {
    protocolVersion = version;
  }
  /**
   * {@inheritDoc}
   * Returns the version of the protocol that is currently used.
   *
   * @return The version of the protocol that is currently used.
   */
  @Override
  public short getProtocolVersion()
  {
    return protocolVersion;
@@ -456,9 +510,16 @@
  /**
   * {@inheritDoc}
   * Set a timeout value.
   * With this option set to a non-zero value, calls to the receive() method
   * block for only this amount of time after which a
   * java.net.SocketTimeoutException is raised.
   * The Broker is valid and usable even after such an Exception is raised.
   *
   * @param timeout the specified timeout, in milliseconds.
   * @throws SocketException if there is an error in the underlying protocol,
   *         such as a TCP error.
   */
  @Override
  public void setSoTimeout(final int timeout) throws SocketException
  {
    plainSocket.setSoTimeout(timeout);
@@ -467,10 +528,8 @@
  /**
   * {@inheritDoc}
   * Stop using the security layer, if there is any.
   */
  @SuppressWarnings("unused")
  @Override
  public void stopEncryption()
  {
    /*
@@ -500,4 +559,59 @@
      }
    }
  }
  /**
   * Run method for the Session.
   * Loops waiting for buffers from the queue and sends them when available.
   */
  public void run()
  {
    isRunning.set(true);
    latch.countDown();
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " starting.");
    }
    boolean needClosing = false;
    while (!closeInitiated)
    {
      byte[] buffer;
      try
      {
        buffer = sendQueue.take();
      }
      catch (InterruptedException ie)
      {
        break;
      }
      try
      {
        send(buffer);
      }
      catch (IOException e)
      {
        setSessionError(e);
        needClosing = true;
      }
    }
    isRunning.set(false);
    if (needClosing)
    {
      close();
    }
    if (debugEnabled())
    {
      TRACER.debugInfo(this.getName() + " stopped.");
    }
  }
  /**
   * This method can be called to wait until the session thread is
   * properly started.
   * @throws InterruptedException when interrupted
   */
  public void waitForStartup() throws InterruptedException
  {
    latch.await();
  }
}
opends/src/server/org/opends/server/replication/protocol/package-info.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 */
@@ -35,8 +36,8 @@
 * The main classes of this packages are :
 * <br>
 * <ul>
 * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
 * implements the ProtocolSession interface that is
 * <li><A HREF="Session.html"><B>Session</B></A>
 * implements the session and protocol that is
 * used by the replication server and the directory server to communicate.
 * This is done by using the innate encoding/decoding capabilities of the
 * ReplicationMessages objects. This class is used by both the
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -90,7 +90,7 @@
   * @param rcvWindowSize The receiving window size.
   */
  public DataServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -391,7 +391,7 @@
   * @param rcvWindowSize The receiving window size.
   */
  public ECLServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
opends/src/server/org/opends/server/replication/server/ECLServerWriter.java
@@ -41,7 +41,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.DoneMsg;
import org.opends.server.replication.protocol.ECLUpdateMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
@@ -61,7 +61,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private ProtocolSession session;
  private Session session;
  private ECLServerHandler handler;
  private ReplicationServerDomain replicationServerDomain;
  private boolean suspended;
@@ -71,12 +71,12 @@
  /**
   * Create a ServerWriter.
   *
   * @param session     the ProtocolSession that will be used to send updates.
   * @param session     the Session that will be used to send updates.
   * @param handler     ECL handler for which the ServerWriter is created.
   * @param replicationServerDomain the ReplicationServerDomain of this
   *                    ServerWriter.
   */
  public ECLServerWriter(ProtocolSession session, ECLServerHandler handler,
  public ECLServerWriter(Session session, ECLServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    super(session, handler, replicationServerDomain);
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -304,7 +304,7 @@
      try
      {
        ProtocolSession session;
        Session session;
        Socket newSocket = null;
        try
        {
@@ -485,7 +485,7 @@
               " connects to " + remoteServerURL);
    Socket socket = new Socket();
    ProtocolSession session = null;
    Session session = null;
    try
    {
      InetSocketAddress ServerAddr = new InetSocketAddress(
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -187,7 +187,7 @@
  // The timer used to run the timeout code (timer tasks) for the assured update
  // messages we are waiting acks for.
  private Timer assuredTimeoutTimer = null;
  // Counter used to purge the timer tasks referemces in assuredTimeoutTimer,
  // Counter used to purge the timer tasks references in assuredTimeoutTimer,
  // every n number of treated assured messages
  private int assuredTimeoutTimerPurgeCounter = 0;
@@ -588,9 +588,9 @@
            if (serverStatus == ServerStatus.DEGRADED_STATUS)
            {
              wrongStatusServers.add(handler.getServerId());
            } else
            {
            }
              /**
             * else
               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
               * We do not want this to be reported as an error to the update
               * maker -> no pollution or potential misunderstanding when
@@ -602,7 +602,6 @@
          }
        }
      }
    }
    // Return computed structures
    PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
@@ -685,19 +684,12 @@
          }
        } else
        { // A RS sent us the safe data message, for sure no further ack to wait
          if (safeDataLevel == (byte) 1)
          {
            /**
             * The original level was 1 so the RS that sent us this message
             * should have already sent his ack to the sender DS. Level 1 has
             * already been reached so no further acks to wait.
             * This should not happen in theory as the sender RS server should
             * have sent us a matching not assured message so we should not come
             * to here.
           * Level 1 has already been reached so no further acks to wait.
           * Just deal with level > 1
             */
          } else
          if (safeDataLevel > (byte) 1)
          {
            // level > 1, so Ack this message to originator RS
            sourceHandler.send(new AckMsg(cn));
          }
        }
@@ -815,11 +807,10 @@
          expectedAcksInfo.completed();
        }
      }
    } else
    {
      // The timeout occurred for the update matching this change number and the
      // ack with timeout error has probably already been sent.
    }
    /* Else the timeout occurred for the update matching this change number
     * and the ack with timeout error has probably already been sent.
     */
  }
  /**
@@ -934,10 +925,8 @@
                  expectedServerInTimeout.
                    incrementAssuredSdSentUpdatesTimeout();
                }
              } else
              {
                // Server disappeared ? Let's forget about it.
              }
              /* else server disappeared ? Let's forget about it. */
            }
          }
          // Mark the ack info object as completed to prevent potential
opends/src/server/org/opends/server/replication/server/ReplicationServerHandler.java
@@ -136,7 +136,7 @@
   * @param rcvWindowSize The receiving window size.
   */
  public ReplicationServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -50,11 +50,11 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.HeartbeatThread;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ResetGenerationIdMsg;
import org.opends.server.replication.protocol.RoutableMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.StartECLSessionMsg;
import org.opends.server.replication.protocol.StartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
@@ -88,7 +88,7 @@
   * @param providedMsg     The provided error message.
   * @param handler         The handler that manages that session.
   */
  static protected void closeSession(ProtocolSession providedSession,
  static protected void closeSession(Session providedSession,
      Message providedMsg, ServerHandler handler)
  {
    if (providedMsg != null)
@@ -118,7 +118,7 @@
  /**
   * The session opened with the remote server.
   */
  protected ProtocolSession session;
  protected Session session;
  /**
   * The serverURL of the remote server.
@@ -237,7 +237,7 @@
  /**
   * Creates a new server handler instance with the provided socket.
   *
   * @param session The ProtocolSession used by the ServerHandler to
   * @param session The Session used by the ServerHandler to
   *                 communicate with the remote entity.
   * @param queueSize The maximum number of update that will be kept
   *                  in memory by this ServerHandler.
@@ -247,7 +247,7 @@
   * @param rcvWindowSize The window size to receive from the remote server.
   */
  public ServerHandler(
      ProtocolSession session,
      Session session,
      int queueSize,
      String replicationServerURL,
      int replicationServerId,
@@ -271,7 +271,7 @@
    // We did not recognize the message, close session as what
    // can happen after is undetermined and we do not want the server to
    // be disturbed
    ProtocolSession localSession = session;
    Session localSession = session;
    if (localSession != null)
    {
      closeSession(localSession, reason, this);
@@ -366,6 +366,22 @@
          replicationServerDomain);
      reader = new ServerReader(session, this);
      session.setName("Replication server RS("
          + this.getReplicationServerId()
          + ") session thread to " + this.toString() + " at "
          + session.getReadableRemoteAddress());
      session.start();
      try
      {
        session.waitForStartup();
      }
      catch (InterruptedException e)
      {
        final Message message =
            ERR_SESSION_STARTUP_INTERRUPTED.get(session.getName());
        throw new DirectoryException(ResultCode.OTHER,
            message, e);
      }
      reader.start();
      writer.start();
opends/src/server/org/opends/server/replication/server/ServerReader.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -58,7 +58,7 @@
   * The tracer object for the debug logger.
   */
  private static final DebugTracer TRACER = getTracer();
  private final ProtocolSession session;
  private final Session session;
  private final ServerHandler handler;
  private final String remoteAddress;
@@ -68,11 +68,11 @@
   * Constructor for the LDAP server reader part of the replicationServer.
   *
   * @param session
   *          The ProtocolSession from which to read the data.
   *          The Session from which to read the data.
   * @param handler
   *          The server handler for this server reader.
   */
  public ServerReader(ProtocolSession session, ServerHandler handler)
  public ServerReader(Session session, ServerHandler handler)
  {
    super("Replication server RS(" + handler.getReplicationServerId()
        + ") reading from " + handler.toString() + " at "
@@ -314,18 +314,6 @@
        logError(errMessage);
      }
    }
    catch (ClassNotFoundException e)
    {
      if (debugEnabled())
        TRACER.debugInfo(
            "In " + this.getName() + " " + stackTraceToSingleLineString(e));
      /*
       * The remote server has sent an unknown message,
       * close the connection.
       */
      errMessage = ERR_UNKNOWN_MESSAGE.get(handler.toString());
      logError(errMessage);
    }
    catch (Exception e)
    {
      if (debugEnabled())
opends/src/server/org/opends/server/replication/server/ServerWriter.java
@@ -40,7 +40,7 @@
import org.opends.server.api.DirectoryThread;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -55,7 +55,7 @@
   */
  private static final DebugTracer TRACER = getTracer();
  private final ProtocolSession session;
  private final Session session;
  private final ServerHandler handler;
  private final ReplicationServerDomain replicationServerDomain;
@@ -66,13 +66,13 @@
   * for new updates and forward them to the server
   *
   * @param session
   *          the ProtocolSession that will be used to send updates.
   *          the Session that will be used to send updates.
   * @param handler
   *          handler for which the ServerWriter is created.
   * @param replicationServerDomain
   *          The ReplicationServerDomain of this ServerWriter.
   */
  public ServerWriter(ProtocolSession session, ServerHandler handler,
  public ServerWriter(Session session, ServerHandler handler,
      ReplicationServerDomain replicationServerDomain)
  {
    // Session may be null for ECLServerWriter.
opends/src/server/org/opends/server/replication/server/package-info.java
@@ -23,6 +23,7 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 */
@@ -46,13 +47,6 @@
 * The main classes of this packages are :
 * <br>
 * <ul>
 * <li><A HREF="SocketSession.html"><B>SocketSession</B></A>
 * implements the ProtocolSession interface that is
 * used by the replication server and the directory server to communicate.
 * This is done by using the innate encoding/decoding capabilities of the
 * ReplicationMessages objects. This class is used by both the
 * replicationServer and the replication package.
 * </li>
 * <li><A HREF="ReplicationServerDomain.html"><B>ReplicationServerDomain</B></A>
 * implements the multiplexing part of the replication
 * server. It contains method for forwarding all the received messages to
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -34,7 +34,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.TimeThread;
@@ -55,7 +55,7 @@
  /**
   * The session on which heartbeats are to be sent.
   */
  private final ProtocolSession session;
  private final Session session;
  /**
   * The time in milliseconds between heartbeats.
@@ -77,7 +77,7 @@
   *                          (in milliseconds).
   * @param serverId2 The serverId of the sender domain.
   */
  public CTHeartbeatPublisherThread(String threadName, ProtocolSession session,
  public CTHeartbeatPublisherThread(String threadName, Session session,
                  long heartbeatInterval, int serverId2)
  {
    super(threadName);
opends/src/server/org/opends/server/replication/service/HeartbeatMonitor.java
@@ -23,7 +23,7 @@
 *
 *
 *      Copyright 2007-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2012 ForgeRock AS
 *      Portions Copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -33,7 +33,7 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.api.DirectoryThread;
@@ -54,7 +54,7 @@
  /**
   * The session on which heartbeats are to be monitored.
   */
  private final ProtocolSession session;
  private final Session session;
  /**
@@ -93,7 +93,7 @@
   *          milliseconds).
   */
  HeartbeatMonitor(int serverID, int replicationServerID,
      String baseDN, ProtocolSession session, long heartbeatInterval)
      String baseDN, Session session, long heartbeatInterval)
  {
    super("Replica DS("
      + serverID + ") heartbeat monitor for domain \""
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -95,7 +95,7 @@
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile String replicationServer = NO_CONNECTED_SERVER;
  private volatile ProtocolSession session = null;
  private volatile Session session = null;
  private final ServerState state;
  private final String baseDn;
  private final int serverId;
@@ -1230,7 +1230,7 @@
    String port = server.substring(separator + 1);
    String hostname = server.substring(0, separator);
    ProtocolSession localSession = null;
    Session localSession = null;
    Socket socket = null;
    boolean hasConnected = false;
    Message errorMessage = null;
@@ -2180,7 +2180,7 @@
   * @param failingSession the socket which failed
   * @param infiniteTry the socket which failed
   */
  public void reStart(ProtocolSession failingSession, boolean infiniteTry)
  public void reStart(Session failingSession, boolean infiniteTry)
  {
    if (failingSession != null)
    {
@@ -2308,7 +2308,7 @@
      try
      {
        boolean credit;
        ProtocolSession current_session;
        Session current_session;
        Semaphore currentWindowSemaphore;
        /*
@@ -2465,7 +2465,7 @@
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final ProtocolSession savedSession = session;
      final Session savedSession = session;
      if (savedSession == null)
      {
        // Must be shutting down.
@@ -2612,7 +2612,7 @@
        if (!shutdown)
        {
          final ProtocolSession tmpSession = session;
          final Session tmpSession = session;
          if (tmpSession == null || !tmpSession.closeInitiated())
          {
            /*
@@ -2879,7 +2879,7 @@
   */
  public boolean isSessionEncrypted()
  {
    final ProtocolSession tmp = session;
    final Session tmp = session;
    return tmp != null ? tmp.isEncrypted() : false;
  }
@@ -3127,7 +3127,7 @@
   */
  String getLocalUrl()
  {
    final ProtocolSession tmp = session;
    final Session tmp = session;
    return tmp != null ? tmp.getLocalUrl() : "";
  }
@@ -3142,12 +3142,12 @@
    return monitor;
  }
  private void setSession(final ProtocolSession newSession)
  private void setSession(final Session newSession)
  {
    // De-register the monitor with the old name.
    deregisterReplicationMonitor();
    final ProtocolSession oldSession = session;
    final Session oldSession = session;
    if (oldSession != null)
    {
      oldSession.close();
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -68,7 +68,7 @@
import org.opends.server.replication.protocol.InitializeRcvAckMsg;
import org.opends.server.replication.protocol.InitializeRequestMsg;
import org.opends.server.replication.protocol.InitializeTargetMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -401,13 +401,13 @@
   * @param generationID            The current generationID of the
   *                                ReplicationServer with which the session
   *                                was established.
   * @param session                 The ProtocolSession that is currently used.
   * @param session                 The Session that is currently used.
   */
  public void sessionInitiated(
      ServerStatus initStatus,
      ServerState replicationServerState,
      long generationID,
      ProtocolSession session)
      Session session)
  {
    // Sanity check: is it a valid initial status?
    if (!isValidInitialStatus(initStatus))
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -27,25 +27,6 @@
 */
package org.opends.server.replication;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_COMPLETION_TIME;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_LOG_MESSAGES;
import static org.opends.server.config.ConfigConstants.ATTR_TASK_STATE;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.io.File;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -66,34 +47,34 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.PersistentServerState;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.schema.DirectoryStringSyntax;
import org.opends.server.schema.IntegerSyntax;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeType;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.LockManager;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.io.File;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.locks.Lock;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.testng.Assert.*;
/**
 * An abstract class that all Replication unit test should extend.
 */
@@ -900,7 +881,7 @@
   * Add a task to the configuration of the current running DS.
   * @param taskEntry The task to add.
   * @param expectedResult The expected result code for the ADD.
   * @param errorMessageID The expected error messageID when the expected
   * @param errorMessage The expected error message when the expected
   * result code is not SUCCESS
   */
  protected void addTask(Entry taskEntry, ResultCode expectedResult,
@@ -1168,7 +1149,7 @@
   * @param msgType Class of the message we are waiting for.
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ProtocolSession session, String msgType) {
  protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) {
    ReplicationMsg replMsg = null;
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -52,39 +52,17 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.StopMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
import org.opends.server.types.Operation;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.types.*;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.TestCaseUtils.TEST_ROOT_DN_STRING;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.testng.Assert.*;
/**
@@ -279,7 +257,7 @@
    private ServerSocket listenSocket;
    private boolean shutdown = false;
    private ProtocolSession session = null;
    private Session session = null;
    // Parameters given at constructor time
    private final int port;
@@ -626,8 +604,7 @@
        session.publish(addMsg);
        // Read and return matching ack
        AckMsg ackMsg = (AckMsg)session.receive();
        return ackMsg;
        return (AckMsg)session.receive();
      } catch(SocketTimeoutException e)
      {
@@ -889,7 +866,7 @@
        replicationServer.setAssured(false);
      replicationServer.start(TIMEOUT_SCENARIO);
      long startTime = System.currentTimeMillis();
      long startTime;
      // Create a safe data assured domain
      if (rsGroupId == (byte)1)
      {
@@ -998,7 +975,7 @@
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeReadModeTimeout" + rsGroupId;;
    String testcase = "testSafeReadModeTimeout" + rsGroupId;
    try
    {
      // Create and start a RS expecting clients in safe read assured mode
@@ -1008,7 +985,7 @@
        replicationServer.setAssured(false);
      replicationServer.start(TIMEOUT_SCENARIO);
      long startTime = 0;
      long startTime;
      // Create a safe data assured domain
      if (rsGroupId == (byte)1)
@@ -1363,10 +1340,9 @@
        "objectClass: organizationalUnit\n";
      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
      String parentUid = getEntryUUID(DN.decode(SAFE_READ_DN));
      AckMsg ackMsg = null;
      try {
        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
        AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
         if (rsGroupId == (byte)2)
           fail("Should only go here for RS with same group id as DS");
@@ -1461,7 +1437,7 @@
      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
      String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
      AckMsg ackMsg = null;
      AckMsg ackMsg;
      try
      {
        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
@@ -1853,7 +1829,7 @@
    /*
     * Find the multi valued attribute matching the requested assured mode
     */
    String assuredAttr = null;
    String assuredAttr;
    switch(assuredMode)
    {
      case SAFE_READ_MODE:
@@ -1873,19 +1849,15 @@
      return resultMap; // Empty map
    Attribute attr = attrs.get(0);
    Iterator<AttributeValue> attValIt = attr.iterator();
    // Parse and store values
    while (attValIt.hasNext())
    {
      String srvStr = attValIt.next().toString();
    for (AttributeValue val : attr) {
      String srvStr = val.toString();
      StringTokenizer strtok = new StringTokenizer(srvStr, ":");
      String token = strtok.nextToken();
      if (token != null)
      {
      if (token != null) {
        int serverId = Integer.valueOf(token);
        token = strtok.nextToken();
        if (token != null)
        {
        if (token != null) {
          Integer nerrors = Integer.valueOf(token);
          resultMap.put(serverId, nerrors);
        }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -62,7 +62,7 @@
import org.opends.server.replication.protocol.AckMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ErrorMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.Session;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
@@ -673,7 +673,7 @@
      ServerStatus initStatus,
      ServerState replicationServerState,
      long generationId,
      ProtocolSession session)
      Session session)
    {
      super.sessionInitiated(initStatus, replicationServerState, generationId, session);
    }
@@ -813,7 +813,7 @@
  {
    private boolean shutdown = false;
    private ProtocolSession session = null;
    private Session session = null;
    /** Parameters given at constructor time */
    private int port;
@@ -1942,7 +1942,7 @@
        } else
        {
          // Already errors for this server, increment the value
          int newVal = prevInt.intValue() + 1;
          int newVal = prevInt + 1;
          prevServerErrors.put(serverId, newVal);
        }
      }
@@ -2018,11 +2018,8 @@
   */
  private boolean areGroupAndGenerationIdOk(int fakeRsGid, long fakeRsGenId)
  {
    if ((fakeRsGid != -1) && (fakeRsGenId != -1L))
    {
      return ( (fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID) );
    }
    return false;
    return (fakeRsGid != -1) && (fakeRsGenId != -1L) &&
        ((fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID));
  }
  /**
@@ -2030,7 +2027,10 @@
   * data assured update and that are expected to effectively ack the update. If
   * -1 is used, the server is out of scope
   */
  private List<Integer> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
  private List<Integer> computeExpectedServersSafeData(
      int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen,
      int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
      int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
  {
    List<Integer> exptectedServers = new ArrayList<Integer>();
    if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
@@ -2650,7 +2650,7 @@
       */
      fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
        otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false),
        otherFakeDsGenId, (otherFakeDsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        otherFakeDsScen);
      assertNotNull(fakeRd3);
@@ -2669,7 +2669,7 @@
       */
      fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID,
        otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false),
        otherFakeRsGenId, (otherFakeRsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen);
      assertNotNull(fakeRs2);
@@ -2684,13 +2684,14 @@
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      // Compute some thing that will help determine what to check according to
      // the current test configurarion: compute if DS and RS subject to conf
      // the current test configuration: compute if DS and RS subject to conf
      // change are eligible and expected for safe read assured
      // eligible: the server should receive the ack request
      // expected: the server should send back an ack (with or without error)
      boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId);
      boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId);
      boolean dsIsExpected = false;
      boolean rsIsExpected = false;
      // Booleans to tell if we expect to see the timeout, wrong status and replay error flags
      boolean shouldSeeTimeout = false;
      boolean shouldSeeWrongStatus = false;
@@ -2723,6 +2724,7 @@
        switch (otherFakeRsScen)
        {
          case REPLY_OK_RS_SCENARIO:
            rsIsExpected = true;
            break;
          case TIMEOUT_RS_SCENARIO:
            shouldSeeRsIdInError = true;
@@ -3443,7 +3445,7 @@
      // DS 2 connected to RS 2
      fakeRd2 = createFakeReplicationDomain(FDS2_ID, fakeDsGid, RS2_ID,
        fakeDsGenId, (fakeDsGid == DEFAULT_GID ? true : false),
        fakeDsGenId, (fakeDsGid == DEFAULT_GID),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT, fakeDsScen);
      assertNotNull(fakeRd2);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -60,28 +60,14 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ChangeNumberGenerator;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.plugin.ReplicationServerListener;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartDSMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.ServerStartMsg;
import org.opends.server.replication.protocol.StartSessionMsg;
import org.opends.server.replication.protocol.TopologyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.WindowMsg;
import org.opends.server.replication.protocol.WindowProbeMsg;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tools.LDAPModify;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.*;
@@ -975,7 +961,7 @@
    int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
    socket.connect(ServerAddr, timeoutMS);
    ReplSessionSecurity replSessionSecurity = getReplSessionSecurity();
    ProtocolSession session = replSessionSecurity.createClientSession(socket,
    Session session = replSessionSecurity.createClientSession(socket,
        timeoutMS);
    boolean sslEncryption =