mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
19.31.2013 e13d1429c75364a349dee5d7f8703593fa0adf4f
Second step in simplifying the ReplicationBroker class.


ReplicationBroker.java
Moved session field to ConnectedRS.
Removed field protocolVersion (useless, superseded by ConnectedRS)
Reworked ConnectedRS class: removed connected field.
In ReplicationServerInfo, added setServerURL() + internally stored a RSInfo object + changed connectedDSs from List to Set.
Used
Improved javadocs + removed comments redundant with javadocs.

ComputeBestServerTest.java:
Consequence of the change to ReplicationBroker.ReplicationServerInfo ctor.

UnbindOperationTestCase.java, ReplicationDomain.java:
Code cleanup.
4 files modified
853 ■■■■ changed files
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java 624 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java 35 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/core/UnbindOperationTestCase.java 59 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java 135 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -71,44 +71,43 @@
  /**
   * Immutable class containing information about whether the broker is
   * connected to an RS and data associated to this connected RS.
   * <p>
   * Mutable methods return a new version of this object copying the data that
   * did not change.
   */
  // @Immutable
  private static final class ConnectedRS
  {
    private final String replicationServer;
    private static final ConnectedRS NO_CONNECTED_RS = new ConnectedRS(
        NO_CONNECTED_SERVER);
    /** The info of the RS we are connected to. */
    private final ReplicationServerInfo rsInfo;
    private final boolean connected;
    private final Session session;
    private final String replicationServer;
    private ConnectedRS(boolean connected, ReplicationServerInfo rsInfo,
        String replicationServer)
    private ConnectedRS(String replicationServer)
    {
      this.connected = connected;
      this.rsInfo = rsInfo;
      this.rsInfo = null;
      this.session = null;
      this.replicationServer = replicationServer;
    }
    private ConnectedRS(ReplicationServerInfo rsInfo, Session session)
    {
      this.rsInfo = rsInfo;
      this.session = session;
      this.replicationServer = session != null ?
          session.getReadableRemoteAddress()
          : NO_CONNECTED_SERVER;
    }
    private static ConnectedRS stopped()
    {
      return new ConnectedRS(false, null, "stopped");
      return new ConnectedRS("stopped");
    }
    private static ConnectedRS noConnectedRS()
    {
      return new ConnectedRS(false, null, NO_CONNECTED_SERVER);
    }
    /**
     * Returns a new version of the current object with the connected status set
     * to true.
     */
    private ConnectedRS setConnected()
    {
      return new ConnectedRS(true, rsInfo, replicationServer);
      return NO_CONNECTED_RS;
    }
    public int getServerId()
@@ -121,6 +120,11 @@
      return rsInfo != null ? rsInfo.getGroupId() : -1;
    }
    private boolean isConnected()
    {
      return session != null;
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
@@ -132,8 +136,8 @@
    public void toString(StringBuilder sb)
    {
      sb.append("connected=").append(connected).append(", ");
      if (rsInfo == null) // this is a null object
      sb.append("connected=").append(isConnected()).append(", ");
      if (!isConnected())
      {
        sb.append("no connected RS");
      }
@@ -145,6 +149,7 @@
          .append(")");
      }
    }
  }
  /**
@@ -158,18 +163,27 @@
   * String reported under CSN=monitor when there is no connected RS.
   */
  public final static String NO_CONNECTED_SERVER = "Not connected";
  private volatile Session session;
  private final ServerState state;
  private Semaphore sendWindow;
  private int maxSendWindow;
  private int rcvWindow = 100;
  private int halfRcvWindow = rcvWindow / 2;
  private int timeout = 0;
  private short protocolVersion;
  private ReplSessionSecurity replSessionSecurity;
  /**
   * The RS this DS is currently connected to.
   * <p>
   * Always use {@link #setConnectedRS(ConnectedRS)} to set a new
   * connected RS.
   */
  // @NotNull // for the reference
  private final AtomicReference<ConnectedRS> connectedRS =
      new AtomicReference<ConnectedRS>(ConnectedRS.noConnectedRS());
  /** Our replication domain. */
  /**
   * Our replication domain.
   * <p>
   * Can be null for unit test purpose.
   */
  private ReplicationDomain domain;
  /**
   * This object is used as a conditional event to be notified about
@@ -242,11 +256,12 @@
  private int mustRunBestServerCheckingAlgorithm = 0;
  /**
   * The monitor provider for this replication domain. The name of the monitor
   * includes the local address and must therefore be re-registered every time
   * the session is re-established or destroyed. The monitor provider can only
   * be created (i.e. non-null) if there is a replication domain, which is not
   * the case in unit tests.
   * The monitor provider for this replication domain.
   * <p>
   * The name of the monitor includes the local address and must therefore be
   * re-registered every time the session is re-established or destroyed. The
   * monitor provider can only be created (i.e. non-null) if there is a
   * replication domain, which is not the case in unit tests.
   */
  private final ReplicationMonitor monitor;
@@ -268,7 +283,6 @@
    this.domain = replicationDomain;
    this.state = state;
    this.config = config;
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.replSessionSecurity = replSessionSecurity;
    this.generationID = generationId;
    this.rcvWindow = getMaxRcvWindow();
@@ -292,7 +306,7 @@
    {
      shutdown = false;
      this.rcvWindow = getMaxRcvWindow();
      connect(connectedRS.get());
      connect();
    }
  }
@@ -385,7 +399,7 @@
      {
        // This RS is locally configured, mark this
        rsInfo.setLocallyConfigured(true);
        rsInfo.serverURL = serverUrl;
        rsInfo.setServerURL(serverUrl);
        return;
      }
    }
@@ -425,22 +439,16 @@
   */
  public static class ReplicationServerInfo
  {
    private RSInfo rsInfo;
    private short protocolVersion;
    private long generationId;
    private byte groupId = -1;
    private int serverId;
    /** Received server URL. */
    private String serverURL;
    private DN baseDN;
    private int windowSize;
    private ServerState serverState;
    private boolean sslEncryption;
    private int degradedStatusThreshold = -1;
    /** Keeps the 1 value if created with a ReplServerStartMsg. */
    private int weight = 1;
    private final int degradedStatusThreshold;
    /** Keeps the 0 value if created with a ReplServerStartMsg. */
    private int connectedDSNumber = 0;
    private List<Integer> connectedDSs;
    private Set<Integer> connectedDSs;
    /**
     * Is this RS locally configured? (the RS is recognized as a usable server).
     */
@@ -458,8 +466,8 @@
    public static ReplicationServerInfo newInstance(
      ReplicationMsg msg, String newServerURL) throws IllegalArgumentException
    {
      ReplicationServerInfo rsInfo = newInstance(msg);
      rsInfo.serverURL = newServerURL;
      final ReplicationServerInfo rsInfo = newInstance(msg);
      rsInfo.setServerURL(newServerURL);
      return rsInfo;
    }
@@ -471,70 +479,62 @@
     * @throws IllegalArgumentException If the passed message has an unexpected
     *                                  type.
     */
    public static ReplicationServerInfo newInstance(
      ReplicationMsg msg) throws IllegalArgumentException
    public static ReplicationServerInfo newInstance(ReplicationMsg msg)
        throws IllegalArgumentException
    {
      if (msg instanceof ReplServerStartMsg)
      {
        // This is a ReplServerStartMsg (RS uses protocol V3 or under)
        ReplServerStartMsg replServerStartMsg = (ReplServerStartMsg) msg;
        return new ReplicationServerInfo(replServerStartMsg);
      } else if (msg instanceof ReplServerStartDSMsg)
        // RS uses protocol V3 or lower
        return new ReplicationServerInfo((ReplServerStartMsg) msg);
      }
      else if (msg instanceof ReplServerStartDSMsg)
      {
        // This is a ReplServerStartDSMsg (RS uses protocol V4 or higher)
        ReplServerStartDSMsg replServerStartDSMsg = (ReplServerStartDSMsg) msg;
        return new ReplicationServerInfo(replServerStartDSMsg);
        // RS uses protocol V4 or higher
        return new ReplicationServerInfo((ReplServerStartDSMsg) msg);
      }
      // Unsupported message type: should not happen
      throw new IllegalArgumentException("Unexpected PDU type: " +
        msg.getClass().getName() + " :\n" + msg);
      throw new IllegalArgumentException("Unexpected PDU type: "
          + msg.getClass().getName() + " :\n" + msg);
    }
    /**
     * Constructs a ReplicationServerInfo object wrapping a
     * {@link ReplServerStartMsg}.
     *
     * @param replServerStartMsg
     * @param msg
     *          The {@link ReplServerStartMsg} this object will wrap.
     */
    private ReplicationServerInfo(ReplServerStartMsg replServerStartMsg)
    private ReplicationServerInfo(ReplServerStartMsg msg)
    {
      this.protocolVersion = replServerStartMsg.getVersion();
      this.generationId = replServerStartMsg.getGenerationId();
      this.groupId = replServerStartMsg.getGroupId();
      this.serverId = replServerStartMsg.getServerId();
      this.serverURL = replServerStartMsg.getServerURL();
      this.baseDN = replServerStartMsg.getBaseDN();
      this.windowSize = replServerStartMsg.getWindowSize();
      this.serverState = replServerStartMsg.getServerState();
      this.sslEncryption = replServerStartMsg.getSSLEncryption();
      this.degradedStatusThreshold =
        replServerStartMsg.getDegradedStatusThreshold();
      this.protocolVersion = msg.getVersion();
      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
          msg.getGenerationId(), msg.getGroupId(), 1);
      this.baseDN = msg.getBaseDN();
      this.windowSize = msg.getWindowSize();
      this.serverState = msg.getServerState();
      this.sslEncryption = msg.getSSLEncryption();
      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
    }
    /**
     * Constructs a ReplicationServerInfo object wrapping a
     * {@link ReplServerStartDSMsg}.
     *
     * @param replServerStartDSMsg
     * @param msg
     *          The {@link ReplServerStartDSMsg} this object will wrap.
     */
    private ReplicationServerInfo(ReplServerStartDSMsg replServerStartDSMsg)
    private ReplicationServerInfo(ReplServerStartDSMsg msg)
    {
      this.protocolVersion = replServerStartDSMsg.getVersion();
      this.generationId = replServerStartDSMsg.getGenerationId();
      this.groupId = replServerStartDSMsg.getGroupId();
      this.serverId = replServerStartDSMsg.getServerId();
      this.serverURL = replServerStartDSMsg.getServerURL();
      this.baseDN = replServerStartDSMsg.getBaseDN();
      this.windowSize = replServerStartDSMsg.getWindowSize();
      this.serverState = replServerStartDSMsg.getServerState();
      this.sslEncryption = replServerStartDSMsg.getSSLEncryption();
      this.degradedStatusThreshold =
        replServerStartDSMsg.getDegradedStatusThreshold();
      this.weight = replServerStartDSMsg.getWeight();
      this.connectedDSNumber = replServerStartDSMsg.getConnectedDSNumber();
      this.rsInfo = new RSInfo(msg.getServerId(), msg.getServerURL(),
          msg.getGenerationId(), msg.getGroupId(), msg.getWeight());
      this.protocolVersion = msg.getVersion();
      this.baseDN = msg.getBaseDN();
      this.windowSize = msg.getWindowSize();
      this.serverState = msg.getServerState();
      this.sslEncryption = msg.getSSLEncryption();
      this.degradedStatusThreshold = msg.getDegradedStatusThreshold();
      this.connectedDSNumber = msg.getConnectedDSNumber();
    }
    /**
@@ -552,7 +552,7 @@
     */
    public byte getGroupId()
    {
      return groupId;
      return rsInfo.getGroupId();
    }
    /**
@@ -570,7 +570,7 @@
     */
    public long getGenerationId()
    {
      return generationId;
      return rsInfo.getGenerationId();
    }
    /**
@@ -579,7 +579,7 @@
     */
    public int getServerId()
    {
      return serverId;
      return rsInfo.getId();
    }
    /**
@@ -588,7 +588,7 @@
     */
    public String getServerURL()
    {
      return serverURL;
      return rsInfo.getServerUrl();
    }
    /**
@@ -635,7 +635,7 @@
     */
    public int getWeight()
    {
      return weight;
      return rsInfo.getWeight();
    }
    /**
@@ -654,15 +654,13 @@
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public ReplicationServerInfo(RSInfo rsInfo, List<Integer> connectedDSs)
    public ReplicationServerInfo(RSInfo rsInfo, Set<Integer> connectedDSs)
    {
      this.serverId = rsInfo.getId();
      this.serverURL = rsInfo.getServerUrl();
      this.generationId = rsInfo.getGenerationId();
      this.groupId = rsInfo.getGroupId();
      this.weight = rsInfo.getWeight();
      this.rsInfo = new RSInfo(rsInfo.getId(), rsInfo.getServerUrl(),
          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
      this.degradedStatusThreshold = -1;
      this.serverState = new ServerState();
    }
@@ -672,7 +670,7 @@
     */
    public RSInfo toRSInfo()
    {
      return new RSInfo(serverId, serverURL, generationId, groupId, weight);
      return rsInfo;
    }
    /**
@@ -681,15 +679,20 @@
     * @param rsInfo The RSinfo to use for the update
     * @param connectedDSs The new connected DSs
     */
    public void update(RSInfo rsInfo, List<Integer> connectedDSs)
    public void update(RSInfo rsInfo, Set<Integer> connectedDSs)
    {
      this.generationId = rsInfo.getGenerationId();
      this.groupId = rsInfo.getGroupId();
      this.weight = rsInfo.getWeight();
      this.rsInfo = new RSInfo(this.rsInfo.getId(), this.rsInfo.getServerUrl(),
          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
      this.connectedDSs = connectedDSs;
      this.connectedDSNumber = connectedDSs.size();
    }
    private void setServerURL(String newServerURL)
    {
      rsInfo = new RSInfo(rsInfo.getId(), newServerURL,
          rsInfo.getGenerationId(), rsInfo.getGroupId(), rsInfo.getWeight());
    }
    /**
     * Updates replication server info with the passed server state.
     * @param serverState The ServerState to use for the update
@@ -699,7 +702,8 @@
      if (this.serverState != null)
      {
        this.serverState.update(serverState);
      } else
      }
      else
      {
        this.serverState = serverState;
      }
@@ -709,7 +713,7 @@
     * Get the getConnectedDSs.
     * @return the getConnectedDSs
     */
    public List<Integer> getConnectedDSs()
    public Set<Integer> getConnectedDSs()
    {
      return connectedDSs;
    }
@@ -739,17 +743,17 @@
    @Override
    public String toString()
    {
      return "Url:" + this.serverURL + " ServerId:" + this.serverId
          + " GroupId:" + this.groupId;
      return "Url:" + getServerURL() + " ServerId:" + getServerId()
          + " GroupId:" + getGroupId();
    }
  }
  private void connect(ConnectedRS rs)
  private void connect()
  {
    if (getBaseDN().toNormalizedString().equalsIgnoreCase(
        ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
    {
      connectAsECL(rs);
      connectAsECL();
    }
    else
    {
@@ -770,8 +774,8 @@
    for (String serverUrl : getReplicationServerUrls())
    {
      // Connect to server + get and store info about it
      ReplicationServerInfo rsInfo =
          performPhaseOneHandshake(serverUrl, false, false);
      final ConnectedRS rs = performPhaseOneHandshake(serverUrl, false, false);
      final ReplicationServerInfo rsInfo = rs.rsInfo;
      if (rsInfo != null)
      {
        rsInfos.put(rsInfo.getServerId(), rsInfo);
@@ -799,13 +803,14 @@
   * </li>
   * </ul>
   */
  private void connectAsECL(ConnectedRS rs)
  private void connectAsECL()
  {
    // FIXME:ECL List of RS to connect is for now limited to one RS only
    final String bestServer = getReplicationServerUrls().iterator().next();
    if (performPhaseOneHandshake(bestServer, true, true) != null)
    final String bestServerURL = getReplicationServerUrls().iterator().next();
    final ConnectedRS rs = performPhaseOneHandshake(bestServerURL, true, true);
    if (rs.isConnected())
    {
      performECLPhaseTwoHandshake(bestServer, rs);
      performECLPhaseTwoHandshake(bestServerURL, rs);
    }
  }
@@ -836,10 +841,6 @@
   */
  private void connectAsDataServer()
  {
    /*
    May have created a broker with null replication domain for
    unit test purpose.
    */
    if (domain != null)
    {
      /*
@@ -876,22 +877,22 @@
      if (replicationServerInfos.isEmpty())
      {
        connectedRS.set(ConnectedRS.noConnectedRS());
        setConnectedRS(ConnectedRS.noConnectedRS());
      }
      else
      {
        // At least one server answered, find the best one.
        RSEvaluations evals = computeBestReplicationServer(true, -1, state,
            replicationServerInfos, serverId, getGroupId(), getGenerationID());
        ReplicationServerInfo electedRsInfo = evals.getBestRS();
        // Best found, now initialize connection to this one (handshake phase 1)
        if (debugEnabled())
          debugInfo("phase 2 : will perform PhaseOneH with the preferred RS="
              + electedRsInfo);
        electedRsInfo = performPhaseOneHandshake(
          electedRsInfo.getServerURL(), true, false);
              + evals.getBestRS());
        final ConnectedRS electedRS = performPhaseOneHandshake(
            evals.getBestRS().getServerURL(), true, false);
        final ReplicationServerInfo electedRsInfo = electedRS.rsInfo;
        if (electedRsInfo != null)
        {
          /*
@@ -904,25 +905,24 @@
          // Handshake phase 1 exchange went well
          // Compute in which status we are starting the session to tell the RS
          ServerStatus initStatus =
            computeInitialServerStatus(electedRsInfo.getGenerationId(),
            electedRsInfo.getServerState(),
            electedRsInfo.getDegradedStatusThreshold(),
            getGenerationID());
          final ServerStatus initStatus = computeInitialServerStatus(
              electedRsInfo.getGenerationId(), electedRsInfo.getServerState(),
              electedRsInfo.getDegradedStatusThreshold(), getGenerationID());
          // Perform session start (handshake phase 2)
          TopologyMsg topologyMsg = performPhaseTwoHandshake(
            electedRsInfo.getServerURL(), initStatus);
          final TopologyMsg topologyMsg =
              performPhaseTwoHandshake(electedRS, initStatus);
          if (topologyMsg != null) // Handshake phase 2 exchange went well
          {
            connectToReplicationServer(electedRsInfo, initStatus, topologyMsg);
            connectToReplicationServer(electedRS, initStatus, topologyMsg);
          } // Could perform handshake phase 2 with best
        } // Could perform handshake phase 1 with best
      }
      // connectedRS has been updated by calls above, reload it
      final ConnectedRS rs = connectedRS.get();
      if (rs.connected)
      if (rs.isConnected())
      {
        connectPhaseLock.notify();
@@ -932,13 +932,13 @@
        {
          logError(NOTE_NOW_FOUND_SAME_GENERATION_CHANGELOG.get(
              serverId, rsServerId, baseDN.toNormalizedString(),
              session.getReadableRemoteAddress(), getGenerationID()));
              rs.replicationServer, getGenerationID()));
        }
        else
        {
          logError(WARN_NOW_FOUND_BAD_GENERATION_CHANGELOG.get(
              serverId, rsServerId, baseDN.toNormalizedString(),
              session.getReadableRemoteAddress(), getGenerationID(), rsGenId));
              rs.replicationServer, getGenerationID(), rsGenId));
        }
      }
      else
@@ -969,29 +969,28 @@
  /**
   * Connects to a replication server.
   *
   * @param rsInfo
   * @param rs
   *          the Replication Server to connect to
   * @param initStatus
   *          The status to enter the state machine with
   * @param topologyMsg
   *          the message containing the topology information
   */
  private void connectToReplicationServer(ReplicationServerInfo rsInfo,
  private void connectToReplicationServer(ConnectedRS rs,
      ServerStatus initStatus, TopologyMsg topologyMsg)
  {
    final int serverId = getServerId();
    final DN baseDN = getBaseDN();
    final ReplicationServerInfo rsInfo = rs.rsInfo;
    ConnectedRS rs = null;
    boolean connectSuccessful = false;
    try
    {
      maxSendWindow = rsInfo.getWindowSize();
      receiveTopo(topologyMsg);
      receiveTopo(topologyMsg, rs.getServerId());
      /*
      Log a message to let the administrator know that the failure
      was resolved.
      Log a message to let the administrator know that the failure was resolved.
      Wake up all the thread that were waiting on the window
      on the previous connection.
      */
@@ -1018,17 +1017,11 @@
      }
      sendWindow = new Semaphore(maxSendWindow);
      rcvWindow = getMaxRcvWindow();
      rs = new ConnectedRS(true, rsInfo, session.getReadableRemoteAddress());
      connectedRS.set(rs);
      /*
      May have created a broker with null replication domain for
      unit test purpose.
      */
      if (domain != null)
      {
        domain.sessionInitiated(initStatus, rsInfo.getServerState(), rsInfo
            .getGenerationId(), session);
        domain.sessionInitiated(initStatus, rsInfo.getServerState(),
            rsInfo.getGenerationId(), rs.session);
      }
      final byte groupId = getGroupId();
@@ -1042,28 +1035,28 @@
        logError(WARN_CONNECTED_TO_SERVER_WITH_WRONG_GROUP_ID.get(
            Byte.toString(groupId), Integer.toString(rs.getServerId()),
            rsInfo.getServerURL(), Byte.toString(rs.getGroupId()),
            baseDN.toNormalizedString(), Integer.toString(serverId)));
            baseDN.toNormalizedString(), Integer.toString(getServerId())));
      }
      startRSHeartBeatMonitoring();
      startRSHeartBeatMonitoring(rs);
      if (rsInfo.getProtocolVersion() >=
        ProtocolVersion.REPLICATION_PROTOCOL_V3)
      {
        startChangeTimeHeartBeatPublishing();
        startChangeTimeHeartBeatPublishing(rs);
      }
      setConnectedRS(rs);
      connectSuccessful = true;
    }
    catch (Exception e)
    {
      Message message = ERR_COMPUTING_FAKE_OPS.get(
      logError(ERR_COMPUTING_FAKE_OPS.get(
          baseDN.toNormalizedString(), rsInfo.getServerURL(),
          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e));
      logError(message);
          e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e)));
    }
    finally
    {
      if (rs == null)
      if (!connectSuccessful)
      {
        connectedRS.set(ConnectedRS.noConnectedRS());
        setSession(null);
        setConnectedRS(ConnectedRS.noConnectedRS());
      }
    }
  }
@@ -1133,9 +1126,9 @@
   * messages exchange) and return the reply message from the replication
   * server, wrapped in a ReplicationServerInfo object.
   *
   * @param server
   * @param serverURL
   *          Server to connect to.
   * @param keepConnection
   * @param keepSession
   *          Do we keep session opened or not after handshake. Use true if want
   *          to perform handshake phase 2 with the same session and keep the
   *          session to create as the current one.
@@ -1143,10 +1136,10 @@
   *          Indicates whether or not the an ECL handshake is to be performed.
   * @return The answer from the server . Null if could not get an answer.
   */
  private ReplicationServerInfo performPhaseOneHandshake(
      String server, boolean keepConnection, boolean isECL)
  private ConnectedRS performPhaseOneHandshake(String serverURL,
      boolean keepSession, boolean isECL)
  {
    Session localSession = null;
    Session newSession = null;
    Socket socket = null;
    boolean hasConnected = false;
    Message errorMessage = null;
@@ -1158,15 +1151,16 @@
      socket.setReceiveBufferSize(1000000);
      socket.setTcpNoDelay(true);
      int timeoutMS = MultimasterReplication.getConnectionTimeoutMS();
      socket.connect(HostPort.valueOf(server).toInetSocketAddress(), timeoutMS);
      localSession = replSessionSecurity.createClientSession(socket, timeoutMS);
      socket.connect(HostPort.valueOf(serverURL).toInetSocketAddress(),
          timeoutMS);
      newSession = replSessionSecurity.createClientSession(socket, timeoutMS);
      boolean isSslEncryption = replSessionSecurity.isSslEncryption();
      // Send our ServerStartMsg.
      final HostPort hp = new HostPort(
          socket.getLocalAddress().getHostName(), socket.getLocalPort());
      String url = hp.toString();
      StartMsg serverStartMsg;
      final String url = hp.toString();
      final StartMsg serverStartMsg;
      if (!isECL)
      {
        serverStartMsg = new ServerStartMsg(getServerId(), url, getBaseDN(),
@@ -1179,11 +1173,11 @@
            getMaxRcvWindow(), config.getHeartbeatInterval(), state,
            getGenerationID(), isSslEncryption, getGroupId());
      }
      localSession.publish(serverStartMsg);
      newSession.publish(serverStartMsg);
      // Read the ReplServerStartMsg or ReplServerStartDSMsg that should
      // come back.
      ReplicationMsg msg = localSession.receive();
      ReplicationMsg msg = newSession.receive();
      if (debugEnabled())
      {
        debugInfo("RB HANDSHAKE SENT:\n" + serverStartMsg + "\nAND RECEIVED:\n"
@@ -1192,7 +1186,7 @@
      // Wrap received message in a server info object
      final ReplicationServerInfo replServerInfo =
          ReplicationServerInfo.newInstance(msg, server);
          ReplicationServerInfo.newInstance(msg, serverURL);
      // Sanity check
      final DN repDN = replServerInfo.getBaseDN();
@@ -1200,7 +1194,7 @@
      {
        errorMessage = ERR_DS_DN_DOES_NOT_MATCH.get(
            repDN.toNormalizedString(), getBaseDN().toNormalizedString());
        return null;
        return setConnectedRS(ConnectedRS.noConnectedRS());
      }
      /*
@@ -1208,64 +1202,53 @@
       * replication server will use the same one (or an older one if it is an
       * old replication server).
       */
      final short localProtocolVersion = getCompatibleVersion(replServerInfo
          .getProtocolVersion());
      if (keepConnection)
      {
        protocolVersion = localProtocolVersion;
      }
      localSession.setProtocolVersion(localProtocolVersion);
      newSession.setProtocolVersion(
          getCompatibleVersion(replServerInfo.getProtocolVersion()));
      if (!isSslEncryption)
      {
        localSession.stopEncryption();
        newSession.stopEncryption();
      }
      hasConnected = true;
      // If this connection is the one to use for sending and receiving
      // updates, store it.
      if (keepConnection)
      if (keepSession)
      {
        setSession(localSession);
        // cannot store it yet,
        // only store after a successful phase two handshake
        return new ConnectedRS(replServerInfo, newSession);
      }
      return replServerInfo;
      return new ConnectedRS(replServerInfo, null);
    }
    catch (ConnectException e)
    {
      errorMessage = WARN_NO_CHANGELOG_SERVER_LISTENING.get(getServerId(),
          server, getBaseDN().toNormalizedString());
          serverURL, getBaseDN().toNormalizedString());
    }
    catch (SocketTimeoutException e)
    {
      errorMessage = WARN_TIMEOUT_CONNECTING_TO_RS.get(getServerId(),
          server, getBaseDN().toNormalizedString());
          serverURL, getBaseDN().toNormalizedString());
    }
    catch (Exception e)
    {
      errorMessage = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(getServerId(),
          server, getBaseDN().toNormalizedString(),
          serverURL, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
    }
    finally
    {
      if (!hasConnected || !keepConnection)
      if (!hasConnected || !keepSession)
      {
        close(localSession);
        close(newSession);
        close(socket);
      }
      if (keepConnection && !hasConnected)
      {
        connectedRS.set(ConnectedRS.noConnectedRS());
      }
      if (!hasConnected && errorMessage != null && !connectionError)
      {
        // There was no server waiting on this host:port
        // Log a notice and will try the next replicationServer in the list
        if (keepConnection) // Log error message only for final connection
        if (keepSession) // Log error message only for final connection
        {
          // log the error message only once to avoid overflowing the error log
          logError(errorMessage);
@@ -1277,7 +1260,7 @@
        }
      }
    }
    return null;
    return setConnectedRS(ConnectedRS.noConnectedRS());
  }
@@ -1294,10 +1277,9 @@
    try
    {
      // Send our Start Session
      StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
      final StartECLSessionMsg startECLSessionMsg = new StartECLSessionMsg();
      startECLSessionMsg.setOperationId("-1");
      final Session localSession = session;
      localSession.publish(startECLSessionMsg);
      rs.session.publish(startECLSessionMsg);
      // FIXME ECL In the handshake phase two, should RS send back a topo msg ?
      if (debugEnabled())
@@ -1306,18 +1288,17 @@
      }
      // Alright set the timeout to the desired value
      localSession.setSoTimeout(timeout);
      connectedRS.set(rs.setConnected());
      rs.session.setSoTimeout(timeout);
      setConnectedRS(rs);
    }
    catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
      logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      logError(message);
          stackTraceToSingleLineString(e)));
      connectedRS.set(ConnectedRS.noConnectedRS());
      setSession(null);
      rs.session.close();
      setConnectedRS(ConnectedRS.noConnectedRS());
    }
  }
@@ -1326,22 +1307,18 @@
   * TopologyMsg messages exchange) and return the reply message from the
   * replication server.
   *
   * @param server Server we are connecting with.
   * @param electedRS Server we are connecting with.
   * @param initStatus The status we are starting with
   * @return The ReplServerStartMsg the server replied. Null if could not
   *         get an answer.
   */
  private TopologyMsg performPhaseTwoHandshake(String server,
  private TopologyMsg performPhaseTwoHandshake(ConnectedRS electedRS,
    ServerStatus initStatus)
  {
    try
    {
      /*
       * Send our StartSessionMsg.
       */
      StartSessionMsg startSessionMsg;
      // May have created a broker with null replication domain for
      // unit test purpose.
      // Send our StartSessionMsg.
      final StartSessionMsg startSessionMsg;
      if (domain != null)
      {
        startSessionMsg = new StartSessionMsg(
@@ -1359,11 +1336,11 @@
        startSessionMsg =
          new StartSessionMsg(initStatus, new ArrayList<String>());
      }
      final Session localSession = session;
      localSession.publish(startSessionMsg);
      final Session session = electedRS.session;
      session.publish(startSessionMsg);
      // Read the TopologyMsg that should come back.
      final TopologyMsg topologyMsg = (TopologyMsg) localSession.receive();
      final TopologyMsg topologyMsg = (TopologyMsg) session.receive();
      if (debugEnabled())
      {
@@ -1372,20 +1349,16 @@
      }
      // Alright set the timeout to the desired value
      localSession.setSoTimeout(timeout);
      session.setSoTimeout(timeout);
      return topologyMsg;
    }
    catch (Exception e)
    {
      Message message = WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), server, getBaseDN().toNormalizedString(),
          stackTraceToSingleLineString(e));
      logError(message);
      logError(WARN_EXCEPTION_STARTING_SESSION_PHASE.get(
          getServerId(), electedRS.rsInfo.getServerURL(),
          getBaseDN().toNormalizedString(), stackTraceToSingleLineString(e)));
      connectedRS.set(ConnectedRS.noConnectedRS());
      setSession(null);
      // Be sure to return null.
      setConnectedRS(ConnectedRS.noConnectedRS());
      return null;
    }
  }
@@ -2272,14 +2245,13 @@
  /**
   * Start the heartbeat monitor thread.
   */
  private void startRSHeartBeatMonitoring()
  private void startRSHeartBeatMonitoring(ConnectedRS rs)
  {
    // Start a heartbeat monitor thread.
    final long heartbeatInterval = config.getHeartbeatInterval();
    if (heartbeatInterval > 0)
    {
      heartbeatMonitor = new HeartbeatMonitor(getServerId(), getRsServerId(),
          getBaseDN().toNormalizedString(), session, heartbeatInterval);
      heartbeatMonitor = new HeartbeatMonitor(getServerId(), rs.getServerId(),
          getBaseDN().toNormalizedString(), rs.session, heartbeatInterval);
      heartbeatMonitor.start();
    }
  }
@@ -2302,7 +2274,7 @@
   */
  public void reStart(boolean infiniteTry)
  {
    reStart(session, infiniteTry);
    reStart(connectedRS.get().session, infiniteTry);
  }
  /**
@@ -2319,14 +2291,10 @@
      numLostConnections++;
    }
    ConnectedRS rs;
    if (failingSession == session)
    ConnectedRS rs = connectedRS.get();
    if (failingSession == rs.session && !rs.equals(ConnectedRS.noConnectedRS()))
    {
      rs = ConnectedRS.noConnectedRS();
      connectedRS.set(rs);
      setSession(null);
    } else {
      rs = connectedRS.get();
      rs = setConnectedRS(ConnectedRS.noConnectedRS());
    }
    while (true)
@@ -2334,14 +2302,14 @@
      // Synchronize inside the loop in order to allow shutdown.
      synchronized (startStopLock)
      {
        if (rs.connected || shutdown)
        if (rs.isConnected() || shutdown)
        {
          break;
        }
        try
        {
          connect(rs);
          connect();
          rs = connectedRS.get();
        }
        catch (Exception e)
@@ -2353,7 +2321,7 @@
          logError(mb.toMessage());
        }
        if (rs.connected || !infiniteTry)
        if (rs.isConnected() || !infiniteTry)
        {
          break;
        }
@@ -2362,7 +2330,7 @@
      {
          Thread.sleep(500);
      }
      catch (InterruptedException e)
      catch (InterruptedException ignored)
      {
        // ignore
      }
@@ -2370,7 +2338,7 @@
    if (debugEnabled())
    {
      debugInfo("end restart : connected=" + rs.connected + " with RS("
      debugInfo("end restart : connected=" + rs.isConnected() + " with RS("
          + rs.getServerId() + ") genId=" + generationID);
    }
  }
@@ -2451,7 +2419,7 @@
        Semaphore currentWindowSemaphore;
        synchronized (connectPhaseLock)
        {
          currentSession = session;
          currentSession = connectedRS.get().session;
          currentWindowSemaphore = sendWindow;
        }
@@ -2491,10 +2459,10 @@
            Check the session. If it has changed, some disconnection or
            reconnection happened and we need to restart from scratch.
            */
            final Session localSession = session;
            if (localSession != null && session == currentSession)
            final Session session = connectedRS.get().session;
            if (session != null && session == currentSession)
            {
              localSession.publish(msg);
              session.publish(msg);
              done = true;
            }
          }
@@ -2509,17 +2477,20 @@
            window update message was lost somehow...
            then loop to check again if connection was closed.
            */
            Session localSession = session;
            if (localSession != null)
            Session session = connectedRS.get().session;
            if (session != null)
            {
              localSession.publish(new WindowProbeMsg());
              session.publish(new WindowProbeMsg());
            }
          }
        }
      } catch (IOException e)
      }
      catch (IOException e)
      {
        if (!retryOnFailure)
        {
          return false;
        }
        // The receive threads should handle reconnection or
        // mark this broker in error. Just retry.
@@ -2590,17 +2561,17 @@
  {
    while (!shutdown)
    {
      final ConnectedRS rs = connectedRS.get();
      if (reconnectOnFailure && !rs.connected)
      ConnectedRS rs = connectedRS.get();
      if (reconnectOnFailure && !rs.isConnected())
      {
        // infinite try to reconnect
        reStart(null, true);
        continue;
      }
      // Save session information for later in case we need it for log messages
      // after the session has been closed and/or failed.
      final Session localSession = session;
      if (localSession == null)
      if (rs.session == null)
      {
        // Must be shutting down.
        break;
@@ -2611,7 +2582,7 @@
      final int previousRsServerID = rs.getServerId();
      try
      {
        ReplicationMsg msg = localSession.receive();
        ReplicationMsg msg = rs.session.receive();
        if (msg instanceof UpdateMsg)
        {
          synchronized (this)
@@ -2621,13 +2592,13 @@
        }
        if (msg instanceof WindowMsg)
        {
          WindowMsg windowMsg = (WindowMsg) msg;
          final WindowMsg windowMsg = (WindowMsg) msg;
          sendWindow.release(windowMsg.getNumAck());
        }
        else if (msg instanceof TopologyMsg)
        {
          TopologyMsg topoMsg = (TopologyMsg) msg;
          receiveTopo(topoMsg);
          final TopologyMsg topoMsg = (TopologyMsg) msg;
          receiveTopo(topoMsg, getRsServerId());
          if (reconnectToTheBestRS)
          {
            // Reset wait time before next computation of best server
@@ -2636,19 +2607,20 @@
          // Caller wants to check what's changed
          if (returnOnTopoChange)
          {
            return msg;
          }
        }
        else if (msg instanceof StopMsg)
        {
          // RS performs a proper disconnection
          Message message = WARN_REPLICATION_SERVER_PROPERLY_DISCONNECTED.get(
              previousRsServerID, localSession.getReadableRemoteAddress(),
              previousRsServerID, rs.replicationServer,
              serverId, baseDN.toNormalizedString());
          logError(message);
          // Try to find a suitable RS
          reStart(localSession, true);
          reStart(rs.session, true);
        }
        else if (msg instanceof MonitorMsg)
        {
@@ -2709,16 +2681,14 @@
                if (bestServerInfo == null)
                {
                  message = NOTE_LOAD_BALANCE_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      localSession.getReadableRemoteAddress(),
                      serverId, previousRsServerID, rs.replicationServer,
                      baseDN.toNormalizedString());
                }
                else
                {
                  final int bestRsServerId = bestServerInfo.getServerId();
                  message = NOTE_NEW_BEST_REPLICATION_SERVER.get(
                      serverId, previousRsServerID,
                      localSession.getReadableRemoteAddress(),
                      serverId, previousRsServerID, rs.replicationServer,
                      bestRsServerId,
                      baseDN.toNormalizedString(),
                      evals.getEvaluation(previousRsServerID).toString(),
@@ -2754,24 +2724,20 @@
        if (!shutdown)
        {
          final Session tmpSession = session;
          if (tmpSession == null || !tmpSession.closeInitiated())
          if (rs.session == null || !rs.session.closeInitiated())
          {
            // We did not initiate the close on our side, log an error message.
            Message message = WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
            logError(WARN_REPLICATION_SERVER_BADLY_DISCONNECTED.get(
                serverId, baseDN.toNormalizedString(), previousRsServerID,
                localSession.getReadableRemoteAddress());
            logError(message);
                rs.replicationServer));
          }
          if (reconnectOnFailure)
          {
            reStart(localSession, true);
          }
          else
          if (!reconnectOnFailure)
          {
            break; // does not seem necessary to explicitly disconnect ..
          }
          reStart(rs.session, true);
        }
      }
    } // while !shutdown
@@ -2824,10 +2790,10 @@
    try
    {
      updateDoneCount++;
      final Session localSession = session;
      if (updateDoneCount >= halfRcvWindow && localSession != null)
      final Session session = connectedRS.get().session;
      if (updateDoneCount >= halfRcvWindow && session != null)
      {
        localSession.publish(new WindowMsg(updateDoneCount));
        session.publish(new WindowMsg(updateDoneCount));
        rcvWindow += updateDoneCount;
        updateDoneCount = 0;
      }
@@ -2850,10 +2816,9 @@
    synchronized (startStopLock)
    {
      shutdown = true;
      setConnectedRS(ConnectedRS.stopped());
      stopRSHeartBeatMonitoring();
      stopChangeTimeHeartBeatPublishing();
      connectedRS.set(ConnectedRS.stopped());
      setSession(null);
      deregisterReplicationMonitor();
    }
  }
@@ -2872,10 +2837,10 @@
  public void setSoTimeout(int timeout) throws SocketException
  {
    this.timeout = timeout;
    final Session localSession = session;
    if (localSession != null)
    final Session session = connectedRS.get().session;
    if (session != null)
    {
      localSession.setSoTimeout(timeout);
      session.setSoTimeout(timeout);
    }
  }
@@ -2977,7 +2942,12 @@
   */
  public short getProtocolVersion()
  {
    return protocolVersion;
    final Session session = connectedRS.get().session;
    if (session != null)
    {
      return session.getProtocolVersion();
    }
    return ProtocolVersion.getCurrentVersion();
  }
  /**
@@ -2988,7 +2958,7 @@
   */
  public boolean isConnected()
  {
    return connectedRS.get().connected;
    return connectedRS.get().isConnected();
  }
  /**
@@ -2997,8 +2967,8 @@
   */
  public boolean isSessionEncrypted()
  {
    final Session tmp = session;
    return tmp != null ? tmp.isEncrypted() : false;
    final Session session = connectedRS.get().session;
    return session != null ? session.isEncrypted() : false;
  }
  /**
@@ -3009,9 +2979,8 @@
  {
    try
    {
      ChangeStatusMsg csMsg = new ChangeStatusMsg(ServerStatus.INVALID_STATUS,
        newStatus);
      session.publish(csMsg);
      connectedRS.get().session.publish(
          new ChangeStatusMsg(ServerStatus.INVALID_STATUS, newStatus));
    } catch (IOException ex)
    {
      Message message = ERR_EXCEPTION_SENDING_CS.get(
@@ -3051,13 +3020,14 @@
   * Computes the list of DSs connected to a particular RS.
   * @param rsId The RS id of the server one wants to know the connected DSs
   * @param dsList The list of DSinfo from which to compute things
   * @param rsServerId the serverId to use for the connectedDS
   * @return The list of connected DSs to the server rsId
   */
  private List<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList)
  private Set<Integer> computeConnectedDSs(int rsId, List<DSInfo> dsList,
      int rsServerId)
  {
    List<Integer> connectedDSs = new ArrayList<Integer>();
    if (getRsServerId() == rsId)
    final Set<Integer> connectedDSs = new HashSet<Integer>();
    if (rsServerId == rsId)
    {
      /*
      If we are computing connected DSs for the RS we are connected
@@ -3071,8 +3041,10 @@
    for (DSInfo dsInfo : dsList)
    {
      if (dsInfo.getRsId() == rsId)
      {
        connectedDSs.add(dsInfo.getDsId());
    }
    }
    return connectedDSs;
  }
@@ -3081,9 +3053,12 @@
   * Processes an incoming TopologyMsg.
   * Updates the structures for the local view of the topology.
   *
   * @param topoMsg The topology information received from RS.
   * @param topoMsg
   *          The topology information received from RS.
   * @param rsServerId
   *          the serverId to use for the connectedDS
   */
  public void receiveTopo(TopologyMsg topoMsg)
  private void receiveTopo(TopologyMsg topoMsg, int rsServerId)
  {
    if (debugEnabled())
      debugInfo("receive TopologyMsg=" + topoMsg);
@@ -3096,9 +3071,9 @@
    final Set<Integer> rssToKeep = new HashSet<Integer>();
    for (RSInfo rsInfo : topoMsg.getRsList())
    {
      int rsId = rsInfo.getId();
      final int rsId = rsInfo.getId();
      rssToKeep.add(rsId); // Mark this server as still existing
      final List<Integer> connectedDSs = computeConnectedDSs(rsId, dsList);
      Set<Integer> connectedDSs = computeConnectedDSs(rsId, dsList, rsServerId);
      ReplicationServerInfo rsInfo2 = replicationServerInfos.get(rsId);
      if (rsInfo2 == null)
      {
@@ -3106,7 +3081,8 @@
        rsInfo2 = new ReplicationServerInfo(rsInfo, connectedDSs);
        setLocallyConfiguredFlag(rsInfo2);
        replicationServerInfos.put(rsId, rsInfo2);
      } else
      }
      else
      {
        // Update the existing info for the replication server
        rsInfo2.update(rsInfo, connectedDSs);
@@ -3140,20 +3116,18 @@
  /**
   * Starts publishing to the RS the current timestamp used in this server.
   */
  private void startChangeTimeHeartBeatPublishing()
  private void startChangeTimeHeartBeatPublishing(ConnectedRS rs)
  {
    // Start a CSN heartbeat thread.
    long changeTimeHeartbeatInterval = config.getChangetimeHeartbeatInterval();
    if (changeTimeHeartbeatInterval > 0)
    {
      final Session localSession = session;
      final String threadName = "Replica DS(" + getServerId()
          + ") change time heartbeat publisher for domain \""
          + getBaseDN() + "\" to RS(" + getRsServerId()
          + ") at " + localSession.getReadableRemoteAddress();
              + ") change time heartbeat publisher for domain \"" + getBaseDN()
              + "\" to RS(" + rs.getServerId() + ") at " + rs.replicationServer;
      ctHeartbeatPublisherThread = new CTHeartbeatPublisherThread(
          threadName, localSession, changeTimeHeartbeatInterval, getServerId());
          threadName, rs.session, changeTimeHeartbeatInterval, getServerId());
      ctHeartbeatPublisherThread.start();
    }
    else
@@ -3206,8 +3180,8 @@
   */
  String getLocalUrl()
  {
    final Session tmp = session;
    return tmp != null ? tmp.getLocalUrl() : "";
    final Session session = connectedRS.get().session;
    return session != null ? session.getLocalUrl() : "";
  }
  /**
@@ -3221,28 +3195,30 @@
    return monitor.getMonitorInstanceName();
  }
  private void setSession(final Session newSession)
  private ConnectedRS setConnectedRS(final ConnectedRS newRS)
  {
    // De-register the monitor with the old name.
    final ConnectedRS oldRS = connectedRS.getAndSet(newRS);
    if (!oldRS.equals(newRS) && oldRS.session != null)
    {
      // monitor name is changing, deregister before registering again
    deregisterReplicationMonitor();
    final Session oldSession = session;
    if (oldSession != null)
    {
      oldSession.close();
    }
    session = newSession;
    // Re-register the monitor with the new name.
      oldRS.session.close();
    registerReplicationMonitor();
  }
    return newRS;
  }
  /**
   * Must be invoked each time the session changes because, the monitor name is
   * dynamically created with the session name, while monitor registration is
   * static.
   *
   * @see #monitor
   */
  private void registerReplicationMonitor()
  {
    /*
     * The monitor should not be registered if this is a unit test because the
     * replication domain is null.
     */
    // The monitor should not be registered if this is a unit test
    // because the replication domain is null.
    if (monitor != null)
    {
      DirectoryServer.registerMonitorProvider(monitor);
@@ -3251,10 +3227,8 @@
  private void deregisterReplicationMonitor()
  {
    /*
     * The monitor should not be deregistered if this is a unit test because the
     * replication domain is null.
     */
    // The monitor should not be deregistered if this is a unit test
    // because the replication domain is null.
    if (monitor != null)
    {
      DirectoryServer.deregisterMonitorProvider(monitor);
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -452,16 +452,13 @@
  {
    switch (status)
    {
      case NOT_CONNECTED_STATUS:
        break;
      case NORMAL_STATUS:
        break;
      case DEGRADED_STATUS:
        break;
      case FULL_UPDATE_STATUS:
        // Signal RS we just entered the full update status
        broker.signalStatusChange(status);
        break;
      case NOT_CONNECTED_STATUS:
      case NORMAL_STATUS:
      case DEGRADED_STATUS:
      case BAD_GEN_ID_STATUS:
        break;
      default:
@@ -1221,9 +1218,7 @@
      }
    }
    /**
     * {@inheritDoc}
     */
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
@@ -1562,19 +1557,17 @@
    // Don't forget to release IEcontext acquired at beginning.
    releaseIEContext();
    String cause = exportRootException != null ? exportRootException
        .getLocalizedMessage() : "";
    final String cause = exportRootException == null ? ""
        : exportRootException.getLocalizedMessage();
    if (serverToInitialize == RoutableMsg.ALL_SERVERS)
    {
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
          getBaseDNString(), serverID, cause);
      logError(msg);
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END_ALL.get(
          getBaseDNString(), serverID, cause));
    }
    else
    {
      Message msg = NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
          getBaseDNString(), serverID, serverToInitialize, cause);
      logError(msg);
      logError(NOTE_FULL_UPDATE_ENGAGED_FOR_REMOTE_END.get(
          getBaseDNString(), serverID, serverToInitialize, cause));
    }
@@ -2388,8 +2381,8 @@
      {
        Message msg = NOTE_FULL_UPDATE_ENGAGED_FROM_REMOTE_END.get(
            getBaseDNString(), initTargetMsgReceived.getSenderID(), serverID,
            (ieContext.getException() != null ? ieContext
                .getException().getLocalizedMessage() : ""));
            (ieContext.getException() == null ? ""
                : ieContext.getException().getLocalizedMessage()));
        logError(msg);
        releaseIEContext();
      } // finally
@@ -3084,7 +3077,7 @@
   *                             be produced.
   * @throws DirectoryException  When needed.
   */
  abstract protected void exportBackend(OutputStream output)
  protected abstract void exportBackend(OutputStream output)
           throws DirectoryException;
  /**
@@ -3095,7 +3088,7 @@
   *
   * @throws DirectoryException  When needed.
   */
  abstract protected void importBackend(InputStream input)
  protected abstract void importBackend(InputStream input)
           throws DirectoryException;
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/core/UnbindOperationTestCase.java
@@ -26,8 +26,6 @@
 */
package org.opends.server.core;
import static org.testng.Assert.*;
import java.util.ArrayList;
import org.opends.messages.Message;
@@ -39,33 +37,28 @@
import org.opends.server.types.ResultCode;
import org.testng.annotations.Test;
import static org.opends.server.protocols.internal.InternalClientConnection.*;
import static org.testng.Assert.*;
/**
 * A set of test cases for unbind operations
 */
public class UnbindOperationTestCase
       extends OperationTestCase
public class UnbindOperationTestCase extends OperationTestCase
{
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override()
  protected Operation[] createTestOperations()
         throws Exception
  {
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    final InternalClientConnection conn = getRootConnection();
    return new Operation[]
    {
      new UnbindOperationBasis(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(),
                          null),
      new UnbindOperationBasis(conn, InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(),
                          new ArrayList<Control>())
      new UnbindOperationBasis(conn, nextOperationID(), nextMessageID(), null),
      new UnbindOperationBasis(conn, nextOperationID(), nextMessageID(), new ArrayList<Control>())
    };
  }
  /**
   * Invokes a number of operation methods on the provided unbind operation for
   * which all processing has been completed.
@@ -79,8 +72,6 @@
    assertTrue(unbindOperation.getProcessingTime() >= 0);
  }
  /**
   * Attempts an internal unbind operation.  This won't actually do anything,
   * since there's nothing to disconnect with an internal connection, but it
@@ -90,13 +81,10 @@
  public void testUnbindInternal()
  {
    InvocationCounterPlugin.resetAllCounters();
    final InternalClientConnection conn = getRootConnection();
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    UnbindOperationBasis unbindOperation =
         new UnbindOperationBasis(conn, InternalClientConnection.nextOperationID(),
                             InternalClientConnection.nextMessageID(), new ArrayList<Control>());
    UnbindOperationBasis unbindOperation = new UnbindOperationBasis(
        conn, nextOperationID(), nextMessageID(), new ArrayList<Control>());
    unbindOperation.run();
    examineCompletedOperation(unbindOperation);
@@ -104,8 +92,6 @@
//    assertTrue(InvocationCounterPlugin.getPostOperationCount() > 0);
  }
  /**
   * Tests the <CODE>cancel</CODE> method to ensure that it indicates that the
   * operation cannot be cancelled.
@@ -113,21 +99,16 @@
  @Test()
  public void testCancel()
  {
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    final InternalClientConnection conn = getRootConnection();
    CancelRequest cancelRequest =
         new CancelRequest(false, Message.raw("Test Unbind Cancel"));
    UnbindOperationBasis unbindOperation =
         new UnbindOperationBasis(conn, InternalClientConnection.nextOperationID(),
                             InternalClientConnection.nextMessageID(), new ArrayList<Control>());
    UnbindOperationBasis unbindOperation = new UnbindOperationBasis(
        conn, nextOperationID(), nextMessageID(), new ArrayList<Control>());
    assertEquals(unbindOperation.cancel(cancelRequest).getResultCode(),
                 ResultCode.CANNOT_CANCEL);
  }
  /**
   * Tests the <CODE>getCancelRequest</CODE> method to ensure that it always
   * returns <CODE>null</CODE>.
@@ -135,20 +116,16 @@
  @Test
  public void testGetCancelRequest()
  {
    InternalClientConnection conn =
         InternalClientConnection.getRootConnection();
    final InternalClientConnection conn = getRootConnection();
    CancelRequest cancelRequest =
         new CancelRequest(false, Message.raw("Test Unbind Cancel"));
    UnbindOperationBasis unbindOperation =
         new UnbindOperationBasis(conn, InternalClientConnection.nextOperationID(),
                             InternalClientConnection.nextMessageID(), new ArrayList<Control>());
    UnbindOperationBasis unbindOperation = new UnbindOperationBasis(
        conn, nextOperationID(), nextMessageID(), new ArrayList<Control>());
    assertNull(unbindOperation.getCancelRequest());
    assertEquals(unbindOperation.cancel(cancelRequest).getResultCode(),
                 ResultCode.CANNOT_CANCEL);
    assertNull(unbindOperation.getCancelRequest());
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/service/ComputeBestServerTest.java
@@ -51,6 +51,7 @@
import static org.assertj.core.data.MapEntry.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.service.ReplicationBroker.*;
@@ -683,7 +684,7 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "AwinnerHost:123", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -700,10 +701,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "BwinnerHost:123", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -723,10 +724,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1));
        newSet(1));
    put(rsInfos,
        new RSInfo(12, "CwinnerHost:456", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -747,10 +748,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "DwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1));
        newSet(1));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(101));
        newSet(101));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -770,10 +771,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "EwinnerHost:456", 0L, (byte)1, 1),
        Arrays.asList(101));
        newSet(101));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -793,10 +794,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1));
        newSet(1));
    put(rsInfos,
        new RSInfo(12, "FwinnerHost:456", 0L, (byte)1, 2),
        Arrays.asList(101));
        newSet(101));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -817,10 +818,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1));
        newSet(1));
    put(rsInfos,
        new RSInfo(12, "GwinnerHost:456", 0L, (byte)1, 2),
        Arrays.asList(101, 102));
        newSet(101, 102));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -841,10 +842,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "HwinnerHost:456", 0L, (byte)1, 2),
        Arrays.asList(101, 102, 103, 104));
        newSet(101, 102, 103, 104));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -865,13 +866,13 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1));
        newSet(1));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 2),
        Arrays.asList(101, 102));
        newSet(101, 102));
    put(rsInfos,
        new RSInfo(13, "IwinnerHost:789", 0L, (byte)1, 3),
        Arrays.asList(201, 202, 203));
        newSet(201, 202, 203));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -891,13 +892,13 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "JwinnerHost:123", 0L, (byte)1, 5),
        Arrays.asList(1, 2, 3));
        newSet(1, 2, 3));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 3),
        Arrays.asList(101, 102, 103, 104, 105));
        newSet(101, 102, 103, 104, 105));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 2),
        Arrays.asList(201));
        newSet(201));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -921,10 +922,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1));
        newSet(1));
    put(rsInfos,
        new RSInfo(12, "KwinnerHost:456", 0L, (byte)1, 1),
        Arrays.asList(101));
        newSet(101));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -944,10 +945,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "LwinnerHost:456", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -968,10 +969,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "MwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -992,16 +993,16 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 3),
        Arrays.asList(1, 2, 3, 4, 5, 6));
        newSet(1, 2, 3, 4, 5, 6));
    put(rsInfos,
        new RSInfo(12, "NwinnerHost:456", 0L, (byte)1, 4),
        Arrays.asList(101, 102, 103, 104, 105, 106, 107, 108));
        newSet(101, 102, 103, 104, 105, 106, 107, 108));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(201, 202));
        newSet(201, 202));
    put(rsInfos,
        new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2),
        Arrays.asList(301, 302, 303, 304));
        newSet(301, 302, 303, 304));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1025,16 +1026,16 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 3),
        Arrays.asList(1, 2, 3, 4));
        newSet(1, 2, 3, 4));
    put(rsInfos,
        new RSInfo(12, "OwinnerHost:456", 0L, (byte)1, 4),
        Arrays.asList(101, 102, 103, 104, 105, 106, 107, 108));
        newSet(101, 102, 103, 104, 105, 106, 107, 108));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(201, 202));
        newSet(201, 202));
    put(rsInfos,
        new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2),
        Arrays.asList(301, 302, 303, 304, 305, 306));
        newSet(301, 302, 303, 304, 305, 306));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1056,16 +1057,16 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "PwinnerHost:123", 0L, (byte)1, 3),
        Arrays.asList(1, 2, 3, 4));
        newSet(1, 2, 3, 4));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 4),
        Arrays.asList(101, 102, 103, 104, 105, 106, 107, 108));
        newSet(101, 102, 103, 104, 105, 106, 107, 108));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(201, 202));
        newSet(201, 202));
    put(rsInfos,
        new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2),
        Arrays.asList(306, 305, 304, 303, 302, 301));
        newSet(306, 305, 304, 303, 302, 301));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1087,16 +1088,16 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 3),
        Arrays.asList(1, 2, 3, 4));
        newSet(1, 2, 3, 4));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 4),
        Arrays.asList(101, 102, 103, 104, 105, 106, 107, 108));
        newSet(101, 102, 103, 104, 105, 106, 107, 108));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(201, 202));
        newSet(201, 202));
    put(rsInfos,
        new RSInfo(14, "QwinnerHost:1011", 0L, (byte)1, 2),
        Arrays.asList(306, 305, 304, 303, 302, 301));
        newSet(306, 305, 304, 303, 302, 301));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1120,16 +1121,16 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte) 1, 3),
        Arrays.asList(1, 2, 3, 4));
        newSet(1, 2, 3, 4));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 4),
        Arrays.asList(113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101));
        newSet(113, 112, 111, 110, 109, 108, 107, 106, 105, 104, 103, 102, 101));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(201, 202));
        newSet(201, 202));
    put(rsInfos,
        new RSInfo(14, "looserHost:1011", 0L, (byte)1, 2),
        Arrays.asList(301));
        newSet(301));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1153,10 +1154,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "RwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(3));
        newSet(3));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1181,10 +1182,10 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "SwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(3));
        newSet(3));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1207,13 +1208,13 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "TwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(3));
        newSet(3));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(4));
        newSet(4));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1236,13 +1237,13 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "UwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2, 3));
        newSet(1, 2, 3));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(4, 5));
        newSet(4, 5));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        Arrays.asList(6, 7));
        newSet(6, 7));
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1263,13 +1264,13 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "looserHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(3));
        newSet(3));
    put(rsInfos,
        new RSInfo(13, "VwinnerHost:789", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1290,13 +1291,13 @@
    rsInfos = new HashMap<Integer, ReplicationServerInfo>();
    put(rsInfos,
        new RSInfo(11, "WwinnerHost:123", 0L, (byte)1, 1),
        Arrays.asList(1, 2));
        newSet(1, 2));
    put(rsInfos,
        new RSInfo(12, "looserHost:456", 0L, (byte)1, 1),
        Arrays.asList(3));
        newSet(3));
    put(rsInfos,
        new RSInfo(13, "looserHost:789", 0L, (byte)1, 1),
        EMPTY_LIST);
        EMPTY_SET);
    testData[idx++] = new Object[] {
      rsInfos,
@@ -1309,10 +1310,9 @@
  }
  private void put(Map<Integer, ReplicationServerInfo> rsInfos, RSInfo rsInfo,
      List<Integer> connectedDSs)
      Set<Integer> connectedDSs)
  {
    ReplicationServerInfo info =
        new ReplicationServerInfo(rsInfo, connectedDSs);
    ReplicationServerInfo info = new ReplicationServerInfo(rsInfo, connectedDSs);
    rsInfos.put(info.getServerId(), info);
  }
@@ -1341,7 +1341,8 @@
        url = bestServer.getServerURL();
      }
      assertNull(bestServer, "The best server should be null but is: " + url);
    } else
    }
    else
    {
      assertNotNull(bestServer, "The best server should not be null");
      assertEquals(bestServer.getServerURL(),