mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.30.2013 80f29a511a0681bc55293f0ea82e368097f2fa9a
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -77,8 +77,9 @@
  private static final String baseDnStr = TEST_ROOT_DN_STRING;
  private static final String testName = "generationIdTest";
  private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id";
  private static final int   WINDOW_SIZE = 10;
  private static final int WINDOW_SIZE = 10;
  private static final int server1ID = 1;
  private static final int server2ID = 2;
  private static final int server3ID = 3;
@@ -87,11 +88,11 @@
  private static final int changelog3ID = 13;
  private DN baseDN;
  private ReplicationBroker broker2 = null;
  private ReplicationBroker broker3 = null;
  private ReplicationServer replServer1 = null;
  private ReplicationServer replServer2 = null;
  private ReplicationServer replServer3 = null;
  private ReplicationBroker broker2;
  private ReplicationBroker broker3;
  private ReplicationServer replServer1;
  private ReplicationServer replServer2;
  private ReplicationServer replServer3;
  private boolean emptyOldChanges = true;
  private Entry taskInitRemoteS2;
  private String[] updatedEntries;
@@ -148,7 +149,8 @@
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  protected void debugInfo(String message, Exception e)
  private void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
@@ -183,31 +185,21 @@
        "ds-task-initialize-replica-server-id: " + server2ID);
  }
  /** Tests that entries have been written in the db */
  private int testEntriesInDb()
  /** Tests that entries have been written in the db. */
  private int countUpdatedEntriesInDb() throws Exception
  {
    debugInfo("TestEntriesInDb");
    short found = 0;
    debugInfo("countUpdatedEntriesInDb");
    int found = 0;
    for (String entry : updatedEntries)
    {
      int dns = entry.indexOf("dn: ");
      int dne = entry.indexOf(TestCaseUtils.TEST_ROOT_DN_STRING);
      String dn = entry.substring(dns + 4,
        dne + TestCaseUtils.TEST_ROOT_DN_STRING.length());
      int dne = entry.indexOf(TEST_ROOT_DN_STRING);
      String dn = entry.substring(dns + 4, dne + TEST_ROOT_DN_STRING.length());
      debugInfo("Search Entry: " + dn);
      DN entryDN = null;
      try
      {
        entryDN = DN.decode(dn);
      }
      catch(Exception e)
      {
        debugInfo("TestEntriesInDb/" + e);
      }
      DN entryDN = DN.decode(dn);
      try
      {
@@ -272,62 +264,49 @@
    };
  }
  private int receiveImport(ReplicationBroker broker, int serverID,
      String[] updatedEntries)
  private int receiveImport(ReplicationBroker broker, int serverID, String[] updatedEntries) throws Exception
  {
    // Expect the broker to receive the entries
    ReplicationMsg msg;
    short entriesReceived = -100;
    int entriesReceived = -100;
    while (true)
    {
      try
      debugInfo("Broker " + serverID + " Wait for entry or done msg");
      ReplicationMsg msg = broker.receive();
      if (msg == null)
      {
        debugInfo("Broker " + serverID + " Wait for entry or done msg");
        msg = broker.receive();
        if (msg == null)
        {
          break;
        }
        if (msg instanceof InitializeTargetMsg)
        {
          debugInfo("Broker " + serverID + " receives InitializeTargetMessage ");
          entriesReceived = 0;
        }
        else if (msg instanceof EntryMsg)
        {
          EntryMsg em = (EntryMsg)msg;
          debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
          entriesReceived++;
        }
        else if (msg instanceof DoneMsg)
        {
          debugInfo("Broker " + serverID + " receives done ");
          break;
        }
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg em = (ErrorMsg)msg;
          debugInfo("Broker " + serverID + " receives ERROR " + em);
          break;
        }
        else
        {
          debugInfo("Broker " + serverID + " receives and trashes " + msg);
        }
        break;
      }
      catch(Exception e)
      if (msg instanceof InitializeTargetMsg)
      {
        debugInfo("receiveUpdatedEntries" + stackTraceToSingleLineString(e));
        debugInfo("Broker " + serverID + " receives InitializeTargetMessage ");
        entriesReceived = 0;
      }
      else if (msg instanceof EntryMsg)
      {
        EntryMsg em = (EntryMsg)msg;
        debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
        entriesReceived++;
      }
      else if (msg instanceof DoneMsg)
      {
        debugInfo("Broker " + serverID + " receives done ");
        break;
      }
      else if (msg instanceof ErrorMsg)
      {
        debugInfo("Broker " + serverID + " receives ERROR " + msg);
        break;
      }
      else
      {
        debugInfo("Broker " + serverID + " receives and trashes " + msg);
      }
    }
    if (updatedEntries != null)
    {
      assertTrue(entriesReceived == updatedEntries.length,
          " Received entries("+entriesReceived +
          ") == Expected entries("+updatedEntries.length+")");
      assertEquals(updatedEntries.length, entriesReceived);
    }
    return entriesReceived;
@@ -387,7 +366,7 @@
      // Must be no connection already done or disconnectFromReplServer should
      // have been called
      assertTrue(synchroServerEntry == null);
      assertNull(synchroServerEntry);
      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
@@ -423,7 +402,7 @@
      String synchroServerStringDN = "cn=" + testName + ", cn=domains," +
      SYNCHRO_PLUGIN_DN;
      // Must have called connectServer1ToChangelog previously
      assertTrue(synchroServerEntry != null);
      assertNotNull(synchroServerEntry);
      DN synchroServerDN = DN.decode(synchroServerStringDN);
@@ -434,8 +413,7 @@
        DirectoryServer.getConfigHandler().deleteEntry(ecle.getDN(), null);
      }
      DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN, null);
      assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()) ==
        null,
      assertNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to delete the synchronized domain");
      synchroServerEntry = null;
@@ -451,7 +429,7 @@
          Thread.sleep(200);
          waitCo++;
        }
        assert(replDomainToDis==null);
        assertNull(replDomainToDis);
      }
      catch (DirectoryException e)
      {
@@ -470,39 +448,29 @@
    return replServerPort[changelogID];
  }
  protected static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  private long readGenIdFromSuffixRootEntry() throws Exception
  {
    long genId=-1;
    Entry resultEntry = getEntry(baseDN, 1000, true);
    if (resultEntry == null)
    {
      Entry resultEntry = getEntry(baseDN, 1000, true);
      if (resultEntry==null)
      {
        debugInfo("Entry not found <" + baseDN + ">");
      }
      else
      {
        debugInfo("Entry found <" + baseDN + ">");
      debugInfo("Entry not found <" + baseDN + ">");
    }
    else
    {
      debugInfo("Entry found <" + baseDN + ">");
        AttributeType synchronizationGenIDType =
          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
        List<Attribute> attrs =
          resultEntry.getAttribute(synchronizationGenIDType);
        if (attrs != null)
      AttributeType synchronizationGenIDType = DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
      List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType);
      if (attrs != null)
      {
        Attribute attr = attrs.get(0);
        if (attr.size() == 1)
        {
          Attribute attr = attrs.get(0);
          if (attr.size() == 1)
          {
            genId =
                Long.decode(attr.iterator().next().getValue().toString());
          }
          return Long.decode(attr.iterator().next().getValue().toString());
        }
      }
    }
    return genId;
    return -1;
  }
  private void performLdifImport() throws Exception
@@ -545,22 +513,18 @@
        + "userPassword: password\n" + "initials: AA\n";
  }
  static protected ReplicationMsg createAddMsg() throws Exception
  private static AddMsg createAddMsg() throws Exception
  {
    Entry personWithUUIDEntry = null;
    String user1entryUUID;
    String baseUUID = null;
    String user1dn;
    /*
     * Create a CSN generator to generate new CSNs when we need to send
     * operation messages to the replicationServer.
     */
    CSNGenerator gen = new CSNGenerator(2, 0);
    user1entryUUID = "33333333-3333-3333-3333-333333333333";
    user1dn = "uid=user1,ou=People," + baseDnStr;
    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
    String user1entryUUID = "33333333-3333-3333-3333-333333333333";
    String user1dn = "uid=user1,ou=People," + baseDnStr;
    Entry personWithUUIDEntry = TestCaseUtils.entryFromLdifString(
    "dn: "+ user1dn + "\n"
    + "objectClass: top\n" + "objectClass: person\n"
    + "objectClass: organizationalPerson\n"
    + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
@@ -574,15 +538,13 @@
    + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
    + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
    + "userPassword: password\n" + "initials: AA\n"
    + "entryUUID: " + user1entryUUID + "\n";
    personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
    + "entryUUID: " + user1entryUUID + "\n");
    // Create and publish an update message to add an entry.
    return new AddMsg(gen.newCSN(),
        personWithUUIDEntry.getDN(),
        user1entryUUID,
        baseUUID,
        null,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
  }
@@ -591,31 +553,24 @@
   * Check that the expected number of changes are in the replication server
   * database.
   */
  private void checkChangelogSize(int expectedCount)
  private void checkChangelogSize(int expectedCount) throws Exception
  {
    try
    SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)");
    InternalSearchOperation searchOperation =
      connection.processSearch(DN.decode("dc=replicationchanges"),
          SearchScope.SUBORDINATE_SUBTREE,
          filter);
    if (debugEnabled())
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      InternalSearchOperation searchOperation =
        connection.processSearch(DN.decode("dc=replicationchanges"),
            SearchScope.SUBORDINATE_SUBTREE,
            filter);
      if (debugEnabled())
      if (searchOperation.getSearchEntries().size() != expectedCount)
      {
        if (searchOperation.getSearchEntries().size() != expectedCount)
        for (SearchResultEntry sre : searchOperation.getSearchEntries())
        {
          for (SearchResultEntry sre : searchOperation.getSearchEntries())
          {
            debugInfo("Entry found: " + sre.toLDIFString());
          }
          debugInfo("Entry found: " + sre.toLDIFString());
        }
      }
      assertEquals(searchOperation.getSearchEntries().size(), expectedCount);
    }
    catch(Exception e)
    {
    }
    assertEquals(searchOperation.getSearchEntries().size(), expectedCount);
  }
  /**
@@ -706,10 +661,10 @@
        broker3.receive();
        fail("No update message is supposed to be received here.");
      }
      catch(SocketTimeoutException e)
      catch (SocketTimeoutException expected)
      {
        // This is the expected result
        // Note that timeout should be lower than RS montoring publisher period
        // Note that timeout should be lower than RS monitoring publisher period
        // so that timeout occurs
      }
@@ -717,8 +672,7 @@
      debugInfo(testCase + " ** TEST ** The part of the topology with the right gen ID should work well");
      // Now create a change that must be replicated
      String ent1[] = { createEntry(UUID.randomUUID()) };
      addTestEntriesToDB(ent1);
      addTestEntriesToDB(createEntry(UUID.randomUUID()));
      // Verify that RS1 does contain the change related to this ADD.
      Thread.sleep(500);
@@ -731,12 +685,12 @@
      //===========================================================
      debugInfo(testCase + " ** TEST ** Persistence of the generation ID in RS1");
      long genIdBeforeShut = replServer1.getGenerationId(baseDN);
      final long genIdBeforeShut = replServer1.getGenerationId(baseDN);
      debugInfo("Shutdown replServer1");
      stop(broker2, broker3);
      broker2 = broker3 = null;
      remove(replServer1);
      replServer1.remove();
      replServer1 = null;
      debugInfo("Create again replServer1");
@@ -749,10 +703,10 @@
      debugInfo("Delay to allow DS to reconnect to replServer1");
      long genIdAfterRestart = replServer1.getGenerationId(baseDN);
      final long genIdAfterRestart = replServer1.getGenerationId(baseDN);
      debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
      assertTrue(replServer1!=null, "Replication server creation failed.");
      assertTrue(genIdBeforeShut == genIdAfterRestart,
      assertNotNull(replServer1, "Replication server creation failed.");
      assertEquals(genIdAfterRestart, genIdBeforeShut,
        "generationId is expected to have the same value" +
        " after replServer1 restart. Before : " + genIdBeforeShut +
        " after : " + genIdAfterRestart);
@@ -799,20 +753,14 @@
      // to enter the bad gen id status
      ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2,
        ChangeStatusMsg.class.getName());
      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
      {
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
          " to enter the bad gen id status"
            + csMsg);
      }
      assertEquals(csMsg.getRequestedStatus(), ServerStatus.BAD_GEN_ID_STATUS,
          "Broker 2 connection is expected to receive 1 ChangeStatusMsg"
              + " to enter the bad gen id status" + csMsg);
      csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3,
        ChangeStatusMsg.class.getName());
      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
      {
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
          " to enter the bad gen id status"
            + csMsg);
      }
      assertEquals(csMsg.getRequestedStatus(), ServerStatus.BAD_GEN_ID_STATUS,
          "Broker 2 connection is expected to receive 1 ChangeStatusMsg"
              + " to enter the bad gen id status" + csMsg);
      debugInfo("DS1 root entry must contain the new gen ID");
      genId = readGenIdFromSuffixRootEntry();
@@ -841,39 +789,24 @@
          "Expecting that DS3 with old gen ID is in bad gen id from RS1");
      debugInfo("Add entries to DS1, update should not be sent to DS2 and DS3 that are in bad gen id");
      String[] ent3 = { createEntry(UUID.randomUUID()) };
      addTestEntriesToDB(ent3);
      addTestEntriesToDB(createEntry(UUID.randomUUID()));
      debugInfo("RS1 must have stored that update.");
      Thread.sleep(500);
      checkChangelogSize(1);
      try
      {
        ReplicationMsg msg2 = broker2.receive();
        fail("No update message is supposed to be received by broker2 in bad gen id. " + msg2);
      } catch(SocketTimeoutException e) { /* expected */ }
      try
      {
        ReplicationMsg msg2 = broker3.receive();
        fail("No update message is supposed to be received by broker3 in bad gen id. " + msg2);
      } catch(SocketTimeoutException e) { /* expected */ }
      assertNoMessageReceivedBadGenId(broker2, "broker2");
      assertNoMessageReceivedBadGenId(broker3, "broker3");
      debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it.");
      AddMsg emsg = (AddMsg)createAddMsg();
      AddMsg emsg = createAddMsg();
      broker2.publish(emsg);
      // Updates count in RS1 must stay unchanged = to 1
      Thread.sleep(500);
      checkChangelogSize(1);
      try
      {
        ReplicationMsg msg2 = broker3.receive();
        fail("No update message is supposed to be received by broker3 in bad gen id. "+ msg2);
      } catch(SocketTimeoutException e) { /* expected */ }
      assertNoMessageReceivedBadGenId(broker3, "broker3");
      //===============================================================
@@ -941,7 +874,7 @@
      assertTrue(msg instanceof AddMsg, "Expected to receive an AddMsg but received: " + msg);
      debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it.");
      emsg = (AddMsg)createAddMsg();
      emsg = createAddMsg();
      broker2.publish(emsg);
      Thread.sleep(500);
@@ -965,7 +898,19 @@
    }
  }
  /**
  private void assertNoMessageReceivedBadGenId(ReplicationBroker broker, String brokerName)
  {
    try
    {
      ReplicationMsg msg = broker.receive();
      fail("No update message is supposed to be received by " + brokerName
          + " with bad gen id. " + msg);
    }
    catch (SocketTimeoutException expected)
    { /* expected */
    }
  }
  /**
   * testMultiRS tests basic features of generationID
   * with more than one Replication Server.
@@ -997,38 +942,13 @@
      connectServer1ToChangelog(changelog1ID);
      debugInfo("Expect genId are set in all replServers.");
      int waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == EMPTY_DN_GENID
            && replServer2.getGenerationId(baseDN) == EMPTY_DN_GENID
            && replServer3.getGenerationId(baseDN) == EMPTY_DN_GENID)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      assertEquals(replServer1.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer1");
      assertEquals(replServer2.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer2");
      assertEquals(replServer3.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer3");
      waitForStableGenerationId(EMPTY_DN_GENID);
      debugInfo("Disconnect DS from replServer1.");
      disconnectFromReplServer(changelog1ID);
      waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == -1
            && replServer2.getGenerationId(baseDN) == -1
            && replServer3.getGenerationId(baseDN) == -1)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      debugInfo(
        "Expect genIds to be resetted in all servers to -1 as no more DS in topo - after 10 sec");
      assertEquals(replServer1.getGenerationId(baseDN), -1);
      assertEquals(replServer2.getGenerationId(baseDN), -1);
      assertEquals(replServer3.getGenerationId(baseDN), -1);
      debugInfo("Expect genIds to be resetted in all servers to -1 as no more DS in topo - after 10 sec");
      waitForStableGenerationId(-1);
      debugInfo("Add entries to DS");
      addTestEntriesToDB(updatedEntries);
@@ -1036,23 +956,10 @@
      debugInfo("Connecting DS to replServer2");
      connectServer1ToChangelog(changelog2ID);
      debugInfo(
        "Expect genIds to be set in all servers based on the added entries.");
      debugInfo("Expect genIds to be set in all servers based on the added entries.");
      genId = readGenIdFromSuffixRootEntry();
      assertTrue(genId != -1);
      waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == genId
            && replServer2.getGenerationId(baseDN) == genId
            && replServer3.getGenerationId(baseDN) == genId)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      assertEquals(replServer1.getGenerationId(baseDN), genId);
      assertEquals(replServer2.getGenerationId(baseDN), genId);
      assertEquals(replServer3.getGenerationId(baseDN), genId);
      waitForStableGenerationId(genId);
      debugInfo("Connecting broker2 to replServer3 with a good genId");
      broker2 = openReplicationSession(baseDN, server2ID, 100,
@@ -1067,18 +974,8 @@
      debugInfo("Verifying that all replservers genIds have been reset.");
      debugInfo(
      "Expect all genIds to keep their value since broker2 is still connected.");
      waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == genId
            && replServer2.getGenerationId(baseDN) == genId
            && replServer3.getGenerationId(baseDN) == genId)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      debugInfo("Expect all genIds to keep their value since broker2 is still connected.");
      waitForStableGenerationId(genId);
      assertEquals(replServer1.getGenerationId(baseDN), genId);
      assertEquals(replServer2.getGenerationId(baseDN), genId);
      assertEquals(replServer3.getGenerationId(baseDN), genId);
@@ -1092,10 +989,7 @@
      debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID));
      int found = testEntriesInDb();
      assertEquals(found, updatedEntries.length,
        " Entries present in DB :" + found +
        " Expected entries :" + updatedEntries.length);
      assertEquals(countUpdatedEntriesInDb(), updatedEntries.length);
      debugInfo("Connecting DS to replServer1.");
      connectServer1ToChangelog(changelog1ID);
@@ -1134,7 +1028,7 @@
      waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
      debugInfo("Verifying that all replservers genIds have been reset.");
      waitRes=0;
      int waitRes = 0;
      while(waitRes<100)
      {
        readGenIdFromSuffixRootEntry();
@@ -1160,6 +1054,23 @@
    }
  }
  private void waitForStableGenerationId(long expectedGenId) throws Exception
  {
    int wait = 0;
    while (wait < 100)
    {
      if (replServer1.getGenerationId(baseDN) == expectedGenId
          && replServer2.getGenerationId(baseDN) == expectedGenId
          && replServer3.getGenerationId(baseDN) == expectedGenId)
        break;
      wait++;
      Thread.sleep(100);
    }
    assertEquals(replServer1.getGenerationId(baseDN), expectedGenId, " in replServer1");
    assertEquals(replServer2.getGenerationId(baseDN), expectedGenId, " in replServer2");
    assertEquals(replServer3.getGenerationId(baseDN), expectedGenId, " in replServer3");
  }
  private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId)
  {
    ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDN);
@@ -1169,7 +1080,7 @@
  /**
   * Disconnect broker and remove entries from the local DB
   */
  protected void postTest()
  private void postTest() throws Exception
  {
    debugInfo("Post test cleaning.");
@@ -1182,12 +1093,8 @@
    Arrays.fill(replServerPort, 0);
    debugInfo("Clearing DS backend");
    try
    {
      TestCaseUtils.initializeTestBackend(false);
    } catch (Exception ex)
    {debugInfo("postTest(): error cleaning memory backend: " + ex);}
    debugInfo("Clearing DJ backend");
    TestCaseUtils.initializeTestBackend(false);
  }
  /**
@@ -1208,8 +1115,6 @@
    try
    {
      long genId;
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      /*
@@ -1223,14 +1128,12 @@
      connectServer1ToChangelog(changelog1ID);
      debugInfo(testCase + " Expect genId attribute to be not retrievable");
      genId = readGenIdFromSuffixRootEntry();
      assertEquals(genId,-1);
      assertEquals(readGenIdFromSuffixRootEntry(), -1);
      addTestEntriesToDB(updatedEntries);
      debugInfo(testCase + " Expect genId attribute to be retrievable");
      genId = readGenIdFromSuffixRootEntry();
      assertEquals(genId, EMPTY_DN_GENID);
      assertEquals(readGenIdFromSuffixRootEntry(), EMPTY_DN_GENID);
      disconnectFromReplServer(changelog1ID);
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -327,33 +327,6 @@
    return broker;
  }
  /**
   * Open a replicationServer session with flow control to the local
   * ReplicationServer.
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDN, int serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
      throws Exception, SocketException
  {
    ServerState state = new ServerState();
    if (emptyOldChanges)
       new PersistentServerState(baseDN, serverId, new ServerState());
    ReplicationBroker broker = new ReplicationBroker(null,
        state, baseDN, serverId, window_size,
        getGenerationId(baseDN), 0, getReplSessionSecurity(), (byte)1, 500);
    List<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    return broker;
  }
  protected void deleteEntry(DN dn)
  {
    try
@@ -364,10 +337,9 @@
    catch(Exception e)
    {}
    DeleteOperationBasis op;
    op = new DeleteOperationBasis(connection, InternalClientConnection
        .nextOperationID(), InternalClientConnection.nextMessageID(), null,
        dn);
    DeleteOperationBasis op = new DeleteOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(),
        null, dn);
    op.run();
    if ((op.getResultCode() != ResultCode.SUCCESS) &&
        (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
@@ -897,102 +869,76 @@
  }
  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
      Message expectedMessage) throws Exception
  {
    TaskState taskState = null;
    int cpt=40;
    try
    SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    do
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
      InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter);
      resultEntry = searchOperation.getSearchEntries().getFirst();
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
        cpt--;
      }
      while ((taskState != expectedTaskState) &&
             (taskState != TaskState.STOPPED_BY_ERROR) &&
             (taskState != TaskState.COMPLETED_SUCCESSFULLY) &&
             (cpt > 0));
      // Check that the task state is as expected.
      AttributeType taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
      String stateString =
          resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER);
      taskState = TaskState.fromString(stateString);
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      List<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      Thread.sleep(500);
      cpt--;
    }
    while ((taskState != expectedTaskState)
        && (taskState != TaskState.STOPPED_BY_ERROR)
        && (taskState != TaskState.COMPLETED_SUCCESSFULLY) && (cpt > 0));
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
      }
      if (logMessages.size() != 0)
      {
          TRACER.debugInfo(logMessages.get(0));
          if (expectedMessage != null)
          {
            TRACER.debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
      }
    // Check that the task contains some log messages.
    AttributeType logMessagesType =
        DirectoryServer.getAttributeType(ATTR_TASK_LOG_MESSAGES.toLowerCase());
    List<String> logMessages = new ArrayList<String>();
    resultEntry.getAttributeValues(logMessagesType,
        DirectoryStringSyntax.DECODER, logMessages);
      if ((expectedTaskState == TaskState.RUNNING)
          && (taskState == TaskState.COMPLETED_SUCCESSFULLY))
    if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
        && (taskState != TaskState.RUNNING))
    {
      if (logMessages.size() == 0)
      {
        // We usually wait the running state after adding the task
        // and if the task is fast enough then it may be already done
        // and we can go on.
      }
      else
      {
        assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
        fail("No log messages were written to the task entry on a failed task");
      }
    }
    catch(Exception e)
    if (logMessages.size() != 0)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
      TRACER.debugInfo(logMessages.get(0));
      if (expectedMessage != null)
      {
        TRACER.debugInfo(expectedMessage.toString());
        assertTrue(logMessages.get(0).indexOf(expectedMessage.toString()) > 0);
      }
    }
    if ((expectedTaskState == TaskState.RUNNING)
        && (taskState == TaskState.COMPLETED_SUCCESSFULLY))
    {
      // We usually wait the running state after adding the task
      // and if the task is fast enough then it may be already done
      // and we can go on.
    }
    else
    {
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState
          + " Expected task state:" + expectedTaskState);
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  protected void addTestEntriesToDB(String[] ldifEntries)
  protected void addTestEntriesToDB(String... ldifEntries)
  {
    try
    {
@@ -1040,10 +986,7 @@
   */
  protected String getEntryUUID(DN dn) throws Exception
  {
    Entry newEntry;
    int count = 10;
    if (count<1)
      count=1;
    String found = null;
    while ((count> 0) && (found == null))
    {
@@ -1057,14 +1000,11 @@
      try
      {
        newEntry = DirectoryServer.getEntry(dn);
        Entry newEntry = DirectoryServer.getEntry(dn);
        if (newEntry != null)
        {
          List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
          Attribute tmpAttr = tmpAttrList.get(0);
          for (AttributeValue val : tmpAttr)
          for (AttributeValue val : tmpAttrList.get(0))
          {
            found = val.getValue().toString();
            break;
@@ -1104,15 +1044,13 @@
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      ReplicationMsg replMsg = null;
      try
      {
        replMsg = session.receive();
@@ -1147,15 +1085,13 @@
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      ReplicationMsg replMsg = null;
      try
      {
        replMsg = broker.receive();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -251,41 +251,16 @@
       */
      unknownCSNServer1 = new CSN(time + 1, 1, 1);
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      DeleteMsg msg = new DeleteMsg(EXAMPLE_DN, firstCSNServer1, "uid");
      server1.publish(msg);
      ReplicationMsg msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      sendAndReceiveDeleteMsg(server1, server2, EXAMPLE_DN, firstCSNServer1, "uid");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg(TEST_ROOT_DN, secondCSNServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      // Send and receive a second Delete Msg
      sendAndReceiveDeleteMsg(server1, server2, TEST_ROOT_DN, secondCSNServer1, "uid");
      /*
       * Send and receive a Delete Msg from server 2 to server 1
       */
      msg = new DeleteMsg(EXAMPLE_DN, firstCSNServer2, "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      // Send and receive a Delete Msg from server 2 to server 1
      sendAndReceiveDeleteMsg(server2, server1, EXAMPLE_DN, firstCSNServer2, "other-uid");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg(TEST_ROOT_DN, secondCSNServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      // Send and receive a second Delete Msg
      sendAndReceiveDeleteMsg(server2, server1, TEST_ROOT_DN, secondCSNServer2, "uid");
      debugInfo("Ending changelogBasic");
    }
@@ -295,6 +270,16 @@
    }
  }
  private void sendAndReceiveDeleteMsg(ReplicationBroker sender, ReplicationBroker receiver,
      DN dn, CSN csn, String entryUUID) throws Exception
  {
    DeleteMsg sentMsg = new DeleteMsg(dn, csn, entryUUID);
    sender.publish(sentMsg);
    ReplicationMsg receivedMsg = receiver.receive();
    receiver.updateWindowAfterReplay();
    assertDeleteMsgBodyEquals(sentMsg, receivedMsg);
  }
  private void assertDeleteMsgBodyEquals(DeleteMsg msg, ReplicationMsg msg2)
  {
    assertTrue(msg2 instanceof DeleteMsg,
@@ -305,6 +290,16 @@
        "ReplicationServer basic : incorrect message body received.");
  }
  private ServerState newServerState(CSN... csns)
  {
    ServerState state = new ServerState();
    for (CSN csn : csns)
    {
      state.update(csn);
    }
    return state;
  }
  /**
   * Test that a new client see the change that was sent in the
   * previous test.
@@ -319,9 +314,9 @@
          3, 100, replicationServerPort, 1000, false);
      assertTrue(broker.isConnected());
      ReplicationMsg msg2 = broker.receive();
      ReplicationMsg receivedMsg = broker.receive();
      broker.updateWindowAfterReplay();
      assertDeleteMsgCSNEquals(msg2, firstCSNServer1, "first");
      assertDeleteMsgCSNEquals(receivedMsg, firstCSNServer1, "first");
      debugInfo("Ending newClient");
    }
    finally
@@ -330,8 +325,6 @@
    }
  }
  /**
   * Test that a client that has already seen some changes now receive
   * the correct next change.
@@ -345,9 +338,9 @@
      broker = openReplicationSession(TEST_ROOT_DN,
          3, 100, replicationServerPort, 5000, state);
      ReplicationMsg msg2 = broker.receive();
      ReplicationMsg receivedMsg = broker.receive();
      broker.updateWindowAfterReplay();
      assertDeleteMsgCSNEquals(msg2, nextCSN, "second");
      assertDeleteMsgCSNEquals(receivedMsg, nextCSN, "second");
    }
    finally
    {
@@ -383,10 +376,7 @@
     * Create a ServerState updated with the first changes from both servers
     * done in test changelogBasic.
     */
    ServerState state = new ServerState();
    state.update(firstCSNServer1);
    state.update(firstCSNServer2);
    ServerState state = newServerState(firstCSNServer1, firstCSNServer2);
    newClientWithChanges(state, secondCSNServer1);
    debugInfo("Ending newClientWithFirstChanges");
  }
@@ -398,13 +388,7 @@
  private void newClientWithUnknownChanges() throws Exception
  {
    debugInfo("Starting newClientWithUnknownChanges");
    /*
     * Create a ServerState with wrongCSNServer1
     */
    ServerState state = new ServerState();
    state.update(unknownCSNServer1);
    state.update(secondCSNServer2);
    ServerState state = newServerState(unknownCSNServer1, secondCSNServer2);
    newClientWithChanges(state, secondCSNServer1);
    debugInfo("Ending newClientWithUnknownChanges");
  }
@@ -416,12 +400,7 @@
  private void newClientWithChangefromServer1() throws Exception
  {
    debugInfo("Starting newClientWithChangefromServer1");
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstCSNServer1);
    ServerState state = newServerState(firstCSNServer1);
    newClientWithChanges(state, firstCSNServer2);
    debugInfo("Ending newClientWithChangefromServer1");
  }
@@ -433,12 +412,7 @@
  private void newClientWithChangefromServer2() throws Exception
  {
    debugInfo("Starting newClientWithChangefromServer2");
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstCSNServer2);
    ServerState state = newServerState(firstCSNServer2);
    newClientWithChanges(state, firstCSNServer1);
    debugInfo("Ending newClientWithChangefromServer2");
  }
@@ -450,13 +424,7 @@
  private void newClientLateServer1() throws Exception
  {
    debugInfo("Starting newClientLateServer1");
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(secondCSNServer2);
    state.update(firstCSNServer1);
    ServerState state = newServerState(secondCSNServer2, firstCSNServer1);
    newClientWithChanges(state, secondCSNServer1);
    debugInfo("Ending newClientLateServer1");
  }
@@ -511,7 +479,7 @@
       * Open a sender session
       */
      server = openReplicationSession(TEST_ROOT_DN,
          5, 100, replicationServerPort, 100000, 1000, 0, false);
          5, 100, replicationServerPort, 100000, false);
      assertTrue(server.isConnected());
      reader = new BrokerReader(server, TOTAL_MSG);
@@ -596,7 +564,7 @@
        int serverId = 10 + i;
        CSNGenerator gen = new CSNGenerator(serverId , 0);
        broker[i] = openReplicationSession(TEST_ROOT_DN,
            serverId, 100, replicationServerPort, 3000, 1000, 0, true);
            serverId, 100, replicationServerPort, 3000, true);
        assertTrue(broker[i].isConnected());
        producer[i] = new BrokerWriter(broker[i], gen, TOTAL_MSG/THREADS);
@@ -723,9 +691,8 @@
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        List<Modification> mods =
            Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        broker1.publish(modMsg);
@@ -821,9 +788,8 @@
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        List<Modification> mods =
            Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        broker1.publish(modMsg);
@@ -1188,9 +1154,8 @@
      debugInfo("Ending export");
    }
   private Entry createBackupTask()
   throws Exception
   {
  private Entry createBackupTask() throws Exception
  {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
@@ -1200,12 +1165,10 @@
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges",
     "ds-task-backup-backend-id: replicationChanges");
  }
   }
   private Entry createRestoreTask()
   throws Exception
   {
  private Entry createRestoreTask() throws Exception
  {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
@@ -1214,11 +1177,10 @@
     "ds-task-class-name: org.opends.server.tasks.RestoreTask",
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges");
   }
  }
   private Entry createExportAllTask()
   throws Exception
   {
  private Entry createExportAllTask() throws Exception
  {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
@@ -1228,11 +1190,10 @@
     "ds-task-export-ldif-file: " + exportLDIFAllFile,
     "ds-task-export-backend-id: replicationChanges",
     "ds-task-export-include-branch: dc=replicationChanges");
   }
  }
   private Entry createExportDomainTask(String suffix)
   throws Exception
   {
  private Entry createExportDomainTask(String suffix) throws Exception
  {
     String root = suffix.substring(suffix.indexOf('=')+1, suffix.indexOf(','));
     exportLDIFDomainFile = "exportLDIF" + root +".ldif";
     return TestCaseUtils.makeEntry(
@@ -1244,7 +1205,7 @@
     "ds-task-export-ldif-file: " + exportLDIFDomainFile,
     "ds-task-export-backend-id: replicationChanges",
     "ds-task-export-include-branch: "+suffix+",dc=replicationChanges");
   }
  }
   private List<UpdateMsg> createChanges(String suffix, int serverId) throws Exception
   {
@@ -1293,13 +1254,13 @@
       // - Modify
       Attribute attr1 = Attributes.create("description", "new value");
       Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
       Attribute attr2 = Attributes.create("modifiersName", "cn=Directory Manager,cn=Root DNs,cn=config");
       Modification mod2 = new Modification(ModificationType.REPLACE, attr2);
       Attribute attr3 = Attributes.create("modifyTimestamp", "20070917172420Z");
       Modification mod3 = new Modification(ModificationType.REPLACE, attr3);
       List<Modification> mods = Arrays.asList(mod1, mod2, mod3);
       List<Modification> mods = Arrays.asList(
           new Modification(ModificationType.REPLACE, attr1),
           new Modification(ModificationType.REPLACE, attr2),
           new Modification(ModificationType.REPLACE, attr3));
       DN dn = exampleSuffixDN;
       ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), dn, mods, "fakeuniqueid");
@@ -1629,9 +1590,8 @@
         // - Modify
         Attribute attr1 = Attributes.create("description", "new value");
         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
         List<Modification> mods = new ArrayList<Modification>();
         mods.add(mod1);
      List<Modification> mods =
          Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
         ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
         broker1.publish(modMsg);