mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.41.2013 dbf51252e03a227a87e866a37847551d8f6e8007
AssuredReplicationPluginTest.java:
Factorized common code in tests:
- Added inner class MonitorAssertions with fluent interface to help assert the monitor attribute values.
- Extracted methods assertNoServerErrors(), assertServerErrorsSafeDataMode(), assertServerErrorsSafeReadMode().
1 files modified
424 ■■■■■ changed files
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 424 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -34,6 +34,8 @@
import java.net.SocketTimeoutException;
import java.util.*;
import org.assertj.core.api.Assertions;
import org.assertj.core.data.MapEntry;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -53,6 +55,7 @@
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.assertj.core.data.MapEntry.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -68,6 +71,41 @@
public class AssuredReplicationPluginTest extends ReplicationTestCase
{
  public class MonitorAssertions
  {
    private Map<String, Long> attributeValues = new HashMap<String, Long>();
    public MonitorAssertions(DN baseDN) throws Exception
    {
      List<String> attributes = Arrays.asList(
          "assured-sr-sent-updates",               "assured-sr-acknowledged-updates",
          "assured-sr-not-acknowledged-updates",   "assured-sr-timeout-updates",
          "assured-sr-wrong-status-updates",       "assured-sr-replay-error-updates",
          "assured-sr-received-updates",           "assured-sr-received-updates-acked",
          "assured-sr-received-updates-not-acked", "assured-sd-sent-updates",
          "assured-sd-acknowledged-updates",       "assured-sd-timeout-updates");
      for (String attribute : attributes)
      {
        attributeValues.put(attribute, getMonitorAttrValue(baseDN, attribute));
      }
    }
    public MonitorAssertions assertValue(String attribute, long expected)
    {
      assertEquals(attributeValues.remove(attribute), (Long) expected);
      return this;
    }
    public void assertRemainingValuesAreZero()
    {
      for (String attribute : attributeValues.keySet())
      {
        assertValue(attribute, 0L);
      }
    }
  }
  /** The port of the replicationServer. */
  private int replServerPort;
  private final int RS_SERVER_ID = 90;
@@ -896,27 +934,12 @@
        // Check monitoring values
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDn = DN.decode(SAFE_DATA_DN);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
        //  errors by server list for sd mode should be [[rsId:1]]
        assertEquals(errorsByServer.size(), 1);
        Integer nError = errorsByServer.get(RS_SERVER_ID);
        assertNotNull(nError);
        assertEquals(nError.intValue(), 1);
        DN baseDN = DN.decode(SAFE_DATA_DN);
        new MonitorAssertions(baseDN)
          .assertValue("assured-sd-sent-updates", 1)
          .assertValue("assured-sd-timeout-updates", 1)
          .assertRemainingValuesAreZero();
        assertServerErrorsSafeDataMode(baseDN, entry(RS_SERVER_ID, 1));
      } else
      {
        // RS has a different group id, addEntry should have returned quickly
@@ -925,23 +948,9 @@
        // No error should be seen in monitoring and update should have not been
        // sent in assured mode
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDn = DN.decode(NOT_ASSURED_DN);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
        assertTrue(errorsByServer.isEmpty());
        DN baseDN = DN.decode(NOT_ASSURED_DN);
        new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
        assertNoServerErrors(baseDN);
      }
    } finally
    {
@@ -949,6 +958,30 @@
    }
  }
  private void assertNoServerErrors(DN baseDN) throws Exception
  {
    assertTrue(getErrorsByServers(baseDN, AssuredMode.SAFE_READ_MODE).isEmpty());
    assertTrue(getErrorsByServers(baseDN, AssuredMode.SAFE_DATA_MODE).isEmpty());
  }
  private void assertServerErrorsSafeDataMode(DN baseDN, MapEntry... entries) throws Exception
  {
    assertTrue(getErrorsByServers(baseDN, AssuredMode.SAFE_READ_MODE).isEmpty());
    Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDN, AssuredMode.SAFE_DATA_MODE);
    Assertions.assertThat(errorsByServer).hasSize(entries.length);
    Assertions.assertThat(errorsByServer).contains(entries);
  }
  private void assertServerErrorsSafeReadMode(DN baseDN, MapEntry... entries) throws Exception
  {
    assertTrue(getErrorsByServers(baseDN, AssuredMode.SAFE_DATA_MODE).isEmpty());
    Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDN, AssuredMode.SAFE_READ_MODE);
    Assertions.assertThat(errorsByServer).hasSize(entries.length);
    Assertions.assertThat(errorsByServer).contains(entries);
  }
  /**
   * Tests that a DS performing a modification in safe read mode waits for
   * the ack of the RS for the configured timeout time, then times out.
@@ -1017,27 +1050,13 @@
        // Check monitoring values
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDn = DN.decode(SAFE_READ_DN);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        //  errors by server list for sr mode should be [[rsId:1]]
        assertEquals(errorsByServer.size(), 1);
        Integer nError = errorsByServer.get(RS_SERVER_ID);
        assertNotNull(nError);
        assertEquals(nError.intValue(), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
        assertTrue(errorsByServer.isEmpty());
        DN baseDN = DN.decode(SAFE_READ_DN);
        new MonitorAssertions(baseDN)
          .assertValue("assured-sr-sent-updates", 1)
          .assertValue("assured-sr-not-acknowledged-updates", 1)
          .assertValue("assured-sr-timeout-updates", 1)
          .assertRemainingValuesAreZero();
        assertServerErrorsSafeReadMode(baseDN, entry(RS_SERVER_ID, 1));
      } else
      {
        // RS has a different group id, addEntry should have returned quickly
@@ -1046,23 +1065,9 @@
        // No error should be seen in monitoring and update should have not been
        // sent in assured mode
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDn = DN.decode(NOT_ASSURED_DN);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        DN baseDN = DN.decode(NOT_ASSURED_DN);
        new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
        assertNoServerErrors(baseDN);
      }
    } finally
    {
@@ -1191,24 +1196,12 @@
      // Check monitoring values
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDn = DN.decode(SAFE_DATA_DN);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      assertTrue(errorsByServer.isEmpty());
      DN baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 1)
        .assertValue("assured-sd-acknowledged-updates", 1)
        .assertRemainingValuesAreZero();
      assertNoServerErrors(baseDN);
    } finally
    {
      endTest(testcase);
@@ -1255,24 +1248,12 @@
      // Check monitoring values
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDn = DN.decode(SAFE_READ_DN);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      assertTrue(errorsByServer.isEmpty());
      DN baseDN = DN.decode(SAFE_READ_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 1)
        .assertValue("assured-sr-acknowledged-updates", 1)
        .assertRemainingValuesAreZero();
      assertNoServerErrors(baseDN);
    } finally
    {
      endTest(testcase);
@@ -1329,23 +1310,12 @@
        assertEquals(ackMsg.getFailedServers().size(), 0);
        // Check for monitoring data
        DN baseDn = DN.decode(SAFE_READ_DN);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
        Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
        assertTrue(errorsByServer.isEmpty());
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 1);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
        assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
        errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
        assertTrue(errorsByServer.isEmpty());
        DN baseDN = DN.decode(SAFE_READ_DN);
        new MonitorAssertions(baseDN)
          .assertValue("assured-sr-received-updates", 1)
          .assertValue("assured-sr-received-updates-acked", 1)
          .assertRemainingValuesAreZero();
        assertNoServerErrors(baseDN);
      } catch (SocketTimeoutException e)
      {
        // Expected
@@ -1471,27 +1441,12 @@
      // - timeout error
      // - server 10 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDn = DN.decode(SAFE_DATA_DN);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 1);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      //  errors by server list for sd mode should be [[10:1]]
      assertEquals(errorsByServer.size(), 1);
      Integer nError = errorsByServer.get(10);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      DN baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 1)
        .assertValue("assured-sd-timeout-updates", 1)
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeDataMode(baseDN, entry(10, 1));
      // Make a second LDAP update (delete the entry)
      startTime = System.currentTimeMillis(); // Time the update has been initiated
@@ -1509,30 +1464,12 @@
      // - timeout error
      // - server 10 error, server 20 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      baseDn = DN.decode(SAFE_DATA_DN);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 2);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 2);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      //  errors by server list for sd mode should be [[10:2],[20:1]]
      assertEquals(errorsByServer.size(), 2);
      nError = errorsByServer.get(10);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 2);
      nError = errorsByServer.get(20);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 2)
        .assertValue("assured-sd-timeout-updates", 2)
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeDataMode(baseDN, entry(10, 2), entry(20, 1));
      // Make a third LDAP update (re-add the entry)
      startTime = System.currentTimeMillis(); // Time the update has been initiated
@@ -1549,33 +1486,13 @@
      // Check monitoring values
      // No ack should have comen back, so timeout incremented (flag and error for rs)
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      baseDn = DN.decode(SAFE_DATA_DN);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      assertTrue(errorsByServer.isEmpty());
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 3);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 3);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      //  errors by server list for sd mode should be [[10:2],[20:1],[rsId:1]]
      assertEquals(errorsByServer.size(), 3);
      nError = errorsByServer.get(10);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 2);
      nError = errorsByServer.get(20);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      nError = errorsByServer.get(RS_SERVER_ID);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 3)
        .assertValue("assured-sd-timeout-updates", 3)
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeDataMode(baseDN,
          entry(10, 2), entry(20, 1), entry(RS_SERVER_ID, 1));
    } finally
    {
@@ -1626,30 +1543,13 @@
      // - replay error
      // - server 10 error, server 20 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDn = DN.decode(SAFE_READ_DN);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 1);
      Map<Integer, Integer> errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      //  errors by server list for sr mode should be [[10:1],[20:1]]
      assertEquals(errorsByServer.size(), 2);
      Integer nError = errorsByServer.get(10);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      nError = errorsByServer.get(20);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      assertTrue(errorsByServer.isEmpty());
      DN baseDN = DN.decode(SAFE_READ_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 1)
        .assertValue("assured-sr-not-acknowledged-updates", 1)
        .assertValue("assured-sr-replay-error-updates", 1)
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeReadMode(baseDN, entry(10, 2), entry(20, 1));
      // Make a second LDAP update (delete the entry)
      startTime = System.currentTimeMillis(); // Time the update has been initiated
@@ -1669,32 +1569,15 @@
      // - replay error
      // - server 10 error, server 20 error, server 30 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 2);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 2);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 2);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      //  errors by server list for sr mode should be [[10:2],[20:2],[30:1]]
      assertEquals(errorsByServer.size(), 3);
      nError = errorsByServer.get(10);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 2);
      nError = errorsByServer.get(20);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 2);
      nError = errorsByServer.get(30);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      assertTrue(errorsByServer.isEmpty());
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 2)
        .assertValue("assured-sr-not-acknowledged-updates", 2)
        .assertValue("assured-sr-timeout-updates", 1)
        .assertValue("assured-sr-wrong-status-updates", 1)
        .assertValue("assured-sr-replay-error-updates", 2)
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeReadMode(baseDN,
          entry(10, 2), entry(20, 2), entry(30, 1));
      // Make a third LDAP update (re-add the entry)
      startTime = System.currentTimeMillis(); // Time the update has been initiated
@@ -1711,36 +1594,15 @@
      // Check monitoring values
      // No ack should have comen back, so timeout incremented (flag and error for rs)
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-sent-updates"), 3);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-not-acknowledged-updates"), 3);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-timeout-updates"), 2);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-wrong-status-updates"), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-replay-error-updates"), 2);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_READ_MODE);
      //  errors by server list for sr mode should be [[10:2],[20:2],[30:1],[rsId:1]]
      assertEquals(errorsByServer.size(), 4);
      nError = errorsByServer.get(10);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 2);
      nError = errorsByServer.get(20);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 2);
      nError = errorsByServer.get(30);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      nError = errorsByServer.get(RS_SERVER_ID);
      assertNotNull(nError);
      assertEquals(nError.intValue(), 1);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sr-received-updates-not-acked"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-sent-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-acknowledged-updates"), 0);
      assertEquals(getMonitorAttrValue(baseDn, "assured-sd-timeout-updates"), 0);
      errorsByServer = getErrorsByServers(baseDn, AssuredMode.SAFE_DATA_MODE);
      assertTrue(errorsByServer.isEmpty());
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 3)
        .assertValue("assured-sr-not-acknowledged-updates", 3)
        .assertValue("assured-sr-timeout-updates", 2)
        .assertValue("assured-sr-wrong-status-updates", 1)
        .assertValue("assured-sr-replay-error-updates", 2)
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeReadMode(baseDN,
          entry(10, 2), entry(20, 2), entry(30, 1), entry(RS_SERVER_ID, 1));
    } finally
    {
      endTest(testcase);