From 5d1c7d62d688c64af2627ceb8b1556ef313954ec Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 14 Aug 2013 14:22:52 +0000
Subject: [PATCH] ReplicationServerDomain.java Renamed directoryServers to connectedDSs. Renamed replicationServers to connectedRSs. Removed the useless getConnectedLDAPservers(), replaced with getConnectedDSs(). Renamed a few local variables.

---
 opends/src/server/org/opends/server/replication/server/DataServerHandler.java               |  179 +++++-------
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java         |  172 +++++-------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java |  422 ++++++++++++--------------------
 3 files changed, 311 insertions(+), 462 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
index 2dbda07..8ec6405 100644
--- a/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DataServerHandler.java
@@ -29,7 +29,6 @@
 
 import java.io.IOException;
 import java.util.*;
-import java.util.zip.DataFormatException;
 
 import org.opends.messages.Message;
 import org.opends.server.replication.common.*;
@@ -39,6 +38,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.common.ServerStatus.*;
 import static org.opends.server.replication.common.StatusMachine.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 
@@ -92,76 +92,10 @@
    */
   public void changeStatusForResetGenId(long newGenId) throws IOException
   {
-    final int localRsServerId = replicationServer.getServerId();
-
-    StatusMachineEvent event;
-    if (newGenId == -1)
+    StatusMachineEvent event = getStatusMachineEvent(newGenId);
+    if (event == null)
     {
-      // The generation id is being made invalid, let's put the DS
-      // into BAD_GEN_ID_STATUS
-      event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
-    } else
-    {
-      if (newGenId == generationId)
-      {
-        if (status == ServerStatus.BAD_GEN_ID_STATUS)
-        {
-          // This server has the good new reference generation id.
-          // Close connection with him to force his reconnection: DS will
-          // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
-
-          if (debugEnabled())
-          {
-            TRACER.debugInfo(
-                "In RS " + localRsServerId +
-                ", closing connection to DS " + getServerId() +
-                " for baseDn " + getBaseDN() +
-                " to force reconnection as new local" +
-                " generationId and remote one match and DS is in bad gen id: " +
-                newGenId);
-          }
-
-          // Connection closure must not be done calling RSD.stopHandler() as it
-          // would rewait the RSD lock that we already must have entering this
-          // method. This would lead to a reentrant lock which we do not want.
-          // So simply close the session, this will make the hang up appear
-          // after the reader thread that took the RSD lock releases it.
-          if (session != null
-              // V4 protocol introduced a StopMsg to properly close the
-              // connection between servers
-             && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-          {
-            try
-            {
-              session.publish(new StopMsg());
-            }
-            catch (IOException ioe)
-            {
-              // Anyway, going to close session, so nothing to do
-            }
-          }
-
-          // NOT_CONNECTED_STATUS is the last one in RS session life: handler
-          // will soon disappear after this method call...
-          status = ServerStatus.NOT_CONNECTED_STATUS;
-          return;
-        } else
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugInfo("In RS " + localRsServerId + ". DS "
-                + getServerId() + " for baseDn " + getBaseDN()
-                + " has already generation id " + newGenId
-                + " so no ChangeStatusMsg sent to him.");
-          }
-          return;
-        }
-      } else
-      {
-        // This server has a bad generation id compared to new reference one,
-        // let's put it into BAD_GEN_ID_STATUS
-        event = StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
-      }
+      return;
     }
 
     if (event == StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT
@@ -170,7 +104,7 @@
       // Prevent useless error message (full update status cannot lead to bad
       // gen status)
       Message message = NOTE_BAD_GEN_ID_IN_FULL_UPDATE.get(
-              Integer.toString(localRsServerId),
+              Integer.toString(replicationServer.getServerId()),
               getBaseDN(),
               Integer.toString(serverId),
               Long.toString(generationId),
@@ -179,30 +113,73 @@
       return;
     }
 
-    ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
+    changeStatus(event, "for reset gen id");
+  }
 
-    if (newStatus == ServerStatus.INVALID_STATUS)
+  private StatusMachineEvent getStatusMachineEvent(long newGenId)
+  {
+    if (newGenId == -1)
     {
-      Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
-          Integer.toString(serverId), status.toString(), event.toString());
-      logError(msg);
-      return;
+      // The generation id is being made invalid, let's put the DS
+      // into BAD_GEN_ID_STATUS
+      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
+    }
+    if (newGenId != generationId)
+    {
+      // This server has a bad generation id compared to new reference one,
+      // let's put it into BAD_GEN_ID_STATUS
+      return StatusMachineEvent.TO_BAD_GEN_ID_STATUS_EVENT;
     }
 
-    // Send message requesting to change the DS status
-    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
-        ServerStatus.INVALID_STATUS);
+    if (status != ServerStatus.BAD_GEN_ID_STATUS)
+    {
+      if (debugEnabled())
+      {
+        TRACER.debugInfo("In RS " + replicationServer.getServerId()
+            + ", DS " + getServerId() + " for baseDn " + getBaseDN()
+            + " has already generation id " + newGenId
+            + " so no ChangeStatusMsg sent to him.");
+      }
+      return null;
+    }
+
+    // This server has the good new reference generation id.
+    // Close connection with him to force his reconnection: DS will
+    // reconnect in NORMAL_STATUS or DEGRADED_STATUS.
 
     if (debugEnabled())
     {
-      TRACER.debugInfo("In RS " + localRsServerId
-          + " Sending change status for reset gen id to " + getServerId()
-          + " for baseDn " + getBaseDN() + ":\n" + csMsg);
+      TRACER.debugInfo("In RS " + replicationServer.getServerId()
+          + ", closing connection to DS " + getServerId() + " for baseDn "
+          + getBaseDN() + " to force reconnection as new local"
+          + " generationId and remote one match and DS is in bad gen id: "
+          + newGenId);
     }
 
-    session.publish(csMsg);
+    // Connection closure must not be done calling RSD.stopHandler() as it
+    // would rewait the RSD lock that we already must have entering this
+    // method. This would lead to a reentrant lock which we do not want.
+    // So simply close the session, this will make the hang up appear
+    // after the reader thread that took the RSD lock releases it.
+    if (session != null
+        // V4 protocol introduced a StopMsg to properly close the
+        // connection between servers
+        && getProtocolVersion() >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+    {
+      try
+      {
+        session.publish(new StopMsg());
+      }
+      catch (IOException ioe)
+      {
+        // Anyway, going to close session, so nothing to do
+      }
+    }
 
-    status = newStatus;
+    // NOT_CONNECTED_STATUS is the last one in RS session life: handler
+    // will soon disappear after this method call...
+    status = ServerStatus.NOT_CONNECTED_STATUS;
+    return null;
   }
 
   /**
@@ -215,6 +192,12 @@
   public ServerStatus changeStatusFromStatusAnalyzer(StatusMachineEvent event)
   throws IOException
   {
+    return changeStatus(event, "from status analyzer");
+  }
+
+  private ServerStatus changeStatus(StatusMachineEvent event, String origin)
+      throws IOException
+  {
     // Check state machine allows this new status (Sanity check)
     ServerStatus newStatus = StatusMachine.computeNewStatus(status, event);
     if (newStatus == ServerStatus.INVALID_STATUS)
@@ -222,22 +205,20 @@
       Message msg = ERR_RS_CANNOT_CHANGE_STATUS.get(getBaseDN(),
           Integer.toString(serverId), status.toString(), event.toString());
       logError(msg);
-      // Status analyzer must only change from NORMAL_STATUS to DEGRADED_STATUS
-      // and vice versa. We may are being trying to change the status while for
-      // instance another status has just been entered: e.g a full update has
-      // just been engaged. In that case, just ignore attempt to change the
-      // status
+      // Only change allowed is from NORMAL_STATUS to DEGRADED_STATUS and vice
+      // versa. We may be trying to change the status while another status has
+      // just been entered: e.g a full update has just been engaged.
+      // In that case, just ignore attempt to change the status
       return newStatus;
     }
 
     // Send message requesting to change the DS status
-    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus,
-        ServerStatus.INVALID_STATUS);
+    ChangeStatusMsg csMsg = new ChangeStatusMsg(newStatus, INVALID_STATUS);
 
     if (debugEnabled())
     {
       TRACER.debugInfo("In RS " + replicationServer.getServerId()
-          + " Sending change status from status analyzer to " + getServerId()
+          + " Sending change status " + origin + " to " + getServerId()
           + " for baseDn " + getBaseDN() + ":\n" + csMsg);
     }
 
@@ -589,7 +570,7 @@
           localGenerationId, sslEncryption, getLocalGroupId(),
           replicationServer.getDegradedStatusThreshold(),
           replicationServer.getWeight(),
-          replicationServerDomain.getConnectedLDAPservers().size());
+          replicationServerDomain.getConnectedDSs().size());
     }
 
     send(startMsg);
@@ -626,16 +607,10 @@
    * receiving a StopMsg to properly stop the handshake procedure.
    * @return the startSessionMsg received or null DS sent a stop message to
    *         not finish the handshake.
-   * @throws DirectoryException
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws DataFormatException
-   * @throws NotSupportedOldVersionPDUException
+   * @throws Exception
    */
   private StartSessionMsg waitAndProcessStartSessionFromRemoteDS()
-  throws DirectoryException, IOException, ClassNotFoundException,
-  DataFormatException,
-  NotSupportedOldVersionPDUException
+      throws Exception
   {
     ReplicationMsg msg = session.receive();
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 940192b..31156f3 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -92,7 +92,7 @@
    * we are currently publishing the first update in the balanced tree is the
    * next change that we must push to this particular server.
    */
-  private final Map<Integer, DataServerHandler> directoryServers =
+  private final Map<Integer, DataServerHandler> connectedDSs =
     new ConcurrentHashMap<Integer, DataServerHandler>();
 
   /**
@@ -101,7 +101,7 @@
    * in the balanced tree is the next change that we must push to this
    * particular server.
    */
-  private final Map<Integer, ReplicationServerHandler> replicationServers =
+  private final Map<Integer, ReplicationServerHandler> connectedRSs =
     new ConcurrentHashMap<Integer, ReplicationServerHandler>();
 
   private final Queue<MessageHandler> otherHandlers =
@@ -358,12 +358,10 @@
      */
     NotAssuredUpdateMsg notAssuredUpdate = null;
 
-    /*
-     * Push the message to the replication servers
-     */
+    // Push the message to the replication servers
     if (sourceHandler.isDataServer())
     {
-      for (ReplicationServerHandler handler : replicationServers.values())
+      for (ReplicationServerHandler handler : connectedRSs.values())
       {
         /**
          * Ignore updates to RS with bad gen id
@@ -392,10 +390,8 @@
       }
     }
 
-    /*
-     * Push the message to the LDAP servers
-     */
-    for (DataServerHandler handler : directoryServers.values())
+    // Push the message to the LDAP servers
+    for (DataServerHandler handler : connectedDSs.values())
     {
       // Don't forward the change to the server that just sent it
       if (handler == sourceHandler)
@@ -576,7 +572,7 @@
       }
 
       // Look for DS eligible for assured
-      for (DataServerHandler handler : directoryServers.values())
+      for (DataServerHandler handler : connectedDSs.values())
       {
         // Don't forward the change to the server that just sent it
         if (handler == sourceHandler)
@@ -735,7 +731,7 @@
   private void collectRSsEligibleForAssuredReplication(byte groupId,
       List<Integer> expectedServers)
   {
-    for (ReplicationServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : connectedRSs.values())
     {
       if (handler.getGroupId() == groupId
       // No ack expected from a RS with different group id
@@ -908,9 +904,8 @@
           List<Integer> serversInTimeout = expectedAcksInfo.getTimeoutServers();
           for (Integer serverId : serversInTimeout)
           {
-            ServerHandler expectedDSInTimeout = directoryServers.get(serverId);
-            ServerHandler expectedRSInTimeout =
-                replicationServers.get(serverId);
+            ServerHandler expectedDSInTimeout = connectedDSs.get(serverId);
+            ServerHandler expectedRSInTimeout = connectedRSs.get(serverId);
             if (expectedDSInTimeout != null)
             {
               if (safeRead)
@@ -950,7 +945,7 @@
    */
   public void stopReplicationServers(Collection<String> replServers)
   {
-    for (ReplicationServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : connectedRSs.values())
     {
       if (replServers.contains(handler.getServerAddressURL()))
       {
@@ -968,13 +963,13 @@
   public void stopAllServers(boolean shutdown)
   {
     // Close session with other replication servers
-    for (ReplicationServerHandler serverHandler : replicationServers.values())
+    for (ReplicationServerHandler serverHandler : connectedRSs.values())
     {
       stopServer(serverHandler, shutdown);
     }
 
     // Close session with other LDAP servers
-    for (DataServerHandler serverHandler : directoryServers.values())
+    for (DataServerHandler serverHandler : connectedDSs.values())
     {
       stopServer(serverHandler, shutdown);
     }
@@ -988,12 +983,12 @@
    */
   public boolean checkForDuplicateDS(DataServerHandler handler)
   {
-    if (directoryServers.containsKey(handler.getServerId()))
+    if (connectedDSs.containsKey(handler.getServerId()))
     {
       // looks like two connected LDAP servers have the same serverId
       Message message = ERR_DUPLICATE_SERVER_ID.get(
           localReplicationServer.getMonitorInstanceName(),
-          directoryServers.get(handler.getServerId()).toString(),
+          connectedDSs.get(handler.getServerId()).toString(),
           handler.toString(), handler.getServerId());
       logError(message);
       return false;
@@ -1047,7 +1042,7 @@
       try
       {
         // Stop useless monitoring publisher if no more RS or DS in domain
-        if ( (directoryServers.size() + replicationServers.size() )== 1)
+        if ( (connectedDSs.size() + connectedRSs.size() )== 1)
         {
           if (debugEnabled())
           {
@@ -1062,7 +1057,7 @@
 
         if (handler.isReplicationServer())
         {
-          if (replicationServers.containsKey(handler.getServerId()))
+          if (connectedRSs.containsKey(handler.getServerId()))
           {
             unregisterServerHandler(handler);
             handler.shutdown();
@@ -1076,11 +1071,11 @@
               buildAndSendTopoInfoToDSs(null);
             }
           }
-        } else if (directoryServers.containsKey(handler.getServerId()))
+        } else if (connectedDSs.containsKey(handler.getServerId()))
         {
           // If this is the last DS for the domain,
           // shutdown the status analyzer
-          if (directoryServers.size() == 1)
+          if (connectedDSs.size() == 1)
           {
             if (debugEnabled())
             {
@@ -1193,11 +1188,11 @@
   {
     if (handler.isReplicationServer())
     {
-      replicationServers.remove(handler.getServerId());
+      connectedRSs.remove(handler.getServerId());
     }
     else
     {
-      directoryServers.remove(handler.getServerId());
+      connectedDSs.remove(handler.getServerId());
     }
   }
 
@@ -1224,9 +1219,9 @@
     // topology and the generationId has never been saved, then we can reset
     // it and the next LDAP server to connect will become the new reference.
     boolean lDAPServersConnectedInTheTopology = false;
-    if (directoryServers.isEmpty())
+    if (connectedDSs.isEmpty())
     {
-      for (ReplicationServerHandler rsh : replicationServers.values())
+      for (ReplicationServerHandler rsh : connectedRSs.values())
       {
         if (generationId != rsh.getGenerationId())
         {
@@ -1283,7 +1278,7 @@
   throws DirectoryException
   {
     ReplicationServerHandler oldHandler =
-      replicationServers.get(handler.getServerId());
+      connectedRSs.get(handler.getServerId());
     if (oldHandler != null)
     {
       if (oldHandler.getServerAddressURL().equals(
@@ -1339,7 +1334,7 @@
   public Set<String> getChangelogs()
   {
     Set<String> results = new LinkedHashSet<String>();
-    for (ReplicationServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : connectedRSs.values())
     {
       results.add(handler.getServerAddressURL());
     }
@@ -1358,22 +1353,6 @@
   }
 
   /**
-   * Returns as a set of String the list of LDAP servers connected to us.
-   * Each string is the serverID of a connected LDAP server.
-   *
-   * @return The set of connected LDAP servers
-   */
-  public List<String> getConnectedLDAPservers()
-  {
-    List<String> results = new ArrayList<String>(0);
-    for (DataServerHandler handler : directoryServers.values())
-    {
-      results.add(String.valueOf(handler.getServerId()));
-    }
-    return results;
-  }
-
-  /**
    * Creates and returns an iterator.
    * When the iterator is not used anymore, the caller MUST call the
    * ReplicationIterator.releaseCursor() method to free the resources
@@ -1493,7 +1472,7 @@
       {
         // Send to all replication servers with a least one remote
         // server connected
-        for (ReplicationServerHandler rsh : replicationServers.values())
+        for (ReplicationServerHandler rsh : connectedRSs.values())
         {
           if (rsh.hasRemoteLDAPServers())
           {
@@ -1503,7 +1482,7 @@
       }
 
       // Sends to all connected LDAP servers
-      for (DataServerHandler destinationHandler : directoryServers.values())
+      for (DataServerHandler destinationHandler : connectedDSs.values())
       {
         // Don't loop on the sender
         if (destinationHandler == senderHandler)
@@ -1516,7 +1495,7 @@
     {
       // Destination is one server
       DataServerHandler destinationHandler =
-        directoryServers.get(msg.getDestination());
+        connectedDSs.get(msg.getDestination());
       if (destinationHandler != null)
       {
         servers.add(destinationHandler);
@@ -1527,13 +1506,13 @@
         // have the targeted server connected.
         if (senderHandler.isDataServer())
         {
-          for (ReplicationServerHandler h : replicationServers.values())
+          for (ReplicationServerHandler rsHandler : connectedRSs.values())
           {
             // Send to all replication servers with a least one remote
             // server connected
-            if (h.isRemoteLDAPServer(msg.getDestination()))
+            if (rsHandler.isRemoteLDAPServer(msg.getDestination()))
             {
-              servers.add(h);
+              servers.add(rsHandler);
             }
           }
         }
@@ -1808,14 +1787,14 @@
       // from the states stored in the serverHandler.
       // - the server state
       // - the older missing change
-      for (DataServerHandler lsh : this.directoryServers.values())
+      for (DataServerHandler lsh : this.connectedDSs.values())
       {
         monitorMsg.setServerState(lsh.getServerId(),
             lsh.getServerState(), lsh.getApproxFirstMissingDate(), true);
       }
 
       // Same for the connected RS
-      for (ReplicationServerHandler rsh : this.replicationServers.values())
+      for (ReplicationServerHandler rsh : this.connectedRSs.values())
       {
         monitorMsg.setServerState(rsh.getServerId(),
             rsh.getServerState(), rsh.getApproxFirstMissingDate(), false);
@@ -1891,7 +1870,7 @@
    */
   public void buildAndSendTopoInfoToDSs(ServerHandler notThisOne)
   {
-    for (DataServerHandler handler : directoryServers.values())
+    for (DataServerHandler handler : connectedDSs.values())
     {
       if (notThisOne == null || handler != notThisOne)
         // All except passed one
@@ -1932,7 +1911,7 @@
   public void buildAndSendTopoInfoToRSs()
   {
     TopologyMsg topoMsg = createTopologyMsgForRS();
-    for (ReplicationServerHandler handler : replicationServers.values())
+    for (ReplicationServerHandler handler : connectedRSs.values())
     {
       for (int i=1; i<=2; i++)
       {
@@ -1976,7 +1955,7 @@
     List<DSInfo> dsInfos = new ArrayList<DSInfo>();
 
     // Go through every DSs
-    for (DataServerHandler serverHandler : directoryServers.values())
+    for (DataServerHandler serverHandler : connectedDSs.values())
     {
       dsInfos.add(serverHandler.toDSInfo());
     }
@@ -2001,7 +1980,7 @@
   {
     // Go through every DSs (except recipient of msg)
     List<DSInfo> dsInfos = new ArrayList<DSInfo>();
-    for (DataServerHandler serverHandler : directoryServers.values())
+    for (DataServerHandler serverHandler : connectedDSs.values())
     {
       if (serverHandler.getServerId() == destDsId)
       {
@@ -2017,7 +1996,7 @@
 
     // Go through every peer RSs (and get their connected DSs), also add info
     // for RSs
-    for (ReplicationServerHandler serverHandler : replicationServers.values())
+    for (ReplicationServerHandler serverHandler : connectedRSs.values())
     {
       rsInfos.add(serverHandler.toRSInfo());
 
@@ -2150,7 +2129,7 @@
 
       // If we are the first replication server warned,
       // then forwards the reset message to the remote replication servers
-      for (ServerHandler rsHandler : replicationServers.values())
+      for (ServerHandler rsHandler : connectedRSs.values())
       {
         try
         {
@@ -2170,7 +2149,7 @@
 
       // Change status of the connected DSs according to the requested new
       // reference generation id
-      for (DataServerHandler dsHandler : directoryServers.values())
+      for (DataServerHandler dsHandler : connectedDSs.values())
       {
         try
         {
@@ -2362,8 +2341,7 @@
           // TODO: i18n
           MessageBuilder mb = new MessageBuilder();
           mb.append(ERR_ERROR_CLEARING_DB.get(dbHandler.toString(),
-            e.getMessage() + " " +
-            stackTraceToSingleLineString(e)));
+              e.getMessage() + " " + stackTraceToSingleLineString(e)));
           logError(mb.toMessage());
         }
       }
@@ -2400,10 +2378,10 @@
           + " given local generation Id=" + this.generationId);
     }
 
-    ServerHandler handler = replicationServers.get(serverId);
+    ServerHandler handler = connectedRSs.get(serverId);
     if (handler == null)
     {
-      handler = directoryServers.get(serverId);
+      handler = connectedDSs.get(serverId);
       if (handler == null)
       {
         return false;
@@ -2549,7 +2527,7 @@
             initializePendingMonitorData();
 
             // Send the monitor requests to the connected replication servers.
-            for (ReplicationServerHandler rs : replicationServers.values())
+            for (ReplicationServerHandler rs : connectedRSs.values())
             {
               // Add server ID to pending table.
               int serverId = rs.getServerId();
@@ -2657,13 +2635,12 @@
     // So for a given DS connected we can take the state and the max from
     // the DS/state.
 
-    for (ServerHandler ds : directoryServers.values())
+    for (ServerHandler ds : connectedDSs.values())
     {
       int serverID = ds.getServerId();
 
       // the state comes from the state stored in the SH
-      ServerState dsState = ds.getServerState()
-          .duplicate();
+      ServerState dsState = ds.getServerState().duplicate();
 
       // the max CN sent by that LS also comes from the SH
       ChangeNumber maxcn = dsState.getChangeNumber(serverID);
@@ -2725,46 +2702,44 @@
         pendingMonitorData.setRSState(msg.getSenderID(), replServerState);
 
         // Store the remote LDAP servers states
-        Iterator<Integer> lsidIterator = msg.ldapIterator();
-        while (lsidIterator.hasNext())
+        Iterator<Integer> dsServerIdIterator = msg.ldapIterator();
+        while (dsServerIdIterator.hasNext())
         {
-          int sid = lsidIterator.next();
-          ServerState dsServerState = msg.getLDAPServerState(sid);
+          int dsServerId = dsServerIdIterator.next();
+          ServerState dsServerState = msg.getLDAPServerState(dsServerId);
           pendingMonitorData.setMaxCNs(dsServerState);
-          pendingMonitorData.setLDAPServerState(sid, dsServerState);
-          pendingMonitorData.setFirstMissingDate(sid,
-              msg.getLDAPApproxFirstMissingDate(sid));
+          pendingMonitorData.setLDAPServerState(dsServerId, dsServerState);
+          pendingMonitorData.setFirstMissingDate(dsServerId,
+              msg.getLDAPApproxFirstMissingDate(dsServerId));
         }
 
         // Process the latency reported by the remote RSi on its connections
         // to the other RSes
-        Iterator<Integer> rsidIterator = msg.rsIterator();
-        while (rsidIterator.hasNext())
+        Iterator<Integer> rsServerIdIterator = msg.rsIterator();
+        while (rsServerIdIterator.hasNext())
         {
-          int rsid = rsidIterator.next();
-          if (rsid == localReplicationServer.getServerId())
+          int rsServerId = rsServerIdIterator.next();
+          long newFmd = msg.getRSApproxFirstMissingDate(rsServerId);
+          if (rsServerId == localReplicationServer.getServerId())
           {
             // this is the latency of the remote RSi regarding the current RS
             // let's update the fmd of my connected LS
-            for (ServerHandler connectedlsh : directoryServers
-                .values())
+            for (DataServerHandler connectedDS : connectedDSs.values())
             {
-              int connectedlsid = connectedlsh.getServerId();
-              Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
-              pendingMonitorData.setFirstMissingDate(connectedlsid, newfmd);
+              int connectedServerId = connectedDS.getServerId();
+              pendingMonitorData.setFirstMissingDate(connectedServerId, newFmd);
             }
           }
           else
           {
             // this is the latency of the remote RSi regarding another RSj
             // let's update the latency of the LSes connected to RSj
-            ReplicationServerHandler rsjHdr = replicationServers.get(rsid);
+            ReplicationServerHandler rsjHdr = connectedRSs.get(rsServerId);
             if (rsjHdr != null)
             {
-              for (int remotelsid : rsjHdr.getConnectedDirectoryServerIds())
+              for (int remoteServerId : rsjHdr.getConnectedDirectoryServerIds())
               {
-                Long newfmd = msg.getRSApproxFirstMissingDate(rsid);
-                pendingMonitorData.setFirstMissingDate(remotelsid, newfmd);
+                pendingMonitorData.setFirstMissingDate(remoteServerId, newFmd);
               }
             }
           }
@@ -2773,8 +2748,8 @@
       catch (RuntimeException e)
       {
         // FIXME: do we really expect these???
-        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(e
-            .getMessage() + stackTraceToSingleLineString(e)));
+        logError(ERR_PROCESSING_REMOTE_MONITOR_DATA.get(
+            e.getMessage() + stackTraceToSingleLineString(e)));
       }
       finally
       {
@@ -2808,7 +2783,7 @@
    */
   public Map<Integer, DataServerHandler> getConnectedDSs()
   {
-    return directoryServers;
+    return connectedDSs;
   }
 
   /**
@@ -2817,7 +2792,7 @@
    */
   public Map<Integer, ReplicationServerHandler> getConnectedRSs()
   {
-    return replicationServers;
+    return connectedRSs;
   }
 
 
@@ -3131,9 +3106,8 @@
             result.update(mostRecentDbCN);
           }
         } catch (Exception e) {
-          Message errMessage = ERR_WRITER_UNEXPECTED_EXCEPTION.get(
-              " " + stackTraceToSingleLineString(e));
-          logError(errMessage);
+          logError(ERR_WRITER_UNEXPECTED_EXCEPTION
+              .get(stackTraceToSingleLineString(e)));
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
         }
       }
@@ -3236,13 +3210,13 @@
 
   private boolean isServerConnected(int serverId)
   {
-    if (directoryServers.containsKey(serverId))
+    if (connectedDSs.containsKey(serverId))
     {
       return true;
     }
 
     // not directly connected
-    for (ReplicationServerHandler rsHandler : replicationServers.values())
+    for (ReplicationServerHandler rsHandler : connectedRSs.values())
     {
       if (rsHandler.isRemoteLDAPServer(serverId))
       {
@@ -3283,7 +3257,7 @@
       {
         // If we are the first replication server warned,
         // then forwards the message to the remote replication servers
-        for (ReplicationServerHandler rsHandler : replicationServers.values())
+        for (ReplicationServerHandler rsHandler : connectedRSs.values())
         {
           try
           {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
index 68601a4..47976f4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/InitOnLineTest.java
@@ -27,23 +27,17 @@
  */
 package org.opends.server.replication;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.messages.TaskMessages.*;
-import static org.opends.server.config.ConfigConstants.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-import static org.testng.Assert.*;
-
 import java.io.File;
 import java.net.SocketTimeoutException;
 import java.util.*;
 
+import org.assertj.core.api.Assertions;
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
 import org.opends.server.TestCaseUtils;
 import org.opends.server.backends.task.TaskState;
+import org.opends.server.core.AddOperation;
 import org.opends.server.core.AddOperationBasis;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.loggers.debug.DebugTracer;
@@ -63,6 +57,14 @@
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.messages.TaskMessages.*;
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
+
 /**
  * Tests contained here:
  *
@@ -202,7 +204,7 @@
         "ds-task-initialize-replica-server-id: all");
   }
 
-  // Tests that entries have been written in the db
+  /** Tests that entries have been written in the db */
   private void testEntriesInDb()
   {
     log("TestEntriesInDb");
@@ -260,62 +262,19 @@
    * @param expectedDone The expected number of entries to be processed.
    */
   private void waitTaskCompleted(Entry taskEntry, TaskState expectedState,
-      long expectedLeft, long expectedDone)
+      long expectedLeft, long expectedDone) throws Exception
   {
     log("waitTaskCompleted " + taskEntry.toLDIFString());
-    try
+
     {
-      // FIXME - Factorize with TasksTestCase
-      // Wait until the task completes.
-      int timeout = 2000;
-
-      AttributeType completionTimeType = DirectoryServer.getAttributeType(
-          ATTR_TASK_COMPLETION_TIME.toLowerCase());
-      SearchFilter filter =
-        SearchFilter.createFilterFromString("(objectclass=*)");
-      Entry resultEntry = null;
-      String completionTime = null;
-      long startMillisecs = System.currentTimeMillis();
-      do
-      {
-        InternalSearchOperation searchOperation =
-          connection.processSearch(taskEntry.getDN(),
-              SearchScope.BASE_OBJECT,
-              filter);
-        try
-        {
-          resultEntry = searchOperation.getSearchEntries().getFirst();
-        } catch (Exception e)
-        {
-          // FIXME How is this possible?  Must be issue 858.
-          fail("Task entry was not returned from the search.");
-          continue;
-        }
-        completionTime =
-          resultEntry.getAttributeValue(completionTimeType,
-              DirectoryStringSyntax.DECODER);
-
-        if (completionTime == null)
-        {
-          if (System.currentTimeMillis() - startMillisecs > 1000*timeout)
-          {
-            break;
-          }
-          Thread.sleep(100);
-        }
-      } while (completionTime == null);
-
-      if (completionTime == null)
-      {
-        fail("The task had not completed after " + timeout + " seconds.");
-      }
+      Entry resultEntry = getCompletionTime(taskEntry);
 
       // Check that the task state is as expected.
       AttributeType taskStateType =
-        DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
+          DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
       String stateString =
-        resultEntry.getAttributeValue(taskStateType,
-            DirectoryStringSyntax.DECODER);
+          resultEntry.getAttributeValue(taskStateType,
+              DirectoryStringSyntax.DECODER);
       TaskState taskState = TaskState.fromString(stateString);
       assertEquals(taskState, expectedState,
           "The task completed in an unexpected state");
@@ -323,7 +282,7 @@
       // Check that the task contains some log messages.
       AttributeType logMessagesType = DirectoryServer.getAttributeType(
           ATTR_TASK_LOG_MESSAGES.toLowerCase());
-      ArrayList<String> logMessages = new ArrayList<String>();
+      List<String> logMessages = new ArrayList<String>();
       resultEntry.getAttributeValues(logMessagesType,
           DirectoryStringSyntax.DECODER,
           logMessages);
@@ -333,88 +292,94 @@
         fail("No log messages were written to the task entry on a failed task");
       }
 
-      try
-      {
-        // Check that the task state is as expected.
-        taskStateType =
-          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_LEFT, true);
-        stateString =
-          resultEntry.getAttributeValue(taskStateType,
-              DirectoryStringSyntax.DECODER);
+      // Check that the task state is as expected.
+      assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_LEFT,
+          expectedLeft, "The number of entries to process is not correct.");
 
-        assertEquals(Long.decode(stateString).longValue(),expectedLeft,
-            "The number of entries to process is not correct.");
-
-        // Check that the task state is as expected.
-        taskStateType =
-          DirectoryServer.getAttributeType(ATTR_TASK_INITIALIZE_DONE, true);
-        stateString =
-          resultEntry.getAttributeValue(taskStateType,
-              DirectoryStringSyntax.DECODER);
-
-        assertEquals(Long.decode(stateString).longValue(),expectedDone,
-            "The number of entries processed is not correct.");
-
-      }
-      catch(Exception e)
-      {
-        fail("Exception"+ e.getMessage()+e.getStackTrace());
-      }
-
+      // Check that the task state is as expected.
+      assertAttributeValue(resultEntry, ATTR_TASK_INITIALIZE_DONE,
+          expectedDone, "The number of entries processed is not correct.");
     }
-    catch(Exception e)
+  }
+
+  private Entry getCompletionTime(Entry taskEntry) throws Exception
+  {
+    // FIXME - Factorize with TasksTestCase
+    // Wait until the task completes.
+    int timeout = 2000;
+
+    AttributeType completionTimeType = DirectoryServer.getAttributeType(
+        ATTR_TASK_COMPLETION_TIME.toLowerCase());
+    SearchFilter filter =
+        SearchFilter.createFilterFromString("(objectclass=*)");
+
+    long startMillisecs = System.currentTimeMillis();
+    do
     {
-      fail("Exception"+ e.getMessage()+e.getStackTrace());
+      InternalSearchOperation searchOperation = connection.processSearch(
+          taskEntry.getDN(), SearchScope.BASE_OBJECT, filter);
+      Entry resultEntry = searchOperation.getSearchEntries().getFirst();
+
+      String completionTime = resultEntry.getAttributeValue(
+          completionTimeType, DirectoryStringSyntax.DECODER);
+
+      if (completionTime != null)
+      {
+        return resultEntry;
+      }
+
+      if (System.currentTimeMillis() - startMillisecs > 1000 * timeout)
+      {
+        fail("The task had not completed after " + timeout + " seconds.");
+      }
+      Thread.sleep(100);
     }
+    while (true);
+  }
+
+  private void assertAttributeValue(Entry resultEntry, String lowerAttrName,
+      long expected, String message) throws DirectoryException
+  {
+    AttributeType type = DirectoryServer.getAttributeType(lowerAttrName, true);
+    String value = resultEntry.getAttributeValue(type, DirectoryStringSyntax.DECODER);
+    assertEquals(Long.decode(value).longValue(), expected, message);
   }
 
   /**
    * Add to the current DB the entries necessary to the test.
    */
-  private void addTestEntriesToDB()
+  private void addTestEntriesToDB() throws Exception
   {
-    try
+    for (String ldifEntry : updatedEntries)
     {
-      for (String ldifEntry : updatedEntries)
-      {
-        Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
-        addTestEntryToDB(entry);
-        // They will be removed at the end of the test
-        entryList.addLast(entry.getDN());
-      }
-      log("addTestEntriesToDB : " + updatedEntries.length + " successfully added to DB");
+      Entry entry = TestCaseUtils.entryFromLdifString(ldifEntry);
+      addTestEntryToDB(entry);
+      // They will be removed at the end of the test
+      entryList.addLast(entry.getDN());
     }
-    catch(Exception e)
-    {
-      fail("addEntries Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
-    }
+    log("addTestEntriesToDB : " + updatedEntries.length
+        + " successfully added to DB");
   }
 
   private void addTestEntryToDB(Entry entry)
   {
-    try
+    AddOperation addOp =
+        new AddOperationBasis(connection, InternalClientConnection
+            .nextOperationID(), InternalClientConnection.nextMessageID(), null,
+            entry.getDN(), entry.getObjectClasses(), entry.getUserAttributes(),
+            entry.getOperationalAttributes());
+    addOp.setInternalOperation(true);
+    addOp.run();
+    if (addOp.getResultCode() != ResultCode.SUCCESS)
     {
-        AddOperationBasis addOp = new AddOperationBasis(connection,
-            InternalClientConnection.nextOperationID(), InternalClientConnection
-            .nextMessageID(), null, entry.getDN(), entry.getObjectClasses(),
-            entry.getUserAttributes(), entry.getOperationalAttributes());
-        addOp.setInternalOperation(true);
-        addOp.run();
-        if (addOp.getResultCode() != ResultCode.SUCCESS)
-        {
-          log("addEntry: Failed" + addOp.getResultCode());
-        }
+      log("addEntry: Failed" + addOp.getResultCode());
+    }
 
-        // They will be removed at the end of the test
-        entryList.addLast(entry.getDN());
-    }
-    catch(Exception e)
-    {
-      fail("addTestEntryToDB Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
-    }
+    // They will be removed at the end of the test
+    entryList.addLast(entry.getDN());
   }
 
-  /*
+  /**
    * Creates entries necessary to the test.
    */
   private String[] newLDIFEntries(int entriesCnt)
@@ -426,8 +391,6 @@
       bigAttributeValue[i] = Integer.toString(i).charAt(0);
 
     String[] entries = new String[entriesCnt + 2];
-    String filler = "000000000000000000000000000000000000";
-
     entries[0] = "dn: " + EXAMPLE_DN + "\n"
                  + "objectClass: top\n"
                  + "objectClass: domain\n"
@@ -441,6 +404,7 @@
                + "entryUUID: 21111111-1111-1111-1111-111111111112\n"
                + "\n";
 
+    String filler = "000000000000000000000000000000000000";
     for (int i=0; i<entriesCnt; i++)
     {
       String useri="0000"+i;
@@ -472,36 +436,25 @@
   private void makeBrokerPublishEntries(ReplicationBroker broker,
       int senderID, int destinationServerID, int requestorID)
   {
-    // Send entries
-    try
+    RoutableMsg initTargetMessage =
+        new InitializeTargetMsg(EXAMPLE_DN, server2ID, destinationServerID,
+            requestorID, updatedEntries.length, initWindow);
+    broker.publish(initTargetMessage);
+
+    int cnt = 0;
+    for (String entry : updatedEntries)
     {
-      RoutableMsg initTargetMessage =
-        new InitializeTargetMsg(
-          EXAMPLE_DN, server2ID, destinationServerID, requestorID,
-          updatedEntries.length, initWindow);
-      broker.publish(initTargetMessage);
+      log("Broker will publish 1 entry: bytes:" + entry.length());
 
-      int cnt = 0;
-      for (String entry : updatedEntries)
-      {
-        log("Broker will publish 1 entry: bytes:"+ entry.length());
-
-        EntryMsg entryMsg = new EntryMsg(senderID, destinationServerID,
-            entry.getBytes(), ++cnt);
-        broker.publish(entryMsg);
-      }
-
-      DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
-      broker.publish(doneMsg);
-
-      log("Broker " + senderID + " published entries");
-
+      EntryMsg entryMsg =
+          new EntryMsg(senderID, destinationServerID, entry.getBytes(), ++cnt);
+      broker.publish(entryMsg);
     }
-    catch(Exception e)
-    {
-      fail("makeBrokerPublishEntries Exception:"+ e.getMessage() + " "
-          + stackTraceToSingleLineString(e));
-    }
+
+    DoneMsg doneMsg = new DoneMsg(senderID, destinationServerID);
+    broker.publish(doneMsg);
+
+    log("Broker " + senderID + " published entries");
   }
 
   void receiveUpdatedEntries(ReplicationBroker broker, int serverID,
@@ -565,8 +518,7 @@
 
     broker.setGenerationID(EMPTY_DN_GENID);
     broker.reStart(true);
-    try { Thread.sleep(500); } catch(Exception e) {}
-
+    sleep(500);
   }
 
   /**
@@ -598,19 +550,18 @@
    * @param changelogId The serverID of the replicationServer to create.
    * @return The new replicationServer.
    */
-  private ReplicationServer createChangelogServer(int changelogId, String testCase)
+  private ReplicationServer createChangelogServer(int changelogId,
+      String testCase) throws Exception
   {
     SortedSet<String> servers = new TreeSet<String>();
-    try
-    {
-      if (changelogId != changelog1ID)
-          servers.add("localhost:" + getChangelogPort(changelog1ID));
-      if (changelogId != changelog2ID)
-          servers.add("localhost:" + getChangelogPort(changelog2ID));
-      if (changelogId != changelog3ID)
-          servers.add("localhost:" + getChangelogPort(changelog3ID));
+    if (changelogId != changelog1ID)
+      servers.add("localhost:" + getChangelogPort(changelog1ID));
+    if (changelogId != changelog2ID)
+      servers.add("localhost:" + getChangelogPort(changelog2ID));
+    if (changelogId != changelog3ID)
+      servers.add("localhost:" + getChangelogPort(changelog3ID));
 
-      ReplServerFakeConfiguration conf =
+    ReplServerFakeConfiguration conf =
         new ReplServerFakeConfiguration(
             getChangelogPort(changelogId),
             "initOnlineTest" + getChangelogPort(changelogId) + testCase + "Db",
@@ -619,16 +570,10 @@
             0,
             100,
             servers);
-      ReplicationServer replicationServer = new ReplicationServer(conf);
-      Thread.sleep(1000);
+    ReplicationServer replicationServer = new ReplicationServer(conf);
+    Thread.sleep(1000);
 
-      return replicationServer;
-    }
-    catch (Exception e)
-    {
-      fail("createChangelog" + stackTraceToSingleLineString(e));
-    }
-    return null;
+    return replicationServer;
   }
 
   /**
@@ -636,52 +581,42 @@
    * replication Server ID.
    * @param changelogID
    */
-  private void connectServer1ToChangelog(int changelogID)
+  private void connectServer1ToChangelog(int changelogID) throws Exception
   {
     connectServer1ToChangelog(changelogID, 0);
   }
 
-  private void connectServer1ToChangelog(int changelogID, int heartbeat)
+  private void connectServer1ToChangelog(int changelogID, int heartbeat) throws Exception
   {
-    // Connect DS to the replicationServer
-    try
-    {
-      // suffix synchronized
-      String testName = "initOnLineTest";
-      String synchroServerLdif =
-        "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
-      + "objectClass: top\n"
-      + "objectClass: ds-cfg-synchronization-provider\n"
-      + "objectClass: ds-cfg-replication-domain\n"
-      + "cn: " + testName + "\n"
-      + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
-      + "ds-cfg-replication-server: localhost:"
-      + getChangelogPort(changelogID)+"\n"
-      + "ds-cfg-server-id: " + server1ID + "\n"
-      + "ds-cfg-receive-status: true\n"
-      + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
-      + "ds-cfg-window-size: " + WINDOW_SIZE;
+    // suffix synchronized
+    String testName = "initOnLineTest";
+    String synchroServerLdif =
+      "dn: cn=" + testName + ", cn=domains," + SYNCHRO_PLUGIN_DN + "\n"
+    + "objectClass: top\n"
+    + "objectClass: ds-cfg-synchronization-provider\n"
+    + "objectClass: ds-cfg-replication-domain\n"
+    + "cn: " + testName + "\n"
+    + "ds-cfg-base-dn: " + EXAMPLE_DN + "\n"
+    + "ds-cfg-replication-server: localhost:"
+    + getChangelogPort(changelogID)+"\n"
+    + "ds-cfg-server-id: " + server1ID + "\n"
+    + "ds-cfg-receive-status: true\n"
+    + (heartbeat>0?"ds-cfg-heartbeat-interval: "+heartbeat+" ms\n":"")
+    + "ds-cfg-window-size: " + WINDOW_SIZE;
 
+    // Clear the backend
+    LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
 
-      // Clear the backend
-      LDAPReplicationDomain.clearJEBackend(false, "userRoot", EXAMPLE_DN);
-
-      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
-      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
-      assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
+    synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
+    DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
+    assertNotNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
         "Unable to add the synchronized server");
-      configEntryList.add(synchroServerEntry.getDN());
+    configEntryList.add(synchroServerEntry.getDN());
 
-      replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
+    replDomain = LDAPReplicationDomain.retrievesReplicationDomain(baseDn);
 
-      assertTrue(!replDomain.ieRunning(),
+    assertTrue(!replDomain.ieRunning(),
         "ReplicationDomain: Import/Export is not expected to be running");
-    }
-    catch(Exception e)
-    {
-      log("connectServer1ToChangelog", e);
-      fail("connectServer1ToChangelog", e);
-    }
   }
 
   private int getChangelogPort(int changelogID) throws Exception
@@ -748,10 +683,6 @@
       testEntriesInDb();
 
       log("Successfully ending " + testCase);
-    }
-    catch(Exception e)
-    {
-      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
       afterTest(testCase);
@@ -967,10 +898,6 @@
       testEntriesInDb();
 
       log("Successfully ending " + testCase);
-    }
-    catch(Exception e)
-    {
-      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
       afterTest(testCase);
@@ -1022,10 +949,6 @@
       // createTask(taskInitTargetS2);
 
       log("Successfully ending " + testCase);
-    }
-    catch(Exception e)
-    {
-      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
       afterTest(testCase);
@@ -1094,10 +1017,6 @@
       // createTask(taskInitTargetS2);
 
       log("Successfully ending " + testCase);
-    }
-    catch(Exception e)
-    {
-      fail(testCase + " Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
     } finally
     {
       afterTest(testCase);
@@ -1155,40 +1074,32 @@
 
       // Check that the list of connected LDAP servers is correct
       // in each replication servers
-      List<String> l1 = changelog1.getReplicationServerDomain(
-          baseDn.toNormalizedString(), false).
-        getConnectedLDAPservers();
-      assertEquals(l1.size(), 1);
-      assertEquals(l1.get(0), String.valueOf(server1ID));
+      Set<Integer> l1 = changelog1.getReplicationServerDomain(
+          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+      Assertions.assertThat(l1).containsExactly(server1ID);
 
-      List<String> l2;
-    l2 = changelog2.getReplicationServerDomain(
-        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
-      assertEquals(l2.size(), 2);
-      assertTrue(l2.contains(String.valueOf(server2ID)));
-      assertTrue(l2.contains(String.valueOf(server3ID)));
+      Set<Integer> l2 = changelog2.getReplicationServerDomain(
+          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+      Assertions.assertThat(l2).containsExactly(server2ID, server3ID);
 
-      List<String> l3;
-    l3 = changelog3.getReplicationServerDomain(
-        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
-      assertEquals(l3.size(), 0);
+      Set<Integer> l3 = changelog3.getReplicationServerDomain(
+          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+      Assertions.assertThat(l3).isEmpty();
 
       // Test updates
       broker3.stop();
       Thread.sleep(1000);
-    l2 = changelog2.getReplicationServerDomain(
-        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
-      assertEquals(l2.size(), 1);
-      assertEquals(l2.get(0), String.valueOf(server2ID));
+      l2 = changelog2.getReplicationServerDomain(
+          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+      Assertions.assertThat(l2).containsExactly(server2ID);
 
       broker3 = openReplicationSession(DN.decode(EXAMPLE_DN),
         server3ID, 100, getChangelogPort(changelog2ID), 1000, emptyOldChanges);
       broker2.stop();
       Thread.sleep(1000);
-    l2 = changelog2.getReplicationServerDomain(
-        baseDn.toNormalizedString(), false).getConnectedLDAPservers();
-      assertEquals(l2.size(), 1);
-      assertEquals(l2.get(0), String.valueOf(server3ID));
+      l2 = changelog2.getReplicationServerDomain(
+          baseDn.toNormalizedString(), false).getConnectedDSs().keySet();
+      Assertions.assertThat(l2).containsExactly(server3ID);
 
     // TODO Test ReplicationServerDomain.getDestinationServers method.
 
@@ -1262,10 +1173,6 @@
 
       log("Successfully ending " + testCase);
     }
-    catch(Exception e)
-    {
-      log(testCase + e.getLocalizedMessage());
-    }
     finally
     {
       afterTest(testCase);
@@ -1567,18 +1474,11 @@
       // in those cases, loop for a while waiting for completion.
       for (int i = 0; i< 10; i++)
       {
-        if (replDomain.ieRunning())
-        {
-          try
-          {
-            Thread.sleep(500);
-          } catch (InterruptedException e)
-          { }
-        }
-        else
+        if (!replDomain.ieRunning())
         {
           break;
         }
+        sleep(500);
       }
        assertTrue(!replDomain.ieRunning(),
          "ReplicationDomain: Import/Export is not expected to be running");
@@ -1591,14 +1491,14 @@
     if (server2 != null)
     {
       server2.stop();
-      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+      sleep(100); // give some time to the broker to disconnect
       // from the replicationServer.
       server2 = null;
     }
     if (server3 != null)
     {
       server3.stop();
-      TestCaseUtils.sleep(100); // give some time to the broker to disconnect
+      sleep(100); // give some time to the broker to disconnect
       // from the replicationServer.
       server3 = null;
     }
@@ -1639,7 +1539,7 @@
     log("Successfully cleaned " + testCase);
   }
 
-    /**
+  /**
    * Clean up the environment.
    *
    * @throws Exception If the environment could not be set up.

--
Gitblit v1.10.0