From 76b8bb967a0e20ef38750dbffd893baa117c1f34 Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Fri, 09 Jan 2009 10:06:40 +0000
Subject: [PATCH] - Assured replication (Safe Read) bug fixes - Some assured replication (Safe Read) unit tests

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |   57 ++
 opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java                             |   25 +
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java |  914 ++++++++++++++++++++++++++++++++++++++++++++----
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                              |   79 ++-
 opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |   28 +
 5 files changed, 985 insertions(+), 118 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index df6b067..6d82905 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2006-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
@@ -329,7 +329,7 @@
     /**
      * The update message equivalent to the originally received update message,
      * but with assured flag disabled. This message is the one that should be
-     * sent to non elligible servers for assured mode.
+     * sent to non eligible servers for assured mode.
      * We need a clone like of the original message with assured flag off, to be
      * posted to servers we don't want to wait the ack from (not normal status
      * servers or servers with different group id). This must be done because
@@ -479,7 +479,7 @@
        * The list of servers identified as servers we are interested in
        * receiving acks from. If this list is not null, then expectedAcksInfo
        * should be not null.
-       * Servers that are not in this list are servers not elligible for an ack
+       * Servers that are not in this list are servers not eligible for an ack
        * request.
        *
        */
@@ -496,7 +496,7 @@
    * Process a just received assured update message in Safe Read mode. If the
    * ack can be sent immediately, it is done here. This will also determine to
    * which suitable servers an ack should be requested from, and which ones are
-   * not elligible for an ack request.
+   * not eligible for an ack request.
    * This method is an helper method for the put method. Have a look at the put
    * method for a better understanding.
    * @param update The just received assured update to process.
@@ -516,12 +516,12 @@
     List<Short> expectedServers = new ArrayList<Short>();
     List<Short> wrongStatusServers = new ArrayList<Short>();
 
-    if (sourceGroupId != groupId)
+    if (sourceGroupId == groupId)
       // Assured feature does not cross different group ids
     {
       if (sourceHandler.isLDAPserver())
       {
-        // Look for RS elligible for assured
+        // Look for RS eligible for assured
         for (ServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
@@ -537,7 +537,7 @@
         }
       }
 
-      // Look for DS elligible for assured
+      // Look for DS eligible for assured
       for (ServerHandler handler : directoryServers.values())
       {
         // Don't forward the change to the server that just sent it
@@ -560,10 +560,10 @@
               wrongStatusServers.add(handler.getServerId());
             } else
             {
-              /*
+              /**
                * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
                * We do not want this to be reported as an error to the update
-               * maker -> no pollution or potential missunderstanding when
+               * maker -> no pollution or potential misunderstanding when
                * reading logs or monitoring and it was just administration (for
                * instance new server is being configured in topo: it goes in bad
                * gen then then full full update).
@@ -586,7 +586,7 @@
 
     if (preparedAssuredInfo.expectedServers == null)
     {
-      // No elligible servers found, send the ack immediatly
+      // No eligible servers found, send the ack immediately
       AckMsg ack = new AckMsg(cn);
       sourceHandler.sendAck(ack);
     }
@@ -598,7 +598,7 @@
    * Process a just received assured update message in Safe Data mode. If the
    * ack can be sent immediately, it is done here. This will also determine to
    * which suitable servers an ack should be requested from, and which ones are
-   * not elligible for an ack request.
+   * not eligible for an ack request.
    * This method is an helper method for the put method. Have a look at the put
    * method for a better understanding.
    * @param update The just received assured update to process.
@@ -637,20 +637,24 @@
         {
           if (safeDataLevel == (byte) 1)
           {
-            // Immediatly return the ack for an assured message in safe data
-            // mode with safe data level 1, coming from a DS. No need to wait
-            // for more acks
+            /**
+             * Immediately return the ack for an assured message in safe data
+             * mode with safe data level 1, coming from a DS. No need to wait
+             * for more acks
+             */
             AckMsg ack = new AckMsg(cn);
             sourceHandler.sendAck(ack);
           } else
           {
             if (safeDataLevel != (byte) 0)
             {
-              // level > 1 : We need further acks
-              // The message will be posted in assured mode to elligible
-              // servers. The embedded safe data level is not changed, and his
-              // value will be used by a remote RS to determine if he must send
-              // an ack (level > 1) or not (level = 1)
+              /**
+               * level > 1 : We need further acks
+               * The message will be posted in assured mode to eligible
+               * servers. The embedded safe data level is not changed, and his
+               * value will be used by a remote RS to determine if he must send
+               * an ack (level > 1) or not (level = 1)
+               */
               interestedInAcks = true;
             } else
             {
@@ -661,12 +665,14 @@
         { // A RS sent us the safe data message, for sure no futher acks to wait
           if (safeDataLevel == (byte) 1)
           {
-            // The original level was 1 so the RS that sent us this message
-            // should have already sent his ack to the sender DS. Level 1 has
-            // already been reached so no further acks to wait.
-            // This should not happen in theory as the sender RS server should
-            // have sent us a matching not assured message so we should not come
-            // to here.
+            /**
+             * The original level was 1 so the RS that sent us this message
+             * should have already sent his ack to the sender DS. Level 1 has
+             * already been reached so no further acks to wait.
+             * This should not happen in theory as the sender RS server should
+             * have sent us a matching not assured message so we should not come
+             * to here.
+             */
           } else
           {
             // level > 1, so Ack this message to originator RS
@@ -682,7 +688,7 @@
     {
       if (sourceHandler.isLDAPserver())
       {
-        // Look for RS elligible for assured
+        // Look for RS eligible for assured
         for (ServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
@@ -709,7 +715,7 @@
         // Some other acks to wait for
         int sdl = update.getSafeDataLevel();
         int neededAdditionalServers = sdl - 1;
-        // Change the number of expected acks if not enough available elligible
+        // Change the number of expected acks if not enough available eligible
         // servers: the level is a best effort thing, we do not want to timeout
         // at every assured SD update for instance if a RS has had his gen id
         // resetted
@@ -721,8 +727,8 @@
         preparedAssuredInfo.expectedServers = expectedServers;
       } else
       {
-        // level > 1 and source is a DS but no elligible servers found, send the
-        // ack immediatly
+        // level > 1 and source is a DS but no eligible servers found, send the
+        // ack immediately
         AckMsg ack = new AckMsg(cn);
         sourceHandler.sendAck(ack);
       }
@@ -755,8 +761,11 @@
           // remove object from the map
           return;
         }
-        // If this is the last ack we were waiting from, immediatly create and
-        // send the final ack to the original server
+        /**
+         *
+         * If this is the last ack we were waiting from, immediately create and
+         * send the final ack to the original server
+         */
         if (expectedAcksInfo.processReceivedAck(ackingServer, ack))
         {
           // Remove the object from the map as no more needed
@@ -768,7 +777,7 @@
             origServer.sendAck(finalAck);
           } catch (IOException e)
           {
-            /*
+            /**
              * An error happened trying the send back an ack to the server.
              * Log an error and close the connection to this server.
              */
@@ -794,7 +803,7 @@
 
   /**
    * The code run when the timeout occurs while waiting for acks of the
-   * elligible servers. This basically sends a timeout ack (with any additional
+   * eligible servers. This basically sends a timeout ack (with any additional
    * error info) to the original server that sent an assured update message.
    */
   private class AssuredTimeoutTask extends TimerTask
@@ -846,7 +855,7 @@
             origServer.sendAck(finalAck);
           } catch (IOException e)
           {
-            /*
+            /**
              * An error happened trying the send back an ack to the server.
              * Log an error and close the connection to this server.
              */
@@ -2408,7 +2417,7 @@
 
   /**
    * Set the purge delay on all the db Handlers for this Domain
-   * of Replicaiton.
+   * of Replication.
    *
    * @param delay The new purge delay to use.
    */
diff --git a/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java b/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
index e5cb702..f61b938 100644
--- a/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
+++ b/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
@@ -22,15 +22,17 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 
 package org.opends.server.replication.server;
 
+import java.util.ArrayList;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
 import java.util.List;
+import java.util.Set;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.AckMsg;
@@ -60,7 +62,7 @@
   // The list of server ids that had errors for the sent matching update
   // Each server id of the list had one of the
   // 3 possible errors (timeout, wrong status or replay error)
-  private List<Short> failedServers = null;
+  private List<Short> failedServers = new ArrayList<Short>();
 
   /**
    * Number of servers we want an ack from and from which we received the ack.
@@ -210,12 +212,27 @@
     ack.setHasTimeout(hasTimeout);
     ack.setHasWrongStatus(hasWrongStatus);
     ack.setHasReplayError(hasReplayError);
-    ack.setFailedServers(failedServers);
 
-    // Force anyway timeout flag if requested
     if (timeout)
+    {
+      // Force anyway timeout flag if requested
       ack.setHasTimeout(true);
 
+      // Add servers that did not respond in time
+      Set<Short> serverIds = expectedServersAckStatus.keySet();
+      for (Short serverId : serverIds)
+      {
+        boolean ackReceived = expectedServersAckStatus.get(serverId);
+        if (!ackReceived)
+        {
+          if (!failedServers.contains(serverId))
+            failedServers.add(serverId);
+        }
+      }
+    }
+
+    ack.setFailedServers(failedServers);
+
     return ack;
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 1f54b7f..5c5fef1 100644
--- a/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -331,6 +331,29 @@
   }
 
   /**
+   * Creates a ReplicationDomain with the provided parameters.
+   * (for unit test purpose only)
+   *
+   * @param serviceID  The identifier of the Replication Domain to which
+   *                   this object is participating.
+   * @param serverID   The identifier of the server that is participating
+   *                   to the Replication Domain.
+   *                   This identifier should be different for each server that
+   *                   is participating to a given Replication Domain.
+   * @param serverState The serverState to use
+   */
+  public ReplicationDomain(String serviceID, short serverID,
+    ServerState serverState)
+  {
+    this.serviceID = serviceID;
+    this.serverID = serverID;
+    this.state = serverState;
+    this.generator = new ChangeNumberGenerator(serverID, state);
+
+    domains.put(serviceID, this);
+  }
+
+  /**
    * Set the initial status of the domain and perform necessary initializations.
    * This method will be called by the Broker each time the ReplicationBroker
    * establish a new session to a Replication Server.
@@ -773,8 +796,9 @@
     }
 
     numRcvdUpdates.incrementAndGet();
-    if (update.isAssured() && (update.getAssuredMode() ==
-      AssuredMode.SAFE_READ_MODE))
+     byte rsGroupId = broker.getRsGroupId();
+    if ( update.isAssured() && (update.getAssuredMode() ==
+      AssuredMode.SAFE_READ_MODE) && (rsGroupId == groupId) )
     {
       receivedAssuredSrUpdates.incrementAndGet();
     }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index 87284b3..033dbcf 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2008 Sun Microsystems, Inc.
+ *      Copyright 2008-2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
@@ -935,6 +935,9 @@
         assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
         Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
         assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
@@ -961,6 +964,9 @@
         assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
         Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
         assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1032,6 +1038,9 @@
         Integer nError = errorsByServer.get((short)RS_SERVER_ID);
         assertNotNull(nError);
         assertEquals(nError.intValue(), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1054,6 +1063,9 @@
         assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
         Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
         assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
         assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1208,6 +1220,9 @@
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
       Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
       assertTrue(errorsByServer.isEmpty());
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 1);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1269,6 +1284,9 @@
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
       Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
       assertTrue(errorsByServer.isEmpty());
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1389,6 +1407,25 @@
         assertFalse(ackMsg.hasReplayError());
         assertFalse(ackMsg.hasWrongStatus());
         assertEquals(ackMsg.getFailedServers().size(), 0);
+
+        // Check for monitoring data
+        DN baseDn = DN.decode(SAFE_READ_DN);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+        assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+        assertTrue(errorsByServer.isEmpty());
       } catch (SocketTimeoutException e)
       {
         // Expected
@@ -1523,6 +1560,9 @@
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
       Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
       assertTrue(errorsByServer.isEmpty());
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
@@ -1558,6 +1598,9 @@
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
       errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
       assertTrue(errorsByServer.isEmpty());
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 2);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 2);
@@ -1595,6 +1638,9 @@
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
       errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
       assertTrue(errorsByServer.isEmpty());
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 3);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 3);
@@ -1676,6 +1722,9 @@
       nError = errorsByServer.get((short)20);
       assertNotNull(nError);
       assertEquals(nError.intValue(), 1);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1718,6 +1767,9 @@
       nError = errorsByServer.get((short)30);
       assertNotNull(nError);
       assertEquals(nError.intValue(), 1);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
@@ -1760,6 +1812,9 @@
       nError = errorsByServer.get((short)RS_SERVER_ID);
       assertNotNull(nError);
       assertEquals(nError.intValue(), 1);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-acked"), 0);
+      assertEquals(getMonitorAttrValue(baseDn, "received-assured-sr-updates-not-acked"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
index e049d81..e0d4512 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -34,6 +34,7 @@
 import java.net.Socket;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -97,15 +98,20 @@
   private static final short FDS1_ID = 1;
   private static final short FDS2_ID = 2;
   private static final short FDS3_ID = 3;
+  private static final short FDS4_ID = 4;
+  private static final short FDS5_ID = 5;
   private static final short FRS1_ID = 11;
   private static final short FRS2_ID = 12;
   private static final short FRS3_ID = 13;
+  private static final short DS_FRS2_ID = FRS2_ID + 10;
   private static final short RS1_ID = 101;
   private static final short RS2_ID = 102;
   private static final short RS3_ID = 103;
   private FakeReplicationDomain fakeRd1 = null;
   private FakeReplicationDomain fakeRd2 = null;
   private FakeReplicationDomain fakeRd3 = null;
+  private FakeReplicationDomain fakeRd4 = null;
+  private FakeReplicationDomain fakeRd5 = null;
   private FakeReplicationServer fakeRs1 = null;
   private FakeReplicationServer fakeRs2 = null;
   private FakeReplicationServer fakeRs3 = null;
@@ -138,7 +144,7 @@
    */
   // DS receives updates and replies acks with no errors to every updates
   private static final int REPLY_OK_DS_SCENARIO = 1;
-  // DS receives acks but does not respond (makes timeouts)
+  // DS receives updates but does not respond (makes timeouts)
   private static final int TIMEOUT_DS_SCENARIO = 2;
   // DS receives updates and replies ack with replay error flags
   private static final int REPLAY_ERROR_DS_SCENARIO = 3;
@@ -148,10 +154,17 @@
    */
   // RS receives updates and replies acks with no errors to every updates
   private static final int REPLY_OK_RS_SCENARIO = 11;
-  // RS receives acks but does not respond (makes timeouts)
+  // RS receives updates but does not respond (makes timeouts)
   private static final int TIMEOUT_RS_SCENARIO = 12;
   // RS is used for sending updates (with sendNewFakeUpdate()) and receive acks, synchronously
-  private static final int SENDER_RS_SCENARIO = 13;
+  private static final int SENDER_RS_SCENARIO = 13;  
+  //   Scenarios only used in safe read tests:
+  // RS receives updates and replies ack error as if a DS was connected to it and timed out
+  private static final int DS_TIMEOUT_RS_SCENARIO_SAFE_READ = 14;
+  // RS receives updates and replies ack error as if a DS was connected to it and was wrong status
+  private static final int DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ = 15;
+  // RS receives updates and replies ack error as if a DS was connected to it and had a replay error
+  private static final int DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ = 16;
 
   private void debugInfo(String s)
   {
@@ -188,6 +201,8 @@
     fakeRd1 = null;
     fakeRd2 = null;
     fakeRd3 = null;
+    fakeRd4 = null;
+    fakeRd5 = null;
     fakeRs1 = null;
     fakeRs2 = null;
     fakeRs3 = null;
@@ -218,6 +233,18 @@
       fakeRd3 = null;
     }
 
+    if (fakeRd4 != null)
+    {
+      fakeRd4.disableService();
+      fakeRd4 = null;
+    }
+
+    if (fakeRd5 != null)
+    {
+      fakeRd5.disableService();
+      fakeRd5 = null;
+    }
+
     // Shutdown fake RSs
 
     if (fakeRs1 != null)
@@ -260,6 +287,18 @@
       rs3 = null;
     }
   }
+  /**
+   * Creates and connects a new fake replication domain, using the passed scenario
+   * (no server state constructor version)
+   */
+  private FakeReplicationDomain createFakeReplicationDomain(short serverId,
+    int groupId, short rsId, long generationId, boolean assured,
+    AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
+    int scenario)
+  {
+    return createFakeReplicationDomain(serverId, groupId, rsId, generationId, assured,
+      assuredMode, safeDataLevel, assuredTimeout, scenario, new ServerState());
+  }
 
   /**
    * Creates and connects a new fake replication domain, using the passed scenario.
@@ -267,7 +306,7 @@
   private FakeReplicationDomain createFakeReplicationDomain(short serverId,
     int groupId, short rsId, long generationId, boolean assured,
     AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
-    int scenario)
+    int scenario, ServerState serverState)
   {
     try
     {
@@ -290,7 +329,8 @@
 
       FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
         TEST_ROOT_DN_STRING, serverId, "localhost:" + rsPort, generationId,
-        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, scenario);
+        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout,
+        scenario, serverState);
 
       // Test connection
       assertTrue(fakeReplicationDomain.isConnected());
@@ -323,7 +363,7 @@
    */
   private FakeReplicationServer createFakeReplicationServer(short serverId,
     int groupId, short rsId, long generationId, boolean assured,
-    AssuredMode assuredMode, int safeDataLevel, int scenario)
+    AssuredMode assuredMode, int safeDataLevel, ServerState serverState, int scenario)
   {
     try
     {
@@ -349,7 +389,7 @@
         TEST_ROOT_DN_STRING, generationId);
 
       // Connect fake RS to the real RS
-      assertTrue(fakeReplicationServer.connect());
+      assertTrue(fakeReplicationServer.connect(serverState));
 
       // Start wished scenario
       fakeReplicationServer.start(scenario);
@@ -375,7 +415,7 @@
       if (serverId == RS1_ID)
       {
         port = rs1Port;
-        if (testCase.equals("testSafeDataManyRealRSs"))
+        if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
         {
           // Every 3 RSs connected together
           replServers.add("localhost:" + rs2Port);
@@ -387,7 +427,7 @@
       } else if (serverId == RS2_ID)
       {
         port = rs2Port;
-        if (testCase.equals("testSafeDataManyRealRSs"))
+        if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
         {
           // Every 3 RSs connected together
           replServers.add("localhost:" + rs1Port);
@@ -399,7 +439,7 @@
       } else if (serverId == RS3_ID)
       {
         port = rs3Port;
-        if (testCase.equals("testSafeDataManyRealRSs"))
+        if (testCase.equals("testSafeDataManyRealRSs") || testCase.equals("testSafeReadOneManyRSsAndDSs"))
         {
           // Every 3 RSs connected together
           replServers.add("localhost:" + rs1Port);
@@ -448,6 +488,8 @@
     // Number of received updates
     private int nReceivedUpdates = 0;
 
+    private boolean sameGidAsRs = true;
+
     /**
      * Creates a fake replication domain (DS)
      * @param serviceID The base dn used at connection to RS
@@ -460,7 +502,7 @@
      * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
      * @param assuredTimeout the assured timeout used when sending updates
      * @param scenario the scenario we are creating for (implies particular
-     * behaviour upon reception of updates)
+     * behavior upon reception of updates)
      * @throws org.opends.server.config.ConfigException
      */
     public FakeReplicationDomain(
@@ -473,9 +515,10 @@
       AssuredMode assuredMode,
       byte safeDataLevel,
       long assuredTimeout,
-      int scenario) throws ConfigException
+      int scenario,
+      ServerState serverState) throws ConfigException
     {
-      super(serviceID, serverID);
+      super(serviceID, serverID, serverState);
       List<String> replicationServers = new ArrayList<String>();
       replicationServers.add(replicationServer);
       this.generationId = generationId;
@@ -544,42 +587,40 @@
     @Override
     public boolean processUpdate(UpdateMsg updateMsg)
     {
-      try
-      {
-        checkUpdateAssuredParameters(updateMsg);
-        nReceivedUpdates++;
 
-        // Now execute the requested scenario
-        AckMsg ackMsg = null;
-        switch (scenario)
-        {
-          case REPLY_OK_DS_SCENARIO:
-            // Send the ack without errors
-            ackMsg = new AckMsg(updateMsg.getChangeNumber());
-            session.publish(ackMsg);
-            break;
-          case TIMEOUT_DS_SCENARIO:
-            // Let timeout occur
-            break;
-          case REPLAY_ERROR_DS_SCENARIO:
-            // Send the ack with replay error
-            ackMsg = new AckMsg(updateMsg.getChangeNumber());
-            ackMsg.setHasReplayError(true);
-            List<Short> failedServers = new ArrayList<Short>();
-            failedServers.add(getServerId());
-            ackMsg.setFailedServers(failedServers);
-            session.publish(ackMsg);
-            break;
-          default:
-            fail("Unknown scenario: " + scenario);
-        }
-        return true;
-      } catch (IOException ex)
+      checkUpdateAssuredParameters(updateMsg);
+      nReceivedUpdates++;
+
+      // Now execute the requested scenario
+      switch (scenario)
       {
-        fail("IOException in fake replication domain " + getServerId() + " :" +
-          ex.getMessage());
-        return false;
+        case REPLY_OK_DS_SCENARIO:
+          // Send the ack without errors
+          // Call processUpdateDone and update the server state is what needs to
+          // be done when using asynchronous process update mechanism
+          // (see processUpdate javadoc)
+          processUpdateDone(updateMsg, null);
+          getServerState().update(updateMsg.getChangeNumber());
+          break;
+        case TIMEOUT_DS_SCENARIO:
+          // Let timeout occur
+          break;
+        case REPLAY_ERROR_DS_SCENARIO:
+          // Send the ack with replay error
+          // Call processUpdateDone and update the server state is what needs to
+          // be done when using asynchronous process update mechanism
+          // (see processUpdate javadoc)
+          processUpdateDone(updateMsg, "This is the replay error message generated from fake DS " +
+            getServerId() + " for update with change number " + updateMsg.
+            getChangeNumber());
+          getServerState().update(updateMsg.getChangeNumber());
+          break;
+        default:
+          fail("Unknown scenario: " + scenario);
       }
+      // IMPORTANT: return false so that we use the asynchronous processUpdate mechanism
+      // (see processUpdate javadoc)
+      return false;
     }
 
     /**
@@ -663,7 +704,7 @@
     // Number of received updates
     private int nReceivedUpdates = 0;
 
-    // True is an ack has been replied to a received assured update (in assured mode of course)
+    // True if an ack has been replied to a received assured update (in assured mode of course)
     // used in reply scenario
     private boolean ackReplied = false;
 
@@ -741,7 +782,7 @@
      * Connect to RS
      * Returns true if connection was made successfully
      */
-    public boolean connect()
+    public boolean connect(ServerState serverState)
     {
       try
       {
@@ -761,7 +802,7 @@
 
         // Send our repl server start msg
         ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
-          fakeUrl, baseDn, 100, new ServerState(),
+          fakeUrl, baseDn, 100, serverState,
           ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
           groupId, 5000);
         session.publish(replServerStartMsg);
@@ -857,6 +898,47 @@
               case TIMEOUT_RS_SCENARIO:
                 // Let timeout occur
                 break;
+              case DS_TIMEOUT_RS_SCENARIO_SAFE_READ:
+                if (updateMsg.isAssured())
+                {
+                  // Emulate RS waiting for virtual DS ack
+                  sleep(MAX_SEND_UPDATE_TIME);
+                  // Send the ack with timeout error from a virtual DS with id (ours + 10)
+                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+                  ackMsg.setHasTimeout(true);
+                  List<Short> failedServers = new ArrayList<Short>();
+                  failedServers.add((short)(serverId + 10));
+                  ackMsg.setFailedServers(failedServers);
+                  session.publish(ackMsg);
+                  ackReplied = true;
+                }
+                break;
+              case DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ:
+                if (updateMsg.isAssured())
+                {
+                  // Send the ack with wrong status error from a virtual DS with id (ours + 10)
+                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+                  ackMsg.setHasWrongStatus(true);
+                  List<Short> failedServers = new ArrayList<Short>();
+                  failedServers.add((short)(serverId + 10));
+                  ackMsg.setFailedServers(failedServers);
+                  session.publish(ackMsg);
+                  ackReplied = true;
+                }
+                break;
+              case DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ:
+                if (updateMsg.isAssured())
+                {
+                  // Send the ack with replay error from a virtual DS with id (ours + 10)
+                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+                  ackMsg.setHasReplayError(true);
+                  List<Short> failedServers = new ArrayList<Short>();
+                  failedServers.add((short)(serverId + 10));
+                  ackMsg.setFailedServers(failedServers);
+                  session.publish(ackMsg);
+                  ackReplied = true;
+                }
+                break;
               default:
                 fail("Unknown scenario: " + scenario);
             }
@@ -910,7 +992,7 @@
     }
 
     /**
-     * Check that received update assured parameters are as defined at DS start
+     * Check that received update assured parameters are as defined at RS start
      */
     private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
     {
@@ -948,7 +1030,7 @@
     }
 
     /**
-     * Test if the last received updates was acknowledged
+     * Test if the last received updates was acknowledged (ack sent with or without errors)
      * WARNING: this must be called once per update as it also immediatly resets the status
      * for a new test for the next update
      * @return True if acknowledged
@@ -1100,7 +1182,7 @@
         // this would timeout. If main DS group id is not the same as the real RS one,
         // the update will even not come to real RS as asured
         fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
-          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT_RS_SCENARIO);
+          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, new ServerState(), TIMEOUT_RS_SCENARIO);
         assertNotNull(fakeRs1);
       }
 
@@ -1120,7 +1202,7 @@
       long sendUpdateTime = System.currentTimeMillis() - startTime;
       assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
 
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
       if (mainDsGid == DEFAULT_GID)
       {
         // Check monitoring values (check that ack has been correctly received)
@@ -1139,7 +1221,7 @@
       }
 
       // Sanity check
-      sleep(1000); // Let time to update to reach other servers
+      sleep(500); // Let time to update to reach other servers
       assertEquals(fakeRd1.nReceivedUpdates(), 0);
       assertTrue(fakeRd1.receivedUpdatesOk());
       if (otherFakeDS)
@@ -1435,24 +1517,24 @@
       // Put a fake RS 1 connected to real RS
       fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRs1Gid, RS1_ID,
         fakeRs1GenId, ((fakeRs1Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
-        fakeRs1Scen);
+        new ServerState(), fakeRs1Scen);
       assertNotNull(fakeRs1);
 
       // Put a fake RS 2 connected to real RS
       fakeRs2 = createFakeReplicationServer(FRS2_ID, fakeRs2Gid, RS1_ID,
         fakeRs2GenId, ((fakeRs2Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
-        fakeRs2Scen);
+        new ServerState(), fakeRs2Scen);
       assertNotNull(fakeRs2);
 
       // Put a fake RS 3 connected to real RS
       fakeRs3 = createFakeReplicationServer(FRS3_ID, fakeRs3Gid, RS1_ID,
         fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
-        fakeRs3Scen);
+        new ServerState(), fakeRs3Scen);
       assertNotNull(fakeRs3);
 
       // Wait for connections to be finished
       // DS must see expected numbers of fake DSs and RSs
-      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 4);
 
       /***********************************************************************
        * Send update from DS 1 (3 fake RSs available) and check what happened
@@ -1479,9 +1561,9 @@
       long sendUpdateTime = System.currentTimeMillis() - startTime;
 
       // Check
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
-      checkWhatHasBeenReceived(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
+      checkWhatHasBeenReceivedSafeData(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
 
       /***********************************************************************
        * Send update from DS 1 (2 fake RSs available) and check what happened
@@ -1493,7 +1575,7 @@
 
       // Wait for disconnection to be finished
       // DS must see expected numbers of fake DSs and RSs
-      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
 
       // Keep track of monitoring values for incremental test step
       acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1516,9 +1598,9 @@
       sendUpdateTime = System.currentTimeMillis() - startTime;
 
       // Check
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(2, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
-      checkWhatHasBeenReceived(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
+      checkWhatHasBeenReceivedSafeData(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
 
       /***********************************************************************
        * Send update from DS 1 (1 fake RS available) and check what happened
@@ -1530,7 +1612,7 @@
 
       // Wait for disconnection to be finished
       // DS must see expected numbers of fake DSs and RSs
-      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
 
       // Keep track of monitoring values for incremental test step
       acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1553,9 +1635,9 @@
       sendUpdateTime = System.currentTimeMillis() - startTime;
 
       // Check
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
-      checkWhatHasBeenReceived(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
+      checkWhatHasBeenReceivedSafeData(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
 
       /***********************************************************************
        * Send update from DS 1 (no fake RS available) and check what happened
@@ -1567,7 +1649,7 @@
 
       // Wait for disconnection to be finished
       // DS must see expected numbers of fake DSs and RSs
-      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 0);
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
 
       // Keep track of monitoring values for incremental test step
       acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
@@ -1590,9 +1672,9 @@
       sendUpdateTime = System.currentTimeMillis() - startTime;
 
       // Check
-      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
       checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
-      checkWhatHasBeenReceived(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
+      checkWhatHasBeenReceivedSafeData(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
     } finally
     {
       endTest();
@@ -1602,7 +1684,7 @@
   // Check that the DSs and the fake RSs of the topology have received/acked what is expected according to the
   // test step (the number of updates)
   // -1 for a gen id means no need to test the matching fake RS
-  private void checkWhatHasBeenReceived(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
+  private void checkWhatHasBeenReceivedSafeData(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
   {
 
     // We should not receive our own update
@@ -1805,7 +1887,7 @@
       rsInfo = fakeRd.getRsList();
       nDs = dsInfo.size();
       nRs = rsInfo.size();
-      if ( (nDs == expectedDs) && (nRs == (expectedRs+1)) ) // Must include real RS so '+1'
+      if ( (nDs == expectedDs) && (nRs == expectedRs) ) // Must include real RS so '+1'
       {
         debugInfo("waitForStableTopo: expected topo obtained after " + (30-nSec) + " second(s).");
         return;
@@ -1817,7 +1899,7 @@
       " DSs (had " + dsInfo +") and " + expectedRs + " RSs (had " + rsInfo +").");
   }
 
-  // Compute the list of servers that are elligible for receiving an assured update
+  // Compute the list of servers that are elligible for receiving a safe data assured update
   // according to their group id and generation id. If -1 is used, the server is out of scope
   private List<Short> computeElligibleServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs2Gid, long fakeRs2GenId, int fakeRs3Gid, long fakeRs3GenId)
   {
@@ -1847,7 +1929,7 @@
     return false;
   }
 
-  // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+  // Compute the list of servers that are elligible for receiving a safe data assured update and that are expected to effectively ack the update
   // If -1 is used, the server is out of scope
   private List<Short> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
   {
@@ -1946,7 +2028,7 @@
       // Put a fake RS 2 connected to real RS
       fakeRs2 = createFakeReplicationServer(FRS2_ID, DEFAULT_GID, RS1_ID,
         DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 10,
-        TIMEOUT_RS_SCENARIO);
+        new ServerState(), TIMEOUT_RS_SCENARIO);
       assertNotNull(fakeRs2);
 
       /*
@@ -1956,7 +2038,7 @@
       // Put a fake RS 1 connected to real RS
       fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
         fakeRsGenId, sendInAssured, AssuredMode.SAFE_DATA_MODE, sdLevel,
-        SENDER_RS_SCENARIO);
+        new ServerState(), SENDER_RS_SCENARIO);
       assertNotNull(fakeRs1);
 
       /*
@@ -2068,7 +2150,7 @@
 
       // Wait for RSs connections to be finished
       // DS must see expected numbers of RSs
-      waitForStableTopo(fakeRd1, 0, 2);
+      waitForStableTopo(fakeRd1, 0, 3);
 
       /*
        * Send update from DS 1 and check result
@@ -2088,6 +2170,7 @@
       assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
 
       // Check monitoring values (check that ack has been correctly received)
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
       assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
       assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
       assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
@@ -2097,5 +2180,684 @@
       endTest();
     }
   }
+
+  /**
+   * Test safe read mode with only one real RS deployment. One fake DS sends
+   * assured messages to one other fake DS connected to the RS a fake RS
+   * connected to the real RS is also expected to send the ack
+   */
+  @Test(enabled = true)
+  public void testSafeReadOneRSBasic() throws Exception
+  {
+    String testCase = "testSafeReadOneRSBasic";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    try
+    {
+      /*******************
+       * Start real RS (the one to be tested)
+       */
+
+      // Create real RS 1
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs1);
+
+      /*******************
+       * Start main DS 1 (the one which sends updates)
+       */
+
+      // Create and connect DS 1 to RS 1
+      // Assured mode: SR
+      fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+        TIMEOUT_DS_SCENARIO);
+      assertNotNull(fakeRd1);
+
+      /*
+       * Send a first assured safe read update
+       */
+
+      long startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check call time (should be short as RS should have acked)
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      // Check monitoring values (check that ack has been correctly received)
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
+      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
+      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+         // Sanity check
+      sleep(500); // Let time to update to reach servers
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+
+      /*******************
+       * Start another fake DS 2 connected to RS
+       */
+
+      // Create and connect DS 2 to RS 1
+      // Assured mode: SR
+      ServerState serverState = fakeRd1.getServerState();
+      fakeRd2 = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+        REPLY_OK_DS_SCENARIO, serverState);
+      assertNotNull(fakeRd2);
+
+      // Wait for connections to be established
+      waitForStableTopo(fakeRd1, 1, 1);
+
+      /*
+       * Send a second assured safe read update
+       */
+
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check call time (should be short as RS should have acked)
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      // Check monitoring values (check that ack has been correctly received)
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 2);
+      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 2);
+      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 1);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 1);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      // Sanity check
+      sleep(500); // Let time to update to reach servers
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+
+      assertEquals(fakeRd2.nReceivedUpdates(), 1);
+      assertTrue(fakeRd2.receivedUpdatesOk());
+
+      /*******************
+       * Start a fake RS 1 connected to RS
+       */
+
+      fakeRs1 = createFakeReplicationServer(FRS1_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1,
+        fakeRd1.getServerState(), REPLY_OK_RS_SCENARIO);
+      assertNotNull(fakeRs1);
+
+      // Wait for connections to be established
+      waitForStableTopo(fakeRd1, 1, 2);
+
+      /*
+       * Send a third assured safe read update
+       */
+
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check call time (should be short as RS should have acked)
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      // Check monitoring values (check that ack has been correctly received)
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 3);
+      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 3);
+      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 2);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 2);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      // Sanity check
+      sleep(500); // Let time to update to reach servers
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+
+      assertEquals(fakeRd2.nReceivedUpdates(), 2);
+      assertTrue(fakeRd2.receivedUpdatesOk());
+
+      assertEquals(fakeRs1.nReceivedUpdates(), 1);
+      assertTrue(fakeRs1.receivedUpdatesOk());
+
+      /*******************
+       * Shutdown fake DS 2
+       */
+
+      // Shutdown fake DS 2
+      fakeRd2.disableService();
+      fakeRd2 = null;
+
+      // Wait for disconnection to be finished
+      waitForStableTopo(fakeRd1, 0, 2);
+
+      /*
+       * Send a fourth assured safe read update
+       */
+
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check call time (should be short as RS should have acked)
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      // Check monitoring values (check that ack has been correctly received)
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
+      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 4);
+      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      // Sanity check
+      sleep(500); // Let time to update to reach servers
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+
+      assertEquals(fakeRs1.nReceivedUpdates(), 2);
+      assertTrue(fakeRs1.receivedUpdatesOk());
+
+      /*******************
+       * Shutdown fake RS 1
+       */
+
+      // Shutdown fake RS 1
+      fakeRs1.shutdown();
+      fakeRs1 = null;
+
+      // Wait for disconnection to be finished
+      waitForStableTopo(fakeRd1, 0, 1);
+
+      /*
+       * Send a fifth assured safe read update
+       */
+
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check call time (should be short as RS should have acked)
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      // Check monitoring values (check that ack has been correctly received)
+      sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
+      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
+      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 5);
+      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      // Sanity check
+      sleep(500); // Let time to update to reach servers
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+    } finally
+    {
+      endTest();
+    }
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeReadOneRSComplexPrecommit test
+   */
+  @DataProvider(name = "testSafeReadOneRSComplexPrecommitProvider")
+  private Object[][] testSafeReadOneRSComplexPrecommitProvider()
+  {
+    return new Object[][]
+    {
+      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      {DEFAULT_GID, DEFAULT_GENID, TIMEOUT_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      {DEFAULT_GID, DEFAULT_GENID, REPLAY_ERROR_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
+      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_TIMEOUT_RS_SCENARIO_SAFE_READ},
+      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ},
+      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ},
+      {OTHER_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      {DEFAULT_GID, DEFAULT_GENID, REPLY_OK_DS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
+    };
+  }
+
+  /**
+   * See testSafeReadOneRSComplex comment.
+   */
+  @Test(dataProvider = "testSafeReadOneRSComplexPrecommitProvider", enabled = true)
+  public void testSafeReadOneRSComplexPrecommit(int otherFakeDsGid, long otherFakeDsGenId, int otherFakeDsScen,
+    int otherFakeRsGid, long otherFakeRsGenId, int otherFakeRsScen) throws Exception
+  {
+    testSafeReadOneRSComplex(otherFakeDsGid, otherFakeDsGenId, otherFakeDsScen,
+    otherFakeRsGid, otherFakeRsGenId, otherFakeRsScen);
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeReadOneRSComplex test
+   */
+  @DataProvider(name = "testSafeReadOneRSComplexProvider")
+  private Object[][] testSafeReadOneRSComplexProvider()
+  {
+    List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+
+    // Other additional DS group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Other additional DS generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Other additional DS scenario
+    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_DS_SCENARIO, TIMEOUT_DS_SCENARIO, REPLAY_ERROR_DS_SCENARIO);
+    // Other additional RS group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Other additional RS generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Other additional RS scenario
+    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO, DS_TIMEOUT_RS_SCENARIO_SAFE_READ, DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ, DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ);
+
+    Object[][] result = new Object[objectArrayList.size()][];
+    int i = 0;
+    for (List<Object> objectArray : objectArrayList)
+    {
+      result[i] = objectArray.toArray();
+      i++;
+    }
+    return result;
+  }
+
+  /**
+   * Test safe read mode with only one real RS deployment.
+   * Test that the RS is able to acknowledge SR updates with level higher than 1
+   * and also to return errors is some errors occur.
+   * - 1 main fake DS connected to the RS
+   * - 1 other fake DS connected to the RS, with same GID as RS and same GENID as RS and always acking without error
+   * - 1 other fake DS connected to the RS, with GID, GENID, scenario...changed through the provider
+   * - 1 fake RS connected to the RS (emulating one fake DS connected to it), with same GID as RS and always acking without error
+   * - 1 other fake RS connected to the RS (emulating one fake DS connected to it), with GID scenario...changed through the provider
+   *
+   * All possible combinations tested thanks to the provider.
+   */
+  @Test(dataProvider = "testSafeReadOneRSComplexProvider", groups = "slow", enabled = false) // Working but disabled as 17.5 minutes to run
+  public void testSafeReadOneRSComplex(int otherFakeDsGid, long otherFakeDsGenId, int otherFakeDsScen,
+    int otherFakeRsGid, long otherFakeRsGenId, int otherFakeRsScen) throws Exception
+  {
+    String testCase = "testSafeReadOneRSComplex";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    try
+    {
+      /*
+       * Start real RS (the one to be tested)
+       */
+
+      // Create real RS 1
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs1);
+
+      /*
+       * Start main DS 1 (the one which sends updates)
+       */
+
+      fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+        TIMEOUT_DS_SCENARIO);
+      assertNotNull(fakeRd1);
+
+      /*
+       * Start another fake DS 2 connected to RS
+       */
+   
+      fakeRd2 = createFakeReplicationDomain(FDS2_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+        REPLY_OK_DS_SCENARIO);
+      assertNotNull(fakeRd2);
+     
+      /*
+       * Start another fake DS 3 connected to RS
+       */
+      
+      fakeRd3 = createFakeReplicationDomain(FDS3_ID, otherFakeDsGid, RS1_ID,
+        otherFakeDsGenId, ((otherFakeDsGid == DEFAULT_GID) ? true : false),
+        AssuredMode.SAFE_READ_MODE, 1, LONG_TIMEOUT,
+        otherFakeDsScen);
+      assertNotNull(fakeRd3);
+      
+      /*
+       * Start fake RS (RS 1) connected to RS
+       */
+      
+      fakeRs1 = createFakeReplicationServer(FRS1_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_READ_MODE, 1,
+        new ServerState(), REPLY_OK_RS_SCENARIO);
+      assertNotNull(fakeRs1);
+      
+      /*
+       * Start another fake RS (RS 2) connected to RS
+       */
+      
+      fakeRs2 = createFakeReplicationServer(FRS2_ID, otherFakeRsGid, RS1_ID,
+        otherFakeRsGenId, ((otherFakeRsGid == DEFAULT_GID) ? true : false),
+        AssuredMode.SAFE_READ_MODE, 1, new ServerState(), otherFakeRsScen);
+      assertNotNull(fakeRs2);
+
+      // Wait for connections to be established
+      waitForStableTopo(fakeRd1, 2, 3);
+
+      /*
+       * Send an assured safe read update
+       */
+
+      long startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Compute some thing that will help determine what to check according to
+      // the current test configurarion: compute if DS and RS subject to conf
+      // change are elligible and expected for safe read assured
+      // elligible: the server should receive the ack request
+      // expected: the server should send back an ack (with or without error)
+      boolean dsIsEligible = areGroupAndGenerationIdOk(otherFakeDsGid, otherFakeDsGenId);
+      boolean rsIsEligible = areGroupAndGenerationIdOk(otherFakeRsGid, otherFakeRsGenId);
+      boolean dsIsExpected = false;
+      boolean rsIsExpected = false;
+      // Booleans to tell if we expect to see the timeout, wrong status and replay error flags
+      boolean shouldSeeTimeout = false;
+      boolean shouldSeeWrongStatus = false;
+      boolean shouldSeeReplayError = false;
+      // Booleans to tell if we expect to see the ds, rs and virtual ds connected to fake rs in server id error list
+      boolean shouldSeeDsIdInError = false;
+      boolean shouldSeeRsIdInError = false;
+      boolean shouldSeeDsRsIdInError = false;
+      if (dsIsEligible)
+      {
+        switch (otherFakeDsScen)
+        {
+          case REPLY_OK_DS_SCENARIO:
+            dsIsExpected = true;
+            break;
+          case TIMEOUT_DS_SCENARIO:
+            shouldSeeDsIdInError = true;
+            shouldSeeTimeout = true;
+            break;
+          case REPLAY_ERROR_DS_SCENARIO:
+            shouldSeeDsIdInError = true;
+            shouldSeeReplayError = true;
+            break;
+          default:
+            fail("No other scenario should be used here");
+        }
+      }
+      if (rsIsEligible)
+      {
+        switch (otherFakeRsScen)
+        {
+          case REPLY_OK_RS_SCENARIO:
+            rsIsExpected = true;
+            break;
+          case TIMEOUT_RS_SCENARIO:
+            shouldSeeRsIdInError = true;
+            shouldSeeTimeout = true;
+            break;
+          case DS_TIMEOUT_RS_SCENARIO_SAFE_READ:
+            shouldSeeDsRsIdInError = true;
+            shouldSeeTimeout = true;
+            break;
+          case DS_REPLAY_ERROR_RS_SCENARIO_SAFE_READ:
+            shouldSeeDsRsIdInError = true;
+            shouldSeeReplayError = true;
+            break;
+          case DS_WRONG_STATUS_RS_SCENARIO_SAFE_READ:
+            shouldSeeDsRsIdInError = true;
+            shouldSeeWrongStatus = true;
+            break;
+          default:
+            fail("No other scenario should be used here");
+        }
+      }
+
+      if (!shouldSeeTimeout)
+      {
+        // Call time should have been short
+        assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+      } else // Timeout
+      {
+        if (shouldSeeDsRsIdInError) // Virtual DS timeout
+        {
+          // Should have timed out
+          assertTrue((MAX_SEND_UPDATE_TIME <= sendUpdateTime) && (sendUpdateTime <=
+            LONG_TIMEOUT));
+        } else // Normal rimeout case
+        {
+          // Should have timed out
+          assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
+            LONG_TIMEOUT));
+        }
+      }
+
+      // Sleep a while as counters are updated just after sending thread is unblocked
+      sleep(500);
+
+      // Check monitoring values in DS 1
+      //
+      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
+      if (( (otherFakeDsGid == DEFAULT_GID) && (otherFakeDsGenId == DEFAULT_GENID) && (otherFakeDsScen != REPLY_OK_DS_SCENARIO) )
+         || ( (otherFakeRsGid == DEFAULT_GID) && (otherFakeRsGenId == DEFAULT_GENID) && (otherFakeRsScen != REPLY_OK_RS_SCENARIO) ))
+      {
+        assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
+        assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 1);
+      }
+      else
+      {
+        assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
+        assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
+      }
+
+
+      if (shouldSeeTimeout)
+        assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 1);
+      else
+        assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
+      if (shouldSeeWrongStatus)
+        assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 1);
+      else
+        assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
+      if (shouldSeeReplayError)
+        assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 1);
+      else
+        assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
+
+      // Check for servers in error list
+      Map<Short, Integer> expectedErrors = new HashMap<Short, Integer>();
+      if (shouldSeeDsIdInError)
+        expectedErrors.put(FDS3_ID, 1);
+      if (shouldSeeRsIdInError)
+        expectedErrors.put(FRS2_ID, 1);
+      if (shouldSeeDsRsIdInError)
+        expectedErrors.put(DS_FRS2_ID, 1);
+      checkServerErrorListsAreEqual(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates(), expectedErrors);
+      
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdates(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesAcked(), 0);
+      assertEquals(fakeRd1.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      // Check monitoring values in DS 2
+      //
+      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdates(), 1);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesAcked(), 1);
+      assertEquals(fakeRd2.getReceivedAssuredSrUpdatesNotAcked(), 0);
+
+      // Check monitoring values in DS 3
+      //
+      assertEquals(fakeRd3.getAssuredSrSentUpdates(), 0);
+      assertEquals(fakeRd3.getAssuredSrAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd3.getAssuredSrNotAcknowledgedUpdates(), 0);
+      assertEquals(fakeRd3.getAssuredSrTimeoutUpdates(), 0);
+      assertEquals(fakeRd3.getAssuredSrWrongStatusUpdates(), 0);
+      assertEquals(fakeRd3.getAssuredSrReplayErrorUpdates(), 0);
+      assertEquals(fakeRd3.getAssuredSrServerNotAcknowledgedUpdates().size(), 0);
+      if (dsIsEligible)
+      {
+        assertEquals(fakeRd3.getReceivedAssuredSrUpdates(), 1);
+        if (dsIsExpected)
+        {
+          assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 1);
+          assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
+        } else
+        {
+          if (shouldSeeReplayError && (otherFakeDsScen == REPLAY_ERROR_DS_SCENARIO))
+          {
+            // Replay error for the other DS
+            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
+            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 1);
+          } else
+          {
+            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
+            assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
+          }
+        }
+      }
+      else
+      {
+        assertEquals(fakeRd3.getReceivedAssuredSrUpdates(), 0);
+        assertEquals(fakeRd3.getReceivedAssuredSrUpdatesAcked(), 0);
+        assertEquals(fakeRd3.getReceivedAssuredSrUpdatesNotAcked(), 0);
+      }
+
+      // Sanity check
+      //
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+
+      assertEquals(fakeRd2.nReceivedUpdates(), 1);
+      assertTrue(fakeRd2.receivedUpdatesOk());
+
+      if (otherFakeDsGenId == DEFAULT_GENID)
+        assertEquals(fakeRd3.nReceivedUpdates(), 1);
+      else
+        assertEquals(fakeRd3.nReceivedUpdates(), 0);
+      assertTrue(fakeRd3.receivedUpdatesOk());
+
+      assertEquals(fakeRs1.nReceivedUpdates(), 1);
+      assertTrue(fakeRs1.receivedUpdatesOk());
+
+      if (otherFakeRsGenId == DEFAULT_GENID)
+        assertEquals(fakeRs2.nReceivedUpdates(), 1);
+      else
+        assertEquals(fakeRs2.nReceivedUpdates(), 0);
+      assertTrue(fakeRs2.receivedUpdatesOk());
+
+    } finally
+    {
+      endTest();
+    }
+  }
+
+  /**
+   * Check that the passed server error lists are equivalent
+   */
+  private void checkServerErrorListsAreEqual(Map<Short, Integer> list1, Map<Short, Integer> list2)
+  {
+    assertNotNull(list1);
+    assertNotNull(list2);
+    assertEquals(list1.size(), list2.size());
+    for (Short s : list1.keySet())
+    {
+      assertEquals(list1.get(s), list2.get(s));
+    }
+  }
 }
 

--
Gitblit v1.10.0