mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.24.2014 8b2e622ff8e5b8b36a1e9c919d2ff377158ac82c
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/AssuredReplicationServerTest.java
@@ -34,6 +34,7 @@
import java.util.*;
import java.util.concurrent.TimeoutException;
import org.assertj.core.api.SoftAssertions;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
@@ -583,17 +584,121 @@
      waitForAckIfAssuredEnabled(delMsg);
    }
    private void assertReceivedWrongUpdates(int expectedNbUpdates,
        int expectedNbWrongUpdates)
    private void assertReceivedWrongUpdates(int expectedNbUpdates, int expectedNbWrongUpdates)
    {
      assertEquals(nReceivedUpdates, expectedNbUpdates);
      assertEquals(nWrongReceivedUpdates, expectedNbWrongUpdates);
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(nReceivedUpdates).as("receivedUpdates").isEqualTo(expectedNbUpdates);
      softly.assertThat(nWrongReceivedUpdates).as("wrongReceivedUpdates").isEqualTo(expectedNbWrongUpdates);
      softly.assertAll();
    }
    private void assertReceivedUpdates(int expectedNbUpdates)
    {
      assertReceivedWrongUpdates(expectedNbUpdates, 0);
    }
    public SafeReadAssertions newSafeReadAssertions()
    {
      return new SafeReadAssertions(this);
    }
  }
  private static class SafeReadAssertions
  {
    private ReplicationDomain domain;
    private int sentUpdates;
    private int acknowledgedUpdates;
    private int notAcknowledgedUpdates;
    private int timeoutUpdates;
    private int wrongStatusUpdates;
    private int replayErrorUpdates;
    private final Map<Integer, Integer> serverNotAcknowledgedUpdates =
        new HashMap<Integer, Integer>();
    private int receivedUpdates;
    private int receivedUpdatesAcked;
    private int receivedUpdatesNotAcked;
    public SafeReadAssertions(FakeReplicationDomain domain)
    {
      this.domain = domain;
    }
    public void runAsserts()
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(domain.getAssuredSrSentUpdates()).as("sentUpdates").isEqualTo(sentUpdates);
      softly.assertThat(domain.getAssuredSrAcknowledgedUpdates()).as("acknowledgedUpdates").isEqualTo(acknowledgedUpdates);
      softly.assertThat(domain.getAssuredSrNotAcknowledgedUpdates()).as("notAcknowledgedUpdates").isEqualTo(notAcknowledgedUpdates);
      softly.assertThat(domain.getAssuredSrTimeoutUpdates()).as("timeoutUpdates").isEqualTo(timeoutUpdates);
      softly.assertThat(domain.getAssuredSrWrongStatusUpdates()).as("wrongStatusUpdates").isEqualTo(wrongStatusUpdates);
      softly.assertThat(domain.getAssuredSrReplayErrorUpdates()).as("replayErrorUpdates").isEqualTo(replayErrorUpdates);
      softly.assertThat(domain.getAssuredSrServerNotAcknowledgedUpdates()).as("serverNotAcknowledgedUpdates").isEqualTo(
          serverNotAcknowledgedUpdates);
      softly.assertThat(domain.getAssuredSrReceivedUpdates()).as("receivedUpdates").isEqualTo(receivedUpdates);
      softly.assertThat(domain.getAssuredSrReceivedUpdatesAcked()).as("receivedUpdatesAcked").isEqualTo(receivedUpdatesAcked);
      softly.assertThat(domain.getAssuredSrReceivedUpdatesNotAcked()).as("receivedUpdatesNotAcked").isEqualTo(receivedUpdatesNotAcked);
      softly.assertAll();
    }
    public SafeReadAssertions sentUpdates(int value)
    {
      sentUpdates = value;
      return this;
    }
    public SafeReadAssertions acknowledgedUpdates(int value)
    {
      acknowledgedUpdates = value;
      return this;
    }
    public SafeReadAssertions notAcknowledgedUpdates(int value)
    {
      notAcknowledgedUpdates = value;
      return this;
    }
    public SafeReadAssertions timeoutUpdates(int value)
    {
      timeoutUpdates = value;
      return this;
    }
    public SafeReadAssertions wrongStatusUpdates(int value)
    {
      wrongStatusUpdates = value;
      return this;
    }
    public SafeReadAssertions replayErrorUpdates(int value)
    {
      replayErrorUpdates = value;
      return this;
    }
    public SafeReadAssertions serverNotAcknowledgedUpdates(int key, int value)
    {
      serverNotAcknowledgedUpdates.put(key, value);
      return this;
    }
    public SafeReadAssertions receivedUpdates(int value)
    {
      receivedUpdates = value;
      return this;
    }
    public SafeReadAssertions receivedUpdatesAcked(int value)
    {
      receivedUpdatesAcked = value;
      return this;
    }
    public SafeReadAssertions receivedUpdatesNotAcked(int value)
    {
      receivedUpdatesNotAcked = value;
      return this;
    }
  }
  /**
@@ -1555,21 +1660,11 @@
    {
      if (expectedServers.size() >= nWishedServers) // Enough servers should ack
      {
        // Enough server ok for acking: ack should come back quickly
        assertThat(sendUpdateTime).isLessThan(MAX_SEND_UPDATE_TIME);
        // Check monitoring values (check that ack has been correctly received)
        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
        checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
        checkAckOccured(sendUpdateTime, prevNAckUpdates, prevNTimeoutUpdates, prevNServerErrors);
      } else
      {
        assertBetweenInclusive(sendUpdateTime, SMALL_TIMEOUT, LONG_TIMEOUT);
        // Check monitoring values (check that timeout occurred)
        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
        // Check that the servers that are eligible but not expected have been added in the error by server list
        List<Integer> expectedServersInError = computeExpectedServersInError(eligibleServers, expectedServers);
        checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
        Set<Integer> serversInError = computeExpectedServersInError(eligibleServers, expectedServers);
        checkTimeOutOccured(sendUpdateTime, prevNAckUpdates, prevNTimeoutUpdates, prevNServerErrors, serversInError);
      }
    }
    else
@@ -1578,50 +1673,53 @@
      {
        if (expectedServers.size() == eligibleServers.size()) // All eligible servers should respond in time
        {
          // Enough server ok for acking: ack should come back quickly
          assertThat(sendUpdateTime).isLessThan(MAX_SEND_UPDATE_TIME);
          // Check monitoring values (check that ack has been correctly received)
          assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
          assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
          checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
          checkAckOccured(sendUpdateTime, prevNAckUpdates, prevNTimeoutUpdates, prevNServerErrors);
        } else
        { // Some eligible servers should fail
          // Not enough expected servers: should have timed out in RS timeout (SMALL_TIMEOUT)
          assertBetweenInclusive(sendUpdateTime, SMALL_TIMEOUT, LONG_TIMEOUT);
          // Check monitoring values (check that timeout occurred)
          assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
          assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
          // Check that the servers that are eligible but not expected have been added in the error by server list
          List<Integer> expectedServersInError = computeExpectedServersInError(eligibleServers, expectedServers);
          checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
          Set<Integer> serversInError = computeExpectedServersInError(eligibleServers, expectedServers);
          checkTimeOutOccured(sendUpdateTime, prevNAckUpdates, prevNTimeoutUpdates, prevNServerErrors, serversInError);
        }
      } else
      {
        // No eligible servers at all, RS should not wait for any ack and immediately ack the update
        assertThat(sendUpdateTime).isLessThan(MAX_SEND_UPDATE_TIME);
        // Check monitoring values (check that ack has been correctly received)
        assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
        assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
        checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
        checkAckOccured(sendUpdateTime, prevNAckUpdates, prevNTimeoutUpdates, prevNServerErrors);
      }
    }
  }
  private void checkAckOccured(long sendUpdateTime, int prevNAckUpdates,
      int prevNTimeoutUpdates, Map<Integer, Integer> prevNServerErrors)
  {
    final FakeReplicationDomain fakeRd1 = fakeRDs[1];
    assertThat(sendUpdateTime).isLessThan(MAX_SEND_UPDATE_TIME);
    // Check monitoring values (check that ack has been correctly received)
    assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates + 1);
    assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates);
    checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, null); // Should have same value as previous one
  }
  private void checkTimeOutOccured(long sendUpdateTime, int prevNAckUpdates,
      int prevNTimeoutUpdates, Map<Integer, Integer> prevNServerErrors, Set<Integer> expectedServersInError)
  {
    final FakeReplicationDomain fakeRd1 = fakeRDs[1];
    assertBetweenInclusive(sendUpdateTime, SMALL_TIMEOUT, LONG_TIMEOUT);
    // Check monitoring values (check that timeout occurred)
    assertEquals(fakeRd1.getAssuredSdAcknowledgedUpdates(), prevNAckUpdates);
    assertEquals(fakeRd1.getAssuredSdTimeoutUpdates(), prevNTimeoutUpdates + 1);
    // Check that the servers that are eligible but not expected have been added in the error by server list
    checkServerErrors(fakeRd1.getAssuredSdServerTimeoutUpdates(), prevNServerErrors, expectedServersInError);
  }
  /**
   * Compute a list of servers that are eligibles but that are not able to
   * return an ack (those in eligibleServers that are not in expectedServers).
   * Result may of course be an empty list
   */
  private List<Integer> computeExpectedServersInError(List<Integer> eligibleServers, List<Integer> expectedServers)
  private Set<Integer> computeExpectedServersInError(List<Integer> eligibleServers, List<Integer> expectedServers)
  {
    List<Integer> expectedServersInError = new ArrayList<Integer>();
    for (Integer serverId : eligibleServers)
    {
      if (!expectedServers.contains(serverId))
      {
        expectedServersInError.add(serverId);
      }
    }
    Set<Integer> expectedServersInError = new HashSet<Integer>(eligibleServers);
    expectedServersInError.removeAll(expectedServers);
    return expectedServersInError;
  }
@@ -1634,7 +1732,7 @@
   * <li>if expectedServersInError is null or empty, both map should be equal</li>
   * </ul>
   */
  private void checkServerErrors(Map<Integer,Integer> measuredServerErrors, Map<Integer,Integer> prevServerErrors, List<Integer> expectedServersInError)
  private void checkServerErrors(Map<Integer,Integer> measuredServerErrors, Map<Integer,Integer> prevServerErrors, Set<Integer> expectedServersInError)
  {
    if (expectedServersInError != null)
    {
@@ -1650,25 +1748,12 @@
        } else
        {
          // Already errors for this server, increment the value
          int newVal = prevInt + 1;
          prevServerErrors.put(serverId, newVal);
          prevServerErrors.put(serverId, prevInt + 1);
        }
      }
    }
    // Maps should be the same
    assertEquals(measuredServerErrors.size(), prevServerErrors.size());
    Set<Integer> measuredKeySet = measuredServerErrors.keySet();
    for (Integer serverId : measuredKeySet)
    {
      Integer measuredInt = measuredServerErrors.get(serverId);
      assertNotNull(measuredInt);
      assertTrue(measuredInt != 0);
      Integer prevInt = prevServerErrors.get(serverId);
      assertNotNull(prevInt);
      assertTrue(prevInt != 0);
      assertEquals(measuredInt, prevInt);
    }
    assertThat(measuredServerErrors).isEqualTo(prevServerErrors);
  }
  /**
@@ -2344,80 +2429,48 @@
      Thread.sleep(500);
      // Check monitoring values in DS 1
      //
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
      final SafeReadAssertions srAssertsRD1 = fakeRd1.newSafeReadAssertions().sentUpdates(1);
      if (( (otherFakeDsGid == DEFAULT_GID) && (otherFakeDsGenId == DEFAULT_GENID) && (otherFakeDsScen != REPLY_OK_DS_SCENARIO) )
         || ( (otherFakeRsGid == DEFAULT_GID) && (otherFakeRsGenId == DEFAULT_GENID) && (otherFakeRsScen != REPLY_OK_RS_SCENARIO) ))
      {
        assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
        assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 1);
        srAssertsRD1.notAcknowledgedUpdates(1);
      }
      else
      {
        assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
        assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 0);
        srAssertsRD1.acknowledgedUpdates(1);
      }
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), shouldSeeTimeout ? 1 : 0);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), shouldSeeWrongStatus ? 1 : 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), shouldSeeReplayError ? 1 : 0);
      // Check for servers in error list
      Map<Integer, Integer> expectedErrors =
          buildExpectedErrors(shouldSeeDsIdInError, shouldSeeRsIdInError, shouldSeeDsRsIdInError);
      checkServerErrorListsAreEqual(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates(), expectedErrors);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
      srAssertsRD1.timeoutUpdates(shouldSeeTimeout ? 1 : 0)
               .wrongStatusUpdates(shouldSeeWrongStatus ? 1 : 0)
               .replayErrorUpdates(shouldSeeReplayError ? 1 : 0);
      addExpectedErrors(srAssertsRD1, shouldSeeDsIdInError, shouldSeeRsIdInError, shouldSeeDsRsIdInError);
      srAssertsRD1.runAsserts();
      // Check monitoring values in DS 2
      //
      final FakeReplicationDomain fakeRd2 = fakeRDs[2];
      checkDSReceivedAndAcked(fakeRd2, 1);
      checkDSReceivedAndAcked(fakeRDs[2], 1);
      // Check monitoring values in DS 3
      //
      final FakeReplicationDomain fakeRd3 = fakeRDs[3];
      assertEquals(fakeRd3.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd3.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd3.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
      final SafeReadAssertions srAssertsRD3 = fakeRDs[3].newSafeReadAssertions();
      if (dsIsEligible)
      {
        assertEquals(fakeRd3.getAssuredSrReceivedUpdates(), 1);
        srAssertsRD3.receivedUpdates(1);
        if (dsIsExpected)
        {
          assertEquals(fakeRd3.getAssuredSrReceivedUpdatesAcked(), 1);
          assertEquals(fakeRd3.getAssuredSrReceivedUpdatesNotAcked(), 0);
          srAssertsRD3.receivedUpdatesAcked(1);
        }
        else if (shouldSeeReplayError
            && otherFakeDsScen == REPLAY_ERROR_DS_SCENARIO)
        {
          // Replay error for the other DS
          assertEquals(fakeRd3.getAssuredSrReceivedUpdatesAcked(), 0);
          assertEquals(fakeRd3.getAssuredSrReceivedUpdatesNotAcked(), 1);
        } else
        {
          assertEquals(fakeRd3.getAssuredSrReceivedUpdatesAcked(), 0);
          assertEquals(fakeRd3.getAssuredSrReceivedUpdatesNotAcked(), 0);
          srAssertsRD3.receivedUpdatesNotAcked(1);
        }
      }
      else
      {
        assertEquals(fakeRd3.getAssuredSrReceivedUpdates(), 0);
        assertEquals(fakeRd3.getAssuredSrReceivedUpdatesAcked(), 0);
        assertEquals(fakeRd3.getAssuredSrReceivedUpdatesNotAcked(), 0);
      }
      srAssertsRD3.runAsserts();
      // Sanity check
      fakeRd1.assertReceivedUpdates(0);
      fakeRd2.assertReceivedUpdates(1);
      fakeRd3.assertReceivedUpdates(otherFakeDsGenId == DEFAULT_GENID ? 1 : 0);
      fakeRDs[1].assertReceivedUpdates(0);
      fakeRDs[2].assertReceivedUpdates(1);
      fakeRDs[3].assertReceivedUpdates(otherFakeDsGenId == DEFAULT_GENID ? 1 : 0);
      fakeRs1.assertReceivedUpdates(1);
      fakeRs2.assertReceivedUpdates(otherFakeRsGenId == DEFAULT_GENID ? 1 : 0);
@@ -2434,35 +2487,19 @@
        + "> inclusive");
  }
  private Map<Integer, Integer> buildExpectedErrors(boolean dsInError, boolean rsInError, boolean dsRsInError)
  private void addExpectedErrors(SafeReadAssertions srAsserts, boolean dsInError, boolean rsInError, boolean dsRsInError)
  {
    Map<Integer, Integer> expectedErrors = new HashMap<Integer, Integer>();
    if (dsInError)
    {
      expectedErrors.put(FDS3_ID, 1);
      srAsserts.serverNotAcknowledgedUpdates(FDS3_ID, 1);
    }
    if (rsInError)
    {
      expectedErrors.put(FRS2_ID, 1);
      srAsserts.serverNotAcknowledgedUpdates(FRS2_ID, 1);
    }
    if (dsRsInError)
    {
      expectedErrors.put(DS_FRS2_ID, 1);
    }
    return expectedErrors;
  }
  /**
   * Check that the passed server error lists are equivalent
   */
  private void checkServerErrorListsAreEqual(Map<Integer, Integer> list1, Map<Integer, Integer> list2)
  {
    assertNotNull(list1);
    assertNotNull(list2);
    assertEquals(list1.size(), list2.size());
    for (int s : list1.keySet())
    {
      assertEquals(list1.get(s), list2.get(s));
      srAsserts.serverNotAcknowledgedUpdates(DS_FRS2_ID, 1);
    }
  }
@@ -2692,45 +2729,24 @@
  /** Helper method for some safe read test methods */
  private void checkDSReceivedAndAcked(FakeReplicationDomain fakeRd, int nPacket)
  {
    assertEquals(fakeRd.getAssuredSrSentUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrAcknowledgedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrNotAcknowledgedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrTimeoutUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrWrongStatusUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrReplayErrorUpdates(), 0);
    assertThat(fakeRd.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
    assertEquals(fakeRd.getAssuredSrReceivedUpdates(), nPacket);
    assertEquals(fakeRd.getAssuredSrReceivedUpdatesAcked(), nPacket);
    assertEquals(fakeRd.getAssuredSrReceivedUpdatesNotAcked(), 0);
    fakeRd.newSafeReadAssertions()
        .receivedUpdates(nPacket)
        .receivedUpdatesAcked(nPacket)
        .runAsserts();
  }
  /** Helper method for some safe read test methods */
  private void checkDSSentAndAcked(FakeReplicationDomain fakeRd, int nPacket)
  {
    assertEquals(fakeRd.getAssuredSrSentUpdates(), nPacket);
    assertEquals(fakeRd.getAssuredSrAcknowledgedUpdates(), nPacket);
    assertEquals(fakeRd.getAssuredSrNotAcknowledgedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrTimeoutUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrWrongStatusUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrReplayErrorUpdates(), 0);
    assertThat(fakeRd.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
    assertEquals(fakeRd.getAssuredSrReceivedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrReceivedUpdatesAcked(), 0);
    assertEquals(fakeRd.getAssuredSrReceivedUpdatesNotAcked(), 0);
    fakeRd.newSafeReadAssertions()
        .sentUpdates(nPacket)
        .acknowledgedUpdates(nPacket)
        .runAsserts();
  }
  private void checkDSNothingReceivedOrSent(final FakeReplicationDomain fakeRd)
  {
    assertEquals(fakeRd.getAssuredSrSentUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrAcknowledgedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrNotAcknowledgedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrTimeoutUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrWrongStatusUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrReplayErrorUpdates(), 0);
    assertThat(fakeRd.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
    assertEquals(fakeRd.getAssuredSrReceivedUpdates(), 0);
    assertEquals(fakeRd.getAssuredSrReceivedUpdatesAcked(), 0);
    assertEquals(fakeRd.getAssuredSrReceivedUpdatesNotAcked(), 0);
    fakeRd.newSafeReadAssertions().runAsserts();
  }
  /**
@@ -2909,50 +2925,27 @@
            checkDSReceivedAndAcked(fakeRd2, 1);
            break;
          case TIMEOUT_DS_SCENARIO:
            assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
            assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 1);
            assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 1);
            assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
            assertThat(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates()).containsOnly(entry(FDS2_ID, 1));
            assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
            assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
            fakeRd1.newSafeReadAssertions()
                .sentUpdates(1)
                .notAcknowledgedUpdates(1)
                .timeoutUpdates(1)
                .serverNotAcknowledgedUpdates(FDS2_ID, 1)
                .runAsserts();
            assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
            assertThat(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
            assertEquals(fakeRd2.getAssuredSrReceivedUpdates(), 1);
            assertEquals(fakeRd2.getAssuredSrReceivedUpdatesAcked(), 0);
            assertEquals(fakeRd2.getAssuredSrReceivedUpdatesNotAcked(), 0);
            fakeRd2.newSafeReadAssertions().receivedUpdates(1).runAsserts();
            break;
          case REPLAY_ERROR_DS_SCENARIO:
            assertEquals(fakeRd1.getAssuredSrSentUpdates(), 1);
            assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 1);
            assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 1);
            assertThat(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates()).containsOnly(entry(FDS2_ID, 1));
            assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
            assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
            assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
            fakeRd1.newSafeReadAssertions()
                .sentUpdates(1)
                .notAcknowledgedUpdates(1)
                .replayErrorUpdates(1)
                .serverNotAcknowledgedUpdates(FDS2_ID, 1)
                .runAsserts();
            assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
            assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
            assertThat(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
            assertEquals(fakeRd2.getAssuredSrReceivedUpdates(), 1);
            assertEquals(fakeRd2.getAssuredSrReceivedUpdatesAcked(), 0);
            assertEquals(fakeRd2.getAssuredSrReceivedUpdatesNotAcked(), 1);
            fakeRd2.newSafeReadAssertions()
                .receivedUpdates(1)
                .receivedUpdatesNotAcked(1)
                .runAsserts();
            break;
          default:
            Assert.fail("Unknown scenario: " + fakeDsScen);
@@ -3040,16 +3033,12 @@
      expectStatusForDS(fakeRd1, ServerStatus.DEGRADED_STATUS, FDS2_ID);
      Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates()).containsOnly(entry(FDS2_ID, 4));
      assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
      fakeRd1.newSafeReadAssertions()
          .sentUpdates(4)
          .notAcknowledgedUpdates(4)
          .timeoutUpdates(4)
          .serverNotAcknowledgedUpdates(FDS2_ID, 4)
          .runAsserts();
      final FakeReplicationDomain fakeRd2 = fakeRDs[2];
      checkDSNothingReceivedOrSent(fakeRd2);
@@ -3068,16 +3057,13 @@
      assertThat(sendUpdateTime).isLessThan(MAX_SEND_UPDATE_TIME);
      Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates()).containsOnly(entry(FDS2_ID, 4));
      assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
      fakeRd1.newSafeReadAssertions()
          .sentUpdates(5)
          .acknowledgedUpdates(1)
          .notAcknowledgedUpdates(4)
          .timeoutUpdates(4)
          .serverNotAcknowledgedUpdates(FDS2_ID, 4)
          .runAsserts();
      checkDSNothingReceivedOrSent(fakeRd2);
@@ -3094,27 +3080,18 @@
      expectStatusForDS(fakeRd1, ServerStatus.NORMAL_STATUS, FDS2_ID);
      // DS2 should also change status so reset its assured monitoring data so no received sr updates
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 5);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 1);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates()).containsOnly(entry(FDS2_ID, 4));
      assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
      fakeRd1.newSafeReadAssertions()
          .sentUpdates(5)
          .acknowledgedUpdates(1)
          .notAcknowledgedUpdates(4)
          .timeoutUpdates(4)
          .serverNotAcknowledgedUpdates(FDS2_ID, 4)
          .runAsserts();
      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
      assertEquals(fakeRd2.getAssuredSrReceivedUpdates(), 4);
      assertEquals(fakeRd2.getAssuredSrReceivedUpdatesAcked(), 4);
      assertEquals(fakeRd2.getAssuredSrReceivedUpdatesNotAcked(), 0);
      fakeRd2.newSafeReadAssertions()
          .receivedUpdates(4)
          .receivedUpdatesAcked(4)
          .runAsserts();
      fakeRd1.assertReceivedUpdates(0);
@@ -3131,27 +3108,18 @@
      assertThat(sendUpdateTime).isLessThan(MAX_SEND_UPDATE_TIME);
      Thread.sleep(500); // Sleep a while as counters are updated just after sending thread is unblocked
      assertEquals(fakeRd1.getAssuredSrSentUpdates(), 6);
      assertEquals(fakeRd1.getAssuredSrAcknowledgedUpdates(), 2);
      assertEquals(fakeRd1.getAssuredSrNotAcknowledgedUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrTimeoutUpdates(), 4);
      assertEquals(fakeRd1.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd1.getAssuredSrServerNotAcknowledgedUpdates()).containsOnly(entry(FDS2_ID, 4));
      assertEquals(fakeRd1.getAssuredSrReceivedUpdates(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesAcked(), 0);
      assertEquals(fakeRd1.getAssuredSrReceivedUpdatesNotAcked(), 0);
      fakeRd1.newSafeReadAssertions()
          .sentUpdates(6)
          .acknowledgedUpdates(2)
          .notAcknowledgedUpdates(4)
          .timeoutUpdates(4)
          .serverNotAcknowledgedUpdates(FDS2_ID, 4)
          .runAsserts();
      assertEquals(fakeRd2.getAssuredSrSentUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrNotAcknowledgedUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrTimeoutUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrWrongStatusUpdates(), 0);
      assertEquals(fakeRd2.getAssuredSrReplayErrorUpdates(), 0);
      assertThat(fakeRd2.getAssuredSrServerNotAcknowledgedUpdates()).isEmpty();
      assertEquals(fakeRd2.getAssuredSrReceivedUpdates(), 5);
      assertEquals(fakeRd2.getAssuredSrReceivedUpdatesAcked(), 5);
      assertEquals(fakeRd2.getAssuredSrReceivedUpdatesNotAcked(), 0);
      fakeRd2.newSafeReadAssertions()
          .receivedUpdates(5)
          .receivedUpdatesAcked(5)
          .runAsserts();
      fakeRd1.assertReceivedUpdates(0);
      fakeRd2.assertReceivedWrongUpdates(6, 1);