From 0107a1af4a4325c502fe2632c7d0f4dcf3da98ba Mon Sep 17 00:00:00 2001
From: mrossign <mrossign@localhost>
Date: Tue, 16 Dec 2008 17:03:47 +0000
Subject: [PATCH] Assured Replication: - all unit tests for safe data mode - assured replication code corrections (thanks to safe data unit tests) => Still every unit tests for safe read mode to do...

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java                             |   23 
 opendj-sdk/opends/src/messages/messages/replication.properties                                                              |    4 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java                                     |   23 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java | 2099 +++++++++++++++++++++++++++++++++++++++++++
 opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml                             |    2 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                              |  172 ++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java                             |   47 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java |  469 ++++++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java                                   |   12 
 9 files changed, 2,682 insertions(+), 169 deletions(-)

diff --git a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
index 363c9a9..7e7f7cd 100644
--- a/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
+++ b/opendj-sdk/opends/src/admin/defn/org/opends/server/admin/std/ReplicationDomainConfiguration.xml
@@ -274,7 +274,7 @@
     </adm:description>
     <adm:default-behavior>
       <adm:defined>
-        <adm:value>1000ms</adm:value>
+        <adm:value>2000ms</adm:value>
       </adm:defined>
     </adm:default-behavior>
     <adm:syntax>
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index b519ed2..64354b8 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -347,8 +347,8 @@
 server %s will be aborted. Simultanate cross connection attempt ?
 NOTICE_BAD_GENERATION_ID_FROM_DS_146=On suffix %s, directory server %s presented \
  generation ID=%s when expected generation ID=%s
-SEVERE_ERR_DS_RECEIVED_ACK_ERROR_147=In replication service %s, the assured \
- update message %s was acknowledged with the following errors: %s
+NOTICE_DS_RECEIVED_ACK_ERROR_147=In replication service %s and server id %s, the \
+ assured update message %s was acknowledged with the following errors: %s
 SEVERE_ERR_DS_ACK_TIMEOUT_148=In replication service %s, timeout after %s ms \
  waiting for the acknowledgement of the assured update message: %s
 SEVERE_ERR_DS_UNKNOWN_ASSURED_MODE_149=In directory server %s, received unknown \
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
index 296e499..f81adda 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ExpectedAcksInfo.java
@@ -27,6 +27,9 @@
 
 package org.opends.server.replication.server;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.AckMsg;
@@ -66,18 +69,36 @@
   private boolean completed = false;
 
   /**
+   * This gives the list of servers we are willing to wait acks from and the
+   * information about the ack from the servers.
+   * key: the id of the server.
+   * value: a boolean true if we received the ack from the server,
+   * false otherwise.
+   */
+  protected Map<Short,Boolean> expectedServersAckStatus =
+    new HashMap<Short,Boolean>();
+
+  /**
    * Creates a new ExpectedAcksInfo.
    * @param changeNumber The change number of the assured update message
    * @param requesterServerHandler The server handler of the server that sent
    * the assured update message
    * @param assuredMode The assured mode requested by the assured update message
+   * @param expectedServers The list of servers we want an ack from
    */
   protected ExpectedAcksInfo(ChangeNumber changeNumber,
-    ServerHandler requesterServerHandler, AssuredMode assuredMode)
+    ServerHandler requesterServerHandler, AssuredMode assuredMode,
+    List<Short> expectedServers)
   {
     this.requesterServerHandler = requesterServerHandler;
     this.assuredMode = assuredMode;
     this.changeNumber = changeNumber;
+
+    // Initialize list of servers we expect acks from
+    for (Short serverId : expectedServers)
+    {
+      expectedServersAckStatus.put(serverId, false);
+    }
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index cf68944..9408c24 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -276,10 +276,10 @@
         // According to assured sub-mode, prepare structures to keep track of
         // the acks we are interested in.
         AssuredMode assuredMode = update.getAssuredMode();
-        if (assuredMode != AssuredMode.SAFE_DATA_MODE)
+        if (assuredMode == AssuredMode.SAFE_DATA_MODE)
         {
           preparedAssuredInfo = processSafeDataUpdateMsg(update, sourceHandler);
-        } else if (assuredMode != AssuredMode.SAFE_READ_MODE)
+        } else if (assuredMode == AssuredMode.SAFE_READ_MODE)
         {
           preparedAssuredInfo = processSafeReadUpdateMsg(update, sourceHandler);
         } else
@@ -525,7 +525,15 @@
         for (ServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
-            expectedServers.add(handler.getServerId());
+            // No ack expected from a RS with different group id
+          {
+            if ((generationId > 0) &&
+              (generationId == handler.getGenerationId()))
+              // No ack expected from a RS with bad gen id
+            {
+              expectedServers.add(handler.getServerId());
+            }
+          }
         }
       }
 
@@ -538,13 +546,29 @@
           continue;
         }
         if (handler.getGroupId() == groupId)
+          // No ack expected from a DS with different group id
         {
-          if (handler.getStatus() == ServerStatus.NORMAL_STATUS)
+          ServerStatus serverStatus = handler.getStatus();
+          if (serverStatus == ServerStatus.NORMAL_STATUS)
           {
             expectedServers.add(handler.getServerId());
           } else
+            // No ack expected from a DS with wrong status
           {
-            wrongStatusServers.add(handler.getServerId());
+            if (serverStatus == ServerStatus.DEGRADED_STATUS)
+            {
+              wrongStatusServers.add(handler.getServerId());
+            } else
+            {
+              /*
+               * BAD_GEN_ID_STATUS or FULL_UPDATE_STATUS:
+               * We do not want this to be reported as an error to the update
+               * maker -> no pollution or potential missunderstanding when
+               * reading logs or monitoring and it was just administration (for
+               * instance new server is being configured in topo: it goes in bad
+               * gen then then full full update).
+               */
+            }
           }
         }
       }
@@ -589,7 +613,7 @@
     UpdateMsg update, ServerHandler sourceHandler) throws IOException
   {
     ChangeNumber cn = update.getChangeNumber();
-    boolean interestedInAcks = true;
+    boolean interestedInAcks = false;
     byte safeDataLevel = update.getSafeDataLevel();
     byte groupId = replicationServer.getGroupId();
     byte sourceGroupId = sourceHandler.getGroupId();
@@ -600,47 +624,55 @@
         Short.toString(replicationServer.getServerId()),
         Byte.toString(safeDataLevel), baseDn, update.toString());
       logError(errorMsg);
-      interestedInAcks = false;
     } else if (sourceGroupId != groupId)
     {
       // Assured feature does not cross different group ids
-      interestedInAcks = false;
     } else
     {
-      if (sourceHandler.isLDAPserver())
+      if ((generationId > 0) &&
+        (generationId == sourceHandler.getGenerationId()))
+        // Ignore assured updates from wrong generationid servers
       {
-        if (safeDataLevel == (byte) 1)
+        if (sourceHandler.isLDAPserver())
         {
-          // Immediatly return the ack for an assured message in safe data mode
-          // with safe data level 1, coming from a DS. No need to wait for more
-          // acks
-          AckMsg ack = new AckMsg(cn);
-          sourceHandler.sendAck(ack);
-          interestedInAcks = false; // No further acks to obtain
+          if (safeDataLevel == (byte) 1)
+          {
+            // Immediatly return the ack for an assured message in safe data
+            // mode with safe data level 1, coming from a DS. No need to wait
+            // for more acks
+            AckMsg ack = new AckMsg(cn);
+            sourceHandler.sendAck(ack);
+          } else
+          {
+            if (safeDataLevel != (byte) 0)
+            {
+              // level > 1 : We need further acks
+              // The message will be posted in assured mode to elligible
+              // servers. The embedded safe data level is not changed, and his
+              // value will be used by a remote RS to determine if he must send
+              // an ack (level > 1) or not (level = 1)
+              interestedInAcks = true;
+            } else
+            {
+              // Should never happen
+            }
+          }
         } else
-        {
-          // level > 1 : We need further acks
-          // The message will be posted in assured mode to elligible servers.
-          // The embedded safe data level is not changed, and his value will be
-          // used by a remote RS to determine if he must send an ack (level > 1)
-          // or not (level = 1)
-        }
-      } else
-      { // A RS sent us the safe data message, for sure no futher acks to wait
-        interestedInAcks = false;
-        if (safeDataLevel == (byte) 1)
-        {
-          // The original level was 1 so the RS that sent us this message should
-          // have already sent his ack to the sender DS. Level 1 has already
-          // been reached so no further acks to wait
-          // This should not happen in theory as the sender RS server should
-          // have sent us a matching not assured message so we should not come
-          // to here.
-        } else
-        {
-          // level > 1, so Ack this message to originator RS
-          AckMsg ack = new AckMsg(cn);
-          sourceHandler.sendAck(ack);
+        { // A RS sent us the safe data message, for sure no futher acks to wait
+          if (safeDataLevel == (byte) 1)
+          {
+            // The original level was 1 so the RS that sent us this message
+            // should have already sent his ack to the sender DS. Level 1 has
+            // already been reached so no further acks to wait.
+            // This should not happen in theory as the sender RS server should
+            // have sent us a matching not assured message so we should not come
+            // to here.
+          } else
+          {
+            // level > 1, so Ack this message to originator RS
+            AckMsg ack = new AckMsg(cn);
+            sourceHandler.sendAck(ack);
+          }
         }
       }
     }
@@ -654,39 +686,46 @@
         for (ServerHandler handler : replicationServers.values())
         {
           if (handler.getGroupId() == groupId)
-            expectedServers.add(handler.getServerId());
+            // No ack expected from a RS with different group id
+          {
+            if ((generationId > 0) &&
+              (generationId == handler.getGenerationId()))
+              // No ack expected from a RS with bad gen id
+            {
+              expectedServers.add(handler.getServerId());
+            }
+          }
         }
       }
-
-      // Look for DS elligible for assured
-      for (ServerHandler handler : directoryServers.values())
-      {
-        // Don't forward the change to the server that just sent it
-        if (handler == sourceHandler)
-        {
-          continue;
-        }
-        if (handler.getGroupId() == groupId)
-          expectedServers.add(handler.getServerId());
-      }
     }
 
     // Return computed structures
     PreparedAssuredInfo preparedAssuredInfo = new PreparedAssuredInfo();
-    if (interestedInAcks && (expectedServers.size() > 0))
+    int nExpectedServers = expectedServers.size();
+    if (interestedInAcks) // interestedInAcks so level > 1
     {
-      // Some other acks to wait for
-      preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
-        sourceHandler, update.getSafeDataLevel());
-      preparedAssuredInfo.expectedServers = expectedServers;
-    }
-
-    if (interestedInAcks && (preparedAssuredInfo.expectedServers == null))
-    {
-      // level > 1 and source is a DS but no elligible servers found, send the
-      // ack immediatly
-      AckMsg ack = new AckMsg(cn);
-      sourceHandler.sendAck(ack);
+      if (nExpectedServers > 0)
+      {
+        // Some other acks to wait for
+        int sdl = update.getSafeDataLevel();
+        int neededAdditionalServers = sdl - 1;
+        // Change the number of expected acks if not enough available elligible
+        // servers: the level is a best effort thing, we do not want to timeout
+        // at every assured SD update for instance if a RS has had his gen id
+        // resetted
+        byte finalSdl = ((nExpectedServers >= neededAdditionalServers) ?
+          (byte)sdl : // Keep level as it was
+          (byte)(nExpectedServers+1)); // Change level to match what's available
+        preparedAssuredInfo.expectedAcksInfo = new SafeDataExpectedAcksInfo(cn,
+          sourceHandler, finalSdl, expectedServers);
+        preparedAssuredInfo.expectedServers = expectedServers;
+      } else
+      {
+        // level > 1 and source is a DS but no elligible servers found, send the
+        // ack immediatly
+        AckMsg ack = new AckMsg(cn);
+        sourceHandler.sendAck(ack);
+      }
     }
 
     return preparedAssuredInfo;
@@ -1436,6 +1475,9 @@
    */
   public void shutdown()
   {
+    // Terminate the assured timer
+    assuredTimeoutTimer.cancel();
+
     // Close session with other changelogs
     for (ServerHandler serverHandler : replicationServers.values())
     {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
index e4ce2c4..da6a716 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeDataExpectedAcksInfo.java
@@ -27,6 +27,9 @@
 
 package org.opends.server.replication.server;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
@@ -61,11 +64,14 @@
    * message
    * @param safeDataLevel The Safe Data level requested for the assured
    * update message
+   * @param expectedServers The list of servers we want an ack from
    */
   public SafeDataExpectedAcksInfo(ChangeNumber changeNumber,
-    ServerHandler requesterServerHandler, byte safeDataLevel)
+    ServerHandler requesterServerHandler, byte safeDataLevel,
+    List<Short> expectedServers)
   {
-    super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE);
+    super(changeNumber, requesterServerHandler, AssuredMode.SAFE_DATA_MODE,
+      expectedServers);
     this.safeDataLevel = safeDataLevel;
   }
 
@@ -88,11 +94,27 @@
         return false;
      }
 
-    numReceivedAcks++;
-    if (numReceivedAcks == safeDataLevel)
-      return true;
-    else
+    // Get the ack status for the matching server
+    short ackingServerId = ackingServer.getServerId();
+    boolean ackReceived = expectedServersAckStatus.get(ackingServerId);
+    if (ackReceived)
+    {
+      // Sanity check: this should never happen
+      if (debugEnabled())
+        TRACER.debugInfo("Received unexpected ack from server id: " +
+          ackingServerId + " ack message: " + ackMsg);
       return false;
+    } else
+    {
+
+      // Mark this ack received for the server
+      expectedServersAckStatus.put(ackingServerId, true);
+      numReceivedAcks++;
+      if (numReceivedAcks == safeDataLevel)
+        return true;
+      else
+        return false;
+    }
   }
 
   /**
@@ -103,7 +125,20 @@
     AckMsg ack = new AckMsg(changeNumber);
 
     if (timeout)
+    {
+      // Fill collected errors info
       ack.setHasTimeout(true);
+      // Tell wich servers did not send an ack in time
+      List<Short> failedServers = new ArrayList<Short>();
+      Set<Short> serverIds = expectedServersAckStatus.keySet();
+      for (Short serverId : serverIds)
+      {
+        boolean ackReceived = expectedServersAckStatus.get(serverId);
+        if (!ackReceived)
+          failedServers.add(serverId);
+      }
+      ack.setFailedServers(failedServers);
+    }
 
     return ack;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
index 62fb049..e5cb702 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/SafeReadExpectedAcksInfo.java
@@ -30,9 +30,7 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 import org.opends.server.loggers.debug.DebugTracer;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import org.opends.server.replication.common.AssuredMode;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.AckMsg;
@@ -65,18 +63,6 @@
   private List<Short> failedServers = null;
 
   /**
-   * This gives the list of servers we are willing to wait acks from and the
-   * information about the ack from the servers.
-   * key: the id of the server.
-   * value: a boolean true if we received the ack from the server,
-   * false otherwise.
-   * This must not include servers we already identified they are in wrong
-   * status, but just servers that are in normal status.
-   */
-  private Map<Short,Boolean> expectedServersAckStatus =
-    new HashMap<Short,Boolean>();
-
-  /**
    * Number of servers we want an ack from and from which we received the ack.
    * Said differently: the number of servers in expectedServersAckStatus whose
    * value is true. When this value reaches the size of expectedServersAckStatus
@@ -100,7 +86,8 @@
     ServerHandler requesterServerHandler, List<Short> expectedServers,
     List<Short> wrongStatusServers)
   {
-    super(changeNumber, requesterServerHandler, AssuredMode.SAFE_READ_MODE);
+    super(changeNumber, requesterServerHandler, AssuredMode.SAFE_READ_MODE,
+      expectedServers);
 
     // Keep track of potential servers detected in wrong status
     if (wrongStatusServers.size() > 0)
@@ -108,12 +95,6 @@
       hasWrongStatus = true;
       failedServers = wrongStatusServers;
     }
-
-    // Initialize list of servers we expect acks from
-    for (Short serverId : expectedServers)
-    {
-      expectedServersAckStatus.put(serverId, false);
-    }
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
index 1301a82..fa95572 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -831,7 +831,7 @@
                   broker.getRsServerId());
                 break;
               default:
-              // Should no happen
+              // Should not happen
             }
 
             throw new TimeoutException("No ack received for message cn: " + cn +
@@ -914,8 +914,8 @@
       {
         // Some problems detected: message not correclty reached every requested
         // servers. Log problem
-        Message errorMsg = ERR_DS_RECEIVED_ACK_ERROR.get(serviceID,
-          update.toString(), ack.errorsToString());
+        Message errorMsg = NOTE_DS_RECEIVED_ACK_ERROR.get(serviceID,
+          Short.toString(serverID), update.toString(), ack.errorsToString());
         logError(errorMsg);
 
         List<Short> failedServers = ack.getFailedServers();
@@ -954,7 +954,7 @@
             }
             break;
           default:
-          // Should no happen
+          // Should not happen
         }
       } else
       {
@@ -969,7 +969,7 @@
             assuredSdAcknowledgedUpdates.incrementAndGet();
             break;
           default:
-          // Should no happen
+          // Should not happen
         }
       }
     }
@@ -2393,7 +2393,7 @@
           assuredSdSentUpdates.incrementAndGet();
           break;
         default:
-          // Should no happen
+          // Should not happen
       }
 
       // Now wait for ack matching the sent assured update
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
index d643ab0..87284b3 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Copyright 2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.plugin;
 
@@ -30,6 +30,7 @@
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
+import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -37,6 +38,8 @@
 import java.util.Map;
 import java.util.NoSuchElementException;
 import java.util.StringTokenizer;
+import java.util.UUID;
+import java.util.concurrent.locks.Lock;
 import org.opends.server.types.ResultCode;
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -63,15 +66,19 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.protocols.internal.InternalSearchOperation;
 import org.opends.server.protocols.ldap.LDAPFilter;
+import org.opends.server.replication.common.ChangeNumberGenerator;
 import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.AddMsg;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.ByteStringFactory;
 import org.testng.annotations.BeforeClass;
 import org.opends.server.types.DN;
 import org.opends.server.types.Entry;
+import org.opends.server.types.LockManager;
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchScope;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
@@ -79,6 +86,7 @@
 import static org.testng.Assert.fail;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
@@ -120,6 +128,7 @@
   private static final int NO_TIMEOUT_SCENARIO = 3;
   private static final int SAFE_READ_MANY_ERRORS = 4;
   private static final int SAFE_DATA_MANY_ERRORS = 5;
+  private static final int NO_READ = 6;
 
   // The tracer object for the debug logger
   private static final DebugTracer TRACER = getTracer();
@@ -288,7 +297,10 @@
 
   /**
    * The fake replication server used to emulate RS behaviour the way we want
-   * for assured features test
+   * for assured features test.
+   * This fake replication server is able to receive a DS connection only.
+   * According to the configured scenario, it will answer to updates with acks
+   * as the scenario is requesting.
    */
   private class FakeReplicationServer extends Thread
   {
@@ -323,13 +335,16 @@
     // where the main code can perform test assertion
     private boolean scenarioExecuted = false;
 
+    private ChangeNumberGenerator gen = null;
+
     // Constructor for RS receiving updates in SR assured mode or not assured
     // The assured boolean means:
     // - true: SR mode
     // - false: not assured
-    public FakeReplicationServer(int port, short serverId, boolean assured)
+    public FakeReplicationServer(byte groupId, int port, short serverId, boolean assured)
     {
 
+      this.groupId = groupId;
       this.port = port;
       this.serverId = serverId;
 
@@ -341,9 +356,9 @@
     }
 
     // Constructor for RS receiving updates in SD assured mode
-    public FakeReplicationServer(int port, short serverId, int safeDataLevel)
+    public FakeReplicationServer(byte groupId, int port, short serverId, int safeDataLevel)
     {
-
+      this.groupId = groupId;
       this.port = port;
       this.serverId = serverId;
 
@@ -358,6 +373,8 @@
     public void start(int scenario)
     {
 
+      gen = new ChangeNumberGenerator((short)3, 0L);
+
       // Store expected test case
       this.scenario = scenario;
 
@@ -472,7 +489,6 @@
         serverState = serverStartMsg.getServerState();
         generationId = serverStartMsg.getGenerationId();
         windowSize = serverStartMsg.getWindowSize();
-        groupId = serverStartMsg.getGroupId();
         sslEncryption = serverStartMsg.getSSLEncryption();
 
         // Send replication server start
@@ -586,11 +602,62 @@
         case SAFE_DATA_MANY_ERRORS:
           executeSafeDataManyErrorsScenario();
           break;
+        case NO_READ:
+          // Nothing to execute, just let session opne. This scenario used to
+          // send updates from the RS to the DS (reply test cases)
+          while (!shutdown)
+          {
+            try
+            {
+              sleep(5000);
+            } catch (InterruptedException ex)
+            {
+              // Going shutdown ?
+              break;
+            }
+          }
+          break;
         default:
           fail("Unknown scenario: " + scenario);
       }
     }
 
+    /*
+     * Make the RS send an add message with the passed entry and return the ack
+     * message it receives from the DS
+     */
+    private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws SocketTimeoutException
+    {
+      try
+      {
+        // Create add message        
+        AddMsg addMsg =
+          new AddMsg(gen.newChangeNumber(), entry.getDN().toString(), UUID.randomUUID().toString(),
+                     parentUid,
+                     entry.getObjectClassAttribute(),
+                     entry.getAttributes(), null );
+
+        // Send add message in assured mode
+        addMsg.setAssured(isAssured);
+        addMsg.setAssuredMode(assuredMode);
+        addMsg.setSafeDataLevel(safeDataLevel);
+        session.publish(addMsg);
+
+        // Read and return matching ack
+        AckMsg ackMsg = (AckMsg)session.receive();
+        return ackMsg;
+
+      } catch(SocketTimeoutException e)
+      {
+        throw e;
+      } catch (Throwable t)
+      {
+        fail("Unexpected exception in fake replication server sendAddUpdate " +
+          "processing: " + t);
+        return null;
+      }
+    }
+
     /**
      * Read the coming update and check parameters are not assured
      */
@@ -800,11 +867,26 @@
   }
 
   /**
+   * Return various group id values
+   */
+  @DataProvider(name = "rsGroupIdProvider")
+  private Object[][] rsGroupIdProvider()
+  {
+    return new Object[][]
+    {
+    { (byte)1 },
+    { (byte)2 }
+    };
+  }
+
+  /**
    * Tests that a DS performing a modification in safe data mode waits for
    * the ack of the RS for the configured timeout time, then times out.
+   * If the RS group id is not the same as the DS one, this must not time out
+   * and return immediately.
    */
-  @Test
-  public void testSafeDataModeTimeout() throws Exception
+  @Test(dataProvider = "rsGroupIdProvider")
+  public void testSafeDataModeTimeout(byte rsGroupId) throws Exception
   {
 
     int TIMEOUT = 5000;
@@ -813,7 +895,7 @@
     {
       // Create and start a RS expecting clients in safe data assured mode with
       // safe data level 2
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
         1);
       replicationServer.start(TIMEOUT_SCENARIO);
 
@@ -830,34 +912,61 @@
         "objectClass: organizationalUnit\n";
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will not send back an ack so we expect
-      // the add entry code (LDAP client code emulation) to be blocked for the
-      // timeout value at least. If the time we have slept is lower, timeout
-      // handling code is not working...
       long endTime = System.currentTimeMillis();
-      assertTrue((endTime - startTime) >= TIMEOUT);
-      assertTrue(replicationServer.isScenarioExecuted());
 
-      // Check monitoring values
-      DN baseDn = DN.decode(SAFE_DATA_DN);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
-      Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
-      assertTrue(errorsByServer.isEmpty());
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
-      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
-      //  errors by server list for sd mode should be [[rsId:1]]
-      assertEquals(errorsByServer.size(), 1);
-      Integer nError = errorsByServer.get((short)RS_SERVER_ID);
-      assertNotNull(nError);
-      assertEquals(nError.intValue(), 1);
+      if (rsGroupId == (byte)1)
+      {
+        // RS has same group id as DS
+        // In this scenario, the fake RS will not send back an ack so we expect
+        // the add entry code (LDAP client code emulation) to be blocked for the
+        // timeout value at least. If the time we have slept is lower, timeout
+        // handling code is not working...
+        assertTrue((endTime - startTime) >= TIMEOUT);
+        assertTrue(replicationServer.isScenarioExecuted());
 
+        // Check monitoring values
+        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        DN baseDn = DN.decode(SAFE_DATA_DN);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+        assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
+        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+        //  errors by server list for sd mode should be [[rsId:1]]
+        assertEquals(errorsByServer.size(), 1);
+        Integer nError = errorsByServer.get((short)RS_SERVER_ID);
+        assertNotNull(nError);
+        assertEquals(nError.intValue(), 1);
+      } else
+      {
+        // RS has a different group id, addEntry should have returned quickly
+        assertTrue((endTime - startTime) < 3000);
+
+        // No error should be seen in monitoring and update should have not been
+        // sent in assured mode
+        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        DN baseDn = DN.decode(SAFE_DATA_DN);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+        assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+        assertTrue(errorsByServer.isEmpty());
+      }
     } finally
     {
       endTest();
@@ -867,9 +976,11 @@
   /**
    * Tests that a DS performing a modification in safe read mode waits for
    * the ack of the RS for the configured timeout time, then times out.
+   * If the RS group id is not the same as the DS one, this must not time out
+   * and return immediately.
    */
-  @Test
-  public void testSafeReadModeTimeout() throws Exception
+  @Test(dataProvider = "rsGroupIdProvider")
+  public void testSafeReadModeTimeout(byte rsGroupId) throws Exception
   {
 
     int TIMEOUT = 5000;
@@ -877,7 +988,7 @@
     try
     {
       // Create and start a RS expecting clients in safe read assured mode
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
         true);
       replicationServer.start(TIMEOUT_SCENARIO);
 
@@ -894,34 +1005,61 @@
         "objectClass: organizationalUnit\n";
       addEntry(TestCaseUtils.entryFromLdifString(entry));
 
-      // In this scenario, the fake RS will not send back an ack so we expect
-      // the add entry code (LDAP client code emulation) to be blocked for the
-      // timeout value at least. If the time we have slept is lower, timeout
-      // handling code is not working...
       long endTime = System.currentTimeMillis();
-      assertTrue((endTime - startTime) >= TIMEOUT);
-      assertTrue(replicationServer.isScenarioExecuted());
 
-      // Check monitoring values
-      DN baseDn = DN.decode(SAFE_READ_DN);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 1);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 1);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
-      Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
-      //  errors by server list for sr mode should be [[rsId:1]]
-      assertEquals(errorsByServer.size(), 1);
-      Integer nError = errorsByServer.get((short)RS_SERVER_ID);
-      assertNotNull(nError);
-      assertEquals(nError.intValue(), 1);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
-      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
-      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
-      assertTrue(errorsByServer.isEmpty());
+      if (rsGroupId == (byte)1)
+      {
+        // RS has same group id as DS
+        // In this scenario, the fake RS will not send back an ack so we expect
+        // the add entry code (LDAP client code emulation) to be blocked for the
+        // timeout value at least. If the time we have slept is lower, timeout
+        // handling code is not working...
+        assertTrue((endTime - startTime) >= TIMEOUT);
+        assertTrue(replicationServer.isScenarioExecuted());
 
+        // Check monitoring values
+        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        DN baseDn = DN.decode(SAFE_READ_DN);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+        //  errors by server list for sr mode should be [[rsId:1]]
+        assertEquals(errorsByServer.size(), 1);
+        Integer nError = errorsByServer.get((short)RS_SERVER_ID);
+        assertNotNull(nError);
+        assertEquals(nError.intValue(), 1);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+        assertTrue(errorsByServer.isEmpty());
+      } else
+      {
+        // RS has a different group id, addEntry should have returned quickly
+        assertTrue((endTime - startTime) < 3000);
+
+        // No error should be seen in monitoring and update should have not been
+        // sent in assured mode
+        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+        DN baseDn = DN.decode(SAFE_READ_DN);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
+        Map<Short, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
+        assertTrue(errorsByServer.isEmpty());
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
+        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
+        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
+        assertTrue(errorsByServer.isEmpty());
+      }
     } finally
     {
       endTest();
@@ -940,7 +1078,7 @@
     try
     {
       // Create and start a RS expecting not assured clients
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
         false);
       replicationServer.start(NOT_ASSURED_SCENARIO);
 
@@ -1034,7 +1172,7 @@
     {
       // Create and start a RS expecting clients in safe data assured mode with
       // safe data level 2
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
         2);
       replicationServer.start(NO_TIMEOUT_SCENARIO);
 
@@ -1060,6 +1198,7 @@
       assertTrue(replicationServer.isScenarioExecuted());
 
       // Check monitoring values
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDn = DN.decode(SAFE_DATA_DN);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1094,7 +1233,7 @@
     try
     {
       // Create and start a RS expecting clients in safe read assured mode
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
         true);
       replicationServer.start(NO_TIMEOUT_SCENARIO);
 
@@ -1120,6 +1259,7 @@
       assertTrue(replicationServer.isScenarioExecuted());
 
       // Check monitoring values
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDn = DN.decode(SAFE_READ_DN);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 1);
@@ -1142,6 +1282,195 @@
   }
 
   /**
+   *  Get the entryUUID for a given DN.
+   *
+   * @throws Exception if the entry does not exist or does not have
+   *                   an entryUUID.
+   */
+  private String getEntryUUID(DN dn) throws Exception
+  {
+    Entry newEntry;
+    int count = 10;
+    if (count<1)
+      count=1;
+    String found = null;
+    while ((count> 0) && (found == null))
+    {
+      Thread.sleep(100);
+
+      Lock lock = null;
+      for (int i=0; i < 3; i++)
+      {
+        lock = LockManager.lockRead(dn);
+        if (lock != null)
+        {
+          break;
+        }
+      }
+
+      if (lock == null)
+      {
+        throw new Exception("could not lock entry " + dn);
+      }
+
+      try
+      {
+        newEntry = DirectoryServer.getEntry(dn);
+
+        if (newEntry != null)
+        {
+          List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
+          Attribute tmpAttr = tmpAttrList.get(0);
+
+          for (AttributeValue val : tmpAttr)
+          {
+            found = val.getStringValue();
+            break;
+          }
+        }
+      }
+      finally
+      {
+        LockManager.unlock(dn, lock);
+      }
+      count --;
+    }
+    if (found == null)
+      throw new Exception("Entry: " + dn + " Could not be found.");
+    return found;
+  }
+
+  /**
+   * Tests that a DS receiving an update from a RS in safe read mode effectively
+   * sends an ack back (with or without error)
+   */
+  @Test(dataProvider = "rsGroupIdProvider")
+  public void testSafeReadModeReply(byte rsGroupId) throws Exception
+  {
+
+    int TIMEOUT = 5000;
+    String testcase = "testSafeReadModeReply";
+    try
+    {
+      // Create and start a RS expecting clients in safe read assured mode
+      replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
+        true);
+      replicationServer.start(NO_READ);
+
+      // Create a safe read assured domain
+      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
+        TIMEOUT);
+      // Wait for connection of domain to RS
+      waitForConnectionToRs(testcase, replicationServer);
+
+      /*
+       *  Send an update from the RS and get the ack
+       */
+
+      // Make the RS send an assured add message
+      String entryStr = "dn: ou=assured-sr-reply-entry," + SAFE_READ_DN + "\n" +
+        "objectClass: top\n" +
+        "objectClass: organizationalUnit\n";
+      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
+      String parentUid = getEntryUUID(DN.decode(SAFE_READ_DN));
+      AckMsg ackMsg = null;
+
+      try {
+        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+
+         if (rsGroupId == (byte)2)
+           fail("Should only go here for RS with same group id as DS");
+
+        // Ack received, replay has occurred
+        assertNotNull(DirectoryServer.getEntry(entry.getDN()));
+
+        // Check that DS replied an ack without errors anyway
+        assertFalse(ackMsg.hasTimeout());
+        assertFalse(ackMsg.hasReplayError());
+        assertFalse(ackMsg.hasWrongStatus());
+        assertEquals(ackMsg.getFailedServers().size(), 0);
+      } catch (SocketTimeoutException e)
+      {
+        // Expected
+        if (rsGroupId == (byte)1)
+           fail("Should only go here for RS with group id different from DS one");
+
+        return;
+      }
+
+      /*
+       * Send un update with error from the RS and get the ack with error
+       */
+
+      // Make the RS send a not possible assured add message
+
+      // TODO: make the domain return an error: use a plugin ?
+      // The resolution code does not generate any error so we need to find a
+      // way to have the replay not working to test this...
+
+      // Check that DS replied an ack with errors
+//      assertFalse(ackMsg.hasTimeout());
+//      assertTrue(ackMsg.hasReplayError());
+//      assertFalse(ackMsg.hasWrongStatus());
+//      List<Short> failedServers = ackMsg.getFailedServers();
+//      assertEquals(failedServers.size(), 1);
+//      assertEquals((short)failedServers.get(0), (short)1);
+    } finally
+    {
+      endTest();
+    }
+  }
+
+  /**
+   * Tests that a DS receiving an update from a RS in safe data mode does not
+   * send back and ack (only safe read is taken into account in DS replay)
+   */
+  @Test(dataProvider = "rsGroupIdProvider")
+  public void testSafeDataModeReply(byte rsGroupId) throws Exception
+  {
+
+    int TIMEOUT = 5000;
+    String testcase = "testSafeDataModeReply";
+    try
+    {
+      // Create and start a RS expecting clients in safe data assured mode
+      replicationServer = new FakeReplicationServer(rsGroupId, replServerPort, RS_SERVER_ID,
+        4);
+      replicationServer.start(NO_READ);
+
+      // Create a safe data assured domain
+      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4,
+        TIMEOUT);
+      // Wait for connection of domain to RS
+      waitForConnectionToRs(testcase, replicationServer);
+
+      // Make the RS send an assured add message: we expect a read timeout as
+      // safe data should be ignored by DS
+      String entryStr = "dn: ou=assured-sd-reply-entry," + SAFE_DATA_DN + "\n" +
+        "objectClass: top\n" +
+        "objectClass: organizationalUnit\n";
+      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
+      String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
+
+      AckMsg ackMsg = null;
+      try
+      {
+        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
+      } catch (SocketTimeoutException e)
+      {
+        // Expected
+        return;
+      }
+
+      fail("DS should not reply an ack in safe data mode, however, it replied: " +
+        ackMsg);
+    } finally
+    {
+      endTest();
+    }
+  }
+
+  /**
    * DS performs many successive modifications in safe data mode and receives RS
    * acks with various errors. Check for monitoring right errors
    */
@@ -1155,7 +1484,7 @@
     {
       // Create and start a RS expecting clients in safe data assured mode with
       // safe data level 3
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
         3);
       replicationServer.start(SAFE_DATA_MANY_ERRORS);
 
@@ -1184,6 +1513,7 @@
       // The expected ack for the first update is:
       // - timeout error
       // - server 10 error
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDn = DN.decode(SAFE_DATA_DN);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1218,6 +1548,7 @@
       // The expected ack for the second update is:
       // - timeout error
       // - server 10 error, server 20 error
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       baseDn = DN.decode(SAFE_DATA_DN);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1254,6 +1585,7 @@
 
       // Check monitoring values
       // No ack should have comen back, so timeout incremented (flag and error for rs)
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       baseDn = DN.decode(SAFE_DATA_DN);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1298,7 +1630,7 @@
     try
     {
       // Create and start a RS expecting clients in safe read assured mode
-      replicationServer = new FakeReplicationServer(replServerPort, RS_SERVER_ID,
+      replicationServer = new FakeReplicationServer((byte)1, replServerPort, RS_SERVER_ID,
         true);
       replicationServer.start(SAFE_READ_MANY_ERRORS);
 
@@ -1327,6 +1659,7 @@
       // The expected ack for the first update is:
       // - replay error
       // - server 10 error, server 20 error
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       DN baseDn = DN.decode(SAFE_READ_DN);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
@@ -1366,6 +1699,7 @@
       // - wrong status error
       // - replay error
       // - server 10 error, server 20 error, server 30 error
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 2);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 2);
@@ -1404,6 +1738,7 @@
 
       // Check monitoring values
       // No ack should have comen back, so timeout incremented (flag and error for rs)
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 3);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
       assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 3);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
new file mode 100644
index 0000000..d08f668
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -0,0 +1,2099 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE
+ * or https://OpenDS.dev.java.net/OpenDS.LICENSE.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at
+ * trunk/opends/resource/legal-notices/OpenDS.LICENSE.  If applicable,
+ * add the following below this CDDL HEADER, with the fields enclosed
+ * by brackets "[]" replaced with your own identifying information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2008 Sun Microsystems, Inc.
+ */
+package org.opends.server.replication.server;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.config.ConfigException;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.AssuredMode;
+import org.opends.server.replication.common.ServerState;
+import org.opends.server.replication.common.ServerStatus;
+import org.opends.server.replication.protocol.ProtocolSession;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.ReplServerStartMsg;
+import org.opends.server.replication.protocol.ReplSessionSecurity;
+import org.opends.server.replication.protocol.TopologyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ChangeNumberGenerator;
+import org.opends.server.replication.common.DSInfo;
+import org.opends.server.replication.common.RSInfo;
+import org.opends.server.replication.protocol.AckMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.ErrorMsg;
+import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.service.ReplicationDomain;
+import org.opends.server.types.DirectoryException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import static org.opends.server.TestCaseUtils.*;
+import static org.testng.Assert.fail;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertEquals;
+import static org.opends.server.loggers.ErrorLogger.logError;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+
+/**
+ * Test Server part of the assured feature in both safe data and
+ * safe read modes.
+ */
+public class AssuredReplicationServerTest
+  extends ReplicationTestCase
+{
+
+  private String testName = this.getClass().getSimpleName();
+  // The tracer object for the debug logger
+  private static final DebugTracer TRACER = getTracer();
+  private int rs1Port = -1;
+  private int rs2Port = -1;
+  private int rs3Port = -1;
+  private static final short FDS1_ID = 1;
+  private static final short FDS2_ID = 2;
+  private static final short FDS3_ID = 3;
+  private static final short FRS1_ID = 11;
+  private static final short FRS2_ID = 12;
+  private static final short FRS3_ID = 13;
+  private static final short RS1_ID = 101;
+  private static final short RS2_ID = 102;
+  private static final short RS3_ID = 103;
+  private FakeReplicationDomain fakeRd1 = null;
+  private FakeReplicationDomain fakeRd2 = null;
+  private FakeReplicationDomain fakeRd3 = null;
+  private FakeReplicationServer fakeRs1 = null;
+  private FakeReplicationServer fakeRs2 = null;
+  private FakeReplicationServer fakeRs3 = null;
+  private ReplicationServer rs1 = null;
+  private ReplicationServer rs2 = null;
+  private ReplicationServer rs3 = null;
+
+  // Small assured timeout value (timeout to be used in first RS receiving an
+  // assured update from a DS)
+  private static final int SMALL_TIMEOUT = 3000;
+  // Long assured timeout value (timeout to use in DS when sending an assured
+  // update)
+  private static final int LONG_TIMEOUT = 5000;
+  // Expected max time for sending an assured update and receive its ack
+  // (without errors)
+  private static final int MAX_SEND_UPDATE_TIME = 2000;
+
+  // Default group id
+  private static final int DEFAULT_GID = 1;
+  // Other group id
+  private static final int OTHER_GID = 2;
+
+  // Default generation id
+  private static long DEFAULT_GENID = EMPTY_DN_GENID;
+  // Other generation id
+  private static long OTHER_GENID = 500L;
+
+  /*
+   * Definitions for the scenario of the fake DS
+   */
+  // DS receives updates and replies acks with no errors to every updates
+  private static final int REPLY_OK_DS_SCENARIO = 1;
+  // DS receives acks but does not respond (makes timeouts)
+  private static final int TIMEOUT_DS_SCENARIO = 2;
+  // DS receives updates and replies ack with replay error flags
+  private static final int REPLAY_ERROR_DS_SCENARIO = 3;
+
+  /*
+   * Definitions for the scenario of the fake RS
+   */
+  // RS receives updates and replies acks with no errors to every updates
+  private static final int REPLY_OK_RS_SCENARIO = 11;
+  // RS receives acks but does not respond (makes timeouts)
+  private static final int TIMEOUT_RS_SCENARIO = 12;
+  // RS is used for sending updates (with sendNewFakeUpdate()) and receive acks, synchronously
+  private static final int SENDER_RS_SCENARIO = 13;
+
+  private void debugInfo(String s)
+  {
+    logError(Message.raw(Category.SYNC, Severity.NOTICE, s));
+    if (debugEnabled())
+    {
+      TRACER.debugInfo("** TEST **" + s);
+    }
+  }
+
+  /**
+   * Before starting the tests configure some stuff
+   */
+  @BeforeClass
+  @Override
+  public void setUp() throws Exception
+  {
+    super.setUp();
+
+    // Find  a free port for the replication servers
+    ServerSocket socket1 = TestCaseUtils.bindFreePort();
+    ServerSocket socket2 = TestCaseUtils.bindFreePort();
+    ServerSocket socket3 = TestCaseUtils.bindFreePort();
+    rs1Port = socket1.getLocalPort();
+    rs2Port = socket2.getLocalPort();
+    rs3Port = socket3.getLocalPort();
+    socket1.close();
+    socket2.close();
+    socket3.close();
+  }
+
+  private void initTest()
+  {
+    fakeRd1 = null;
+    fakeRd2 = null;
+    fakeRd3 = null;
+    fakeRs1 = null;
+    fakeRs2 = null;
+    fakeRs3 = null;
+    rs1 = null;
+    rs2 = null;
+    rs3 = null;
+  }
+
+  private void endTest()
+  {
+    // Shutdown fake DSs
+
+    if (fakeRd1 != null)
+    {
+      fakeRd1.disableService();
+      fakeRd1 = null;
+    }
+
+    if (fakeRd2 != null)
+    {
+      fakeRd2.disableService();
+      fakeRd2 = null;
+    }
+
+    if (fakeRd3 != null)
+    {
+      fakeRd3.disableService();
+      fakeRd3 = null;
+    }
+
+    // Shutdown fake RSs
+
+    if (fakeRs1 != null)
+    {
+      fakeRs1.shutdown();
+      fakeRs1 = null;
+    }
+
+    if (fakeRs2 != null)
+    {
+      fakeRs2.shutdown();
+      fakeRs2 = null;
+    }
+    
+    if (fakeRs3 != null)
+    {
+      fakeRs3.shutdown();
+      fakeRs3 = null;
+    }
+
+    // Shutdown RSs
+
+    if (rs1 != null)
+    {
+      rs1.clearDb();
+      rs1.remove();
+      rs1 = null;
+    }
+
+    if (rs2 != null)
+    {
+      rs2.clearDb();
+      rs2.remove();
+      rs2 = null;
+    }
+    if (rs3 != null)
+    {
+      rs3.clearDb();
+      rs3.remove();
+      rs3 = null;
+    }
+  }
+
+  /**
+   * Creates and connects a new fake replication domain, using the passed scenario.
+   */
+  private FakeReplicationDomain createFakeReplicationDomain(short serverId,
+    int groupId, short rsId, long generationId, boolean assured,
+    AssuredMode assuredMode, int safeDataLevel, long assuredTimeout,
+    int scenario)
+  {
+    try
+    {
+      // Set port to right real RS according to its id
+      int rsPort = -1;
+      switch (rsId)
+      {
+        case RS1_ID:
+          rsPort = rs1Port;
+          break;
+        case RS2_ID:
+          rsPort = rs2Port;
+          break;
+        case RS3_ID:
+          rsPort = rs3Port;
+          break;
+        default:
+          fail("Unknown RS id: " + rsId);
+      }
+
+      FakeReplicationDomain fakeReplicationDomain = new FakeReplicationDomain(
+        TEST_ROOT_DN_STRING, serverId, "localhost:" + rsPort, generationId,
+        (byte)groupId, assured, assuredMode, (byte)safeDataLevel, assuredTimeout, scenario);
+
+      // Test connection
+      assertTrue(fakeReplicationDomain.isConnected());
+      int rdPort = -1;
+      // Check connected server port
+      String serverStr = fakeReplicationDomain.getReplicationServer();
+      int index = serverStr.lastIndexOf(':');
+      if ((index == -1) || (index >= serverStr.length()))
+        fail("Enable to find port number in: " + serverStr);
+      String rdPortStr = serverStr.substring(index + 1);
+      try
+      {
+        rdPort = (new Integer(rdPortStr)).intValue();
+      } catch (Exception e)
+      {
+        fail("Enable to get an int from: " + rdPortStr);
+      }
+      assertEquals(rdPort, rsPort);
+
+      return fakeReplicationDomain;
+    } catch (Exception e)
+    {
+      fail("createFakeReplicationDomain " + e.getMessage());
+    }
+    return null;
+  }
+
+  /**
+   * Creates and connects a new fake replication server, using the passed scenario.
+   */
+  private FakeReplicationServer createFakeReplicationServer(short serverId,
+    int groupId, short rsId, long generationId, boolean assured,
+    AssuredMode assuredMode, int safeDataLevel, int scenario)
+  {
+    try
+    {
+      // Set port to right real RS according to its id
+      int rsPort = -1;
+      switch (rsId)
+      {
+        case RS1_ID:
+          rsPort = rs1Port;
+          break;
+        case RS2_ID:
+          rsPort = rs2Port;
+          break;
+        case RS3_ID:
+          rsPort = rs3Port;
+          break;
+        default:
+          fail("Unknown RS id: " + rsId);
+      }
+
+      FakeReplicationServer fakeReplicationServer = new FakeReplicationServer(
+        rsPort, serverId, assured, assuredMode, (byte)safeDataLevel, (byte)groupId,
+        TEST_ROOT_DN_STRING, generationId);
+
+      // Connect fake RS to the real RS
+      assertTrue(fakeReplicationServer.connect());
+
+      // Start wished scenario
+      fakeReplicationServer.start(scenario);
+
+      return fakeReplicationServer;
+    } catch (Exception e)
+    {
+      fail("createFakeReplicationServer " + e.getMessage());
+    }
+    return null;
+  }
+
+  /**
+   * Creates a new real replication server (one which is to be tested).
+   */
+  private ReplicationServer createReplicationServer(short serverId,
+    int groupId, long assuredTimeout, String testCase)
+  {
+    SortedSet<String> replServers = new TreeSet<String>();
+    try
+    {
+      int port = -1;
+      if (serverId == RS1_ID)
+      {
+        port = rs1Port;
+        if (testCase.equals("testSafeDataManyRealRSs"))
+        {
+          // Every 3 RSs connected together
+          replServers.add("localhost:" + rs2Port);
+          replServers.add("localhost:" + rs3Port);
+        } else
+        {
+          // Let this server alone
+        }
+      } else if (serverId == RS2_ID)
+      {
+        port = rs2Port;
+        if (testCase.equals("testSafeDataManyRealRSs"))
+        {
+          // Every 3 RSs connected together
+          replServers.add("localhost:" + rs1Port);
+          replServers.add("localhost:" + rs3Port);
+        } else
+        {
+          // Let this server alone
+        }
+      } else if (serverId == RS3_ID)
+      {
+        port = rs3Port;
+        if (testCase.equals("testSafeDataManyRealRSs"))
+        {          
+          // Every 3 RSs connected together
+          replServers.add("localhost:" + rs1Port);
+          replServers.add("localhost:" + rs2Port);
+        } else
+        {
+          // Let this server alone
+        }
+      } else
+      {
+        fail("Unknown replication server id.");
+      }
+
+      String dir = testName + serverId + testCase + "Db";
+      ReplServerFakeConfiguration conf =
+        new ReplServerFakeConfiguration(port, dir, 0, serverId, 0, 100,
+        replServers, groupId, assuredTimeout, 5000);
+      ReplicationServer replicationServer = new ReplicationServer(conf);
+      return replicationServer;
+
+    } catch (Exception e)
+    {
+      fail("createReplicationServer " + e.getMessage());
+    }
+    return null;
+  }
+
+  /**
+   * Fake replication domain implementation to test the replication server
+   * regarding the assured feature.
+   * According to the configured scenario, it will answer to updates with acks
+   * as the scenario is requesting.
+   */
+  public class FakeReplicationDomain extends ReplicationDomain
+  {
+    // The scenario this DS is expecting
+
+    private int scenario = -1;
+    private long generationId = -1;
+    private ProtocolSession session = null;
+
+    private ChangeNumberGenerator gen = null;
+
+    // False if a received update had assured parameters not as expected
+    private boolean everyUpdatesAreOk = true;
+    // Number of received updates
+    private int nReceivedUpdates = 0;
+
+    /**
+     * Creates a fake replication domain (DS)
+     * @param serviceID The base dn used at connection to RS
+     * @param serverID our server id
+     * @param replicationServer the URS of the RS we will connect to
+     * @param generationId the generation id we use at connection to real RS
+     * @param groupId our group id
+     * @param assured do we expect incoming assured updates (also used for outgoing updates)
+     * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
+     * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
+     * @param assuredTimeout the assured timeout used when sending updates
+     * @param scenario the scenario we are creating for (implies particular
+     * behaviour upon reception of updates)
+     * @throws org.opends.server.config.ConfigException
+     */
+    public FakeReplicationDomain(
+      String serviceID,
+      short serverID,
+      String replicationServer,
+      long generationId,
+      byte groupId,
+      boolean assured,
+      AssuredMode assuredMode,
+      byte safeDataLevel,
+      long assuredTimeout,
+      int scenario) throws ConfigException
+    {
+      super(serviceID, serverID);
+      List<String> replicationServers = new ArrayList<String>();
+      replicationServers.add(replicationServer);
+      this.generationId = generationId;
+      setGroupId(groupId);
+      setAssured(assured);
+      setAssuredMode(assuredMode);
+      setAssuredSdLevel(safeDataLevel);
+      setAssuredTimeout(assuredTimeout);
+      this.scenario = scenario;
+
+      gen = new ChangeNumberGenerator(serverID, 0L);
+
+      startPublishService(replicationServers, 100, 1000);
+      startListenService();
+    }
+
+    public boolean receivedUpdatesOk()
+    {
+      return everyUpdatesAreOk;
+    }
+
+    public int nReceivedUpdates()
+    {
+      return nReceivedUpdates;
+    }
+
+    /**
+     * To get the session reference to be able to send our own acks
+     */
+    @Override
+    public void sessionInitiated(
+      ServerStatus initStatus,
+      ServerState replicationServerState,
+      long generationId,
+      ProtocolSession session)
+    {
+      super.sessionInitiated(initStatus, replicationServerState, generationId, session);
+      this.session = session;
+    }
+
+    @Override
+    public long countEntries() throws DirectoryException
+    {
+      // Not needed for this test
+      return -1;
+    }
+
+    @Override
+    protected void exportBackend(OutputStream output) throws DirectoryException
+    {
+      // Not needed for this test
+    }
+
+    @Override
+    public long getGenerationID()
+    {
+      return generationId;
+    }
+
+    @Override
+    protected void importBackend(InputStream input) throws DirectoryException
+    {
+      // Not needed for this test
+    }
+
+    @Override
+    public boolean processUpdate(UpdateMsg updateMsg)
+    {
+      try
+      {
+        checkUpdateAssuredParameters(updateMsg);
+        nReceivedUpdates++;
+
+        // Now execute the requested scenario
+        AckMsg ackMsg = null;
+        switch (scenario)
+        {
+          case REPLY_OK_DS_SCENARIO:
+            // Send the ack without errors
+            ackMsg = new AckMsg(updateMsg.getChangeNumber());
+            session.publish(ackMsg);
+            break;
+          case TIMEOUT_DS_SCENARIO:
+            // Let timeout occur
+            break;
+          case REPLAY_ERROR_DS_SCENARIO:
+            // Send the ack with replay error
+            ackMsg = new AckMsg(updateMsg.getChangeNumber());
+            ackMsg.setHasReplayError(true);
+            List<Short> failedServers = new ArrayList<Short>();
+            failedServers.add(getServerId());
+            ackMsg.setFailedServers(failedServers);
+            session.publish(ackMsg);
+            break;
+          default:
+            fail("Unknown scenario: " + scenario);
+        }
+        return true;
+      } catch (IOException ex)
+      {
+        fail("IOException in fake replication domain " + getServerId() + " :" +
+          ex.getMessage());
+        return false;
+      }
+    }
+
+    /**
+     * Check that received update assured parameters are as defined at DS start
+     */
+    private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
+    {
+      boolean ok = true;
+      if (updateMsg.isAssured() != isAssured())
+      {
+        debugInfo("Fake DS " + getServerId() + " received update assured flag is wrong: " + updateMsg);
+        ok = false;
+      }
+      if (updateMsg.getAssuredMode() !=  getAssuredMode())
+      {
+        debugInfo("Fake DS " + getServerId() + " received update assured mode is wrong: " + updateMsg);
+        ok = false;
+      }
+      if (updateMsg.getSafeDataLevel() != getAssuredSdLevel())
+      {
+        debugInfo("Fake DS " + getServerId() + " received update assured sd level is wrong: " + updateMsg);
+        ok = false;
+      }
+      
+      if (ok)
+        debugInfo("Fake DS " + getServerId() + " received update assured parameters are ok: " + updateMsg);
+      else
+        everyUpdatesAreOk = false;
+    }
+
+    /**
+     * Sends a new update from this DS
+     * @throws TimeoutException If timeout waiting for an assured ack
+     */
+    public void sendNewFakeUpdate() throws TimeoutException
+    {
+
+      // Create a new delete update message (the simplest to create)
+      DeleteMsg delMsg = new DeleteMsg(getServiceID(), gen.newChangeNumber(),
+        UUID.randomUUID().toString());
+
+      // Send it (this uses the defined assured conf at constructor time)
+      publish(delMsg);
+    }
+  }
+
+  /**
+   * The fake replication server used to emulate RS behaviour the way we want
+   * for assured features test.
+   * This fake replication server is able to receive another RS connection only.
+   * According to the configured scenario, it will answer to updates with acks
+   * as the scenario is requesting.
+   */
+  private static int fakePort = 0;
+
+  private class FakeReplicationServer extends Thread
+  {
+
+    private boolean shutdown = false;
+    private ProtocolSession session = null;
+
+    // Parameters given at constructor time
+    private int port;
+    private short serverId = -1;
+    boolean isAssured = false; // Default value for config
+    AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE; // Default value for config
+    byte safeDataLevel = (byte) 1; // Default value for config
+    private String baseDn = null;
+    private long generationId = -1L;
+    private byte groupId = (byte) -1;
+    private boolean sslEncryption = false;
+    // The scenario this RS is expecting
+    private int scenario = -1;
+
+    private ChangeNumberGenerator gen = null;
+
+    // False if a received update had assured parameters not as expected
+    private boolean everyUpdatesAreOk = true;
+    // Number of received updates
+    private int nReceivedUpdates = 0;
+
+    // True is an ack has been replied to a received assured update (in assured mode of course)
+    // used in reply scenario
+    private boolean ackReplied = false;
+
+    /**
+     * Creates a fake replication server
+     * @param port port of the real RS we will connect to
+     * @param serverId our server id
+     * @param assured do we expect incoming assured updates (also used for outgoing updates)
+     * @param assuredMode the expected assured mode of the incoming updates (also used for outgoing updates)
+     * @param safeDataLevel the expected safe data level of the incoming updates (also used for outgoing updates)
+     * @param groupId our group id
+     * @param baseDn the basedn we connect with, to the real RS
+     * @param generationId the generation id we use at connection to real RS
+     */
+    public FakeReplicationServer(int port, short serverId, boolean assured,
+      AssuredMode assuredMode, int safeDataLevel,
+      byte groupId, String baseDn, long generationId)
+    {
+      this.port = port;
+      this.serverId = serverId;
+      this.baseDn = baseDn;
+      this.generationId = generationId;
+      this.groupId = groupId;
+      this.isAssured = assured;
+      this.assuredMode = assuredMode;
+      this.safeDataLevel = (byte) safeDataLevel;
+
+      gen = new ChangeNumberGenerator((short)(serverId + 10), 0L);
+    }
+
+    /*
+     * Make the RS send an assured message and return the ack
+     * message it receives from the RS
+     */
+    public AckMsg sendNewFakeUpdate() throws SocketTimeoutException
+    {
+      try
+      {
+
+        // Create a new delete update message (the simplest to create)
+        DeleteMsg delMsg = new DeleteMsg(baseDn, gen.newChangeNumber(),
+        UUID.randomUUID().toString());
+
+        // Send del message in assured mode
+        delMsg.setAssured(isAssured);
+        delMsg.setAssuredMode(assuredMode);
+        delMsg.setSafeDataLevel(safeDataLevel);
+        session.publish(delMsg);
+
+        // Read and return matching ack
+        AckMsg ackMsg = null;
+        ReplicationMsg replMsg = session.receive();
+        if (replMsg instanceof ErrorMsg)
+        {
+          // Support for connection done with bad gen id : we receive an error
+          // message that we must throw away before reading our ack.
+          replMsg = session.receive();
+        }
+        ackMsg = (AckMsg)replMsg;
+
+        return ackMsg;
+
+      } catch(SocketTimeoutException e)
+      {
+        throw e;
+      } catch (Throwable t)
+      {
+        fail("Unexpected exception in fake replication server sendNewFakeUpdate " +
+          "processing: " + t);
+        return null;
+      }
+    }
+
+    /**
+     * Connect to RS
+     * Returns true if connection was made successfuly
+     */
+    public boolean connect()
+    {
+      try
+      {
+        // Create and connect socket
+        InetSocketAddress serverAddr =
+          new InetSocketAddress("localhost", port);
+        Socket socket = new Socket();
+        socket.setTcpNoDelay(true);
+        socket.connect(serverAddr, 500);
+
+        // Create client session
+        fakePort++;
+        String fakeUrl = "localhost:" + fakePort;
+        ReplSessionSecurity replSessionSecurity = new ReplSessionSecurity();
+        session = replSessionSecurity.createClientSession(fakeUrl, socket,
+          ReplSessionSecurity.HANDSHAKE_TIMEOUT);
+
+        // Send our repl server start msg
+        ReplServerStartMsg replServerStartMsg = new ReplServerStartMsg(serverId,
+          fakeUrl, baseDn, 100, new ServerState(),
+          ProtocolVersion.getCurrentVersion(), generationId, sslEncryption,
+          groupId, 5000);
+        session.publish(replServerStartMsg);
+
+        // Read repl server start msg
+        ReplServerStartMsg inReplServerStartMsg = (ReplServerStartMsg) session.
+          receive();
+
+        sslEncryption = inReplServerStartMsg.getSSLEncryption();
+        if (!sslEncryption)
+        {
+          session.stopEncryption();
+        }
+
+        // Send our topo mesg
+        RSInfo rsInfo = new RSInfo(serverId, generationId, groupId);
+        List<RSInfo> rsInfos = new ArrayList<RSInfo>();
+        rsInfos.add(rsInfo);
+        TopologyMsg topoMsg = new TopologyMsg(null, rsInfos);
+        session.publish(topoMsg);
+
+        // Read topo msg
+        TopologyMsg inTopoMsg = (TopologyMsg) session.receive();
+        debugInfo("Fake RS " + serverId + " handshake received the following info:" + inTopoMsg);
+
+      } catch (Throwable ex)
+      {
+        fail("Could not connect to replication server. Error in RS " + serverId +
+          " :" + ex.getMessage());
+        return false;
+      }
+      return true;
+    }
+
+    /**
+     * Starts the fake RS, expecting and testing the passed scenario.
+     */
+    public void start(int scenario)
+    {
+
+      // Store expected test case
+      this.scenario = scenario;
+
+      if (scenario == SENDER_RS_SCENARIO)
+      {
+        // Do not start the listening thread and let the main thread receive
+        // receive acks in sendNewFakeUpdate()
+        return;
+      }
+
+      // Start listening
+      start();
+    }
+
+    /**
+     * Wait for DS connections
+     */
+    @Override
+    public void run()
+    {
+      try
+      {
+        // Loop receiving and treating updates
+        while (!shutdown)
+        {
+          try
+          {
+            ReplicationMsg replicationMsg = session.receive();
+
+            if (!(replicationMsg instanceof UpdateMsg))
+            {
+              debugInfo("Fake RS " + serverId + " received non update message: " +
+                replicationMsg);
+              continue;
+            }
+
+            UpdateMsg updateMsg = (UpdateMsg) replicationMsg;
+            checkUpdateAssuredParameters(updateMsg);
+            nReceivedUpdates++;
+
+            // Now execute the requested scenario
+            switch (scenario)
+            {
+              case REPLY_OK_RS_SCENARIO:
+                if (updateMsg.isAssured())
+                {
+                  // Send the ack without errors
+                  AckMsg ackMsg = new AckMsg(updateMsg.getChangeNumber());
+                  session.publish(ackMsg);
+                  ackReplied = true;
+                }
+                break;
+              case TIMEOUT_RS_SCENARIO:
+                // Let timeout occur
+                break;
+              default:
+                fail("Unknown scenario: " + scenario);
+            }
+          } catch (SocketTimeoutException toe)
+          {
+            // We may timeout reading, in this case just re-read
+            debugInfo("Fake RS " + serverId + " : " + toe.
+              getMessage() + " (this is normal)");
+          }
+        }
+      } catch (Throwable th)
+      {
+        debugInfo("Terminating thread of fake RS " + serverId + " :" + th.
+          getMessage());
+      // Probably thread closure from main thread
+      }
+    }
+
+    /**
+     * Shutdown the Replication Server service and all its connections.
+     */
+    public void shutdown()
+    {
+      if (shutdown)
+      {
+        return;
+      }
+
+      shutdown = true;
+
+      /*
+       * Shutdown any current client handling code
+       */
+      try
+      {
+        if (session != null)
+        {
+          session.close();
+        }
+      } catch (IOException e)
+      {
+        // ignore.
+      }
+
+      try
+      {
+        join();
+      } catch (InterruptedException ie)
+      {
+      }
+    }
+
+    /**
+     * Check that received update assured parameters are as defined at DS start
+     */
+    private void checkUpdateAssuredParameters(UpdateMsg updateMsg)
+    {
+      boolean ok = true;
+      if (updateMsg.isAssured() != isAssured)
+      {
+        debugInfo("Fake RS " + serverId + " received update assured flag is wrong: " + updateMsg);
+        ok = false;
+      }
+      if (updateMsg.getAssuredMode() !=  assuredMode)
+      {
+        debugInfo("Fake RS " + serverId + " received update assured mode is wrong: " + updateMsg);
+        ok = false;
+      }
+      if (updateMsg.getSafeDataLevel() != safeDataLevel)
+      {
+        debugInfo("Fake RS " + serverId + " received update assured sd level is wrong: " + updateMsg);
+        ok = false;
+      }
+
+      if (ok)
+        debugInfo("Fake RS " + serverId + " received update assured parameters are ok: " + updateMsg);
+      else
+        everyUpdatesAreOk = false;
+    }
+
+    public boolean receivedUpdatesOk()
+    {
+      return everyUpdatesAreOk;
+    }
+    
+    public int nReceivedUpdates()
+    {
+      return nReceivedUpdates;
+    }
+
+    /**
+     * Test if the last received updates was acknowledged
+     * WARNING: this must be called once per update as it also immediatly resets the status
+     * for a new test for the next update
+     * @return True if acknowledged
+     */
+    public boolean ackReplied()
+    {
+      boolean result = ackReplied;
+      // reset ack replied status
+      ackReplied = false;
+      return result;
+    }
+  }
+
+  /**
+   * Sleep a while
+   */
+  private void sleep(long time)
+  {
+    try
+    {
+      Thread.sleep(time);
+    } catch (InterruptedException ex)
+    {
+      fail("Error sleeping " + ex);
+    }
+  }
+
+  /**
+   * See testSafeDataLevelOne comment.
+   * This is a facility to run the testSafeDataLevelOne in precommit in simplest
+   * case, so that precommit run test something and is not long.
+   * testSafeDataLevelOne will run in nightly tests (groups = "slow")
+   */
+  @Test(enabled = true)
+  public void testSafeDataLevelOnePrecommit() throws Exception
+  {
+    testSafeDataLevelOne(DEFAULT_GID, false, false, DEFAULT_GID, DEFAULT_GID);
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeDataLevelOne test
+   */
+  @DataProvider(name = "testSafeDataLevelOneProvider")
+  private Object[][] testSafeDataLevelOneProvider()
+  {
+    return new Object[][]
+    {
+    { DEFAULT_GID, false, false, DEFAULT_GID, DEFAULT_GID},
+    { DEFAULT_GID, false, false, OTHER_GID, DEFAULT_GID},
+    { DEFAULT_GID, false, false, DEFAULT_GID, OTHER_GID},
+    { DEFAULT_GID, false, false, OTHER_GID, OTHER_GID},
+    { DEFAULT_GID, true, false, DEFAULT_GID, DEFAULT_GID},
+    { DEFAULT_GID, true, false, OTHER_GID, DEFAULT_GID},
+    { DEFAULT_GID, true, false, DEFAULT_GID, OTHER_GID},
+    { DEFAULT_GID, true, false, OTHER_GID, OTHER_GID},
+    { DEFAULT_GID, false, true, DEFAULT_GID, DEFAULT_GID},
+    { DEFAULT_GID, false, true, OTHER_GID, DEFAULT_GID},
+    { DEFAULT_GID, false, true, DEFAULT_GID, OTHER_GID},
+    { DEFAULT_GID, false, true, OTHER_GID, OTHER_GID},
+    { DEFAULT_GID, true, true, DEFAULT_GID, DEFAULT_GID},
+    { DEFAULT_GID, true, true, OTHER_GID, DEFAULT_GID},
+    { DEFAULT_GID, true, true, DEFAULT_GID, OTHER_GID},
+    { DEFAULT_GID, true, true, OTHER_GID, OTHER_GID},
+    { OTHER_GID, false, false, DEFAULT_GID, DEFAULT_GID},
+    { OTHER_GID, false, false, OTHER_GID, DEFAULT_GID},
+    { OTHER_GID, false, false, DEFAULT_GID, OTHER_GID},
+    { OTHER_GID, false, false, OTHER_GID, OTHER_GID},
+    { OTHER_GID, true, false, DEFAULT_GID, DEFAULT_GID},
+    { OTHER_GID, true, false, OTHER_GID, DEFAULT_GID},
+    { OTHER_GID, true, false, DEFAULT_GID, OTHER_GID},
+    { OTHER_GID, true, false, OTHER_GID, OTHER_GID},
+    { OTHER_GID, false, true, DEFAULT_GID, DEFAULT_GID},
+    { OTHER_GID, false, true, OTHER_GID, DEFAULT_GID},
+    { OTHER_GID, false, true, DEFAULT_GID, OTHER_GID},
+    { OTHER_GID, false, true, OTHER_GID, OTHER_GID},
+    { OTHER_GID, true, true, DEFAULT_GID, DEFAULT_GID},
+    { OTHER_GID, true, true, OTHER_GID, DEFAULT_GID},
+    { OTHER_GID, true, true, DEFAULT_GID, OTHER_GID},
+    { OTHER_GID, true, true, OTHER_GID, OTHER_GID}
+    };
+  }
+
+  /**
+   * Test that the RS is able to acknowledge SD updates sent by SD, with level 1.
+   * - 1 main fake DS connected to 1 RS, with same GID as RS or not
+   * - 1 optional other fake DS connected to RS, with same GID as RS or not
+   * - 1 optional other fake RS connected to RS, with same GID as RS or not
+   * All possible combinations tested thanks to the provider
+   */
+  @Test(dataProvider = "testSafeDataLevelOneProvider", groups = "slow", enabled = true)
+  public void testSafeDataLevelOne(int mainDsGid, boolean otherFakeDS, boolean fakeRS, int otherFakeDsGid, int fakeRsGid) throws Exception
+  {
+    String testCase = "testSafeDataLevelOne";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    try
+    {
+      /*
+       * Start real RS (the one to be tested)
+       */
+
+      // Create real RS 1
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs1);
+
+      /*
+       * Start main DS (the one which sends updates)
+       */
+
+      // Create and connect fake domain 1 to RS 1
+      // Assured mode: SD, level 1
+      fakeRd1 = createFakeReplicationDomain(FDS1_ID, mainDsGid, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
+        TIMEOUT_DS_SCENARIO);
+      assertNotNull(fakeRd1);
+
+      /*
+       * Start one other fake DS
+       */
+
+      // Put another fake domain connected to real RS ?
+      if (otherFakeDS)
+      {
+        // Assured set to false as RS should forward change without assured requested
+        // Timeout scenario used so that no reply is made if however the real RS
+        // by mistake sends an assured error and expects an ack from this DS:
+        // this would timeout. If main DS group id is not the same as the real RS one,
+        // the update will even not come to real RS as asured
+        fakeRd2 = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
+          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, LONG_TIMEOUT,
+          TIMEOUT_DS_SCENARIO);
+        assertNotNull(fakeRd2);
+      }
+
+      /*
+       * Start 1 fake Rs
+       */
+
+      // Put a fake RS connected to real RS ?
+      if (fakeRS)
+      {
+        // Assured set to false as RS should forward change without assured requested
+        // Timeout scenario used so that no reply is made if however the real RS
+        // by mistake sends an assured error and expects an ack from this fake RS:
+        // this would timeout. If main DS group id is not the same as the real RS one,
+        // the update will even not come to real RS as asured
+        fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
+          DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT_RS_SCENARIO);
+        assertNotNull(fakeRs1);
+      }
+
+      // Send update from DS 1
+      long startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+
+      // Check call time (should have last a lot less than long timeout)
+      // (ack received if group id of DS and real RS are the same, no ack requested
+      // otherwise)
+      long sendUpdateTime = System.currentTimeMillis() - startTime;
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
+      if (mainDsGid == DEFAULT_GID)
+      {
+        // Check monitoring values (check that ack has been correctly received)
+        assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
+        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
+        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
+        assertEquals(fakeRd1.getAssuredSdServerTimeoutUpdates().size(), 0);
+      } else
+      {
+        // Check monitoring values (DS group id (OTHER_GID) is not the same as RS one
+        // (DEFAULT_GID) so update should have been sent in normal mode
+        assertEquals(fakeRd1.getAssuredSdSentUpdates(), 0);
+        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 0);
+        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
+        assertEquals(fakeRd1.getAssuredSdServerTimeoutUpdates().size(), 0);
+      }
+
+      // Sanity check
+      sleep(1000); // Let time to update to reach other servers
+      assertEquals(fakeRd1.nReceivedUpdates(), 0);
+      assertTrue(fakeRd1.receivedUpdatesOk());
+      if (otherFakeDS)
+      {
+        assertEquals(fakeRd2.nReceivedUpdates(), 1);
+        assertTrue(fakeRd2.receivedUpdatesOk());
+      }
+      if (fakeRS)
+      {
+        assertEquals(fakeRs1.nReceivedUpdates(), 1);
+        assertTrue(fakeRs1.receivedUpdatesOk());
+      }
+    } finally
+    {
+      endTest();
+    }
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeDataLevelHighPrecommit test
+   */
+  @DataProvider(name = "testSafeDataLevelHighPrecommitProvider")
+  private Object[][] testSafeDataLevelHighPrecommitProvider()
+  {
+    return new Object[][]
+    {
+      
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
+    };
+  }
+
+  /**
+   * See testSafeDataLevelHigh comment.
+   */
+  @Test(dataProvider = "testSafeDataLevelHighPrecommitProvider", enabled = true)
+  public void testSafeDataLevelHighPrecommit(int sdLevel, boolean otherFakeDS, int otherFakeDsGid, long otherFakeDsGenId,
+    int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+    int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) throws Exception
+  {
+    testSafeDataLevelHigh(sdLevel, otherFakeDS, otherFakeDsGid, otherFakeDsGenId,
+    fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen,
+    fakeRs3Gid, fakeRs3GenId, fakeRs3Scen);
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeDataLevelHighNightly test
+   */
+  @DataProvider(name = "testSafeDataLevelHighNightlyProvider")
+  private Object[][] testSafeDataLevelHighNightlyProvider()
+  {
+    return new Object[][]
+    {
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 2, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, OTHER_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, DEFAULT_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, OTHER_GID, OTHER_GENID, TIMEOUT_RS_SCENARIO, OTHER_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO},
+      { 3, true, DEFAULT_GID, DEFAULT_GENID, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, OTHER_GENID, REPLY_OK_RS_SCENARIO, DEFAULT_GID, DEFAULT_GENID, REPLY_OK_RS_SCENARIO}
+
+    };
+  }
+
+  /**
+   * See testSafeDataLevelHigh comment.
+   */
+  @Test(dataProvider = "testSafeDataLevelHighNightlyProvider", groups = "slow", enabled = true)
+  public void testSafeDataLevelHighNightly(int sdLevel, boolean otherFakeDS, int otherFakeDsGid, long otherFakeDsGenId,
+    int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+    int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) throws Exception
+  {
+    testSafeDataLevelHigh(sdLevel, otherFakeDS, otherFakeDsGid, otherFakeDsGenId,
+    fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen,
+    fakeRs3Gid, fakeRs3GenId, fakeRs3Scen);
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeDataLevelHigh test
+   */
+  @DataProvider(name = "testSafeDataLevelHighProvider")
+  private Object[][] testSafeDataLevelHighProvider()
+  {
+    // Constrcut all possible combinations of parameters
+    List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+
+    // Safe Data Level
+    objectArrayList = addPossibleParameters(objectArrayList, 2, 3);
+    // Other fake DS
+    objectArrayList = addPossibleParameters(objectArrayList, true, false);
+    // Other fake DS group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Other fake DS generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Fake RS 1 group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Fake RS 1 generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Fake RS 1 scenario
+    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
+    // Fake RS 2 group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Fake RS 2 generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Fake RS 2 scenario
+    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
+    // Fake RS 3 group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Fake RS 3 generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Fake RS 3 scenario
+    objectArrayList = addPossibleParameters(objectArrayList, REPLY_OK_RS_SCENARIO, TIMEOUT_RS_SCENARIO);
+
+    Object[][] result = new Object[objectArrayList.size()][];
+    int i = 0;
+    for (List<Object> objectArray : objectArrayList)
+    {
+      result[i] = objectArray.toArray();
+      i++;
+    }
+
+    debugInfo("testSafeDataLevelHighProvider: number of possible parameter combinations : " + i);
+
+    return result;
+  }
+
+  // Helper for providers:
+  // Modify the passed object array list adding to each already contained object array each
+  // passed possible values.
+  // Example: to create all possible parameter combinations for a test method which has 2 parameters:
+  // one boolean then an integer, with both 2 possible values: {true|false} and {10|100}:
+  //
+  // List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+  // // Possible bolean values
+  // objectArrayList = addPossibleParameters(objectArrayList, true, false);
+  // // Possible integer values
+  // objectArrayList = addPossibleParameters(objectArrayList, 10, 100);
+  // Object[][] result = new Object[objectArrayList.size()][];
+  //    int i = 0;
+  //    for (List<Object> objectArray : objectArrayList)
+  //    {
+  //      result[i] = objectArray.toArray();
+  //      i++;
+  //    }
+  // return result;
+  //
+  // The provider will return the equivalent following Object[][]:
+  // new Object[][]
+  // {
+  //   { true, 10},
+  //   { true, 100},
+  //   { false, 10},
+  //   { false, 100}
+  // };
+  private List<List<Object>> addPossibleParameters(List<List<Object>> objectArrayList, Object... possibleParameters)
+  {
+    List<List<Object>> newObjectArrayList = new ArrayList<List<Object>>();
+
+    if (objectArrayList.size() == 0)
+    {
+      // First time we add some parameters, create first object arrays      
+      // Add each possible parameter as initial parameter lists
+      for (Object possibleParameter : possibleParameters)
+      {
+        // Create new empty list
+        List<Object> newObjectArray = new ArrayList<Object>();
+        // Add the new possible parameter
+        newObjectArray.add(possibleParameter);
+        // Store the new object array in the result list
+        newObjectArrayList.add(newObjectArray);
+      }
+      return newObjectArrayList;
+    }
+
+    for (List<Object> objectArray : objectArrayList)
+    {
+      // Add each possible parameter to the already existing list
+      for (Object possibleParameter : possibleParameters)
+      {
+        // Clone the existing object array
+        List<Object> newObjectArray = new ArrayList<Object>();
+        for (Object object : objectArray)
+        {
+          newObjectArray.add(object);
+        }
+        // Add the new possible parameter
+        newObjectArray.add(possibleParameter);
+        // Store the new object array in the result list
+        newObjectArrayList.add(newObjectArray);
+      }
+    }
+
+    return newObjectArrayList;
+  }
+
+  /**
+   * Test that the RS is able to acknowledge SD updates with level higher than 1
+   * and also to return errors is some servers timeout.
+   * - 1 main fake DS connected to 1 RS
+   * - 1 optional other fake DS connected to RS, with same GID as RS or not and same GENID as RS or not
+   * - 3 optional other fake RSs connected to RS, with same GID as RS or not and same GENID as RS or not
+   * All possible combinations tested thanks to the provider.
+   * Fake RSs shutting down 1 after 1 to go from 3 available servers to 0. One update sent at each step.
+   *
+   * NOTE: the following unit test is disabled by default as its testSafeDataLevelHighProvider provider
+   * provides every possible combinations of parameters. This test runs then for hours. We keep this provider
+   * for occasional testing but we disable it.
+   * A simpler set of parameters is instead used in enabled test methods (which run this method in fact):
+   * - testSafeDataLevelHighPrecommit which is used for precommit and runs fast
+   * - testSafeDataLevelHighNightly which is used in nightly tests and takes more time to execute
+   */
+  @Test(dataProvider = "testSafeDataLevelHighProvider", enabled = false)
+  public void testSafeDataLevelHigh(int sdLevel, boolean otherFakeDS, int otherFakeDsGid, long otherFakeDsGenId,
+    int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen,
+    int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen) throws Exception
+  {
+    String testCase = "testSafeDataLevelHigh";
+
+    debugInfo("Starting " + testCase);
+    
+    assertTrue(sdLevel > 1);
+    int nWishedServers = sdLevel - 1; // Number of fake RSs we want an ack from
+
+    initTest();
+
+    try
+    {
+      /*
+       * Start real RS (the one to be tested)
+       */
+
+      // Create real RS 1
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs1);
+
+      /*
+       * Start main DS (the one which sends updates)
+       */
+
+      // Create and connect fake domain 1 to RS 1
+      fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
+        TIMEOUT_DS_SCENARIO);
+      assertNotNull(fakeRd1);
+
+      /*
+       * Start one other fake DS
+       */
+
+      // Put another fake domain connected to real RS ?
+      if (otherFakeDS)
+      {
+        fakeRd2 = createFakeReplicationDomain(FDS2_ID, otherFakeDsGid, RS1_ID,
+          otherFakeDsGenId, false, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
+          TIMEOUT_DS_SCENARIO);
+        assertNotNull(fakeRd2);
+      }
+
+      /*
+       * Start 3 fake Rss
+       */
+
+      // Put a fake RS 1 connected to real RS
+      fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRs1Gid, RS1_ID,
+        fakeRs1GenId, ((fakeRs1Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
+        fakeRs1Scen);
+      assertNotNull(fakeRs1);
+
+      // Put a fake RS 2 connected to real RS
+      fakeRs2 = createFakeReplicationServer(FRS2_ID, fakeRs2Gid, RS1_ID,
+        fakeRs2GenId, ((fakeRs2Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
+        fakeRs2Scen);
+      assertNotNull(fakeRs2);
+
+      // Put a fake RS 3 connected to real RS
+      fakeRs3 = createFakeReplicationServer(FRS3_ID, fakeRs3Gid, RS1_ID,
+        fakeRs3GenId, ((fakeRs3Gid == DEFAULT_GID) ? true : false), AssuredMode.SAFE_DATA_MODE, sdLevel,
+        fakeRs3Scen);
+      assertNotNull(fakeRs3);
+      
+      // Wait for connections to be finished
+      // DS must see expected numbers of fake DSs and RSs
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 3);
+
+      /***********************************************************************
+       * Send update from DS 1 (3 fake RSs available) and check what happened
+       ***********************************************************************/
+
+      // Keep track of monitoring values for incremental test step
+      int acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+      int timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+      Map<Short,Integer> serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+      // Compute the list of servers that are elligible for receiving an assured update
+      List<Short> elligibleServers = computeElligibleServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs2Gid, fakeRs2GenId, fakeRs3Gid, fakeRs3GenId);
+      // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+      List<Short> expectedServers = computeExpectedServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen, fakeRs3Gid, fakeRs3GenId, fakeRs3Scen);
+
+      // Send update
+      long startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      long sendUpdateTime = System.currentTimeMillis() - startTime;
+      
+      // Check
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      checkTimeAndMonitoringSafeData(1, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+      checkWhatHasBeenReceived(1, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, fakeRs3GenId, expectedServers);
+
+      /***********************************************************************
+       * Send update from DS 1 (2 fake RSs available) and check what happened
+       ***********************************************************************/
+
+      // Shutdown fake RS 3
+      fakeRs3.shutdown();
+      fakeRs3 = null;
+
+      // Wait for disconnection to be finished
+      // DS must see expected numbers of fake DSs and RSs
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 2);
+
+      // Keep track of monitoring values for incremental test step
+      acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+      timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+      serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+      // Compute the list of servers that are elligible for receiving an assured update
+      elligibleServers = computeElligibleServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs2Gid, fakeRs2GenId, -1, -1L);
+      // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+      expectedServers = computeExpectedServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, fakeRs2Gid, fakeRs2GenId, fakeRs2Scen, -1, -1L, -1);
+
+      // Send update
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      checkTimeAndMonitoringSafeData(2, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+      checkWhatHasBeenReceived(2, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, fakeRs2GenId, -1L, expectedServers);
+
+      /***********************************************************************
+       * Send update from DS 1 (1 fake RS available) and check what happened
+       ***********************************************************************/
+
+      // Shutdown fake RS 2
+      fakeRs2.shutdown();
+      fakeRs2 = null;
+
+      // Wait for disconnection to be finished
+      // DS must see expected numbers of fake DSs and RSs
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 1);
+
+      // Keep track of monitoring values for incremental test step
+      acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+      timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+      serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+      // Compute the list of servers that are elligible for receiving an assured update
+      elligibleServers = computeElligibleServersSafeData(fakeRs1Gid, fakeRs1GenId, -1, -1L, -1, -1L);
+      // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+      expectedServers = computeExpectedServersSafeData(fakeRs1Gid, fakeRs1GenId, fakeRs1Scen, -1, -1L, -1, -1, -1L, -1);
+
+      // Send update
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;
+      
+      // Check
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      checkTimeAndMonitoringSafeData(3, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+      checkWhatHasBeenReceived(3, otherFakeDS, otherFakeDsGenId, fakeRs1GenId, -1L, -1L, expectedServers);
+
+      /***********************************************************************
+       * Send update from DS 1 (no fake RS available) and check what happened
+       ***********************************************************************/
+
+      // Shutdown fake RS 1
+      fakeRs1.shutdown();
+      fakeRs1 = null;
+
+      // Wait for disconnection to be finished
+      // DS must see expected numbers of fake DSs and RSs
+      waitForStableTopo(fakeRd1, (otherFakeDS ? 1 : 0), 0);
+
+      // Keep track of monitoring values for incremental test step
+      acknowledgedUpdates = fakeRd1.getAssuredSdAcknowledgedUpdates();
+      timeoutUpdates = fakeRd1.getAssuredSdTimeoutUpdates();
+      serverErrors = fakeRd1.getAssuredSdServerTimeoutUpdates();
+      // Compute the list of servers that are elligible for receiving an assured update
+      elligibleServers = computeElligibleServersSafeData(-1, -1L, -1, -1L, -1, -1L);
+      // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+      expectedServers = computeExpectedServersSafeData(-1, -1L, -1, -1, -1L, -1, -1, -1L, -1);
+
+      // Send update
+      startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      sendUpdateTime = System.currentTimeMillis() - startTime;      
+            
+      // Check
+      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked and let time the update to reach other servers
+      checkTimeAndMonitoringSafeData(4, acknowledgedUpdates, timeoutUpdates, serverErrors, sendUpdateTime, nWishedServers, elligibleServers, expectedServers);
+      checkWhatHasBeenReceived(4, otherFakeDS, otherFakeDsGenId, -1L, -1L, -1L, expectedServers);
+    } finally
+    {
+      endTest();
+    }
+  }
+
+  // Check that the DSs and the fake RSs of the topology have received/acked what is expected according to the
+  // test step (the number of updates)
+  // -1 for a gen id means no need to test the matching fake RS
+  private void checkWhatHasBeenReceived(int nSentUpdates, boolean otherFakeDS, long otherFakeDsGenId, long fakeRs1GenId, long fakeRs2GenId, long fakeRs3GenId, List<Short> expectedServers)
+  {
+
+    // We should not receive our own update
+    assertEquals(fakeRd1.nReceivedUpdates(), 0);
+    assertTrue(fakeRd1.receivedUpdatesOk());
+
+    // Check what received other fake DS
+    if (otherFakeDS)
+    {
+      if (otherFakeDsGenId == DEFAULT_GENID)
+      {
+        // Update should have been received
+        assertEquals(fakeRd2.nReceivedUpdates(), nSentUpdates);
+        assertTrue(fakeRd2.receivedUpdatesOk());
+      } else
+      {
+        assertEquals(fakeRd2.nReceivedUpdates(), 0);
+        assertTrue(fakeRd2.receivedUpdatesOk());
+      }
+    }
+
+    // Check what received/did fake Rss
+
+    if (nSentUpdates < 4)  // Fake RS 3 is stopped after 3 updates sent
+    {
+      if (fakeRs1GenId != DEFAULT_GENID)
+        assertEquals(fakeRs1.nReceivedUpdates(), 0);
+      else
+        assertEquals(fakeRs1.nReceivedUpdates(), nSentUpdates);
+      assertTrue(fakeRs1.receivedUpdatesOk());
+      if (expectedServers.contains(FRS1_ID))
+        assertTrue(fakeRs1.ackReplied());
+      else
+        assertFalse(fakeRs1.ackReplied());
+    }
+
+    if (nSentUpdates < 3)  // Fake RS 3 is stopped after 2 updates sent
+    {
+      if (fakeRs2GenId != DEFAULT_GENID)
+        assertEquals(fakeRs2.nReceivedUpdates(), 0);
+      else
+        assertEquals(fakeRs2.nReceivedUpdates(), nSentUpdates);
+      assertTrue(fakeRs2.receivedUpdatesOk());
+      if (expectedServers.contains(FRS2_ID))
+        assertTrue(fakeRs2.ackReplied());
+      else
+        assertFalse(fakeRs2.ackReplied());
+    }
+
+    if (nSentUpdates < 2) // Fake RS 3 is stopped after 1 update sent
+    {
+      if (fakeRs3GenId != DEFAULT_GENID)
+        assertEquals(fakeRs3.nReceivedUpdates(), 0);
+      else
+        assertEquals(fakeRs3.nReceivedUpdates(), nSentUpdates);
+      assertTrue(fakeRs3.receivedUpdatesOk());
+      if (expectedServers.contains(FRS3_ID))
+        assertTrue(fakeRs3.ackReplied());
+      else
+        assertFalse(fakeRs3.ackReplied());
+    }
+  }
+
+  /**
+   * Check the time the sending of the safe data assured update took and the monitoring
+   * values according to the test configuration
+   */
+  private void checkTimeAndMonitoringSafeData(int nSentUpdates, int prevNAckUpdates, int prevNTimeoutUpdates, Map<Short,Integer> prevNServerErrors, long sendUpdateTime,
+    int nWishedServers, List<Short> elligibleServers, List<Short> expectedServers)
+  {
+    assertEquals(fakeRd1.getAssuredSdSentUpdates(), nSentUpdates);
+    if (elligibleServers.size() >= nWishedServers) // Enough elligible servers
+    {
+      if (expectedServers.size() >= nWishedServers) // Enough servers should ack
+      {
+        // Enough server ok for acking: ack should come back quickly
+        assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+        // Check monitoring values (check that ack has been correctly received)
+        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
+        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
+        checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
+      } else
+      {
+        // Not enough expected servers: should have timed out in RS timeout
+        // (SMALL_TIMEOUT)
+        assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
+          LONG_TIMEOUT));
+        // Check monitoring values (check that timeout occured)
+        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
+        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
+        // Check that the servers that are elligible but not expected have been added in the error by server list
+        List<Short> expectedServersInError = computeExpectedServersInError(elligibleServers, expectedServers);
+        checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
+      }
+    } else // Not enough elligible servers
+    {
+      if (elligibleServers.size() > 0) // Some elligible servers anyway
+      {
+        if (expectedServers.size() == elligibleServers.size()) // All elligible servers should respond in time
+        {
+          // Enough server ok for acking: ack should come back quickly
+          assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+          // Check monitoring values (check that ack has been correctly received)
+          assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
+          assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
+          checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
+        } else
+        { // Some elligible servers should fail
+          // Not enough expected servers: should have timed out in RS timeout
+          // (SMALL_TIMEOUT)
+          assertTrue((SMALL_TIMEOUT <= sendUpdateTime) && (sendUpdateTime <=
+            LONG_TIMEOUT));
+          // Check monitoring values (check that timeout occured)
+          assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
+          assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
+          // Check that the servers that are elligible but not expected have been added in the error by server list
+          List<Short> expectedServersInError = computeExpectedServersInError(elligibleServers, expectedServers);
+          checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
+        }
+      } else
+      {
+        // No elligible servers at all, RS should not wait for any ack and immediately ack the update
+        assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+        // Check monitoring values (check that ack has been correctly received)
+        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
+        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
+        checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
+      }
+    }
+  }
+
+  // Compute a list of servers that are elligibles but that are not able to return an ack
+  // (those in elligibleServers that are not in expectedServers). Result may of course be an empty list
+  private List<Short> computeExpectedServersInError(List<Short> elligibleServers, List<Short> expectedServers)
+  {
+    List<Short> expectedServersInError = new ArrayList<Short>();
+    for (Short serverId : elligibleServers)
+    {
+      if (!expectedServers.contains(serverId))
+        expectedServersInError.add(serverId);
+    }
+    return expectedServersInError;
+  }
+
+  // Check that the passed list of errors by server ids is as expected.
+  // - if expectedServersInError is not null and not empty, each server id in measuredServerErrors should have the value it has
+  // in prevServerErrors + 1, or 1 if it was not in prevServerErrors
+  // - if expectedServersInError is null or empty, both map should be equal
+  private void checkServerErrors(Map<Short,Integer> measuredServerErrors, Map<Short,Integer> prevServerErrors, List<Short> expectedServersInError)
+  {
+    if (expectedServersInError != null)
+    {
+      // Adding an error to each server in expectedServersInError, with prevServerErrors as basis, should give the
+      // same map as measuredServerErrors
+      for (Short serverId : expectedServersInError)
+      {
+        Integer prevInt = prevServerErrors.get(serverId);
+        if (prevInt == null)
+        {
+          // Add this server to the list of servers in error
+          prevServerErrors.put(serverId, 1);
+        } else
+        {
+          // Already errors for this server, increment the value
+          int newVal = prevInt.intValue() + 1;
+          prevServerErrors.put(serverId, newVal);
+        }
+      }
+    }
+
+    // Maps should be the same
+    assertEquals(measuredServerErrors.size(), prevServerErrors.size());
+    Set<Short> measuredKeySet = measuredServerErrors.keySet();
+    for (Short serverId : measuredKeySet)
+    {
+      Integer measuredInt = measuredServerErrors.get(serverId);
+      assertNotNull(measuredInt);
+      assertTrue(measuredInt.intValue() != 0);
+      Integer prevInt = prevServerErrors.get(serverId);
+      assertNotNull(prevInt);
+      assertTrue(prevInt.intValue() != 0);
+      assertEquals(measuredInt, prevInt);
+    }
+  }
+
+  /**
+   * Wait until number of fake DSs and fake RSs are available in the topo view of the passed
+   * fake DS or throw an assertion if timeout waiting.
+   */
+  private void waitForStableTopo(FakeReplicationDomain fakeRd, int expectedDs, int expectedRs)
+  {
+    int nSec = 30;
+    int nDs = 0;
+    int nRs = 0;
+    List<DSInfo> dsInfo = null;
+    List<RSInfo> rsInfo = null;
+    while(nSec > 0)
+    {
+      dsInfo = fakeRd.getDsList();
+      rsInfo = fakeRd.getRsList();
+      nDs = dsInfo.size();
+      nRs = rsInfo.size();
+      if ( (nDs == expectedDs) && (nRs == (expectedRs+1)) ) // Must include real RS so '+1'
+      {
+        debugInfo("waitForStableTopo: expected topo obtained after " + (30-nSec) + " second(s).");
+        return;
+      }
+      sleep(1000);
+      nSec--;
+    }
+    fail("Did not reach expected topo view in time: expected " + expectedDs +
+      " DSs (had " + dsInfo +") and " + expectedRs + " RSs (had " + rsInfo +").");
+  }
+
+  // Compute the list of servers that are elligible for receiving an assured update
+  // according to their group id and generation id. If -1 is used, the server is out of scope
+  private List<Short> computeElligibleServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs2Gid, long fakeRs2GenId, int fakeRs3Gid, long fakeRs3GenId)
+  {
+    List<Short> elligibleServers = new ArrayList<Short>();
+    if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
+    {
+      elligibleServers.add(FRS1_ID);
+    }
+    if (areGroupAndGenerationIdOk(fakeRs2Gid, fakeRs2GenId))
+    {
+      elligibleServers.add(FRS2_ID);
+    }
+    if (areGroupAndGenerationIdOk(fakeRs3Gid, fakeRs3GenId))
+    {
+      elligibleServers.add(FRS3_ID);
+    }
+    return elligibleServers;
+  }
+
+  // Are group id and generation id ok for being an elligible RS for assured update ?
+  private boolean areGroupAndGenerationIdOk(int fakeRsGid, long fakeRsGenId)
+  {
+    if ((fakeRsGid != -1) && (fakeRsGenId != -1L))
+    {
+      return ( (fakeRsGid == DEFAULT_GID) && (fakeRsGenId == DEFAULT_GENID) );
+    }
+    return false;
+  }
+
+  // Compute the list of servers that are elligible for receiving an assured update and that are expected to effectively ack the update
+  // If -1 is used, the server is out of scope
+  private List<Short> computeExpectedServersSafeData(int fakeRs1Gid, long fakeRs1GenId, int fakeRs1Scen, int fakeRs2Gid, long fakeRs2GenId, int fakeRs2Scen, int fakeRs3Gid, long fakeRs3GenId, int fakeRs3Scen)
+  {
+    List<Short> exptectedServers = new ArrayList<Short>();
+    if (areGroupAndGenerationIdOk(fakeRs1Gid, fakeRs1GenId))
+    {
+      if (fakeRs1Scen == REPLY_OK_RS_SCENARIO)
+      {
+        exptectedServers.add(FRS1_ID);
+      } else if (fakeRs1Scen != TIMEOUT_RS_SCENARIO)
+      {
+        fail("No other scenario should be used here");
+        return null;
+      }
+    }
+    if (areGroupAndGenerationIdOk(fakeRs2Gid, fakeRs2GenId))
+    {
+      if (fakeRs2Scen == REPLY_OK_RS_SCENARIO)
+      {
+        exptectedServers.add(FRS2_ID);
+      } else if (fakeRs2Scen != TIMEOUT_RS_SCENARIO)
+      {
+        fail("No other scenario should be used here");
+        return null;
+      }
+    }
+    if (areGroupAndGenerationIdOk(fakeRs3Gid, fakeRs3GenId))
+    {
+      if (fakeRs3Scen == REPLY_OK_RS_SCENARIO)
+      {
+        exptectedServers.add(FRS3_ID);
+      } else if (fakeRs3Scen != TIMEOUT_RS_SCENARIO)
+      {
+        fail("No other scenario should be used here");
+        return null;
+      }
+    }
+    return exptectedServers;
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeDataFromRS test
+   */
+  @DataProvider(name = "testSafeDataFromRSProvider")
+  private Object[][] testSafeDataFromRSProvider()
+  {
+    List<List<Object>> objectArrayList = new ArrayList<List<Object>>();
+
+    // Safe Data Level
+    objectArrayList = addPossibleParameters(objectArrayList, 1, 2, 3);
+    // Fake RS group id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GID, OTHER_GID);
+    // Fake RS generation id
+    objectArrayList = addPossibleParameters(objectArrayList, DEFAULT_GENID, OTHER_GENID);
+    // Fake RS sends update in assured mode
+    objectArrayList = addPossibleParameters(objectArrayList, true, false);
+
+    Object[][] result = new Object[objectArrayList.size()][];
+    int i = 0;
+    for (List<Object> objectArray : objectArrayList)
+    {
+      result[i] = objectArray.toArray();
+      i++;
+    }
+    return result;
+  }
+
+  /**
+   * Test that the RS is acking or not acking a safe data update sent from another
+   * (fake) RS according to passed parameters
+   */
+  @Test(dataProvider = "testSafeDataFromRSProvider", groups = "slow", enabled = true)
+  public void testSafeDataFromRS(int sdLevel, int fakeRsGid, long fakeRsGenId, boolean sendInAssured) throws Exception
+  {
+    String testCase = "testSafeDataFromRS";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    try
+    {
+      /*
+       * Start real RS (the one to be tested)
+       */
+
+      // Create real RS 1
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs1);
+
+      /*
+       * Start fake RS to make the RS have the default generation id
+       */
+
+      // Put a fake RS 2 connected to real RS
+      fakeRs2 = createFakeReplicationServer(FRS2_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, false, AssuredMode.SAFE_DATA_MODE, 10,
+        TIMEOUT_RS_SCENARIO);
+      assertNotNull(fakeRs2);
+
+      /*
+       * Start fake RS to send updates
+       */
+
+      // Put a fake RS 1 connected to real RS
+      fakeRs1 = createFakeReplicationServer(FRS1_ID, fakeRsGid, RS1_ID,
+        fakeRsGenId, sendInAssured, AssuredMode.SAFE_DATA_MODE, sdLevel,
+        SENDER_RS_SCENARIO);
+      assertNotNull(fakeRs1);
+
+      /*
+       * Send an assured update using configured assured parameters
+       */
+      
+      long startTime = System.currentTimeMillis();
+      AckMsg ackMsg = null;
+      boolean timeout = false;
+      try
+      {
+        ackMsg = fakeRs1.sendNewFakeUpdate();
+      } catch (SocketTimeoutException e)
+      {
+        debugInfo("testSafeDataFromRS: timeout waiting for update ack");
+        timeout = true;
+      }
+      long sendUpdateTime = System.currentTimeMillis() - startTime;
+      debugInfo("testSafeDataFromRS: send update call time: " + sendUpdateTime);
+
+      /*
+       * Now check timeout or not according to test configuration parameters
+       */
+      if ( (sdLevel == 1) || (fakeRsGid != DEFAULT_GID) ||
+        (fakeRsGenId != DEFAULT_GENID) || (!sendInAssured) )
+      {
+        // Should have timed out (no ack)
+        assertTrue(timeout);
+        assertNull(ackMsg);
+      } else
+      {
+        // Ack should have been received
+        assertFalse(timeout);
+        assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+        assertNotNull(ackMsg);
+        assertFalse(ackMsg.hasTimeout());
+        assertFalse(ackMsg.hasReplayError());
+        assertFalse(ackMsg.hasWrongStatus());
+        assertEquals(ackMsg.getFailedServers().size(), 0);
+      }
+      
+   } finally
+    {
+      endTest();
+    }
+  }
+
+  /**
+   * Returns possible combinations of parameters for testSafeDataManyRealRSs test
+   */
+  @DataProvider(name = "testSafeDataManyRealRSsProvider")
+  private Object[][] testSafeDataManyRealRSsProvider()
+  {
+    return new Object[][]
+    {
+      {1},
+      {2},
+      {3},
+      {4}
+    };
+  }
+
+  /**
+   * Test topo of 3 real RSs.
+   * One assured safe data update sent with different safe data level.
+   * Update should always be acked
+   */
+  @Test(dataProvider = "testSafeDataManyRealRSsProvider", enabled = true)
+  public void testSafeDataManyRealRSs(int sdLevel) throws Exception
+  {
+    String testCase = "testSafeDataManyRealRSs";
+
+    debugInfo("Starting " + testCase);
+
+    initTest();
+
+    try
+    {
+
+      /*
+       * Start 3 real RSs
+       */
+
+      // Create real RS 1
+      rs1 = createReplicationServer(RS1_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs1);
+
+      // Create real RS 2
+      rs2 = createReplicationServer(RS2_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs2);
+
+      // Create real RS 3
+      rs3 = createReplicationServer(RS3_ID, DEFAULT_GID, SMALL_TIMEOUT,
+        testCase);
+      assertNotNull(rs3);
+      
+      /*
+       * Start DS that will send updates
+       */
+
+      // Wait for RSs to connect together
+      // Create and connect fake domain 1 to RS 1
+      fakeRd1 = createFakeReplicationDomain(FDS1_ID, DEFAULT_GID, RS1_ID,
+        DEFAULT_GENID, true, AssuredMode.SAFE_DATA_MODE, sdLevel, LONG_TIMEOUT,
+        TIMEOUT_DS_SCENARIO);
+      assertNotNull(fakeRd1);
+
+      // Wait for RSs connections to be finished
+      // DS must see expected numbers of RSs
+      waitForStableTopo(fakeRd1, 0, 2);
+
+      /*
+       * Send update from DS 1 and check result
+       */
+
+      long startTime = System.currentTimeMillis();
+      try
+      {
+        fakeRd1.sendNewFakeUpdate();
+      } catch (TimeoutException e)
+      {
+        fail("No timeout is expected here");
+      }
+      long sendUpdateTime = System.currentTimeMillis() - startTime;
+
+      // Check call time
+      assertTrue(sendUpdateTime < MAX_SEND_UPDATE_TIME);
+
+      // Check monitoring values (check that ack has been correctly received)
+      assertEquals(fakeRd1.getAssuredSdSentUpdates(), 1);
+      assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), 1);
+      assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), 0);
+      assertEquals(fakeRd1.getAssuredSdServerTimeoutUpdates().size(), 0);
+    } finally
+    {
+      endTest();
+    }
+  }
+}
+

--
Gitblit v1.10.0