mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
14.22.2013 5d1c7d62d688c64af2627ceb8b1556ef313954ec
ReplicationServerDomain.java
Renamed directoryServers to connectedDSs.
Renamed replicationServers to connectedRSs.
Removed the useless getConnectedLDAPservers(), replaced with getConnectedDSs().
Renamed a few local variables.

DataServerHandler.java:
Consequence of removing to ReplicationServerDomain.getConnectedLDAPservers()
Extracted method changeStatus() from changeStatusFromStatusAnalyzer() and changeStatusForResetGenId().
Extracted method getStatusMachineEvent() from changeStatusForResetGenId().

InitOnLineTest.java
Consequence of removing to ReplicationServerDomain.getConnectedLDAPservers().
Removed useless try / catch / fail.
Extracted methods getCompletionTime(), assertAttributeValue().
Used Assertions.assertThat().
3 files modified
773 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/DataServerHandler.java 179 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 172 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java 422 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -29,7 +29,6 @@
import java.io.IOException;
import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.replication.common.*;
@@ -39,6 +38,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachine.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
@@ -92,76 +92,10 @@
   */
  public void changeStatusForResetGenId(long newGenId) throws IOException
  {
    final int localRsServerId = replicationServer.getServerId();
    StatusMachineEvent event;
    if (newGenId == -1)
    StatusMachineEvent event = getStatusMachineEvent(newGenId);
    if (event == null)
    {
      // The generation id is being made invalid, let's put the DS
      // into BAD_GEN_ID_STATUS
      event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    } else
    {
      if (newGenId == generationId)
      {
        if (status == ServerStatus.BAD_GEN_ID_STATUS)
        {
          // This server has the good new reference generation id.
          // Close connection with him to force his reconnection: DS will
          // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
          if (debugEnabled())
          {
            TRACER.debugInfo(
                "In RS " + localRsServerId +
                ", closing connection to DS " + getServerId() +
                " for baseDn " + getBaseDN() +
                " to force reconnection as new local" +
                " generationId and remote one match and DS is in bad gen id: " +
                newGenId);
          }
          // Connection closure must not be done calling RSD.stopHandler() as it
          // would rewait the RSD lock that we already must have entering this
          // method. This would lead to a reentrant lock which we do not want.
          // So simply close the session, this will make the hang up appear
          // after the reader thread that took the RSD lock releases it.
          if (session != null
              // V4 protocol introduced a StopMsg to properly close the
              // connection between servers
             && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
          {
            try
            {
              session.publish(new StopMsg());
            }
            catch (IOException ioe)
            {
              // Anyway, going to close session, so nothing to do
            }
          }
          // NOT_CONNECTED_STATUS is the last one in RS session life: handler
          // will soon disappear after this method call...
          status = ServerStatus.NOT_CONNECTED_STATUS;
          return;
        } else
        {
          if (debugEnabled())
          {
            TRACER.debugInfo("In RS " + localRsServerId + ". DS "
                + getServerId() + " for baseDn " + getBaseDN()
                + " has already generation id " + newGenId
                + " so no ChangeStatusMsg sent to him.");
          }
          return;
        }
      } else
      {
        // This server has a bad generation id compared to new reference one,
        // let's put it into BAD_GEN_ID_STATUS
        event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
      }
      return;
    }
    if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
@@ -170,7 +104,7 @@
      // Prevent useless error message (full update status cannot lead to bad
      // gen status)
      Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
              Integer.toString(localRsServerId),
              Integer.toString(replicationServer.getServerId()),
              getBaseDN(),
              Integer.toString(serverId),
              Long.toString(generationId),
@@ -179,30 +113,73 @@
      return;
    }
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    changeStatus(event, "for reset gen id");
  }
    if (newStatus == ServerStatus.INVALID_STATUS)
  private StatusMachineEvent getStatusMachineEvent(long newGenId)
  {
    if (newGenId == -1)
    {
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
          Integer.toString(serverId), status.toString(), event.toString());
      logError(msg);
      return;
      // The generation id is being made invalid, let's put the DS
      // into BAD_GEN_ID_STATUS
      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    }
    if (newGenId != generationId)
    {
      // This server has a bad generation id compared to new reference one,
      // let's put it into BAD_GEN_ID_STATUS
      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
        ServerStatus.INVALID_STATUS);
    if (status != ServerStatus.BAD_GEN_ID_STATUS)
    {
      if (debugEnabled())
      {
        TRACER.debugInfo("In RS " + replicationServer.getServerId()
            + ", DS " + getServerId() + " for baseDn " + getBaseDN()
            + " has already generation id " + newGenId
            + " so no ChangeStatusMsg sent to him.");
      }
      return null;
    }
    // This server has the good new reference generation id.
    // Close connection with him to force his reconnection: DS will
    // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS " + localRsServerId
          + " Sending change status for reset gen id to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
      TRACER.debugInfo("In RS " + replicationServer.getServerId()
          + ", closing connection to DS " + getServerId() + " for baseDn "
          + getBaseDN() + " to force reconnection as new local"
          + " generationId and remote one match and DS is in bad gen id: "
          + newGenId);
    }
    session.publish(csMsg);
    // Connection closure must not be done calling RSD.stopHandler() as it
    // would rewait the RSD lock that we already must have entering this
    // method. This would lead to a reentrant lock which we do not want.
    // So simply close the session, this will make the hang up appear
    // after the reader thread that took the RSD lock releases it.
    if (session != null
        // V4 protocol introduced a StopMsg to properly close the
        // connection between servers
        && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      try
      {
        session.publish(new StopMsg());
      }
      catch (IOException ioe)
      {
        // Anyway, going to close session, so nothing to do
      }
    }
    status = newStatus;
    // NOT_CONNECTED_STATUS is the last one in RS session life: handler
    // will soon disappear after this method call...
    status = ServerStatus.NOT_CONNECTED_STATUS;
    return null;
  }
  /**
@@ -215,6 +192,12 @@
  public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
  throws IOException
  {
    return changeStatus(event, "from status analyzer");
  }
  private ServerStatus changeStatus(StatusMachineEvent event, String origin)
      throws IOException
  {
    // Check state machine allows this new status (Sanity check)
    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
    if (newStatus == ServerStatus.INVALID_STATUS)
@@ -222,22 +205,20 @@
      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
          Integer.toString(serverId), status.toString(), event.toString());
      logError(msg);
      // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
      // and vice versa. We may are being trying to change the status while for
      // instance another status has just been entered: e.g a full update has
      // just been engaged. In that case, just ignore attempt to change the
      // status
      // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice
      // versa. We may be trying to change the status while another status has
      // just been entered: e.g a full update has just been engaged.
      // In that case, just ignore attempt to change the status
      return newStatus;
    }
    // Send message requesting to change the DS status
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
        ServerStatus.INVALID_STATUS);
    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS);
    if (debugEnabled())
    {
      TRACER.debugInfo("In RS " + replicationServer.getServerId()
          + " Sending change status from status analyzer to " + getServerId()
          + " Sending change status " + origin + " to " + getServerId()
          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
    }
@@ -589,7 +570,7 @@
          localGenerationId, sslEncryption, getLocalGroupId(),
          replicationServer.getDegradedStatusThreshold(),
          replicationServer.getWeight(),
          replicationServerDomain.getConnectedLDAPservers().size());
          replicationServerDomain.getConnectedDSs().size());
    }
    send(startMsg);
@@ -626,16 +607,10 @@
   * receiving a StopMsg to properly stop the handshake procedure.
   * @return the startSessionMsg received or null DS sent a stop message to
   *         not finish the handshake.
   * @throws DirectoryException
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws DataFormatException
   * @throws NotSupportedOldVersionPDUException
   * @throws Exception
   */
  private StartSessionMsg waitAndProcessStartSessionFromRemoteDS()
  throws DirectoryException, IOException, ClassNotFoundException,
  DataFormatException,
  NotSupportedOldVersionPDUException
      throws Exception
  {
    ReplicationMsg msg = session.receive();
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -92,7 +92,7 @@
   * we are currently publishing the first update in the balanced tree is the
   * next change that we must push to this particular server.
   */
  private final Map<Integer, DataServerHandler> directoryServers =
  private final Map<Integer, DataServerHandler> connectedDSs =
    new ConcurrentHashMap<Integer, DataServerHandler>();
  /**
@@ -101,7 +101,7 @@
   * in the balanced tree is the next change that we must push to this
   * particular server.
   */
  private final Map<Integer, ReplicationServerHandler> replicationServers =
  private final Map<Integer, ReplicationServerHandler> connectedRSs =
    new ConcurrentHashMap<Integer, ReplicationServerHandler>();
  private final Queue<MessageHandler> otherHandlers =
@@ -358,12 +358,10 @@
     */
    NotAssuredUpdateMsg notAssuredUpdate = null;
    /*
     * Push the message to the replication servers
     */
    // Push the message to the replication servers
    if (sourceHandler.isDataServer())
    {
      for (ReplicationServerHandler handler : replicationServers.values())
      for (ReplicationServerHandler handler : connectedRSs.values())
      {
        /**
         * Ignore updates to RS with bad gen id
@@ -392,10 +390,8 @@
      }
    }
    /*
     * Push the message to the LDAP servers
     */
    for (DataServerHandler handler : directoryServers.values())
    // Push the message to the LDAP servers
    for (DataServerHandler handler : connectedDSs.values())
    {
      // Don't forward the change to the server that just sent it
      if (handler == sourceHandler)
@@ -576,7 +572,7 @@
      }
      // Look for DS eligible for assured
      for (DataServerHandler handler : directoryServers.values())
      for (DataServerHandler handler : connectedDSs.values())
      {
        // Don't forward the change to the server that just sent it
        if (handler == sourceHandler)
@@ -735,7 +731,7 @@
  private void collectRSsEligibleForAssuredReplication(byte groupId,
      List<Integer> expectedServers)
  {
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      if (handler.getGroupId() == groupId
      // No ack expected from a RS with different group id
@@ -908,9 +904,8 @@
          List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
          for (Integer serverId : serversInTimeout)
          {
            ServerHandler expectedDSInTimeout = directoryServers.get(serverId);
            ServerHandler expectedRSInTimeout =
                replicationServers.get(serverId);
            ServerHandler expectedDSInTimeout = connectedDSs.get(serverId);
            ServerHandler expectedRSInTimeout = connectedRSs.get(serverId);
            if (expectedDSInTimeout != null)
            {
              if (safeRead)
@@ -950,7 +945,7 @@
   */
  public void stopReplicationServers(Collection<String> replServers)
  {
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      if (replServers.contains(handler.getServerAddressURL()))
      {
@@ -968,13 +963,13 @@
  public void stopAllServers(boolean shutdown)
  {
    // Close session with other replication servers
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    for (ReplicationServerHandler serverHandler : connectedRSs.values())
    {
      stopServer(serverHandler, shutdown);
    }
    // Close session with other LDAP servers
    for (DataServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : connectedDSs.values())
    {
      stopServer(serverHandler, shutdown);
    }
@@ -988,12 +983,12 @@
   */
  public boolean checkForDuplicateDS(DataServerHandler handler)
  {
    if (directoryServers.containsKey(handler.getServerId()))
    if (connectedDSs.containsKey(handler.getServerId()))
    {
      // looks like two connected LDAP servers have the same serverId
      Message message = ERR_DUPLICATE_SERVER_ID.get(
          localReplicationServer.getMonitorInstanceName(),
          directoryServers.get(handler.getServerId()).toString(),
          connectedDSs.get(handler.getServerId()).toString(),
          handler.toString(), handler.getServerId());
      logError(message);
      return false;
@@ -1047,7 +1042,7 @@
      try
      {
        // Stop useless monitoring publisher if no more RS or DS in domain
        if ( (directoryServers.size() + replicationServers.size() )== 1)
        if ( (connectedDSs.size() + connectedRSs.size() )== 1)
        {
          if (debugEnabled())
          {
@@ -1062,7 +1057,7 @@
        if (handler.isReplicationServer())
        {
          if (replicationServers.containsKey(handler.getServerId()))
          if (connectedRSs.containsKey(handler.getServerId()))
          {
            unregisterServerHandler(handler);
            handler.shutdown();
@@ -1076,11 +1071,11 @@
              buildAndSendTopoInfoToDSs(null);
            }
          }
        } else if (directoryServers.containsKey(handler.getServerId()))
        } else if (connectedDSs.containsKey(handler.getServerId()))
        {
          // If this is the last DS for the domain,
          // shutdown the status analyzer
          if (directoryServers.size() == 1)
          if (connectedDSs.size() == 1)
          {
            if (debugEnabled())
            {
@@ -1193,11 +1188,11 @@
  {
    if (handler.isReplicationServer())
    {
      replicationServers.remove(handler.getServerId());
      connectedRSs.remove(handler.getServerId());
    }
    else
    {
      directoryServers.remove(handler.getServerId());
      connectedDSs.remove(handler.getServerId());
    }
  }
@@ -1224,9 +1219,9 @@
    // topology and the generationId has never been saved, then we can reset
    // it and the next LDAP server to connect will become the new reference.
    boolean lDAPServersConnectedInTheTopology = false;
    if (directoryServers.isEmpty())
    if (connectedDSs.isEmpty())
    {
      for (ReplicationServerHandler rsh : replicationServers.values())
      for (ReplicationServerHandler rsh : connectedRSs.values())
      {
        if (generationId != rsh.getGenerationId())
        {
@@ -1283,7 +1278,7 @@
  throws DirectoryException
  {
    ReplicationServerHandler oldHandler =
      replicationServers.get(handler.getServerId());
      connectedRSs.get(handler.getServerId());
    if (oldHandler != null)
    {
      if (oldHandler.getServerAddressURL().equals(
@@ -1339,7 +1334,7 @@
  public Set<String> getChangelogs()
  {
    Set<String> results = new LinkedHashSet<String>();
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      results.add(handler.getServerAddressURL());
    }
@@ -1358,22 +1353,6 @@
  }
  /**
   * Returns as a set of String the list of LDAP servers connected to us.
   * Each string is the serverID of a connected LDAP server.
   *
   * @return The set of connected LDAP servers
   */
  public List<String> getConnectedLDAPservers()
  {
    List<String> results = new ArrayList<String>(0);
    for (DataServerHandler handler : directoryServers.values())
    {
      results.add(String.valueOf(handler.getServerId()));
    }
    return results;
  }
  /**
   * Creates and returns an iterator.
   * When the iterator is not used anymore, the caller MUST call the
   * ReplicationIterator.releaseCursor() method to free the resources
@@ -1493,7 +1472,7 @@
      {
        // Send to all replication servers with a least one remote
        // server connected
        for (ReplicationServerHandler rsh : replicationServers.values())
        for (ReplicationServerHandler rsh : connectedRSs.values())
        {
          if (rsh.hasRemoteLDAPServers())
          {
@@ -1503,7 +1482,7 @@
      }
      // Sends to all connected LDAP servers
      for (DataServerHandler destinationHandler : directoryServers.values())
      for (DataServerHandler destinationHandler : connectedDSs.values())
      {
        // Don't loop on the sender
        if (destinationHandler == senderHandler)
@@ -1516,7 +1495,7 @@
    {
      // Destination is one server
      DataServerHandler destinationHandler =
        directoryServers.get(msg.getDestination());
        connectedDSs.get(msg.getDestination());
      if (destinationHandler != null)
      {
        servers.add(destinationHandler);
@@ -1527,13 +1506,13 @@
        // have the targeted server connected.
        if (senderHandler.isDataServer())
        {
          for (ReplicationServerHandler h : replicationServers.values())
          for (ReplicationServerHandler rsHandler : connectedRSs.values())
          {
            // Send to all replication servers with a least one remote
            // server connected
            if (h.isRemoteLDAPServer(msg.getDestination()))
            if (rsHandler.isRemoteLDAPServer(msg.getDestination()))
            {
              servers.add(h);
              servers.add(rsHandler);
            }
          }
        }
@@ -1808,14 +1787,14 @@
      // from the states stored in the serverHandler.
      // - the server state
      // - the older missing change
      for (DataServerHandler lsh : this.directoryServers.values())
      for (DataServerHandler lsh : this.connectedDSs.values())
      {
        monitorMsg.setServerState(lsh.getServerId(),
            lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
      }
      // Same for the connected RS
      for (ReplicationServerHandler rsh : this.replicationServers.values())
      for (ReplicationServerHandler rsh : this.connectedRSs.values())
      {
        monitorMsg.setServerState(rsh.getServerId(),
            rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
@@ -1891,7 +1870,7 @@
   */
  public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
  {
    for (DataServerHandler handler : directoryServers.values())
    for (DataServerHandler handler : connectedDSs.values())
    {
      if (notThisOne == null || handler != notThisOne)
        // All except passed one
@@ -1932,7 +1911,7 @@
  public void buildAndSendTopoInfoToRSs()
  {
    TopologyMsg topoMsg = createTopologyMsgForRS();
    for (ReplicationServerHandler handler : replicationServers.values())
    for (ReplicationServerHandler handler : connectedRSs.values())
    {
      for (int i=1; i<=2; i++)
      {
@@ -1976,7 +1955,7 @@
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    // Go through every DSs
    for (DataServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : connectedDSs.values())
    {
      dsInfos.add(serverHandler.toDSInfo());
    }
@@ -2001,7 +1980,7 @@
  {
    // Go through every DSs (except recipient of msg)
    List<DSInfo> dsInfos = new ArrayList<DSInfo>();
    for (DataServerHandler serverHandler : directoryServers.values())
    for (DataServerHandler serverHandler : connectedDSs.values())
    {
      if (serverHandler.getServerId() == destDsId)
      {
@@ -2017,7 +1996,7 @@
    // Go through every peer RSs (and get their connected DSs), also add info
    // for RSs
    for (ReplicationServerHandler serverHandler : replicationServers.values())
    for (ReplicationServerHandler serverHandler : connectedRSs.values())
    {
      rsInfos.add(serverHandler.toRSInfo());
@@ -2150,7 +2129,7 @@
      // If we are the first replication server warned,
      // then forwards the reset message to the remote replication servers
      for (ServerHandler rsHandler : replicationServers.values())
      for (ServerHandler rsHandler : connectedRSs.values())
      {
        try
        {
@@ -2170,7 +2149,7 @@
      // Change status of the connected DSs according to the requested new
      // reference generation id
      for (DataServerHandler dsHandler : directoryServers.values())
      for (DataServerHandler dsHandler : connectedDSs.values())
      {
        try
        {
@@ -2362,8 +2341,7 @@
          // TODO: i18n
          MessageBuilder mb = new MessageBuilder();
          mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
            e.getMessage() + " " +
            stackTraceToSingleLineString(e)));
              e.getMessage() + " " + stackTraceToSingleLineString(e)));
          logError(mb.toMessage());
        }
      }
@@ -2400,10 +2378,10 @@
          + " given local generation Id=" + this.generationId);
    }
    ServerHandler handler = replicationServers.get(serverId);
    ServerHandler handler = connectedRSs.get(serverId);
    if (handler == null)
    {
      handler = directoryServers.get(serverId);
      handler = connectedDSs.get(serverId);
      if (handler == null)
      {
        return false;
@@ -2549,7 +2527,7 @@
            initializePendingMonitorData();
            // Send the monitor requests to the connected replication servers.
            for (ReplicationServerHandler rs : replicationServers.values())
            for (ReplicationServerHandler rs : connectedRSs.values())
            {
              // Add server ID to pending table.
              int serverId = rs.getServerId();
@@ -2657,13 +2635,12 @@
    // So for a given DS connected we can take the state and the max from
    // the DS/state.
    for (ServerHandler ds : directoryServers.values())
    for (ServerHandler ds : connectedDSs.values())
    {
      int serverID = ds.getServerId();
      // the state comes from the state stored in the SH
      ServerState dsState = ds.getServerState()
          .duplicate();
      ServerState dsState = ds.getServerState().duplicate();
      // the max CN sent by that LS also comes from the SH
      ChangeNumber maxcn = dsState.getChangeNumber(serverID);
@@ -2725,46 +2702,44 @@
        pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
        // Store the remote LDAP servers states
        Iterator<Integer> lsidIterator = msg.ldapIterator();
        while (lsidIterator.hasNext())
        Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
        while (dsServerIdIterator.hasNext())
        {
          int sid = lsidIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(sid);
          int dsServerId = dsServerIdIterator.next();
          ServerState dsServerState = msg.getLDAPServerState(dsServerId);
          pendingMonitorData.setMaxCNs(dsServerState);
          pendingMonitorData.setLDAPServerState(sid, dsServerState);
          pendingMonitorData.setFirstMissingDate(sid,
              msg.getLDAPApproxFirstMissingDate(sid));
          pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
          pendingMonitorData.setFirstMissingDate(dsServerId,
              msg.getLDAPApproxFirstMissingDate(dsServerId));
        }
        // Process the latency reported by the remote RSi on its connections
        // to the other RSes
        Iterator<Integer> rsidIterator = msg.rsIterator();
        while (rsidIterator.hasNext())
        Iterator<Integer> rsServerIdIterator = msg.rsIterator();
        while (rsServerIdIterator.hasNext())
        {
          int rsid = rsidIterator.next();
          if (rsid == localReplicationServer.getServerId())
          int rsServerId = rsServerIdIterator.next();
          long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
          if (rsServerId == localReplicationServer.getServerId())
          {
            // this is the latency of the remote RSi regarding the current RS
            // let's update the fmd of my connected LS
            for (ServerHandler connectedlsh : directoryServers
                .values())
            for (DataServerHandler connectedDS : connectedDSs.values())
            {
              int connectedlsid = connectedlsh.getServerId();
              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
              pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd);
              int connectedServerId = connectedDS.getServerId();
              pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
            }
          }
          else
          {
            // this is the latency of the remote RSi regarding another RSj
            // let's update the latency of the LSes connected to RSj
            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
            ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
            if (rsjHdr != null)
            {
              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
              for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
              {
                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
                pendingMonitorData.setFirstMissingDate(remotelsid, newfmd);
                pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
              }
            }
          }
@@ -2773,8 +2748,8 @@
      catch (RuntimeException e)
      {
        // FIXME: do we really expect these???
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
            .getMessage() + stackTraceToSingleLineString(e)));
        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
            e.getMessage() + stackTraceToSingleLineString(e)));
      }
      finally
      {
@@ -2808,7 +2783,7 @@
   */
  public Map<Integer, DataServerHandler> getConnectedDSs()
  {
    return directoryServers;
    return connectedDSs;
  }
  /**
@@ -2817,7 +2792,7 @@
   */
  public Map<Integer, ReplicationServerHandler> getConnectedRSs()
  {
    return replicationServers;
    return connectedRSs;
  }
@@ -3131,9 +3106,8 @@
            result.update(mostRecentDbCN);
          }
        } catch (Exception e) {
          Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
              " " + stackTraceToSingleLineString(e));
          logError(errMessage);
          logError(ERR_WRITER_UNEXPECTED_EXCEPTION
              .get(stackTraceToSingleLineString(e)));
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
      }
@@ -3236,13 +3210,13 @@
  private boolean isServerConnected(int serverId)
  {
    if (directoryServers.containsKey(serverId))
    if (connectedDSs.containsKey(serverId))
    {
      return true;
    }
    // not directly connected
    for (ReplicationServerHandler rsHandler : replicationServers.values())
    for (ReplicationServerHandler rsHandler : connectedRSs.values())
    {
      if (rsHandler.isRemoteLDAPServer(serverId))
      {
@@ -3283,7 +3257,7 @@
      {
        // If we are the first replication server warned,
        // then forwards the message to the remote replication servers
        for (ReplicationServerHandler rsHandler : replicationServers.values())
        for (ReplicationServerHandler rsHandler : connectedRSs.values())
        {
          try
          {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -27,23 +27,17 @@
 */
package org.opends.server.replication;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.TaskMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
import java.io.File;
import java.net.SocketTimeoutException;
import java.util.*;
import org.assertj.core.api.Assertions;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.task.TaskState;
import org.opends.server.core.AddOperation;
import org.opends.server.core.AddOperationBasis;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
@@ -63,6 +57,14 @@
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.TaskMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
 * Tests contained here:
 *
@@ -202,7 +204,7 @@
        "ds-task-initialize-replica-server-id: all");
  }
  // Tests that entries have been written in the db
  /** Tests that entries have been written in the db */
  private void testEntriesInDb()
  {
    log("TestEntriesInDb");
@@ -260,62 +262,19 @@
   * @param expectedDone The expected number of entries to be processed.
   */
  private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
      long expectedLeft, long expectedDone)
      long expectedLeft, long expectedDone) throws Exception
  {
    log("waitTaskCompleted " + taskEntry.toLDIFString());
    try
    {
      // FIXME - Factorize with TasksTestCase
      // Wait until the task completes.
      int timeout = 2000;
      AttributeType completionTimeType = DirectoryServer.getAttributeType(
          ATTR_TASK_COMPLETION_TIME.toLowerCase());
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      String completionTime = null;
      long startMillisecs = System.currentTimeMillis();
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          // FIXME How is this possible?  Must be issue 858.
          fail("Task entry was not returned from the search.");
          continue;
        }
        completionTime =
          resultEntry.getAttributeValue(completionTimeType,
              DirectoryStringSyntax.DECODER);
        if (completionTime == null)
        {
          if (System.currentTimeMillis() - startMillisecs > 1000*timeout)
          {
            break;
          }
          Thread.sleep(100);
        }
      } while (completionTime == null);
      if (completionTime == null)
      {
        fail("The task had not completed after " + timeout + " seconds.");
      }
      Entry resultEntry = getCompletionTime(taskEntry);
      // Check that the task state is as expected.
      AttributeType taskStateType =
        DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
      String stateString =
        resultEntry.getAttributeValue(taskStateType,
            DirectoryStringSyntax.DECODER);
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
      TaskState taskState = TaskState.fromString(stateString);
      assertEquals(taskState, expectedState,
          "The task completed in an unexpected state");
@@ -323,7 +282,7 @@
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      ArrayList<String> logMessages = new ArrayList<String>();
      List<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
@@ -333,88 +292,94 @@
        fail("No log messages were written to the task entry on a failed task");
      }
      try
      {
        // Check that the task state is as expected.
        taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
        stateString =
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
      // Check that the task state is as expected.
      assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_LEFT,
          expectedLeft, "The number of entries to process is not correct.");
        assertEquals(Long.decode(stateString).longValue(),expectedLeft,
            "The number of entries to process is not correct.");
        // Check that the task state is as expected.
        taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
        stateString =
          resultEntry.getAttributeValue(taskStateType,
              DirectoryStringSyntax.DECODER);
        assertEquals(Long.decode(stateString).longValue(),expectedDone,
            "The number of entries processed is not correct.");
      }
      catch(Exception e)
      {
        fail("Exception"+ e.getMessage()+e.getStackTrace());
      }
      // Check that the task state is as expected.
      assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_DONE,
          expectedDone, "The number of entries processed is not correct.");
    }
    catch(Exception e)
  }
  private Entry getCompletionTime(Entry taskEntry) throws Exception
  {
    // FIXME - Factorize with TasksTestCase
    // Wait until the task completes.
    int timeout = 2000;
    AttributeType completionTimeType = DirectoryServer.getAttributeType(
        ATTR_TASK_COMPLETION_TIME.toLowerCase());
    SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
    long startMillisecs = System.currentTimeMillis();
    do
    {
      fail("Exception"+ e.getMessage()+e.getStackTrace());
      InternalSearchOperation searchOperation = connection.processSearch(
          taskEntry.getDN(), SearchScope.BASE_OBJECT, filter);
      Entry resultEntry = searchOperation.getSearchEntries().getFirst();
      String completionTime = resultEntry.getAttributeValue(
          completionTimeType, DirectoryStringSyntax.DECODER);
      if (completionTime != null)
      {
        return resultEntry;
      }
      if (System.currentTimeMillis() - startMillisecs > 1000 * timeout)
      {
        fail("The task had not completed after " + timeout + " seconds.");
      }
      Thread.sleep(100);
    }
    while (true);
  }
  private void assertAttributeValue(Entry resultEntry, String lowerAttrName,
      long expected, String message) throws DirectoryException
  {
    AttributeType type = DirectoryServer.getAttributeType(lowerAttrName, true);
    String value = resultEntry.getAttributeValue(type, DirectoryStringSyntax.DECODER);
    assertEquals(Long.decode(value).longValue(), expected, message);
  }
  /**
   * Add to the current DB the entries necessary to the test.
   */
  private void addTestEntriesToDB()
  private void addTestEntriesToDB() throws Exception
  {
    try
    for (String ldifEntry : updatedEntries)
    {
      for (String ldifEntry : updatedEntries)
      {
        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
        addTestEntryToDB(entry);
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
      }
      log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
      Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
      addTestEntryToDB(entry);
      // They will be removed at the end of the test
      entryList.addLast(entry.getDN());
    }
    catch(Exception e)
    {
      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
    log("addTestEntriesToDB : " + updatedEntries.length
        + " successfully added to DB");
  }
  private void addTestEntryToDB(Entry entry)
  {
    try
    AddOperation addOp =
        new AddOperationBasis(connection, InternalClientConnection
            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
            entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(),
            entry.getOperationalAttributes());
    addOp.setInternalOperation(true);
    addOp.run();
    if (addOp.getResultCode() != ResultCode.SUCCESS)
    {
        AddOperationBasis addOp = new AddOperationBasis(connection,
            InternalClientConnection.nextOperationID(), InternalClientConnection
            .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
            entry.getUserAttributes(), entry.getOperationalAttributes());
        addOp.setInternalOperation(true);
        addOp.run();
        if (addOp.getResultCode() != ResultCode.SUCCESS)
        {
          log("addEntry: Failed" + addOp.getResultCode());
        }
      log("addEntry: Failed" + addOp.getResultCode());
    }
        // They will be removed at the end of the test
        entryList.addLast(entry.getDN());
    }
    catch(Exception e)
    {
      fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    }
    // They will be removed at the end of the test
    entryList.addLast(entry.getDN());
  }
  /*
  /**
   * Creates entries necessary to the test.
   */
  private String[] newLDIFEntries(int entriesCnt)
@@ -426,8 +391,6 @@
      bigAttributeValue[i] = Integer.toString(i).charAt(0);
    String[] entries = new String[entriesCnt + 2];
    String filler = "000000000000000000000000000000000000";
    entries[0] = "dn: " + EXAMPLE_DN + "\n"
                 + "objectClass: top\n"
                 + "objectClass: domain\n"
@@ -441,6 +404,7 @@
               + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
               + "\n";
    String filler = "000000000000000000000000000000000000";
    for (int i=0; i<entriesCnt; i++)
    {
      String useri="0000"+i;
@@ -472,36 +436,25 @@
  private void makeBrokerPublishEntries(ReplicationBroker broker,
      int senderID, int destinationServerID, int requestorID)
  {
    // Send entries
    try
    RoutableMsg initTargetMessage =
        new InitializeTargetMsg(EXAMPLE_DN, server2ID, destinationServerID,
            requestorID, updatedEntries.length, initWindow);
    broker.publish(initTargetMessage);
    int cnt = 0;
    for (String entry : updatedEntries)
    {
      RoutableMsg initTargetMessage =
        new InitializeTargetMsg(
          EXAMPLE_DN, server2ID, destinationServerID, requestorID,
          updatedEntries.length, initWindow);
      broker.publish(initTargetMessage);
      log("Broker will publish 1 entry: bytes:" + entry.length());
      int cnt = 0;
      for (String entry : updatedEntries)
      {
        log("Broker will publish 1 entry: bytes:"+ entry.length());
        EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
            entry.getBytes(), ++cnt);
        broker.publish(entryMsg);
      }
      DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
      broker.publish(doneMsg);
      log("Broker " + senderID + " published entries");
      EntryMsg entryMsg =
          new EntryMsg(senderID, destinationServerID, entry.getBytes(), ++cnt);
      broker.publish(entryMsg);
    }
    catch(Exception e)
    {
      fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " "
          + stackTraceToSingleLineString(e));
    }
    DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
    broker.publish(doneMsg);
    log("Broker " + senderID + " published entries");
  }
  void receiveUpdatedEntries(ReplicationBroker broker, int serverID,
@@ -565,8 +518,7 @@
    broker.setGenerationID(EMPTY_DN_GENID);
    broker.reStart(true);
    try { Thread.sleep(500); } catch(Exception e) {}
    sleep(500);
  }
  /**
@@ -598,19 +550,18 @@
   * @param changelogId The serverID of the replicationServer to create.
   * @return The new replicationServer.
   */
  private ReplicationServer createChangelogServer(int changelogId, String testCase)
  private ReplicationServer createChangelogServer(int changelogId,
      String testCase) throws Exception
  {
    SortedSet<String> servers = new TreeSet<String>();
    try
    {
      if (changelogId != changelog1ID)
          servers.add("localhost:" + getChangelogPort(changelog1ID));
      if (changelogId != changelog2ID)
          servers.add("localhost:" + getChangelogPort(changelog2ID));
      if (changelogId != changelog3ID)
          servers.add("localhost:" + getChangelogPort(changelog3ID));
    if (changelogId != changelog1ID)
      servers.add("localhost:" + getChangelogPort(changelog1ID));
    if (changelogId != changelog2ID)
      servers.add("localhost:" + getChangelogPort(changelog2ID));
    if (changelogId != changelog3ID)
      servers.add("localhost:" + getChangelogPort(changelog3ID));
      ReplServerFakeConfiguration conf =
    ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(
            getChangelogPort(changelogId),
            "initOnlineTest" + getChangelogPort(changelogId) + testCase + "Db",
@@ -619,16 +570,10 @@
            0,
            100,
            servers);
      ReplicationServer replicationServer = new ReplicationServer(conf);
      Thread.sleep(1000);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    Thread.sleep(1000);
      return replicationServer;
    }
    catch (Exception e)
    {
      fail("createChangelog" + stackTraceToSingleLineString(e));
    }
    return null;
    return replicationServer;
  }
  /**
@@ -636,52 +581,42 @@
   * replication Server ID.
   * @param changelogID
   */
  private void connectServer1ToChangelog(int changelogID)
  private void connectServer1ToChangelog(int changelogID) throws Exception
  {
    connectServer1ToChangelog(changelogID, 0);
  }
  private void connectServer1ToChangelog(int changelogID, int heartbeat)
  private void connectServer1ToChangelog(int changelogID, int heartbeat) throws Exception
  {
    // Connect DS to the replicationServer
    try
    {
      // suffix synchronized
      String testName = "initOnLineTest";
      String synchroServerLdif =
        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
      + "objectClass: top\n"
      + "objectClass: ds-cfg-synchronization-provider\n"
      + "objectClass: ds-cfg-replication-domain\n"
      + "cn: " + testName + "\n"
      + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
      + "ds-cfg-replication-server: localhost:"
      + getChangelogPort(changelogID)+"\n"
      + "ds-cfg-server-id: " + server1ID + "\n"
      + "ds-cfg-receive-status: true\n"
      + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
      + "ds-cfg-window-size: " + WINDOW_SIZE;
    // suffix synchronized
    String testName = "initOnLineTest";
    String synchroServerLdif =
      "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
    + "objectClass: top\n"
    + "objectClass: ds-cfg-synchronization-provider\n"
    + "objectClass: ds-cfg-replication-domain\n"
    + "cn: " + testName + "\n"
    + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
    + "ds-cfg-replication-server: localhost:"
    + getChangelogPort(changelogID)+"\n"
    + "ds-cfg-server-id: " + server1ID + "\n"
    + "ds-cfg-receive-status: true\n"
    + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
    + "ds-cfg-window-size: " + WINDOW_SIZE;
    // Clear the backend
    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
      // Clear the backend
      LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to add the synchronized server");
      configEntryList.add(synchroServerEntry.getDN());
    configEntryList.add(synchroServerEntry.getDN());
      replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
    replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
      assertTrue(!replDomain.ieRunning(),
    assertTrue(!replDomain.ieRunning(),
        "ReplicationDomain: Import/Export is not expected to be running");
    }
    catch(Exception e)
    {
      log("connectServer1ToChangelog", e);
      fail("connectServer1ToChangelog", e);
    }
  }
  private int getChangelogPort(int changelogID) throws Exception
@@ -748,10 +683,6 @@
      testEntriesInDb();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -967,10 +898,6 @@
      testEntriesInDb();
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -1022,10 +949,6 @@
      // createTask(taskInitTargetS2);
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -1094,10 +1017,6 @@
      // createTask(taskInitTargetS2);
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
    } finally
    {
      afterTest(testCase);
@@ -1155,40 +1074,32 @@
      // Check that the list of connected LDAP servers is correct
      // in each replication servers
      List<String> l1 = changelog1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).
        getConnectedLDAPservers();
      assertEquals(l1.size(), 1);
      assertEquals(l1.get(0), String.valueOf(server1ID));
      Set<Integer> l1 = changelog1.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l1).containsExactly(server1ID);
      List<String> l2;
    l2 = changelog2.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l2.size(), 2);
      assertTrue(l2.contains(String.valueOf(server2ID)));
      assertTrue(l2.contains(String.valueOf(server3ID)));
      Set<Integer> l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID, server3ID);
      List<String> l3;
    l3 = changelog3.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l3.size(), 0);
      Set<Integer> l3 = changelog3.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l3).isEmpty();
      // Test updates
      broker3.stop();
      Thread.sleep(1000);
    l2 = changelog2.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l2.size(), 1);
      assertEquals(l2.get(0), String.valueOf(server2ID));
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server2ID);
      broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
        server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
      broker2.stop();
      Thread.sleep(1000);
    l2 = changelog2.getReplicationServerDomain(
        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
      assertEquals(l2.size(), 1);
      assertEquals(l2.get(0), String.valueOf(server3ID));
      l2 = changelog2.getReplicationServerDomain(
          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
      Assertions.assertThat(l2).containsExactly(server3ID);
    // TODO Test ReplicationServerDomain.getDestinationServers method.
@@ -1262,10 +1173,6 @@
      log("Successfully ending " + testCase);
    }
    catch(Exception e)
    {
      log(testCase + e.getLocalizedMessage());
    }
    finally
    {
      afterTest(testCase);
@@ -1567,18 +1474,11 @@
      // in those cases, loop for a while waiting for completion.
      for (int i = 0; i< 10; i++)
      {
        if (replDomain.ieRunning())
        {
          try
          {
            Thread.sleep(500);
          } catch (InterruptedException e)
          { }
        }
        else
        if (!replDomain.ieRunning())
        {
          break;
        }
        sleep(500);
      }
       assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
@@ -1591,14 +1491,14 @@
    if (server2 != null)
    {
      server2.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server2 = null;
    }
    if (server3 != null)
    {
      server3.stop();
      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
      sleep(100); // give some time to the broker to disconnect
      // from the replicationServer.
      server3 = null;
    }
@@ -1639,7 +1539,7 @@
    log("Successfully cleaned " + testCase);
  }
    /**
  /**
   * Clean up the environment.
   *
   * @throws Exception If the environment could not be set up.