mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.30.2013 9cdfa916fa7bdc0d4b56b8eddc5d623f5d4e2a95
Code cleanup.


LDAPReplicationDomain.java:
In buildAndPublishMissingChanges(), removed ReplicationBroker parameter and used broker field instead.


*Test.java:
Removed try / catch / fail test anti pattern.
Inlined local variables.

AssuredReplicationPluginTest.java:
Extracted methods assertBlockedForLessThanTimeout() assertBlockedLongerThanTimeout(),
Inlined sleep().
In getErrorsByServers(), improved the code.

ReplicationServerTest.java:
Split changelogChaining() test into changelogChaining0() and changelogChaining1() to match the previous double testing + then simplified the code.
Extracted methods receiveReplicationMsgs(), assertOnlyTopologyMsgsReceived().
Used CSNGenerator.
4 files modified
935 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 46 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java 93 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java 368 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 428 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -41,7 +41,7 @@
import org.opends.messages.MessageBuilder;
import org.opends.messages.Severity;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.*;
import org.opends.server.admin.std.meta.ReplicationDomainCfgDefn.IsolationPolicy;
import org.opends.server.admin.std.server.ExternalChangelogDomainCfg;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.api.AlertGenerator;
@@ -62,7 +62,6 @@
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.tasks.PurgeConflictsHistoricalTask;
import org.opends.server.tasks.TaskUtils;
@@ -71,7 +70,7 @@
import org.opends.server.util.LDIFReader;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.*;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyOperation;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.messages.ToolMessages.*;
@@ -436,7 +435,7 @@
       */
      try
      {
        if (buildAndPublishMissingChanges(startCSN, broker))
        if (buildAndPublishMissingChanges(startCSN))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
@@ -1262,8 +1261,8 @@
          break;
        }
      }
      boolean attributeToBeFiltered = (fractionalExclusive && found)
          || (!fractionalExclusive && !found);
      boolean attributeToBeFiltered = fractionalExclusive && found
          || !fractionalExclusive && !found;
      if (attributeToBeFiltered
          && !newRdn.hasAttributeType(attributeType)
          && !modifyDNOperation.deleteOldRDN())
@@ -1437,7 +1436,7 @@
   private static boolean isFractionalProhibited(AttributeType attrType)
   {
     String attributeName = attrType.getPrimaryName();
     return (attributeName != null && isFractionalProhibitedAttr(attributeName))
     return attributeName != null && isFractionalProhibitedAttr(attributeName)
         || isFractionalProhibitedAttr(attrType.getOID());
   }
@@ -1453,8 +1452,8 @@
    // Now remove the attribute or modification if:
    // - exclusive mode and attribute is in configuration
    // - inclusive mode and attribute is not in configuration
    return (foundAttribute && fractionalExclusive)
        || (!foundAttribute && !fractionalExclusive);
    return foundAttribute && fractionalExclusive
        || !foundAttribute && !fractionalExclusive;
  }
  private static boolean contains(Set<String> fractionalConcernedAttributes,
@@ -1857,16 +1856,8 @@
      // this policy imply that we always accept updates.
      return true;
    }
    if (isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES))
    {
      // this isolation policy specifies that the updates are denied
      // when the broker had problems during the connection phase
      // Updates are still accepted if the broker is currently connecting..
      return !hasConnectionError();
    }
    // we should never get there as the only possible policies are
    // ACCEPT_ALL_UPDATES and REJECT_ALL_UPDATES
    return true;
    return !isolationPolicy.equals(IsolationPolicy.REJECT_ALL_UPDATES)
        || !hasConnectionError();
  }
@@ -2473,7 +2464,7 @@
        op = msg.createOperation(conn);
        dependency = remotePendingChanges.checkDependencies(op, msg);
        while (!dependency && !replayDone && (retryCount-- > 0))
        while (!dependency && !replayDone && retryCount-- > 0)
        {
          if (shutdown.get())
          {
@@ -2824,8 +2815,8 @@
      for (Modification mod : mods)
      {
        AttributeType modAttrType = mod.getAttribute().getAttributeType();
        if ((mod.getModificationType() == ModificationType.DELETE
            || mod.getModificationType() == ModificationType.REPLACE)
        if (mod.getModificationType() == ModificationType.DELETE
            || mod.getModificationType() == ModificationType.REPLACE
            && currentRDN.hasAttributeType(modAttrType))
        {
          if (currentRDN.hasAttributeType(modAttrType))
@@ -4429,14 +4420,11 @@
   *
   * @param startCSN
   *          The CSN where we need to start the search
   * @param session
   *          The session to use to publish the changes
   * @return A boolean indicating he success of the operation.
   * @throws Exception
   *           if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(CSN startCSN,
      ReplicationBroker session) throws Exception
  public boolean buildAndPublishMissingChanges(CSN startCSN) throws Exception
  {
    // Trim the changes in replayOperations that are older than the startCSN.
    synchronized (replayOperations)
@@ -4494,7 +4482,7 @@
      for (FakeOperation opToSend : opsToSend)
      {
        session.publishRecovery(opToSend.generateMessage());
        broker.publishRecovery(opToSend.generateMessage());
      }
      opsToSend.clear();
      if (lastRetrievedChange != null)
@@ -5205,8 +5193,8 @@
        return false;
      // Compare modes
      if ((cfg1.isFractional() != cfg2.isFractional())
          || (cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive()))
      if (cfg1.isFractional() != cfg2.isFractional()
          || cfg1.isFractionalExclusive() != cfg2.isFractionalExclusive())
        return false;
      // Compare all classes attributes
opends/tests/unit-tests-testng/src/server/org/opends/server/core/networkgroups/ResourceLimitsPolicyTest.java
@@ -23,11 +23,13 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS
 */
package org.opends.server.core.networkgroups;
import java.util.ArrayList;
import java.util.List;
import org.opends.messages.Message;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
@@ -35,18 +37,18 @@
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.types.DN;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchScope;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.*;
/*
/**
 * This set of tests test the resource limits.
 */
@SuppressWarnings("javadoc")
public class ResourceLimitsPolicyTest extends DirectoryServerTestCase {
  //===========================================================================
  //
@@ -60,8 +62,7 @@
   * @throws Exception if the environment could not be set up.
   */
  @BeforeClass
  public void setUp()
    throws Exception
  public void setUp() throws Exception
  {
    // This test suite depends on having the schema available,
    // so we'll start the server.
@@ -119,13 +120,11 @@
  public void testMaxNumberOfConnections()
          throws Exception
  {
    ArrayList<Message> messages = new ArrayList<Message>();
    List<Message> messages = new ArrayList<Message>();
    ResourceLimitsPolicyFactory factory =
        new ResourceLimitsPolicyFactory();
    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
    ResourceLimitsPolicy limits =
        factory
            .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
        factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
              {
                @Override
@@ -139,17 +138,14 @@
    InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN);
    limits.addConnection(conn1);
    boolean check = limits.isAllowed(conn1, null, true, messages);
    assertTrue(check);
    assertTrue(limits.isAllowed(conn1, null, true, messages));
    InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN);
    limits.addConnection(conn2);
    check = limits.isAllowed(conn2, null, true, messages);
    assertFalse(check);
    assertFalse(limits.isAllowed(conn2, null, true, messages));
    limits.removeConnection(conn1);
    check = limits.isAllowed(conn2, null, true, messages);
    assertTrue(check);
    assertTrue(limits.isAllowed(conn2, null, true, messages));
    limits.removeConnection(conn2);
  }
@@ -162,13 +158,11 @@
  public void testMaxNumberOfConnectionsFromSameIp()
          throws Exception
  {
    ArrayList<Message> messages = new ArrayList<Message>();
    List<Message> messages = new ArrayList<Message>();
    ResourceLimitsPolicyFactory factory =
        new ResourceLimitsPolicyFactory();
    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
    ResourceLimitsPolicy limits =
        factory
            .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
        factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
              {
                @Override
@@ -182,17 +176,14 @@
    InternalClientConnection conn1 = new InternalClientConnection(DN.NULL_DN);
    limits.addConnection(conn1);
    boolean check = limits.isAllowed(conn1, null, true, messages);
    assertTrue(check);
    assertTrue(limits.isAllowed(conn1, null, true, messages));
    InternalClientConnection conn2 = new InternalClientConnection(DN.NULL_DN);
    limits.addConnection(conn2);
    check = limits.isAllowed(conn2, null, true, messages);
    assertFalse(check);
    assertFalse(limits.isAllowed(conn2, null, true, messages));
    limits.removeConnection(conn1);
    check = limits.isAllowed(conn2, null, true, messages);
    assertTrue(check);
    assertTrue(limits.isAllowed(conn2, null, true, messages));
    limits.removeConnection(conn2);
  }
@@ -213,11 +204,9 @@
  {
    List<Message> messages = new ArrayList<Message>();
    ResourceLimitsPolicyFactory factory =
        new ResourceLimitsPolicyFactory();
    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
    ResourceLimitsPolicy limits =
        factory
            .createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
        factory.createQOSPolicy(new MockResourceLimitsQOSPolicyCfg()
              {
                @Override
@@ -236,12 +225,7 @@
        SearchScope.BASE_OBJECT,
        LDAPFilter.decode(searchFilter).toSearchFilter());
    boolean check = limits.isAllowed(conn1, search, true, messages);
    if (success) {
      assertTrue(check);
    } else {
      assertFalse(check);
    }
    assertEquals(limits.isAllowed(conn1, search, true, messages), success);
    limits.removeConnection(conn1);
  }
@@ -254,7 +238,7 @@
  public void testMaxThroughput()
          throws Exception
  {
    ArrayList<Message> messages = new ArrayList<Message>();
    List<Message> messages = new ArrayList<Message>();
    final long interval = 1000; // Unit is milliseconds
    ResourceLimitsPolicyFactory factory = new ResourceLimitsPolicyFactory();
@@ -276,35 +260,26 @@
    InternalClientConnection conn = new InternalClientConnection(DN.NULL_DN);
    limits.addConnection(conn);
    InternalSearchOperation search1 = conn.processSearch(
      DN.decode("dc=example,dc=com"),
      SearchScope.BASE_OBJECT,
      LDAPFilter.decode("(objectclass=*)").toSearchFilter());
    final DN dn = DN.decode("dc=example,dc=com");
    final SearchFilter all = SearchFilter.createFilterFromString("(objectclass=*)");
    // First operation is allowed
    boolean check = limits.isAllowed(conn, search1, true, messages);
    assertTrue(check);
    InternalSearchOperation search2 = conn.processSearch(
      DN.decode("dc=example,dc=com"),
      SearchScope.BASE_OBJECT,
      LDAPFilter.decode("(objectclass=*)").toSearchFilter());
    InternalSearchOperation search1 =
        conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
    assertTrue(limits.isAllowed(conn, search1, true, messages));
    // Second operation in the same interval is refused
    check = limits.isAllowed(conn, search2, true, messages);
    assertFalse(check);
    InternalSearchOperation search2 =
        conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
    assertFalse(limits.isAllowed(conn, search2, true, messages));
    // Wait for the end of the interval => counters are reset
    Thread.sleep(interval);
    InternalSearchOperation search3 = conn.processSearch(
      DN.decode("dc=example,dc=com"),
      SearchScope.BASE_OBJECT,
      LDAPFilter.decode("(objectclass=*)").toSearchFilter());
    // The operation is allowed
    check = limits.isAllowed(conn, search3, true, messages);
    assertTrue(check);
    InternalSearchOperation search3 =
        conn.processSearch(dn, SearchScope.BASE_OBJECT, all);
    assertTrue(limits.isAllowed(conn, search3, true, messages));
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/AssuredReplicationPluginTest.java
@@ -46,7 +46,6 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.*;
import org.opends.server.replication.protocol.*;
@@ -462,9 +461,8 @@
     * Handle the handshake processing with the connecting DS
     * returns true if handshake was performed without errors
     */
    private boolean performHandshake()
    private boolean performHandshake() throws Exception
    {
      try
      {
        // Receive server start
        ServerStartMsg serverStartMsg = (ServerStartMsg) session.receive();
@@ -520,17 +518,6 @@
        TopologyMsg topologyMsg = new TopologyMsg(new ArrayList<DSInfo>(),
          rsList);
        session.publish(topologyMsg);
      } catch (IOException e)
      {
        fail("Unexpected io exception in fake replication server handshake " +
          "processing: " + e);
        return false;
      } catch (Exception e)
      {
        fail("Unexpected exception in fake replication server handshake " +
          "processing: " + e);
        return false;
      }
      return true;
    }
@@ -558,7 +545,7 @@
    /**
     * Handle client connection then call code specific to configured test
     */
    private void handleClientConnection()
    private void handleClientConnection() throws Exception
    {
      debugInfo("handleClientConnection " + testcase + " " + scenario);
      // Handle DS connection
@@ -610,13 +597,12 @@
      debugInfo("handleClientConnection " + testcase + " " + scenario + " done");
    }
    /*
    /**
     * Make the RS send an add message with the passed entry and return the ack
     * message it receives from the DS
     */
    private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws SocketTimeoutException
    private AckMsg sendAssuredAddMsg(Entry entry, String parentUid) throws Exception
    {
      try
      {
        AddMsg addMsg =
          new AddMsg(gen.newCSN(), entry.getDN(), UUID.randomUUID().toString(),
@@ -632,86 +618,51 @@
        // Read and return matching ack
        return (AckMsg)session.receive();
      } catch(SocketTimeoutException e)
      {
        throw e;
      } catch (Throwable t)
      {
        fail("Unexpected exception in fake replication server sendAddUpdate " +
          "processing: " + t);
        return null;
      }
    }
    /**
     * Read the coming update and check parameters are not assured
     */
    private void executeNotAssuredScenario()
    private void executeNotAssuredScenario() throws Exception
    {
      UpdateMsg updateMsg = (UpdateMsg) session.receive();
      checkUpdateAssuredParameters(updateMsg);
      try
      {
        UpdateMsg updateMsg = (UpdateMsg) session.receive();
        checkUpdateAssuredParameters(updateMsg);
        scenarioExecuted = true;
      } catch (Exception e)
      {
        fail("Unexpected exception in fake replication server executeNotAssuredScenario " +
          "processing: " + e);
      }
      scenarioExecuted = true;
    }
    /**
     * Read the coming update and make the client time out by not sending back
     * the ack
     */
    private void executeTimeoutScenario()
    private void executeTimeoutScenario() throws Exception
    {
      UpdateMsg updateMsg = (UpdateMsg) session.receive();
      checkUpdateAssuredParameters(updateMsg);
      try
      {
        UpdateMsg updateMsg = (UpdateMsg) session.receive();
        checkUpdateAssuredParameters(updateMsg);
      scenarioExecuted = true;
        scenarioExecuted = true;
        // We do not send back an ack and the client code is expected to be
        // blocked at least for the programmed timeout time.
      } catch (Exception e)
      {
        fail("Unexpected exception in fake replication server executeTimeoutScenario " +
          "processing: " + e + " testcase= " + testcase +
          " groupId=" + groupId);
      }
      // We do not send back an ack and the client code is expected to be
      // blocked at least for the programmed timeout time.
    }
    /**
     * Read the coming update, sleep some time then send back an ack
     */
    private void executeNoTimeoutScenario()
    private void executeNoTimeoutScenario() throws Exception
    {
      try
      {
        UpdateMsg updateMsg = (UpdateMsg) session.receive();
        checkUpdateAssuredParameters(updateMsg);
      UpdateMsg updateMsg = (UpdateMsg) session.receive();
      checkUpdateAssuredParameters(updateMsg);
        // Sleep before sending back the ack
        sleep(NO_TIMEOUT_RS_SLEEP_TIME);
      // Sleep before sending back the ack
      sleep(NO_TIMEOUT_RS_SLEEP_TIME);
        // Send the ack without errors
        AckMsg ackMsg = new AckMsg(updateMsg.getCSN());
        session.publish(ackMsg);
      // Send the ack without errors
      AckMsg ackMsg = new AckMsg(updateMsg.getCSN());
      session.publish(ackMsg);
        scenarioExecuted = true;
      } catch (Exception e)
      {
        fail("Unexpected exception in fake replication server executeNoTimeoutScenario " +
          "processing: " + e);
      }
      scenarioExecuted = true;
    }
    /**
@@ -733,9 +684,8 @@
    /**
     * Read the coming safe read mode updates and send back acks with errors
     */
    private void executeSafeReadManyErrorsScenario()
    private void executeSafeReadManyErrorsScenario() throws Exception
    {
      try
      {
        // Read first update
        UpdateMsg updateMsg = (UpdateMsg) session.receive();
@@ -779,20 +729,14 @@
        // let timeout occur
        scenarioExecuted = true;
      } catch (Exception e)
      {
        fail("Unexpected exception in fake replication server executeSafeReadManyErrorsScenario " +
          "processing: " + e);
      }
    }
    /**
     * Read the coming seaf data mode updates and send back acks with errors
     * Read the coming safe data mode updates and send back acks with errors
     */
    private void executeSafeDataManyErrorsScenario()
    private void executeSafeDataManyErrorsScenario() throws Exception
    {
      try
      {
        // Read first update
        UpdateMsg updateMsg = (UpdateMsg) session.receive();
@@ -830,32 +774,12 @@
        checkUpdateAssuredParameters(updateMsg);
        // let timeout occur
        scenarioExecuted = true;
      } catch (Exception e)
      {
        fail("Unexpected exception in fake replication server executeSafeDataManyErrorsScenario " +
          "processing: " + e);
      }
    }
  }
  /**
   * Sleep a while
   */
  private void sleep(long time)
  {
    try
    {
      Thread.sleep(time);
    } catch (InterruptedException ex)
    {
      fail("Error sleeping " + ex);
    }
  }
  /**
   * Return various group id values
   */
  @DataProvider(name = "rsGroupIdProvider")
@@ -877,7 +801,6 @@
  @Test(dataProvider = "rsGroupIdProvider")
  public void testSafeDataModeTimeout(byte rsGroupId) throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeDataModeTimeout" + rsGroupId;
    try
@@ -894,8 +817,7 @@
      // Create a safe data assured domain
      if (rsGroupId == (byte)1)
      {
        safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1,
        TIMEOUT);
        safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 1, TIMEOUT);
        // Wait for connection of domain to RS
        waitForConnectionToRs(testcase, replicationServer);
@@ -926,15 +848,11 @@
      if (rsGroupId == (byte)1)
      {
        // RS has same group id as DS
        // In this scenario, the fake RS will not send back an ack so we expect
        // the add entry code (LDAP client code emulation) to be blocked for the
        // timeout value at least. If the time we have slept is lower, timeout
        // handling code is not working...
        assertTrue((endTime - startTime) >= TIMEOUT);
        assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT);
        assertTrue(replicationServer.isScenarioExecuted());
        // Check monitoring values
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDN = DN.decode(SAFE_DATA_DN);
        new MonitorAssertions(baseDN)
          .assertValue("assured-sd-sent-updates", 1)
@@ -948,7 +866,7 @@
        // No error should be seen in monitoring and update should have not been
        // sent in assured mode
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDN = DN.decode(NOT_ASSURED_DN);
        new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
        assertNoServerErrors(baseDN);
@@ -992,7 +910,6 @@
  @Test(dataProvider = "rsGroupIdProvider")
  public void testSafeReadModeTimeout(byte rsGroupId) throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeReadModeTimeout" + rsGroupId;
    try
@@ -1010,8 +927,7 @@
      if (rsGroupId == (byte)1)
      {
        // Create a safe read assured domain
        safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
            TIMEOUT);
        safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
        // Wait for connection of domain to RS
        waitForConnectionToRs(testcase, replicationServer);
@@ -1041,16 +957,11 @@
      if (rsGroupId == (byte)1)
      {
        // RS has same group id as DS
        // In this scenario, the fake RS will not send back an ack so we expect
        // the add entry code (LDAP client code emulation) to be blocked for the
        // timeout value at least. If the time we have slept is lower, timeout
        // handling code is not working...
        assertTrue((endTime - startTime) >= TIMEOUT);
        assertBlockedLongerThanTimeout(startTime, endTime, TIMEOUT);
        assertTrue(replicationServer.isScenarioExecuted());
        // Check monitoring values
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDN = DN.decode(SAFE_READ_DN);
        new MonitorAssertions(baseDN)
          .assertValue("assured-sr-sent-updates", 1)
@@ -1065,7 +976,7 @@
        // No error should be seen in monitoring and update should have not been
        // sent in assured mode
        sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
        DN baseDN = DN.decode(NOT_ASSURED_DN);
        new MonitorAssertions(baseDN).assertRemainingValuesAreZero();
        assertNoServerErrors(baseDN);
@@ -1117,7 +1028,7 @@
   * Wait for connection to the fake replication server or times out with error
   * after some seconds
   */
  private void waitForConnectionToRs(String testCase, FakeReplicationServer rs)
  private void waitForConnectionToRs(String testCase, FakeReplicationServer rs) throws Exception
  {
    int nsec = -1;
    do
@@ -1125,7 +1036,7 @@
      nsec++;
      if (nsec == 10) // 10 seconds timeout
        fail(testCase + ": timeout waiting for domain connection to fake RS after " + nsec + " seconds.");
      sleep(1000);
      Thread.sleep(1000);
    } while (!rs.isHandshakeOk());
  }
@@ -1133,7 +1044,7 @@
   * Wait for the scenario to be executed by the fake replication server or
   * times out with error after some seconds
   */
  private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs)
  private void waitForScenarioExecutedOnRs(String testCase, FakeReplicationServer rs) throws Exception
  {
    int nsec = -1;
    do
@@ -1141,7 +1052,7 @@
      nsec++;
      if (nsec == 10) // 10 seconds timeout
        fail(testCase + ": timeout waiting for scenario to be exectued on fake RS after " + nsec + " seconds.");
      sleep(1000);
      Thread.sleep(1000);
    } while (!rs.isScenarioExecuted());
  }
@@ -1163,7 +1074,6 @@
  @Test
  public void testSafeDataModeAck() throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeDataModeAck";
    try
@@ -1175,8 +1085,7 @@
      replicationServer.start(NO_TIMEOUT_SCENARIO);
      // Create a safe data assured domain
      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2,
        TIMEOUT);
      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 2, TIMEOUT);
      // Wait for connection of domain to RS
      waitForConnectionToRs(testcase, replicationServer);
@@ -1187,16 +1096,11 @@
        "objectClass: organizationalUnit\n";
      addEntry(TestCaseUtils.entryFromLdifString(entry));
      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
      long endTime = System.currentTimeMillis();
      long callTime = endTime - startTime;
      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
      assertTrue(replicationServer.isScenarioExecuted());
      // Check monitoring values
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 1)
@@ -1216,7 +1120,6 @@
  @Test
  public void testSafeReadModeAck() throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeReadModeAck";
    try
@@ -1227,8 +1130,7 @@
      replicationServer.start(NO_TIMEOUT_SCENARIO);
      // Create a safe read assured domain
      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
        TIMEOUT);
      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
      // Wait for connection of domain to RS
      waitForConnectionToRs(testcase, replicationServer);
@@ -1239,16 +1141,11 @@
        "objectClass: organizationalUnit\n";
      addEntry(TestCaseUtils.entryFromLdifString(entry));
      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
      long endTime = System.currentTimeMillis();
      long callTime = endTime - startTime;
      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
      assertTrue(replicationServer.isScenarioExecuted());
      // Check monitoring values
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDN = DN.decode(SAFE_READ_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 1)
@@ -1268,7 +1165,6 @@
  @Test(dataProvider = "rsGroupIdProvider", groups = "slow")
  public void testSafeReadModeReply(byte rsGroupId) throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeReadModeReply";
    try
@@ -1279,8 +1175,7 @@
      replicationServer.start(NO_READ);
      // Create a safe read assured domain
      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
        TIMEOUT);
      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
      // Wait for connection of domain to RS
      waitForConnectionToRs(testcase, replicationServer);
@@ -1356,7 +1251,6 @@
  @Test(dataProvider = "rsGroupIdProvider", groups = "slow")
  public void testSafeDataModeReply(byte rsGroupId) throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeDataModeReply";
    try
@@ -1367,8 +1261,7 @@
      replicationServer.start(NO_READ);
      // Create a safe data assured domain
      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4,
        TIMEOUT);
      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 4, TIMEOUT);
      // Wait for connection of domain to RS
      waitForConnectionToRs(testcase, replicationServer);
@@ -1380,19 +1273,14 @@
      Entry entry = TestCaseUtils.entryFromLdifString(entryStr);
      String parentUid = getEntryUUID(DN.decode(SAFE_DATA_DN));
      AckMsg ackMsg;
      try
      {
        ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
      } catch (SocketTimeoutException e)
      {
        // Expected
        return;
      }
      fail("DS should not reply an ack in safe data mode, however, it replied: " +
        ackMsg);
    } finally
      AckMsg ackMsg = replicationServer.sendAssuredAddMsg(entry, parentUid);
      fail("DS should not reply an ack in safe data mode, however, it replied: " + ackMsg);
    }
    catch (SocketTimeoutException expected)
    {
      return;
    }
    finally
    {
      endTest(testcase);
    }
@@ -1405,7 +1293,6 @@
  @Test(groups = "slow")
  public void testSafeDataManyErrors() throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeDataManyErrors";
    try
@@ -1417,8 +1304,7 @@
      replicationServer.start(SAFE_DATA_MANY_ERRORS);
      // Create a safe data assured domain
      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3,
        TIMEOUT);
      safeDataDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_DATA_MODE, 3, TIMEOUT);
      // Wait for connection of domain to RS
      waitForConnectionToRs(testcase, replicationServer);
@@ -1430,18 +1316,13 @@
        "objectClass: organizationalUnit\n";
      addEntry(TestCaseUtils.entryFromLdifString(entry));
      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
      long endTime = System.currentTimeMillis();
      long callTime = endTime - startTime;
      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
      // Check monitoring values
      // The expected ack for the first update is:
      // - timeout error
      // - server 10 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 1)
@@ -1453,18 +1334,13 @@
      startTime = System.currentTimeMillis(); // Time the update has been initiated
      deleteEntry(entryDn);
      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
      // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked
      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
      endTime = System.currentTimeMillis();
      callTime = endTime - startTime;
      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
      // Check monitoring values
      // The expected ack for the second update is:
      // - timeout error
      // - server 10 error, server 20 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 2)
@@ -1476,17 +1352,12 @@
      startTime = System.currentTimeMillis(); // Time the update has been initiated
      addEntry(TestCaseUtils.entryFromLdifString(entry));
      // In this scenario, the fake RS will not send back an ack so we expect
      // the add entry code (LDAP client code emulation) to be blocked for the
      // timeout value at least. If the time we have slept is lower, timeout
      // handling code is not working...
      endTime = System.currentTimeMillis();
      assertTrue((endTime - startTime) >= TIMEOUT);
      assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT);
      assertTrue(replicationServer.isScenarioExecuted());
      // Check monitoring values
      // No ack should have comen back, so timeout incremented (flag and error for rs)
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      baseDN = DN.decode(SAFE_DATA_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sd-sent-updates", 3)
@@ -1494,7 +1365,6 @@
        .assertRemainingValuesAreZero();
      assertServerErrorsSafeDataMode(baseDN,
          entry(10, 2), entry(20, 1), entry(RS_SERVER_ID, 1));
    } finally
    {
      endTest(testcase);
@@ -1502,13 +1372,36 @@
  }
  /**
   * In this scenario, the fake RS will not send back an ack so we expect the
   * add entry code (LDAP client code emulation) to be blocked for the timeout
   * value at least. If the time we have slept is lower, timeout handling code
   * is not working...
   */
  private void assertBlockedLongerThanTimeout(long startTime, long endTime, int TIMEOUT)
  {
    assertTrue((endTime - startTime) >= TIMEOUT);
  }
  /**
   * In this scenario, the fake RS will send back an ack after
   * NO_TIMEOUT_RS_SLEEP_TIME seconds, so we expect the add/delete entry code
   * (LDAP client code emulation) to be blocked for more than
   * NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
   */
  private void assertBlockedForLessThanTimeout(long startTime, int TIMEOUT)
  {
    long endTime = System.currentTimeMillis();
    long callTime = endTime - startTime;
    assertTrue(NO_TIMEOUT_RS_SLEEP_TIME <= callTime && callTime <= TIMEOUT);
  }
  /**
   * DS performs many successive modifications in safe read mode and receives RS
   * acks with various errors. Check for monitoring right errors
   */
  @Test(groups = "slow")
  public void testSafeReadManyErrors() throws Exception
  {
    int TIMEOUT = 5000;
    String testcase = "testSafeReadManyErrors";
    try
@@ -1519,8 +1412,7 @@
      replicationServer.start(SAFE_READ_MANY_ERRORS);
      // Create a safe read assured domain
      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0,
        TIMEOUT);
      safeReadDomainCfgEntry = createAssuredDomain(AssuredMode.SAFE_READ_MODE, 0, TIMEOUT);
      // Wait for connection of domain to RS
      waitForConnectionToRs(testcase, replicationServer);
@@ -1532,18 +1424,13 @@
        "objectClass: organizationalUnit\n";
      addEntry(TestCaseUtils.entryFromLdifString(entry));
      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
      // seconds, so we expect the add entry code (LDAP client code emulation) to be blocked
      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
      long endTime = System.currentTimeMillis();
      long callTime = endTime - startTime;
      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
      // Check monitoring values
      // The expected ack for the first update is:
      // - replay error
      // - server 10 error, server 20 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      DN baseDN = DN.decode(SAFE_READ_DN);
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 1)
@@ -1556,12 +1443,7 @@
      startTime = System.currentTimeMillis(); // Time the update has been initiated
      deleteEntry(entryDn);
      // In this scenario, the fake RS will send back an ack after NO_TIMEOUT_RS_SLEEP_TIME
      // seconds, so we expect the delete entry code (LDAP client code emulation) to be blocked
      // for more than NO_TIMEOUT_RS_SLEEP_TIME seconds but no more than the timeout value.
      endTime = System.currentTimeMillis();
      callTime = endTime - startTime;
      assertTrue( (callTime >= NO_TIMEOUT_RS_SLEEP_TIME) && (callTime <= TIMEOUT));
      assertBlockedForLessThanTimeout(startTime, TIMEOUT);
      // Check monitoring values
      // The expected ack for the second update is:
@@ -1569,7 +1451,7 @@
      // - wrong status error
      // - replay error
      // - server 10 error, server 20 error, server 30 error
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 2)
        .assertValue("assured-sr-not-acknowledged-updates", 2)
@@ -1584,17 +1466,12 @@
      startTime = System.currentTimeMillis(); // Time the update has been initiated
      addEntry(TestCaseUtils.entryFromLdifString(entry));
      // In this scenario, the fake RS will not send back an ack so we expect
      // the add entry code (LDAP client code emulation) to be blocked for the
      // timeout value at least. If the time we have slept is lower, timeout
      // handling code is not working...
      endTime = System.currentTimeMillis();
      assertTrue((endTime - startTime) >= TIMEOUT);
      assertBlockedLongerThanTimeout(startTime, System.currentTimeMillis(), TIMEOUT);
      assertTrue(replicationServer.isScenarioExecuted());
      // Check monitoring values
      // No ack should have comen back, so timeout incremented (flag and error for rs)
      sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      Thread.sleep(1000); // Sleep a while as counters are updated just after sending thread is unblocked
      new MonitorAssertions(baseDN)
        .assertValue("assured-sr-sent-updates", 3)
        .assertValue("assured-sr-not-acknowledged-updates", 3)
@@ -1615,14 +1492,14 @@
   */
  private void deleteEntry(String dn) throws Exception
  {
    DN realDn = DN.decode(dn);
    DN realDN = DN.decode(dn);
    DeleteOperationBasis delOp = new DeleteOperationBasis(connection,
      InternalClientConnection.nextOperationID(), InternalClientConnection.
      nextMessageID(), null, realDn);
      InternalClientConnection.nextOperationID(),
      InternalClientConnection.nextMessageID(), null, realDN);
    delOp.setInternalOperation(true);
    delOp.run();
    waitOpResult(delOp, ResultCode.SUCCESS);
    assertNull(DirectoryServer.getEntry(realDn));
    assertNull(DirectoryServer.getEntry(realDN));
  }
  /**
@@ -1636,8 +1513,9 @@
    AssuredMode assuredMode) throws Exception
  {
    // Find monitoring entry for requested base DN
    String monitorFilter =
         "(&(cn=Directory server*)(domain-name=" + baseDN + "))";
    SearchFilter monitorFilter = SearchFilter.createFilterFromString(
        "(&(cn=Directory server*)(domain-name=" + baseDN + "))");
    DN dn = DN.decode("cn=replication,cn=monitor");
    InternalSearchOperation op;
    int count = 0;
@@ -1645,19 +1523,14 @@
    {
      if (count++>0)
        Thread.sleep(100);
      op = connection.processSearch(
                                    ByteString.valueOf("cn=replication,cn=monitor"),
                                    SearchScope.WHOLE_SUBTREE,
                                    LDAPFilter.decode(monitorFilter));
      op = connection.processSearch(dn, SearchScope.WHOLE_SUBTREE, monitorFilter);
    }
    while (op.getSearchEntries().isEmpty() && (count<100));
    if (op.getSearchEntries().isEmpty())
      throw new Exception("Could not read monitoring information");
    while (op.getSearchEntries().isEmpty() && count < 100);
    Assertions.assertThat(op.getSearchEntries()).isNotEmpty();
    SearchResultEntry entry = op.getSearchEntries().getFirst();
    if (entry == null)
      throw new Exception("Could not find monitoring entry");
    assertNotNull(entry);
    /*
     * Find the multi valued attribute matching the requested assured mode
@@ -1676,37 +1549,33 @@
    }
    List<Attribute> attrs = entry.getAttribute(assuredAttr);
    if (attrs == null || attrs.isEmpty())
      return Collections.emptyMap();
    Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>();
    if ( (attrs == null) || (attrs.isEmpty()) )
      return resultMap; // Empty map
    Attribute attr = attrs.get(0);
    // Parse and store values
    for (AttributeValue val : attr) {
      String srvStr = val.toString();
      StringTokenizer strtok = new StringTokenizer(srvStr, ":");
      String token = strtok.nextToken();
      if (token != null) {
        int serverId = Integer.valueOf(token);
        token = strtok.nextToken();
        if (token != null) {
          Integer nerrors = Integer.valueOf(token);
          resultMap.put(serverId, nerrors);
    Map<Integer,Integer> resultMap = new HashMap<Integer,Integer>();
    for (AttributeValue val : attrs.get(0))
    {
      StringTokenizer strtok = new StringTokenizer(val.toString(), ":");
      String serverId = strtok.nextToken();
      if (serverId != null) {
        String nbErrors = strtok.nextToken();
        if (nbErrors != null) {
          resultMap.put(Integer.valueOf(serverId), Integer.valueOf(nbErrors));
        }
      }
    }
    return resultMap;
  }
  private void waitOpResult(Operation operation, ResultCode expectedResult)
  private void waitOpResult(Operation operation, ResultCode expectedResult) throws Exception
  {
    int ii=0;
    while((operation.getResultCode()==ResultCode.UNDEFINED) ||
        (operation.getResultCode()!=expectedResult))
    {
      sleep(50);
      Thread.sleep(50);
      ii++;
      if (ii>10)
        assertEquals(operation.getResultCode(), expectedResult,
@@ -1714,4 +1583,3 @@
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -35,6 +35,7 @@
import java.net.SocketTimeoutException;
import java.util.*;
import org.assertj.core.api.Assertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.api.SynchronizationProvider;
import org.opends.server.backends.task.TaskState;
@@ -646,33 +647,23 @@
  }
  /**
   * Chaining tests of the replication Server code with 2 replication servers involved
   * 2 tests are done here (itest=0 or itest=1)
   *
   * Test 1
   * - Create replication server 1
   * - Create replication server 2 connected with replication server 1
   * - Create and connect client 1 to replication server 1
   * - Create and connect client 2 to replication server 2
   * - Make client1 publish changes
   * - Check that client 2 receives the changes published by client 1
   *
   * Test 2
   * - Create replication server 1
   * - Create and connect client1 to replication server 1
   * - Make client1 publish changes
   * - Create replication server 2 connected with replication server 1
   * - Create and connect client 2 to replication server 2
   * - Check that client 2 receives the changes published by client 1
   * <ol>
   * <li>Create replication server 1</li>
   * <li>Create replication server 2 connected with replication server 1</li>
   * <li>Create and connect client 1 to replication server 1</li>
   * <li>Create and connect client 2 to replication server 2</li>
   * <li>Make client1 publish changes</li>
   * <li>Check that client 2 receives the changes published by client 1</li>
   * </ol>
   */
  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
  public void changelogChaining() throws Exception
  public void changelogChaining0() throws Exception
  {
    debugInfo("Starting changelogChaining");
    final String tn = "changelogChaining0";
    debugInfo("Starting " + tn);
    replicationServer.clearDb();
    TestCaseUtils.initializeTestBackend(true);
    for (int itest = 0; itest <2; itest++)
    {
      ReplicationBroker broker2 = null;
      boolean emptyOldChanges = true;
@@ -683,13 +674,11 @@
      int[] changelogIds = new int[] { 80, 81 };
      int[] brokerIds = new int[] { 100, 101 };
      for (int i = 0; i < ((itest == 0) ? 2 : 1); i++)
      for (int i = 0; i < 2; i++)
      {
        changelogs[i] = null;
        // for itest=0, create the 2 connected replicationServer
        // for itest=1, create the 1rst replicationServer, the second
        // one will be created later
        // create the 2 connected replicationServer
        SortedSet<String> servers = new TreeSet<String>();
        servers.add(
          "localhost:" + ((i == 0) ? changelogPorts[1] : changelogPorts[0]));
@@ -703,30 +692,118 @@
      try
      {
        // For itest=0, create and connect client1 to changelog1
        //              and client2 to changelog2
        // For itest=1, only create and connect client1 to changelog1
        //              client2 will be created later
        // create and connect client1 to changelog1 and client2 to changelog2
        broker1 = openReplicationSession(TEST_ROOT_DN,
             brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
        assertTrue(broker1.isConnected());
        if (itest == 0)
        {
          broker2 = openReplicationSession(TEST_ROOT_DN,
             brokerIds[1], 100, changelogPorts[0], 1000, !emptyOldChanges);
          assertTrue(broker2.isConnected());
        }
        broker2 = openReplicationSession(TEST_ROOT_DN, brokerIds[1], 100,
             changelogPorts[0], 1000, !emptyOldChanges);
        assertTrue(broker2.isConnected());
        // - Test messages between clients by publishing now
        // - Delete
        long time = TimeThread.getTime();
        int ts = 1;
        CSN csn = new CSN(time, ts++, brokerIds[0]);
        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
        DN dn = DN.decode("o=example" + 0 + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
        broker1.publish(delMsg);
        DN dn = DN.decode("o=example" + itest + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(dn, csn, "uid");
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
        String baseUUID = "22222222-2222-2222-2222-222222222222";
        // - Add
        Entry entry = TestCaseUtils.entryFromLdifString(
        "dn: o=example," + TEST_ROOT_DN_STRING + "\n"
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
        AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
            entry.getAttributes(), new ArrayList<Attribute>());
        broker1.publish(addMsg);
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        broker1.publish(modMsg);
        // - ModifyDN
        ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
            EXAMPLE_DN, RDN.decode("o=example2"), true, null);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
        LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
        ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
        broker1.publish(modDNMsg);
        // - Check msg receives by broker, through changeLog2
        List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
        Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
        debugInfo("Ending " + tn);
      }
      finally
      {
        remove(changelogs);
        stop(broker1, broker2);
      }
    }
  }
  /**
   * <ol>
   * <li>Create replication server 1</li>
   * <li>Create and connect client1 to replication server 1</li>
   * <li>Make client1 publish changes</li>
   * <li>Create replication server 2 connected with replication server 1</li>
   * <li>Create and connect client 2 to replication server 2</li>
   * <li>Check that client 2 receives the changes published by client 1</li>
   * <ol>
   */
  @Test(enabled=true, dependsOnMethods = { "searchBackend"})
  public void changelogChaining1() throws Exception
  {
    final String tn = "changelogChaining1";
    debugInfo("Starting " + tn);
    replicationServer.clearDb();
    TestCaseUtils.initializeTestBackend(true);
    {
      ReplicationBroker broker2 = null;
      boolean emptyOldChanges = true;
      // - Create 2 connected replicationServer
      ReplicationServer[] changelogs = new ReplicationServer[2];
      int[] changelogPorts = TestCaseUtils.findFreePorts(2);
      int[] changelogIds = new int[] { 80, 81 };
      int[] brokerIds = new int[] { 100, 101 };
      {
        // create the 1rst replicationServer, the second one will be created later
        SortedSet<String> servers = new TreeSet<String>();
        servers.add("localhost:" + changelogPorts[1]);
        ReplServerFakeConfiguration conf =
          new ReplServerFakeConfiguration(changelogPorts[0], "replicationServerTestChangelogChainingDb"+0, 0,
                                         changelogIds[0], 0, 100, servers);
        changelogs[0] = new ReplicationServer(conf);
      }
      ReplicationBroker broker1 = null;
      try
      {
        // only create and connect client1 to changelog1 client2 will be created later
        broker1 = openReplicationSession(TEST_ROOT_DN,
             brokerIds[0], 100, changelogPorts[0], 1000, !emptyOldChanges);
        assertTrue(broker1.isConnected());
        // - Test messages between clients by publishing now
        // - Delete
        CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
        DN dn = DN.decode("o=example" + 1 + "," + TEST_ROOT_DN_STRING);
        DeleteMsg delMsg = new DeleteMsg(dn, csnGen.newCSN(), "uid");
        broker1.publish(delMsg);
        String user1entryUUID = "33333333-3333-3333-3333-333333333333";
@@ -737,8 +814,7 @@
            + "objectClass: top\n" + "objectClass: domain\n"
            + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
        Entry entry = TestCaseUtils.entryFromLdifString(lentry);
        csn = new CSN(time, ts++, brokerIds[0]);
        AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN,
        AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
            user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
            entry.getAttributes(), new ArrayList<Attribute>());
        broker1.publish(addMsg);
@@ -748,88 +824,32 @@
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        csn = new CSN(time, ts++, brokerIds[0]);
        ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid");
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        broker1.publish(modMsg);
        // - ModifyDN
        csn = new CSN(time, ts++, brokerIds[0]);
        ModifyDNOperationBasis op = new ModifyDNOperationBasis(connection, 1, 1, null,
            EXAMPLE_DN, RDN.decode("o=example2"), true, null);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, "uniqueid",
        "newparentId"));
        LocalBackendModifyDNOperation localOp =
          new LocalBackendModifyDNOperation(op);
        op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csnGen.newCSN(), "uniqueid", "newparentId"));
        LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
        ModifyDNMsg modDNMsg = new ModifyDNMsg(localOp);
        broker1.publish(modDNMsg);
        if (itest > 0)
        {
          SortedSet<String> servers = new TreeSet<String>();
          servers.add("localhost:"+changelogPorts[0]);
          ReplServerFakeConfiguration conf =
            new ReplServerFakeConfiguration(changelogPorts[1], null, 0,
                                           changelogIds[1], 0, 100, null);
          changelogs[1] = new ReplicationServer(conf);
        SortedSet<String> servers = new TreeSet<String>();
        servers.add("localhost:"+changelogPorts[0]);
        ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(
            changelogPorts[1], null, 0, changelogIds[1], 0, 100, null);
        changelogs[1] = new ReplicationServer(conf);
          // Connect broker 2 to changelog2
          broker2 = openReplicationSession(TEST_ROOT_DN,
              brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
          assertTrue(broker2.isConnected());
        }
        // Connect broker 2 to changelog2
        broker2 = openReplicationSession(TEST_ROOT_DN,
            brokerIds[1], 100, changelogPorts[1], 2000, !emptyOldChanges);
        assertTrue(broker2.isConnected());
        // - Check msg receives by broker, through changeLog2
        while (ts > 1)
        {
          ReplicationMsg msg2;
          try
          {
            msg2 = broker2.receive();
            if (msg2 == null)
              break;
            broker2.updateWindowAfterReplay();
          }
          catch (Exception e)
          {
            fail("Broker receive failed: " + e.getMessage() + "#Msg:" + ts + "#itest:" + itest);
            break;
          }
          if (msg2 instanceof DeleteMsg)
          {
            if (delMsg.equals(msg2))
              ts--;
          }
          else if (msg2 instanceof AddMsg)
          {
            if (addMsg.equals(msg2))
              ts--;
          }
          else if (msg2 instanceof ModifyMsg)
          {
            if (modMsg.equals(msg2))
              ts--;
          }
          else if (msg2 instanceof ModifyDNMsg)
          {
            if (modDNMsg.equals(msg2))
              ts--;
          }
          else if (msg2 instanceof TopologyMsg)
          {
            // Nothing to test here.
          }
          else
          {
            fail("ReplicationServer transmission failed: no expected message" +
              " class: " + msg2);
            break;
          }
        }
        // Check that everything expected has been received
        assertEquals(ts, 1, "Broker2 did not receive the complete set of"
            + " expected messages: #msg received " + ts);
        debugInfo("Ending changelogChaining");
        List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 4);
        Assertions.assertThat(msgs).containsExactly(delMsg, addMsg, modMsg, modDNMsg);
        debugInfo("Ending " + tn);
      }
      finally
      {
@@ -839,6 +859,30 @@
    }
  }
  private List<ReplicationMsg> receiveReplicationMsgs(ReplicationBroker broker2, int nbMessagesExpected)
  {
    List<ReplicationMsg> msgs = new ArrayList<ReplicationMsg>(nbMessagesExpected);
    for (int i = 0; i < nbMessagesExpected; i++)
    {
      try
      {
        ReplicationMsg msg = broker2.receive();
        if (msg == null)
          break;
        if (msg instanceof TopologyMsg)
          continue; // ignore
        msgs.add(msg);
        broker2.updateWindowAfterReplay();
      }
      catch (SocketTimeoutException e)
      {
        fail("Broker receive failed: " + e.getMessage() + "#Msg:" + i);
      }
    }
    return msgs;
  }
  /**
   * Test that the Replication sends back correctly WindowsUpdate
   * when we send a WindowProbeMsg.
@@ -859,7 +903,7 @@
     * Some other tests may have been running before and therefore
     * may have pushed some changes to the Replication Server
     * When a new session is opened, the Replication Server is therefore
     * going to send all thoses old changes to this Replication Server.
     * going to send all these old changes to this Replication Server.
     * To avoid this, this test open a first session, save the
     * ServerState from the ReplicationServer, close the session
     * and re-open a new connection providing the ServerState it just
@@ -1207,28 +1251,26 @@
   private List<UpdateMsg> createChanges(String suffix, int serverId) throws Exception
   {
     List<UpdateMsg> l = new ArrayList<UpdateMsg>();
     long time = TimeThread.getTime();
     int ts = 1;
     {
       String user1entryUUID = "33333333-3333-3333-3333-333333333333";
       String baseUUID       = "22222222-2222-2222-2222-222222222222";
       // - Add
       String lentry = "dn: "+suffix+"\n"
       Entry entry = TestCaseUtils.entryFromLdifString(
       "dn: "+suffix+"\n"
           + "objectClass: top\n"
           + "objectClass: domain\n"
           + "entryUUID: 11111111-1111-1111-1111-111111111111\n";
       Entry entry = TestCaseUtils.entryFromLdifString(lentry);
       CSN csn = new CSN(time, ts++, serverId);
           + "entryUUID: 11111111-1111-1111-1111-111111111111\n");
       CSNGenerator csnGen = new CSNGenerator(serverId, TimeThread.getTime());
       DN exampleSuffixDN = DN.decode("o=example," + suffix);
       AddMsg addMsg = new AddMsg(csn, exampleSuffixDN,
       AddMsg addMsg = new AddMsg(csnGen.newCSN(), exampleSuffixDN,
           user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
           entry.getAttributes(), new ArrayList<Attribute>());
       l.add(addMsg);
       // - Add
       String luentry =
       Entry uentry = TestCaseUtils.entryFromLdifString(
             "dn: cn=Fiona Jensen,ou=People,"+suffix+"\n"
           + "objectClass: top\n"
           + "objectclass: person\n"
@@ -1239,12 +1281,10 @@
           + "givenName: fjensen\n"
           + "telephonenumber: +1 408 555 1212\n"
           + "entryUUID: " + user1entryUUID +"\n"
           + "userpassword: fjen$$en" + "\n";
       Entry uentry = TestCaseUtils.entryFromLdifString(luentry);
       csn = new CSN(time, ts++, serverId);
           + "userpassword: fjen$$en" + "\n");
       DN newPersonDN = DN.decode("uid=new person,ou=People,"+suffix);
       AddMsg addMsg2 = new AddMsg(
           csn,
           csnGen.newCSN(),
           newPersonDN,
           user1entryUUID,
           baseUUID,
@@ -1263,22 +1303,18 @@
       List<Modification> mods = Arrays.asList(mod1, mod2, mod3);
       csn = new CSN(time, ts++, serverId);
       DN dn = exampleSuffixDN;
       ModifyMsg modMsg = new ModifyMsg(csn, dn,
           mods, "fakeuniqueid");
       ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), dn, mods, "fakeuniqueid");
       l.add(modMsg);
       // Modify DN
       csn = new CSN(time, ts++, serverId);
       ModifyDNMsg  modDnMsg = new ModifyDNMsg(newPersonDN, csn,
       ModifyDNMsg  modDnMsg = new ModifyDNMsg(newPersonDN, csnGen.newCSN(),
           user1entryUUID, baseUUID, false,
           "uid=wrong, ou=people,"+suffix, "uid=newrdn");
       l.add(modDnMsg);
       // Del
       csn = new CSN(time, ts++, serverId);
       DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csn, "uid");
       DeleteMsg delMsg = new DeleteMsg(exampleSuffixDN, csnGen.newCSN(), "uid");
       l.add(delMsg);
     }
     return l;
@@ -1567,8 +1603,8 @@
         changelogs[i] = new ReplicationServer(conf);
       }
       try
       {
    try
    {
         // Create and connect client1 to changelog1
         // and client2 to changelog2
         broker1 = openReplicationSession(TEST_ROOT_DN,
@@ -1580,9 +1616,7 @@
         assertTrue(broker2.isConnected());
         // - Test messages between clients by publishing now
         long time = TimeThread.getTime();
         int ts = 1;
         CSN csn;
         CSNGenerator csnGen = new CSNGenerator(brokerIds[0], TimeThread.getTime());
         String user1entryUUID = "33333333-3333-3333-3333-333333333333";
         String baseUUID  = "22222222-2222-2222-2222-222222222222";
@@ -1591,10 +1625,9 @@
             + "objectClass: top\n" + "objectClass: domain\n"
             + "entryUUID: " + user1entryUUID + "\n";
         Entry entry = TestCaseUtils.entryFromLdifString(lentry);
         csn = new CSN(time, ts++, brokerIds[0]);
         AddMsg addMsg = new AddMsg(csn, EXAMPLE_DN,
             user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry
             .getAttributes(), new ArrayList<Attribute>());
         AddMsg addMsg = new AddMsg(csnGen.newCSN(), EXAMPLE_DN,
             user1entryUUID, baseUUID, entry.getObjectClassAttribute(),
             entry.getAttributes(), new ArrayList<Attribute>());
         broker1.publish(addMsg);
         // - Modify
@@ -1602,47 +1635,12 @@
         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
         List<Modification> mods = new ArrayList<Modification>();
         mods.add(mod1);
         csn = new CSN(time, ts++, brokerIds[0]);
         ModifyMsg modMsg = new ModifyMsg(csn, EXAMPLE_DN, mods, "fakeuniqueid");
         ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
         broker1.publish(modMsg);
         // - Check msg received by broker, through changeLog2
         while (ts > 1)
         {
           ReplicationMsg msg2;
           try
           {
             msg2 = broker2.receive();
             if (msg2 == null)
               break;
             broker2.updateWindowAfterReplay();
           }
           catch (Exception e)
           {
             fail("Broker receive failed: " + e.getMessage() + "#Msg: " + ts);
             break;
           }
           if (msg2 instanceof AddMsg)
           {
             if (addMsg.equals(msg2))
               ts--;
           }
           else if (msg2 instanceof ModifyMsg)
           {
             if (modMsg.equals(msg2))
               ts--;
           }
           else
           {
          fail("ReplicationServer transmission failed: did not expect message of class: " + msg2);
             break;
           }
         }
         // Check that everything expected has been received
         assertEquals(ts, 1, "Broker2 did not receive the complete set of"
             + " expected messages: #msg received " + ts);
         List<ReplicationMsg> msgs = receiveReplicationMsgs(broker2, 2);
         Assertions.assertThat(msgs).containsExactly(addMsg, modMsg);
         // Then change the config to remove replicationServer[1] from
         // the configuration of replicationServer[0]
@@ -1657,41 +1655,43 @@
         // The link between RS[0] & RS[1] should be destroyed by the new configuration.
         // So we expect a timeout exception when calling receive on RS[1].
         // Send an update and check that RS[1] does not receive the message after the timeout
         try
         {
           // - Del
           csn = new CSN(time, ts++, brokerIds[0]);
           DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csn, user1entryUUID);
           broker1.publish(delMsg);
           // Should receive some TopologyMsg messages for disconnection
           // between the 2 RSs
           ReplicationMsg msg = null;
           while (true)
           {
             msg = broker2.receive();
             if (msg instanceof TopologyMsg)
             {
               debugInfo("Broker 2 received: " + msg);
             } else
             {
               fail("Broker: receive successed when it should fail. " +
                 "This broker was disconnected by configuration." +
                 " Received: " + msg);
             }
           }
         }
         catch (SocketTimeoutException soExc)
         {
         // the receive fail as expected
         debugInfo("Ending replicationServerConnected");
         return;
         }
       }
       finally
       {
      // - Del
      DeleteMsg delMsg = new DeleteMsg(EXAMPLE_DN, csnGen.newCSN(), user1entryUUID);
      broker1.publish(delMsg);
      // Should receive some TopologyMsg messages for disconnection between the 2 RSs
      assertOnlyTopologyMsgsReceived(broker2);
    }
    finally
    {
      remove(changelogs);
      stop(broker1, broker2);
       }
    }
  }
  private void assertOnlyTopologyMsgsReceived(ReplicationBroker broker2)
  {
    try
    {
      while (true)
      {
        ReplicationMsg msg = broker2.receive();
        if (msg instanceof TopologyMsg)
        {
          debugInfo("Broker 2 received: " + msg);
        }
        else
        {
          fail("Broker: receive successed when it should fail. "
              + "This broker was disconnected by configuration."
              + " Received: " + msg);
        }
      }
    }
    catch (SocketTimeoutException expected)
    {
      debugInfo("Ending replicationServerConnected");
    }
  }
}