mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
09.06.2009 76b8bb967a0e20ef38750dbffd893baa117c1f34
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
@@ -329,7 +329,7 @@
    /**
     * The update message equivalent to the originally received update message,
     * but with assured flag disabled. This message is the one that should be
     * sent to non elligible servers for assured mode.
     * sent to non eligible servers for assured mode.
     * We need a clone like of the original message with assured flag off, to be
     * posted to servers we don't want to wait the ack from (not normal status
     * servers or servers with different group id). This must be done because
@@ -479,7 +479,7 @@
       * The list of servers identified as servers we are interested in
       * receiving acks from. If this list is not null, then expectedAcksInfo
       * should be not null.
       * Servers that are not in this list are servers not elligible for an ack
       * Servers that are not in this list are servers not eligible for an ack
       * request.
       *
       */
@@ -496,7 +496,7 @@
   * Process a just received assured update message in Safe Read mode. If the
   * ack can be sent immediately, it is done here. This will also determine to
   * which suitable servers an ack should be requested from, and which ones are
   * not elligible for an ack request.
   * not eligible for an ack request.
   * This method is an helper method for the put method. Have a look at the put
   * method for a better understanding.
   * @param update The just received assured update to process.
@@ -516,12 +516,12 @@
    List<Short> expectedServers = new ArrayList<Short>();
    List<Short> wrongStatusServers = new ArrayList<Short>();
    if (sourceGroupId != groupId)
    if (sourceGroupId == groupId)
      // Assured feature does not cross different group ids
    {
      if (sourceHandler.isLDAPserver())
      {
        // Look for RS elligible for assured
        // Look for RS eligible for assured
        for (ServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
@@ -537,7 +537,7 @@
        }
      }
      // Look for DS elligible for assured
      // Look for DS eligible for assured
      for (ServerHandler handler : directoryServers.values())
      {
        // Don't forward the change to the server that just sent it
@@ -560,10 +560,10 @@
              wrongStatusServers.add(handler.getServerId());
            } else
            {
              /*
              /**
               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
               * We do not want this to be reported as an error to the update
               * maker -> no pollution or potential missunderstanding when
               * maker -> no pollution or potential misunderstanding when
               * reading logs or monitoring and it was just administration (for
               * instance new server is being configured in topo: it goes in bad
               * gen then then full full update).
@@ -586,7 +586,7 @@
    if (preparedAssuredInfo.expectedServers == null)
    {
      // No elligible servers found, send the ack immediatly
      // No eligible servers found, send the ack immediately
      AckMsg ack = new AckMsg(cn);
      sourceHandler.sendAck(ack);
    }
@@ -598,7 +598,7 @@
   * Process a just received assured update message in Safe Data mode. If the
   * ack can be sent immediately, it is done here. This will also determine to
   * which suitable servers an ack should be requested from, and which ones are
   * not elligible for an ack request.
   * not eligible for an ack request.
   * This method is an helper method for the put method. Have a look at the put
   * method for a better understanding.
   * @param update The just received assured update to process.
@@ -637,20 +637,24 @@
        {
          if (safeDataLevel == (byte) 1)
          {
            // Immediatly return the ack for an assured message in safe data
            // mode with safe data level 1, coming from a DS. No need to wait
            // for more acks
            /**
             * Immediately return the ack for an assured message in safe data
             * mode with safe data level 1, coming from a DS. No need to wait
             * for more acks
             */
            AckMsg ack = new AckMsg(cn);
            sourceHandler.sendAck(ack);
          } else
          {
            if (safeDataLevel != (byte) 0)
            {
              // level > 1 : We need further acks
              // The message will be posted in assured mode to elligible
              // servers. The embedded safe data level is not changed, and his
              // value will be used by a remote RS to determine if he must send
              // an ack (level > 1) or not (level = 1)
              /**
               * level > 1 : We need further acks
               * The message will be posted in assured mode to eligible
               * servers. The embedded safe data level is not changed, and his
               * value will be used by a remote RS to determine if he must send
               * an ack (level > 1) or not (level = 1)
               */
              interestedInAcks = true;
            } else
            {
@@ -661,12 +665,14 @@
        { // A RS sent us the safe data message, for sure no futher acks to wait
          if (safeDataLevel == (byte) 1)
          {
            // The original level was 1 so the RS that sent us this message
            // should have already sent his ack to the sender DS. Level 1 has
            // already been reached so no further acks to wait.
            // This should not happen in theory as the sender RS server should
            // have sent us a matching not assured message so we should not come
            // to here.
            /**
             * The original level was 1 so the RS that sent us this message
             * should have already sent his ack to the sender DS. Level 1 has
             * already been reached so no further acks to wait.
             * This should not happen in theory as the sender RS server should
             * have sent us a matching not assured message so we should not come
             * to here.
             */
          } else
          {
            // level > 1, so Ack this message to originator RS
@@ -682,7 +688,7 @@
    {
      if (sourceHandler.isLDAPserver())
      {
        // Look for RS elligible for assured
        // Look for RS eligible for assured
        for (ServerHandler handler : replicationServers.values())
        {
          if (handler.getGroupId() == groupId)
@@ -709,7 +715,7 @@
        // Some other acks to wait for
        int sdl = update.getSafeDataLevel();
        int neededAdditionalServers = sdl - 1;
        // Change the number of expected acks if not enough available elligible
        // Change the number of expected acks if not enough available eligible
        // servers: the level is a best effort thing, we do not want to timeout
        // at every assured SD update for instance if a RS has had his gen id
        // resetted
@@ -721,8 +727,8 @@
        preparedAssuredInfo.expectedServers = expectedServers;
      } else
      {
        // level > 1 and source is a DS but no elligible servers found, send the
        // ack immediatly
        // level > 1 and source is a DS but no eligible servers found, send the
        // ack immediately
        AckMsg ack = new AckMsg(cn);
        sourceHandler.sendAck(ack);
      }
@@ -755,8 +761,11 @@
          // remove object from the map
          return;
        }
        // If this is the last ack we were waiting from, immediatly create and
        // send the final ack to the original server
        /**
         *
         * If this is the last ack we were waiting from, immediately create and
         * send the final ack to the original server
         */
        if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
        {
          // Remove the object from the map as no more needed
@@ -768,7 +777,7 @@
            origServer.sendAck(finalAck);
          } catch (IOException e)
          {
            /*
            /**
             * An error happened trying the send back an ack to the server.
             * Log an error and close the connection to this server.
             */
@@ -794,7 +803,7 @@
  /**
   * The code run when the timeout occurs while waiting for acks of the
   * elligible servers. This basically sends a timeout ack (with any additional
   * eligible servers. This basically sends a timeout ack (with any additional
   * error info) to the original server that sent an assured update message.
   */
  private class AssuredTimeoutTask extends TimerTask
@@ -846,7 +855,7 @@
            origServer.sendAck(finalAck);
          } catch (IOException e)
          {
            /*
            /**
             * An error happened trying the send back an ack to the server.
             * Log an error and close the connection to this server.
             */
@@ -2408,7 +2417,7 @@
  /**
   * Set the purge delay on all the db Handlers for this Domain
   * of Replicaiton.
   * of Replication.
   *
   * @param delay The new purge delay to use.
   */
opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
@@ -22,15 +22,17 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import java.util.ArrayList;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import java.util.List;
import java.util.Set;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.AckMsg;
@@ -60,7 +62,7 @@
  // The list of server ids that had errors for the sent matching update
  // Each server id of the list had one of the
  // 3 possible errors (timeout, wrong status or replay error)
  private List<Short> failedServers = null;
  private List<Short> failedServers = new ArrayList<Short>();
  /**
   * Number of servers we want an ack from and from which we received the ack.
@@ -210,12 +212,27 @@
    ack.setHasTimeout(hasTimeout);
    ack.setHasWrongStatus(hasWrongStatus);
    ack.setHasReplayError(hasReplayError);
    ack.setFailedServers(failedServers);
    // Force anyway timeout flag if requested
    if (timeout)
    {
      // Force anyway timeout flag if requested
      ack.setHasTimeout(true);
      // Add servers that did not respond in time
      Set<Short> serverIds = expectedServersAckStatus.keySet();
      for (Short serverId : serverIds)
      {
        boolean ackReceived = expectedServersAckStatus.get(serverId);
        if (!ackReceived)
        {
          if (!failedServers.contains(serverId))
            failedServers.add(serverId);
        }
      }
    }
    ack.setFailedServers(failedServers);
    return ack;
  }
}
opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -331,6 +331,29 @@
  }
  /**
   * Creates a ReplicationDomain with the provided parameters.
   * (for unit test purpose only)
   *
   * @param serviceID  The identifier of the Replication Domain to which
   *                   this object is participating.
   * @param serverID   The identifier of the server that is participating
   *                   to the Replication Domain.
   *                   This identifier should be different for each server that
   *                   is participating to a given Replication Domain.
   * @param serverState The serverState to use
   */
  public ReplicationDomain(String serviceID, short serverID,
    ServerState serverState)
  {
    this.serviceID = serviceID;
    this.serverID = serverID;
    this.state = serverState;
    this.generator = new ChangeNumberGenerator(serverID, state);
    domains.put(serviceID, this);
  }
  /**
   * Set the initial status of the domain and perform necessary initializations.
   * This method will be called by the Broker each time the ReplicationBroker
   * establish a new session to a Replication Server.
@@ -773,8 +796,9 @@
    }
    numRcvdUpdates.incrementAndGet();
    if (update.isAssured() && (update.getAssuredMode() ==
      AssuredMode.SAFE_READ_MODE))
     byte rsGroupId = broker.getRsGroupId();
    if ( update.isAssured() && (update.getAssuredMode() ==
      AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) )
    {
      receivedAssuredSrUpdates.incrementAndGet();
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
@@ -935,6 +935,9 @@
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
@@ -961,6 +964,9 @@
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1032,6 +1038,9 @@
        Integer nError = errorsByServer.get((short)RS_SERVER_ID);
        assertNotNull(nError);
        assertEquals(nError.intValue(), 1);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1054,6 +1063,9 @@
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1208,6 +1220,9 @@
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1269,6 +1284,9 @@
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1389,6 +1407,25 @@
        assertFalse(ackMsg.hasReplayError());
        assertFalse(ackMsg.hasWrongStatus());
        assertEquals(ackMsg.getFailedServers().size(), 0);
        // Check for monitoring data
        DN baseDn = DN.decode(SAFE_READ_DN);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
        assertTrue(errorsByServer.isEmpty());
      } catch (SocketTimeoutException e)
      {
        // Expected
@@ -1523,6 +1560,9 @@
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
@@ -1558,6 +1598,9 @@
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 2);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 2);
@@ -1595,6 +1638,9 @@
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 3);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 3);
@@ -1676,6 +1722,9 @@
      nError = errorsByServer.get((short)20);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1718,6 +1767,9 @@
      nError = errorsByServer.get((short)30);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1760,6 +1812,9 @@
      nError = errorsByServer.get((short)RS_SERVER_ID);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -34,6 +34,7 @@
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -97,15 +98,20 @@
  private static final short FDS1_ID = 1;
  private static final short FDS2_ID = 2;
  private static final short FDS3_ID = 3;
  private static final short FDS4_ID = 4;
  private static final short FDS5_ID = 5;
  private static final short FRS1_ID = 11;
  private static final short FRS2_ID = 12;
  private static final short FRS3_ID = 13;
  private static final short DS_FRS2_ID = FRS2_ID + 10;
  private static final short RS1_ID = 101;
  private static final short RS2_ID = 102;
  private static final short RS3_ID = 103;
  private FakeReplicationDomain fakeRd1 = null;
  private FakeReplicationDomain fakeRd2 = null;
  private FakeReplicationDomain fakeRd3 = null;
  private FakeReplicationDomain fakeRd4 = null;
  private FakeReplicationDomain fakeRd5 = null;
  private FakeReplicationServer fakeRs1 = null;
  private FakeReplicationServer fakeRs2 = null;
  private FakeReplicationServer fakeRs3 = null;
@@ -138,7 +144,7 @@
   */
  // DS receives updates and replies acks with no errors to every updates
  private static final int REPLY_OK_DS_SCENARIO = 1;
  // DS receives acks but does not respond (makes timeouts)
  // DS receives updates but does not respond (makes timeouts)
  private static final int TIMEOUT_DS_SCENARIO = 2;
  // DS receives updates and replies ack with replay error flags
  private static final int REPLAY_ERROR_DS_SCENARIO = 3;
@@ -148,10 +154,17 @@
   */
  // RS receives updates and replies acks with no errors to every updates
  private static final int REPLY_OK_RS_SCENARIO = 11;
  // RS receives acks but does not respond (makes timeouts)
  // RS receives updates but does not respond (makes timeouts)
  private static final int TIMEOUT_RS_SCENARIO = 12;
  // RS is used for sending updates (with sendNewFakeUpdate()) and receive acks, synchronously
  private static final int SENDER_RS_SCENARIO = 13;
  private static final int SENDER_RS_SCENARIO = 13;
  //   Scenarios only used in safe read tests:
  // RS receives updates and replies ack error as if a DS was connected to it and timed out
  private static final int DS_TIMEOUT_RS_SCENARIO_SAFE_READ = 14;
  // RS receives updates and replies ack error as if a DS was connected to it and was wrong status
  private static final int DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ = 15;
  // RS receives updates and replies ack error as if a DS was connected to it and had a replay error
  private static final int DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ = 16;
  private void debugInfo(String s)
  {
@@ -188,6 +201,8 @@
    fakeRd1 = null;
    fakeRd2 = null;
    fakeRd3 = null;
    fakeRd4 = null;
    fakeRd5 = null;
    fakeRs1 = null;
    fakeRs2 = null;
    fakeRs3 = null;
@@ -218,6 +233,18 @@
      fakeRd3 = null;
    }
    if (fakeRd4 != null)
    {
      fakeRd4.disableService();
      fakeRd4 = null;
    }
    if (fakeRd5 != null)
    {
      fakeRd5.disableService();
      fakeRd5 = null;
    }
    // Shutdown fake RSs
    if (fakeRs1 != null)
@@ -260,6 +287,18 @@
      rs3 = null;
    }
  }
  /**
   * Creates and connects a new fake replication domain, using the passed scenario
   * (no server state constructor version)
   */
  private FakeReplicationDomain createFakeReplicationDomain(short serverId,
    int groupId, short rsId, long generationId, boolean assured,
    AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
    int scenario)
  {
    return createFakeReplicationDomain(serverId, groupId, rsId, generationId, assured,
      assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState());
  }
  /**
   * Creates and connects a new fake replication domain, using the passed scenario.
@@ -267,7 +306,7 @@
  private FakeReplicationDomain createFakeReplicationDomain(short serverId,
    int groupId, short rsId, long generationId, boolean assured,
    AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
    int scenario)
    int scenario, ServerState serverState)
  {
    try
    {
@@ -290,7 +329,8 @@
      FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
        TEST_ROOT_DN_STRING, serverId, "localhost:" + rsPort, generationId,
        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, scenario);
        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
        scenario, serverState);
      // Test connection
      assertTrue(fakeReplicationDomain.isConnected());
@@ -323,7 +363,7 @@
   */
  private FakeReplicationServer createFakeReplicationServer(short serverId,
    int groupId, short rsId, long generationId, boolean assured,
    AssuredMode assuredMode, int safeDataLevel, int scenario)
    AssuredMode assuredMode, int safeDataLevel, ServerState serverState, int scenario)
  {
    try
    {
@@ -349,7 +389,7 @@
        TEST_ROOT_DN_STRING, generationId);
      // Connect fake RS to the real RS
      assertTrue(fakeReplicationServer.connect());
      assertTrue(fakeReplicationServer.connect(serverState));
      // Start wished scenario
      fakeReplicationServer.start(scenario);
@@ -375,7 +415,7 @@
      if (serverId == RS1_ID)
      {
        port = rs1Port;
        if (testCase.equals("testSafeDataManyRealRSs"))
        if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
        {
          // Every 3 RSs connected together
          replServers.add("localhost:" + rs2Port);
@@ -387,7 +427,7 @@
      } else if (serverId == RS2_ID)
      {
        port = rs2Port;
        if (testCase.equals("testSafeDataManyRealRSs"))
        if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
        {
          // Every 3 RSs connected together
          replServers.add("localhost:" + rs1Port);
@@ -399,7 +439,7 @@
      } else if (serverId == RS3_ID)
      {
        port = rs3Port;
        if (testCase.equals("testSafeDataManyRealRSs"))
        if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
        {
          // Every 3 RSs connected together
          replServers.add("localhost:" + rs1Port);
@@ -448,6 +488,8 @@
    // Number of received updates
    private int nReceivedUpdates = 0;
    private boolean sameGidAsRs = true;
    /**
     * Creates a fake replication domain (DS)
     * @param serviceID The base dn used at connection to RS
@@ -460,7 +502,7 @@
     * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
     * @param assuredTimeout the assured timeout used when sending updates
     * @param scenario the scenario we are creating for (implies particular
     * behaviour upon reception of updates)
     * behavior upon reception of updates)
     * @throws org.opends.server.config.ConfigException
     */
    public FakeReplicationDomain(
@@ -473,9 +515,10 @@
      AssuredMode assuredMode,
      byte safeDataLevel,
      long assuredTimeout,
      int scenario) throws ConfigException
      int scenario,
      ServerState serverState) throws ConfigException
    {
      super(serviceID, serverID);
      super(serviceID, serverID, serverState);
      List<String> replicationServers = new ArrayList<String>();
      replicationServers.add(replicationServer);
      this.generationId = generationId;
@@ -544,42 +587,40 @@
    @Override
    public boolean processUpdate(UpdateMsg updateMsg)
    {
      try
      {
        checkUpdateAssuredParameters(updateMsg);
        nReceivedUpdates++;
        // Now execute the requested scenario
        AckMsg ackMsg = null;
        switch (scenario)
        {
          case REPLY_OK_DS_SCENARIO:
            // Send the ack without errors
            ackMsg = new AckMsg(updateMsg.getChangeNumber());
            session.publish(ackMsg);
            break;
          case TIMEOUT_DS_SCENARIO:
            // Let timeout occur
            break;
          case REPLAY_ERROR_DS_SCENARIO:
            // Send the ack with replay error
            ackMsg = new AckMsg(updateMsg.getChangeNumber());
            ackMsg.setHasReplayError(true);
            List<Short> failedServers = new ArrayList<Short>();
            failedServers.add(getServerId());
            ackMsg.setFailedServers(failedServers);
            session.publish(ackMsg);
            break;
          default:
            fail("Unknown scenario: " + scenario);
        }
        return true;
      } catch (IOException ex)
      checkUpdateAssuredParameters(updateMsg);
      nReceivedUpdates++;
      // Now execute the requested scenario
      switch (scenario)
      {
        fail("IOException in fake replication domain " + getServerId() + " :" +
          ex.getMessage());
        return false;
        case REPLY_OK_DS_SCENARIO:
          // Send the ack without errors
          // Call processUpdateDone and update the server state is what needs to
          // be done when using asynchronous process update mechanism
          // (see processUpdate javadoc)
          processUpdateDone(updateMsg, null);
          getServerState().update(updateMsg.getChangeNumber());
          break;
        case TIMEOUT_DS_SCENARIO:
          // Let timeout occur
          break;
        case REPLAY_ERROR_DS_SCENARIO:
          // Send the ack with replay error
          // Call processUpdateDone and update the server state is what needs to
          // be done when using asynchronous process update mechanism
          // (see processUpdate javadoc)
          processUpdateDone(updateMsg, "This is the replay error message generated from fake DS " +
            getServerId() + " for update with change number " + updateMsg.
            getChangeNumber());
          getServerState().update(updateMsg.getChangeNumber());
          break;
        default:
          fail("Unknown scenario: " + scenario);
      }
      // IMPORTANT: return false so that we use the asynchronous processUpdate mechanism
      // (see processUpdate javadoc)
      return false;
    }
    /**
@@ -663,7 +704,7 @@
    // Number of received updates
    private int nReceivedUpdates = 0;
    // True is an ack has been replied to a received assured update (in assured mode of course)
    // True if an ack has been replied to a received assured update (in assured mode of course)
    // used in reply scenario
    private boolean ackReplied = false;
@@ -741,7 +782,7 @@
     * Connect to RS
     * Returns true if connection was made successfully
     */
    public boolean connect()
    public boolean connect(ServerState serverState)
    {
      try
      {
@@ -761,7 +802,7 @@
        // Send our repl server start msg
        ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
          fakeUrl, baseDn, 100, new ServerState(),
          fakeUrl, baseDn, 100, serverState,
          ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
          groupId, 5000);
        session.publish(replServerStartMsg);
@@ -857,6 +898,47 @@
              case TIMEOUT_RS_SCENARIO:
                // Let timeout occur
                break;
              case DS_TIMEOUT_RS_SCENARIO_SAFE_READ:
                if (updateMsg.isAssured())
                {
                  // Emulate RS waiting for virtual DS ack
                  sleep(MAX_SEND_UPDATE_TIME);
                  // Send the ack with timeout error from a virtual DS with id (ours + 10)
                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
                  ackMsg.setHasTimeout(true);
                  List<Short> failedServers = new ArrayList<Short>();
                  failedServers.add((short)(serverId + 10));
                  ackMsg.setFailedServers(failedServers);
                  session.publish(ackMsg);
                  ackReplied = true;
                }
                break;
              case DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ:
                if (updateMsg.isAssured())
                {
                  // Send the ack with wrong status error from a virtual DS with id (ours + 10)
                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
                  ackMsg.setHasWrongStatus(true);
                  List<Short> failedServers = new ArrayList<Short>();
                  failedServers.add((short)(serverId + 10));
                  ackMsg.setFailedServers(failedServers);
                  session.publish(ackMsg);
                  ackReplied = true;
                }
                break;
              case DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ:
                if (updateMsg.isAssured())
                {
                  // Send the ack with replay error from a virtual DS with id (ours + 10)
                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
                  ackMsg.setHasReplayError(true);
                  List<Short> failedServers = new ArrayList<Short>();
                  failedServers.add((short)(serverId + 10));
                  ackMsg.setFailedServers(failedServers);
                  session.publish(ackMsg);
                  ackReplied = true;
                }
                break;
              default:
                fail("Unknown scenario: " + scenario);
            }
@@ -910,7 +992,7 @@
    }
    /**
     * Check that received update assured parameters are as defined at DS start
     * Check that received update assured parameters are as defined at RS start
     */
    private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
    {
@@ -948,7 +1030,7 @@
    }
    /**
     * Test if the last received updates was acknowledged
     * Test if the last received updates was acknowledged (ack sent with or without errors)
     * WARNING: this must be called once per update as it also immediatly resets the status
     * for a new test for the next update
     * @return True if acknowledged
@@ -1100,7 +1182,7 @@
        // this would timeout. If main DS group id is not the same as the real RS one,
        // the update will even not come to real RS as asured
        fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT_RS_SCENARIO);
          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, new ServerState(), TIMEOUT_RS_SCENARIO);
        assertNotNull(fakeRs1);
      }
@@ -1120,7 +1202,7 @@
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      if (mainDsGid == DEFAULT_GID)
      {
        // Check monitoring values (check that ack has been correctly received)
@@ -1139,7 +1221,7 @@
      }
      // Sanity check
      sleep(1000); // Let time to update to reach other servers
      sleep(500); // Let time to update to reach other servers
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
      if (otherFakeDS)
@@ -1435,24 +1517,24 @@
      // Put a fake RS 1 connected to real RS
      fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRs1Gid, RS1_ID,
        fakeRs1GenId, ((fakeRs1Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
        fakeRs1Scen);
        new ServerState(), fakeRs1Scen);
      assertNotNull(fakeRs1);
      // Put a fake RS 2 connected to real RS
      fakeRs2 = createFakeReplicationServer(FRS2_ID, fakeRs2Gid, RS1_ID,
        fakeRs2GenId, ((fakeRs2Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
        fakeRs2Scen);
        new ServerState(), fakeRs2Scen);
      assertNotNull(fakeRs2);
      // Put a fake RS 3 connected to real RS
      fakeRs3 = createFakeReplicationServer(FRS3_ID, fakeRs3Gid, RS1_ID,
        fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
        fakeRs3Scen);
        new ServerState(), fakeRs3Scen);
      assertNotNull(fakeRs3);
      // Wait for connections to be finished
      // DS must see expected numbers of fake DSs and RSs
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 4);
      /***********************************************************************
       * Send update from DS 1 (3 fake RSs available) and check what happened
@@ -1479,9 +1561,9 @@
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
      checkWhatHasBeenReceived(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
      checkWhatHasBeenReceivedSafeData(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
      /***********************************************************************
       * Send update from DS 1 (2 fake RSs available) and check what happened
@@ -1493,7 +1575,7 @@
      // Wait for disconnection to be finished
      // DS must see expected numbers of fake DSs and RSs
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
      // Keep track of monitoring values for incremental test step
      acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1516,9 +1598,9 @@
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(2, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
      checkWhatHasBeenReceived(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
      checkWhatHasBeenReceivedSafeData(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
      /***********************************************************************
       * Send update from DS 1 (1 fake RS available) and check what happened
@@ -1530,7 +1612,7 @@
      // Wait for disconnection to be finished
      // DS must see expected numbers of fake DSs and RSs
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
      // Keep track of monitoring values for incremental test step
      acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1553,9 +1635,9 @@
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
      checkWhatHasBeenReceived(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
      checkWhatHasBeenReceivedSafeData(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
      /***********************************************************************
       * Send update from DS 1 (no fake RS available) and check what happened
@@ -1567,7 +1649,7 @@
      // Wait for disconnection to be finished
      // DS must see expected numbers of fake DSs and RSs
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 0);
      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
      // Keep track of monitoring values for incremental test step
      acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1590,9 +1672,9 @@
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
      checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
      checkWhatHasBeenReceived(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
      checkWhatHasBeenReceivedSafeData(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
    } finally
    {
      endTest();
@@ -1602,7 +1684,7 @@
  // Check that the DSs and the fake RSs of the topology have received/acked what is expected according to the
  // test step (the number of updates)
  // -1 for a gen id means no need to test the matching fake RS
  private void checkWhatHasBeenReceived(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
  private void checkWhatHasBeenReceivedSafeData(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
  {
    // We should not receive our own update
@@ -1805,7 +1887,7 @@
      rsInfo = fakeRd.getRsList();
      nDs = dsInfo.size();
      nRs = rsInfo.size();
      if ( (nDs == expectedDs) && (nRs == (expectedRs+1)) ) // Must include real RS so '+1'
      if ( (nDs == expectedDs) && (nRs == expectedRs) ) // Must include real RS so '+1'
      {
        debugInfo("waitForStableTopo: expected topo obtained after " + (30-nSec) + " second(s).");
        return;
@@ -1817,7 +1899,7 @@
      " DSs (had " + dsInfo +") and " + expectedRs + " RSs (had " + rsInfo +").");
  }
  // Compute the list of servers that are elligible for receiving an assured update
  // Compute the list of servers that are elligible for receiving a safe data assured update
  // according to their group id and generation id. If -1 is used, the server is out of scope
  private List<Short> computeElligibleServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs2Gid, long fakeRs2GenId, int fakeRs3Gid, long fakeRs3GenId)
  {
@@ -1847,7 +1929,7 @@
    return false;
  }
  // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
  // Compute the list of servers that are elligible for receiving a safe data assured update and that are expected to effectively ack the update
  // If -1 is used, the server is out of scope
  private List<Short> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
  {
@@ -1946,7 +2028,7 @@
      // Put a fake RS 2 connected to real RS
      fakeRs2 = createFakeReplicationServer(FRS2_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 10,
        TIMEOUT_RS_SCENARIO);
        new ServerState(), TIMEOUT_RS_SCENARIO);
      assertNotNull(fakeRs2);
      /*
@@ -1956,7 +2038,7 @@
      // Put a fake RS 1 connected to real RS
      fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
        fakeRsGenId, sendInAssured, AssuredMode.SAFE_DATA_MODE, sdLevel,
        SENDER_RS_SCENARIO);
        new ServerState(), SENDER_RS_SCENARIO);
      assertNotNull(fakeRs1);
      /*
@@ -2068,7 +2150,7 @@
      // Wait for RSs connections to be finished
      // DS must see expected numbers of RSs
      waitForStableTopo(fakeRd1, 0, 2);
      waitForStableTopo(fakeRd1, 0, 3);
      /*
       * Send update from DS 1 and check result
@@ -2088,6 +2170,7 @@
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      // Check monitoring values (check that ack has been correctly received)
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
      assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
      assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
@@ -2097,5 +2180,684 @@
      endTest();
    }
  }
  /**
   * Test safe read mode with only one real RS deployment. One fake DS sends
   * assured messages to one other fake DS connected to the RS a fake RS
   * connected to the real RS is also expected to send the ack
   */
  @Test(enabled = true)
  public void testSafeReadOneRSBasic() throws Exception
  {
    String testCase = "testSafeReadOneRSBasic";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /*******************
       * Start real RS (the one to be tested)
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase);
      assertNotNull(rs1);
      /*******************
       * Start main DS 1 (the one which sends updates)
       */
      // Create and connect DS 1 to RS 1
      // Assured mode: SR
      fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
      assertNotNull(fakeRd1);
      /*
       * Send a first assured safe read update
       */
      long startTime = System.currentTimeMillis();
      try
      {
        fakeRd1.sendNewFakeUpdate();
      } catch (TimeoutException e)
      {
        fail("No timeout is expected here");
      }
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check call time (should be short as RS should have acked)
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      // Check monitoring values (check that ack has been correctly received)
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
         // Sanity check
      sleep(500); // Let time to update to reach servers
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
      /*******************
       * Start another fake DS 2 connected to RS
       */
      // Create and connect DS 2 to RS 1
      // Assured mode: SR
      ServerState serverState = fakeRd1.getServerState();
      fakeRd2 = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO, serverState);
      assertNotNull(fakeRd2);
      // Wait for connections to be established
      waitForStableTopo(fakeRd1, 1, 1);
      /*
       * Send a second assured safe read update
       */
      startTime = System.currentTimeMillis();
      try
      {
        fakeRd1.sendNewFakeUpdate();
      } catch (TimeoutException e)
      {
        fail("No timeout is expected here");
      }
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check call time (should be short as RS should have acked)
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      // Check monitoring values (check that ack has been correctly received)
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 2);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 2);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 1);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 1);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
      // Sanity check
      sleep(500); // Let time to update to reach servers
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
      assertEquals(fakeRd2.nReceivedUpdates(), 1);
      assertTrue(fakeRd2.receivedUpdatesOk());
      /*******************
       * Start a fake RS 1 connected to RS
       */
      fakeRs1 = createFakeReplicationServer(FRS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1,
        fakeRd1.getServerState(), REPLY_OK_RS_SCENARIO);
      assertNotNull(fakeRs1);
      // Wait for connections to be established
      waitForStableTopo(fakeRd1, 1, 2);
      /*
       * Send a third assured safe read update
       */
      startTime = System.currentTimeMillis();
      try
      {
        fakeRd1.sendNewFakeUpdate();
      } catch (TimeoutException e)
      {
        fail("No timeout is expected here");
      }
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check call time (should be short as RS should have acked)
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      // Check monitoring values (check that ack has been correctly received)
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 3);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 3);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 2);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 2);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
      // Sanity check
      sleep(500); // Let time to update to reach servers
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
      assertEquals(fakeRd2.nReceivedUpdates(), 2);
      assertTrue(fakeRd2.receivedUpdatesOk());
      assertEquals(fakeRs1.nReceivedUpdates(), 1);
      assertTrue(fakeRs1.receivedUpdatesOk());
      /*******************
       * Shutdown fake DS 2
       */
      // Shutdown fake DS 2
      fakeRd2.disableService();
      fakeRd2 = null;
      // Wait for disconnection to be finished
      waitForStableTopo(fakeRd1, 0, 2);
      /*
       * Send a fourth assured safe read update
       */
      startTime = System.currentTimeMillis();
      try
      {
        fakeRd1.sendNewFakeUpdate();
      } catch (TimeoutException e)
      {
        fail("No timeout is expected here");
      }
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check call time (should be short as RS should have acked)
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      // Check monitoring values (check that ack has been correctly received)
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
      // Sanity check
      sleep(500); // Let time to update to reach servers
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
      assertEquals(fakeRs1.nReceivedUpdates(), 2);
      assertTrue(fakeRs1.receivedUpdatesOk());
      /*******************
       * Shutdown fake RS 1
       */
      // Shutdown fake RS 1
      fakeRs1.shutdown();
      fakeRs1 = null;
      // Wait for disconnection to be finished
      waitForStableTopo(fakeRd1, 0, 1);
      /*
       * Send a fifth assured safe read update
       */
      startTime = System.currentTimeMillis();
      try
      {
        fakeRd1.sendNewFakeUpdate();
      } catch (TimeoutException e)
      {
        fail("No timeout is expected here");
      }
      sendUpdateTime = System.currentTimeMillis() - startTime;
      // Check call time (should be short as RS should have acked)
      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      // Check monitoring values (check that ack has been correctly received)
      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 5);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
      // Sanity check
      sleep(500); // Let time to update to reach servers
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
    } finally
    {
      endTest();
    }
  }
  /**
   * Returns possible combinations of parameters for testSafeReadOneRSComplexPrecommit test
   */
  @DataProvider(name = "testSafeReadOneRSComplexPrecommitProvider")
  private Object[][] testSafeReadOneRSComplexPrecommitProvider()
  {
    return new Object[][]
    {
      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
      {DEFAULT_GID, DEFAULT_GENID, TIMEOUT_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
      {DEFAULT_GID, DEFAULT_GENID, REPLAY_ERROR_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_TIMEOUT_RS_SCENARIO_SAFE_READ},
      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ},
      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ},
      {OTHER_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
    };
  }
  /**
   * See testSafeReadOneRSComplex comment.
   */
  @Test(dataProvider = "testSafeReadOneRSComplexPrecommitProvider", enabled = true)
  public void testSafeReadOneRSComplexPrecommit(int otherFakeDsGid, long otherFakeDsGenId, int otherFakeDsScen,
    int otherFakeRsGid, long otherFakeRsGenId, int otherFakeRsScen) throws Exception
  {
    testSafeReadOneRSComplex(otherFakeDsGid, otherFakeDsGenId, otherFakeDsScen,
    otherFakeRsGid, otherFakeRsGenId, otherFakeRsScen);
  }
  /**
   * Returns possible combinations of parameters for testSafeReadOneRSComplex test
   */
  @DataProvider(name = "testSafeReadOneRSComplexProvider")
  private Object[][] testSafeReadOneRSComplexProvider()
  {
    List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
    // Other additional DS group id
    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
    // Other additional DS generation id
    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
    // Other additional DS scenario
    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_DS_SCENARIO, TIMEOUT_DS_SCENARIO, REPLAY_ERROR_DS_SCENARIO);
    // Other additional RS group id
    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
    // Other additional RS generation id
    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
    // Other additional RS scenario
    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ);
    Object[][] result = new Object[objectArrayList.size()][];
    int i = 0;
    for (List<Object> objectArray : objectArrayList)
    {
      result[i] = objectArray.toArray();
      i++;
    }
    return result;
  }
  /**
   * Test safe read mode with only one real RS deployment.
   * Test that the RS is able to acknowledge SR updates with level higher than 1
   * and also to return errors is some errors occur.
   * - 1 main fake DS connected to the RS
   * - 1 other fake DS connected to the RS, with same GID as RS and same GENID as RS and always acking without error
   * - 1 other fake DS connected to the RS, with GID, GENID, scenario...changed through the provider
   * - 1 fake RS connected to the RS (emulating one fake DS connected to it), with same GID as RS and always acking without error
   * - 1 other fake RS connected to the RS (emulating one fake DS connected to it), with GID scenario...changed through the provider
   *
   * All possible combinations tested thanks to the provider.
   */
  @Test(dataProvider = "testSafeReadOneRSComplexProvider", groups = "slow", enabled = false) // Working but disabled as 17.5 minutes to run
  public void testSafeReadOneRSComplex(int otherFakeDsGid, long otherFakeDsGenId, int otherFakeDsScen,
    int otherFakeRsGid, long otherFakeRsGenId, int otherFakeRsScen) throws Exception
  {
    String testCase = "testSafeReadOneRSComplex";
    debugInfo("Starting " + testCase);
    initTest();
    try
    {
      /*
       * Start real RS (the one to be tested)
       */
      // Create real RS 1
      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
        testCase);
      assertNotNull(rs1);
      /*
       * Start main DS 1 (the one which sends updates)
       */
      fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        TIMEOUT_DS_SCENARIO);
      assertNotNull(fakeRd1);
      /*
       * Start another fake DS 2 connected to RS
       */
      fakeRd2 = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        REPLY_OK_DS_SCENARIO);
      assertNotNull(fakeRd2);
      /*
       * Start another fake DS 3 connected to RS
       */
      fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
        otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false),
        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
        otherFakeDsScen);
      assertNotNull(fakeRd3);
      /*
       * Start fake RS (RS 1) connected to RS
       */
      fakeRs1 = createFakeReplicationServer(FRS1_ID, DEFAULT_GID, RS1_ID,
        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1,
        new ServerState(), REPLY_OK_RS_SCENARIO);
      assertNotNull(fakeRs1);
      /*
       * Start another fake RS (RS 2) connected to RS
       */
      fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID,
        otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false),
        AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen);
      assertNotNull(fakeRs2);
      // Wait for connections to be established
      waitForStableTopo(fakeRd1, 2, 3);
      /*
       * Send an assured safe read update
       */
      long startTime = System.currentTimeMillis();
      try
      {
        fakeRd1.sendNewFakeUpdate();
      } catch (TimeoutException e)
      {
        fail("No timeout is expected here");
      }
      long sendUpdateTime = System.currentTimeMillis() - startTime;
      // Compute some thing that will help determine what to check according to
      // the current test configurarion: compute if DS and RS subject to conf
      // change are elligible and expected for safe read assured
      // elligible: the server should receive the ack request
      // expected: the server should send back an ack (with or without error)
      boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId);
      boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId);
      boolean dsIsExpected = false;
      boolean rsIsExpected = false;
      // Booleans to tell if we expect to see the timeout, wrong status and replay error flags
      boolean shouldSeeTimeout = false;
      boolean shouldSeeWrongStatus = false;
      boolean shouldSeeReplayError = false;
      // Booleans to tell if we expect to see the ds, rs and virtual ds connected to fake rs in server id error list
      boolean shouldSeeDsIdInError = false;
      boolean shouldSeeRsIdInError = false;
      boolean shouldSeeDsRsIdInError = false;
      if (dsIsEligible)
      {
        switch (otherFakeDsScen)
        {
          case REPLY_OK_DS_SCENARIO:
            dsIsExpected = true;
            break;
          case TIMEOUT_DS_SCENARIO:
            shouldSeeDsIdInError = true;
            shouldSeeTimeout = true;
            break;
          case REPLAY_ERROR_DS_SCENARIO:
            shouldSeeDsIdInError = true;
            shouldSeeReplayError = true;
            break;
          default:
            fail("No other scenario should be used here");
        }
      }
      if (rsIsEligible)
      {
        switch (otherFakeRsScen)
        {
          case REPLY_OK_RS_SCENARIO:
            rsIsExpected = true;
            break;
          case TIMEOUT_RS_SCENARIO:
            shouldSeeRsIdInError = true;
            shouldSeeTimeout = true;
            break;
          case DS_TIMEOUT_RS_SCENARIO_SAFE_READ:
            shouldSeeDsRsIdInError = true;
            shouldSeeTimeout = true;
            break;
          case DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ:
            shouldSeeDsRsIdInError = true;
            shouldSeeReplayError = true;
            break;
          case DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ:
            shouldSeeDsRsIdInError = true;
            shouldSeeWrongStatus = true;
            break;
          default:
            fail("No other scenario should be used here");
        }
      }
      if (!shouldSeeTimeout)
      {
        // Call time should have been short
        assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
      } else // Timeout
      {
        if (shouldSeeDsRsIdInError) // Virtual DS timeout
        {
          // Should have timed out
          assertTrue((MAX_SEND_UPDATE_TIME <= sendUpdateTime) && (sendUpdateTime <=
            LONG_TIMEOUT));
        } else // Normal rimeout case
        {
          // Should have timed out
          assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
            LONG_TIMEOUT));
        }
      }
      // Sleep a while as counters are updated just after sending thread is unblocked
      sleep(500);
      // Check monitoring values in DS 1
      //
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
      if (( (otherFakeDsGid == DEFAULT_GID) && (otherFakeDsGenId == DEFAULT_GENID) && (otherFakeDsScen != REPLY_OK_DS_SCENARIO) )
         || ( (otherFakeRsGid == DEFAULT_GID) && (otherFakeRsGenId == DEFAULT_GENID) && (otherFakeRsScen != REPLY_OK_RS_SCENARIO) ))
      {
        assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
        assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 1);
      }
      else
      {
        assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
        assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
      }
      if (shouldSeeTimeout)
        assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 1);
      else
        assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
      if (shouldSeeWrongStatus)
        assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 1);
      else
        assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      if (shouldSeeReplayError)
        assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 1);
      else
        assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      // Check for servers in error list
      Map<Short, Integer> expectedErrors = new HashMap<Short, Integer>();
      if (shouldSeeDsIdInError)
        expectedErrors.put(FDS3_ID, 1);
      if (shouldSeeRsIdInError)
        expectedErrors.put(FRS2_ID, 1);
      if (shouldSeeDsRsIdInError)
        expectedErrors.put(DS_FRS2_ID, 1);
      checkServerErrorListsAreEqual(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates(), expectedErrors);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
      // Check monitoring values in DS 2
      //
      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 1);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 1);
      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
      // Check monitoring values in DS 3
      //
      assertEquals(fakeRd3.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrReplayErrorUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
      if (dsIsEligible)
      {
        assertEquals(fakeRd3.getReceivedAssuredSrUpdates(), 1);
        if (dsIsExpected)
        {
          assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 1);
          assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
        } else
        {
          if (shouldSeeReplayError && (otherFakeDsScen == REPLAY_ERROR_DS_SCENARIO))
          {
            // Replay error for the other DS
            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 1);
          } else
          {
            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
          }
        }
      }
      else
      {
        assertEquals(fakeRd3.getReceivedAssuredSrUpdates(), 0);
        assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
        assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
      }
      // Sanity check
      //
      assertEquals(fakeRd1.nReceivedUpdates(), 0);
      assertTrue(fakeRd1.receivedUpdatesOk());
      assertEquals(fakeRd2.nReceivedUpdates(), 1);
      assertTrue(fakeRd2.receivedUpdatesOk());
      if (otherFakeDsGenId == DEFAULT_GENID)
        assertEquals(fakeRd3.nReceivedUpdates(), 1);
      else
        assertEquals(fakeRd3.nReceivedUpdates(), 0);
      assertTrue(fakeRd3.receivedUpdatesOk());
      assertEquals(fakeRs1.nReceivedUpdates(), 1);
      assertTrue(fakeRs1.receivedUpdatesOk());
      if (otherFakeRsGenId == DEFAULT_GENID)
        assertEquals(fakeRs2.nReceivedUpdates(), 1);
      else
        assertEquals(fakeRs2.nReceivedUpdates(), 0);
      assertTrue(fakeRs2.receivedUpdatesOk());
    } finally
    {
      endTest();
    }
  }
  /**
   * Check that the passed server error lists are equivalent
   */
  private void checkServerErrorListsAreEqual(Map<Short, Integer> list1, Map<Short, Integer> list2)
  {
    assertNotNull(list1);
    assertNotNull(list2);
    assertEquals(list1.size(), list2.size());
    for (Short s : list1.keySet())
    {
      assertEquals(list1.get(s), list2.get(s));
    }
  }
}