mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.30.2013 80f29a511a0681bc55293f0ea82e368097f2fa9a
Fixed the failing nightly test GenerationIdTest.testSingleRS().


GenerationIdTest.java:
In testSingleRS(), called again ReplicationServer.remove() instead of ReplicationServerTestCase.remove() because the test does not expect the DB to be wiped out.
Changed all protected keywords to private since it is never subclassed.
Renamed testEntriesInDb() to countUpdatedEntriesInDb().
Removed useless try / catch and let the exceptions go up.
Replaced uses of assertTrue() with the appropriate assert* method.
Changed the return type of createAddMsg() to return the appropriate type.
Extracted method assertNoMessageReceivedBadGenId(), waitForStableGenerationId().

ReplicationTestCase.java:
In addTestEntriesToDB(), used varargs.
Removed outdated openReplicationSession().
Removed useless try / catch and let the exceptions go up.

ReplicationServerTest.java:
Extracted method sendAndReceiveDeleteMsg() and newServerState().
Used Arrays.asList().
3 files modified
735 ■■■■■ changed files
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java 393 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java 182 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 160 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/GenerationIdTest.java
@@ -77,8 +77,9 @@
  private static final String baseDnStr = TEST_ROOT_DN_STRING;
  private static final String testName = "generationIdTest";
  private static final String REPLICATION_GENERATION_ID = "ds-sync-generation-id";
  private static final int   WINDOW_SIZE = 10;
  private static final int WINDOW_SIZE = 10;
  private static final int server1ID = 1;
  private static final int server2ID = 2;
  private static final int server3ID = 3;
@@ -87,11 +88,11 @@
  private static final int changelog3ID = 13;
  private DN baseDN;
  private ReplicationBroker broker2 = null;
  private ReplicationBroker broker3 = null;
  private ReplicationServer replServer1 = null;
  private ReplicationServer replServer2 = null;
  private ReplicationServer replServer3 = null;
  private ReplicationBroker broker2;
  private ReplicationBroker broker3;
  private ReplicationServer replServer1;
  private ReplicationServer replServer2;
  private ReplicationServer replServer3;
  private boolean emptyOldChanges = true;
  private Entry taskInitRemoteS2;
  private String[] updatedEntries;
@@ -148,7 +149,8 @@
      TRACER.debugInfo("** TEST **" + s);
    }
  }
  protected void debugInfo(String message, Exception e)
  private void debugInfo(String message, Exception e)
  {
    debugInfo(message + stackTraceToSingleLineString(e));
  }
@@ -183,31 +185,21 @@
        "ds-task-initialize-replica-server-id: " + server2ID);
  }
  /** Tests that entries have been written in the db */
  private int testEntriesInDb()
  /** Tests that entries have been written in the db. */
  private int countUpdatedEntriesInDb() throws Exception
  {
    debugInfo("TestEntriesInDb");
    short found = 0;
    debugInfo("countUpdatedEntriesInDb");
    int found = 0;
    for (String entry : updatedEntries)
    {
      int dns = entry.indexOf("dn: ");
      int dne = entry.indexOf(TestCaseUtils.TEST_ROOT_DN_STRING);
      String dn = entry.substring(dns + 4,
        dne + TestCaseUtils.TEST_ROOT_DN_STRING.length());
      int dne = entry.indexOf(TEST_ROOT_DN_STRING);
      String dn = entry.substring(dns + 4, dne + TEST_ROOT_DN_STRING.length());
      debugInfo("Search Entry: " + dn);
      DN entryDN = null;
      try
      {
        entryDN = DN.decode(dn);
      }
      catch(Exception e)
      {
        debugInfo("TestEntriesInDb/" + e);
      }
      DN entryDN = DN.decode(dn);
      try
      {
@@ -272,62 +264,49 @@
    };
  }
  private int receiveImport(ReplicationBroker broker, int serverID,
      String[] updatedEntries)
  private int receiveImport(ReplicationBroker broker, int serverID, String[] updatedEntries) throws Exception
  {
    // Expect the broker to receive the entries
    ReplicationMsg msg;
    short entriesReceived = -100;
    int entriesReceived = -100;
    while (true)
    {
      try
      debugInfo("Broker " + serverID + " Wait for entry or done msg");
      ReplicationMsg msg = broker.receive();
      if (msg == null)
      {
        debugInfo("Broker " + serverID + " Wait for entry or done msg");
        msg = broker.receive();
        if (msg == null)
        {
          break;
        }
        if (msg instanceof InitializeTargetMsg)
        {
          debugInfo("Broker " + serverID + " receives InitializeTargetMessage ");
          entriesReceived = 0;
        }
        else if (msg instanceof EntryMsg)
        {
          EntryMsg em = (EntryMsg)msg;
          debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
          entriesReceived++;
        }
        else if (msg instanceof DoneMsg)
        {
          debugInfo("Broker " + serverID + " receives done ");
          break;
        }
        else if (msg instanceof ErrorMsg)
        {
          ErrorMsg em = (ErrorMsg)msg;
          debugInfo("Broker " + serverID + " receives ERROR " + em);
          break;
        }
        else
        {
          debugInfo("Broker " + serverID + " receives and trashes " + msg);
        }
        break;
      }
      catch(Exception e)
      if (msg instanceof InitializeTargetMsg)
      {
        debugInfo("receiveUpdatedEntries" + stackTraceToSingleLineString(e));
        debugInfo("Broker " + serverID + " receives InitializeTargetMessage ");
        entriesReceived = 0;
      }
      else if (msg instanceof EntryMsg)
      {
        EntryMsg em = (EntryMsg)msg;
        debugInfo("Broker " + serverID + " receives entry " + new String(em.getEntryBytes()));
        entriesReceived++;
      }
      else if (msg instanceof DoneMsg)
      {
        debugInfo("Broker " + serverID + " receives done ");
        break;
      }
      else if (msg instanceof ErrorMsg)
      {
        debugInfo("Broker " + serverID + " receives ERROR " + msg);
        break;
      }
      else
      {
        debugInfo("Broker " + serverID + " receives and trashes " + msg);
      }
    }
    if (updatedEntries != null)
    {
      assertTrue(entriesReceived == updatedEntries.length,
          " Received entries("+entriesReceived +
          ") == Expected entries("+updatedEntries.length+")");
      assertEquals(updatedEntries.length, entriesReceived);
    }
    return entriesReceived;
@@ -387,7 +366,7 @@
      // Must be no connection already done or disconnectFromReplServer should
      // have been called
      assertTrue(synchroServerEntry == null);
      assertNull(synchroServerEntry);
      synchroServerEntry = TestCaseUtils.entryFromLdifString(synchroServerLdif);
      DirectoryServer.getConfigHandler().addEntry(synchroServerEntry, null);
@@ -423,7 +402,7 @@
      String synchroServerStringDN = "cn=" + testName + ", cn=domains," +
      SYNCHRO_PLUGIN_DN;
      // Must have called connectServer1ToChangelog previously
      assertTrue(synchroServerEntry != null);
      assertNotNull(synchroServerEntry);
      DN synchroServerDN = DN.decode(synchroServerStringDN);
@@ -434,8 +413,7 @@
        DirectoryServer.getConfigHandler().deleteEntry(ecle.getDN(), null);
      }
      DirectoryServer.getConfigHandler().deleteEntry(synchroServerDN, null);
      assertTrue(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()) ==
        null,
      assertNull(DirectoryServer.getConfigEntry(synchroServerEntry.getDN()),
        "Unable to delete the synchronized domain");
      synchroServerEntry = null;
@@ -451,7 +429,7 @@
          Thread.sleep(200);
          waitCo++;
        }
        assert(replDomainToDis==null);
        assertNull(replDomainToDis);
      }
      catch (DirectoryException e)
      {
@@ -470,39 +448,29 @@
    return replServerPort[changelogID];
  }
  protected static final String REPLICATION_GENERATION_ID =
    "ds-sync-generation-id";
  private long readGenIdFromSuffixRootEntry() throws Exception
  {
    long genId=-1;
    Entry resultEntry = getEntry(baseDN, 1000, true);
    if (resultEntry == null)
    {
      Entry resultEntry = getEntry(baseDN, 1000, true);
      if (resultEntry==null)
      {
        debugInfo("Entry not found <" + baseDN + ">");
      }
      else
      {
        debugInfo("Entry found <" + baseDN + ">");
      debugInfo("Entry not found <" + baseDN + ">");
    }
    else
    {
      debugInfo("Entry found <" + baseDN + ">");
        AttributeType synchronizationGenIDType =
          DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
        List<Attribute> attrs =
          resultEntry.getAttribute(synchronizationGenIDType);
        if (attrs != null)
      AttributeType synchronizationGenIDType = DirectoryServer.getAttributeType(REPLICATION_GENERATION_ID);
      List<Attribute> attrs = resultEntry.getAttribute(synchronizationGenIDType);
      if (attrs != null)
      {
        Attribute attr = attrs.get(0);
        if (attr.size() == 1)
        {
          Attribute attr = attrs.get(0);
          if (attr.size() == 1)
          {
            genId =
                Long.decode(attr.iterator().next().getValue().toString());
          }
          return Long.decode(attr.iterator().next().getValue().toString());
        }
      }
    }
    return genId;
    return -1;
  }
  private void performLdifImport() throws Exception
@@ -545,22 +513,18 @@
        + "userPassword: password\n" + "initials: AA\n";
  }
  static protected ReplicationMsg createAddMsg() throws Exception
  private static AddMsg createAddMsg() throws Exception
  {
    Entry personWithUUIDEntry = null;
    String user1entryUUID;
    String baseUUID = null;
    String user1dn;
    /*
     * Create a CSN generator to generate new CSNs when we need to send
     * operation messages to the replicationServer.
     */
    CSNGenerator gen = new CSNGenerator(2, 0);
    user1entryUUID = "33333333-3333-3333-3333-333333333333";
    user1dn = "uid=user1,ou=People," + baseDnStr;
    String entryWithUUIDldif = "dn: "+ user1dn + "\n"
    String user1entryUUID = "33333333-3333-3333-3333-333333333333";
    String user1dn = "uid=user1,ou=People," + baseDnStr;
    Entry personWithUUIDEntry = TestCaseUtils.entryFromLdifString(
    "dn: "+ user1dn + "\n"
    + "objectClass: top\n" + "objectClass: person\n"
    + "objectClass: organizationalPerson\n"
    + "objectClass: inetOrgPerson\n" + "uid: user.1\n"
@@ -574,15 +538,13 @@
    + "telephoneNumber: 216-564-6748\n" + "employeeNumber: 1\n"
    + "sn: Amar\n" + "givenName: Aaccf\n" + "postalCode: 85762\n"
    + "userPassword: password\n" + "initials: AA\n"
    + "entryUUID: " + user1entryUUID + "\n";
    personWithUUIDEntry = TestCaseUtils.entryFromLdifString(entryWithUUIDldif);
    + "entryUUID: " + user1entryUUID + "\n");
    // Create and publish an update message to add an entry.
    return new AddMsg(gen.newCSN(),
        personWithUUIDEntry.getDN(),
        user1entryUUID,
        baseUUID,
        null,
        personWithUUIDEntry.getObjectClassAttribute(),
        personWithUUIDEntry.getAttributes(), new ArrayList<Attribute>());
  }
@@ -591,31 +553,24 @@
   * Check that the expected number of changes are in the replication server
   * database.
   */
  private void checkChangelogSize(int expectedCount)
  private void checkChangelogSize(int expectedCount) throws Exception
  {
    try
    SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)");
    InternalSearchOperation searchOperation =
      connection.processSearch(DN.decode("dc=replicationchanges"),
          SearchScope.SUBORDINATE_SUBTREE,
          filter);
    if (debugEnabled())
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      InternalSearchOperation searchOperation =
        connection.processSearch(DN.decode("dc=replicationchanges"),
            SearchScope.SUBORDINATE_SUBTREE,
            filter);
      if (debugEnabled())
      if (searchOperation.getSearchEntries().size() != expectedCount)
      {
        if (searchOperation.getSearchEntries().size() != expectedCount)
        for (SearchResultEntry sre : searchOperation.getSearchEntries())
        {
          for (SearchResultEntry sre : searchOperation.getSearchEntries())
          {
            debugInfo("Entry found: " + sre.toLDIFString());
          }
          debugInfo("Entry found: " + sre.toLDIFString());
        }
      }
      assertEquals(searchOperation.getSearchEntries().size(), expectedCount);
    }
    catch(Exception e)
    {
    }
    assertEquals(searchOperation.getSearchEntries().size(), expectedCount);
  }
  /**
@@ -706,10 +661,10 @@
        broker3.receive();
        fail("No update message is supposed to be received here.");
      }
      catch(SocketTimeoutException e)
      catch (SocketTimeoutException expected)
      {
        // This is the expected result
        // Note that timeout should be lower than RS montoring publisher period
        // Note that timeout should be lower than RS monitoring publisher period
        // so that timeout occurs
      }
@@ -717,8 +672,7 @@
      debugInfo(testCase + " ** TEST ** The part of the topology with the right gen ID should work well");
      // Now create a change that must be replicated
      String ent1[] = { createEntry(UUID.randomUUID()) };
      addTestEntriesToDB(ent1);
      addTestEntriesToDB(createEntry(UUID.randomUUID()));
      // Verify that RS1 does contain the change related to this ADD.
      Thread.sleep(500);
@@ -731,12 +685,12 @@
      //===========================================================
      debugInfo(testCase + " ** TEST ** Persistence of the generation ID in RS1");
      long genIdBeforeShut = replServer1.getGenerationId(baseDN);
      final long genIdBeforeShut = replServer1.getGenerationId(baseDN);
      debugInfo("Shutdown replServer1");
      stop(broker2, broker3);
      broker2 = broker3 = null;
      remove(replServer1);
      replServer1.remove();
      replServer1 = null;
      debugInfo("Create again replServer1");
@@ -749,10 +703,10 @@
      debugInfo("Delay to allow DS to reconnect to replServer1");
      long genIdAfterRestart = replServer1.getGenerationId(baseDN);
      final long genIdAfterRestart = replServer1.getGenerationId(baseDN);
      debugInfo("Aft restart / replServer.genId=" + genIdAfterRestart);
      assertTrue(replServer1!=null, "Replication server creation failed.");
      assertTrue(genIdBeforeShut == genIdAfterRestart,
      assertNotNull(replServer1, "Replication server creation failed.");
      assertEquals(genIdAfterRestart, genIdBeforeShut,
        "generationId is expected to have the same value" +
        " after replServer1 restart. Before : " + genIdBeforeShut +
        " after : " + genIdAfterRestart);
@@ -799,20 +753,14 @@
      // to enter the bad gen id status
      ChangeStatusMsg csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker2,
        ChangeStatusMsg.class.getName());
      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
      {
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
          " to enter the bad gen id status"
            + csMsg);
      }
      assertEquals(csMsg.getRequestedStatus(), ServerStatus.BAD_GEN_ID_STATUS,
          "Broker 2 connection is expected to receive 1 ChangeStatusMsg"
              + " to enter the bad gen id status" + csMsg);
      csMsg = (ChangeStatusMsg)waitForSpecificMsg(broker3,
        ChangeStatusMsg.class.getName());
      if (csMsg.getRequestedStatus() != ServerStatus.BAD_GEN_ID_STATUS)
      {
        fail("Broker 2 connection is expected to receive 1 ChangeStatusMsg" +
          " to enter the bad gen id status"
            + csMsg);
      }
      assertEquals(csMsg.getRequestedStatus(), ServerStatus.BAD_GEN_ID_STATUS,
          "Broker 2 connection is expected to receive 1 ChangeStatusMsg"
              + " to enter the bad gen id status" + csMsg);
      debugInfo("DS1 root entry must contain the new gen ID");
      genId = readGenIdFromSuffixRootEntry();
@@ -841,39 +789,24 @@
          "Expecting that DS3 with old gen ID is in bad gen id from RS1");
      debugInfo("Add entries to DS1, update should not be sent to DS2 and DS3 that are in bad gen id");
      String[] ent3 = { createEntry(UUID.randomUUID()) };
      addTestEntriesToDB(ent3);
      addTestEntriesToDB(createEntry(UUID.randomUUID()));
      debugInfo("RS1 must have stored that update.");
      Thread.sleep(500);
      checkChangelogSize(1);
      try
      {
        ReplicationMsg msg2 = broker2.receive();
        fail("No update message is supposed to be received by broker2 in bad gen id. " + msg2);
      } catch(SocketTimeoutException e) { /* expected */ }
      try
      {
        ReplicationMsg msg2 = broker3.receive();
        fail("No update message is supposed to be received by broker3 in bad gen id. " + msg2);
      } catch(SocketTimeoutException e) { /* expected */ }
      assertNoMessageReceivedBadGenId(broker2, "broker2");
      assertNoMessageReceivedBadGenId(broker3, "broker3");
      debugInfo("DS2 is publishing a change and RS1 must ignore this change, DS3 must not receive it.");
      AddMsg emsg = (AddMsg)createAddMsg();
      AddMsg emsg = createAddMsg();
      broker2.publish(emsg);
      // Updates count in RS1 must stay unchanged = to 1
      Thread.sleep(500);
      checkChangelogSize(1);
      try
      {
        ReplicationMsg msg2 = broker3.receive();
        fail("No update message is supposed to be received by broker3 in bad gen id. "+ msg2);
      } catch(SocketTimeoutException e) { /* expected */ }
      assertNoMessageReceivedBadGenId(broker3, "broker3");
      //===============================================================
@@ -941,7 +874,7 @@
      assertTrue(msg instanceof AddMsg, "Expected to receive an AddMsg but received: " + msg);
      debugInfo("DS2 is publishing a change and RS1 must store this change, DS3 must receive it.");
      emsg = (AddMsg)createAddMsg();
      emsg = createAddMsg();
      broker2.publish(emsg);
      Thread.sleep(500);
@@ -965,7 +898,19 @@
    }
  }
  /**
  private void assertNoMessageReceivedBadGenId(ReplicationBroker broker, String brokerName)
  {
    try
    {
      ReplicationMsg msg = broker.receive();
      fail("No update message is supposed to be received by " + brokerName
          + " with bad gen id. " + msg);
    }
    catch (SocketTimeoutException expected)
    { /* expected */
    }
  }
  /**
   * testMultiRS tests basic features of generationID
   * with more than one Replication Server.
@@ -997,38 +942,13 @@
      connectServer1ToChangelog(changelog1ID);
      debugInfo("Expect genId are set in all replServers.");
      int waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == EMPTY_DN_GENID
            && replServer2.getGenerationId(baseDN) == EMPTY_DN_GENID
            && replServer3.getGenerationId(baseDN) == EMPTY_DN_GENID)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      assertEquals(replServer1.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer1");
      assertEquals(replServer2.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer2");
      assertEquals(replServer3.getGenerationId(baseDN), EMPTY_DN_GENID, " in replServer3");
      waitForStableGenerationId(EMPTY_DN_GENID);
      debugInfo("Disconnect DS from replServer1.");
      disconnectFromReplServer(changelog1ID);
      waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == -1
            && replServer2.getGenerationId(baseDN) == -1
            && replServer3.getGenerationId(baseDN) == -1)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      debugInfo(
        "Expect genIds to be resetted in all servers to -1 as no more DS in topo - after 10 sec");
      assertEquals(replServer1.getGenerationId(baseDN), -1);
      assertEquals(replServer2.getGenerationId(baseDN), -1);
      assertEquals(replServer3.getGenerationId(baseDN), -1);
      debugInfo("Expect genIds to be resetted in all servers to -1 as no more DS in topo - after 10 sec");
      waitForStableGenerationId(-1);
      debugInfo("Add entries to DS");
      addTestEntriesToDB(updatedEntries);
@@ -1036,23 +956,10 @@
      debugInfo("Connecting DS to replServer2");
      connectServer1ToChangelog(changelog2ID);
      debugInfo(
        "Expect genIds to be set in all servers based on the added entries.");
      debugInfo("Expect genIds to be set in all servers based on the added entries.");
      genId = readGenIdFromSuffixRootEntry();
      assertTrue(genId != -1);
      waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == genId
            && replServer2.getGenerationId(baseDN) == genId
            && replServer3.getGenerationId(baseDN) == genId)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      assertEquals(replServer1.getGenerationId(baseDN), genId);
      assertEquals(replServer2.getGenerationId(baseDN), genId);
      assertEquals(replServer3.getGenerationId(baseDN), genId);
      waitForStableGenerationId(genId);
      debugInfo("Connecting broker2 to replServer3 with a good genId");
      broker2 = openReplicationSession(baseDN, server2ID, 100,
@@ -1067,18 +974,8 @@
      debugInfo("Verifying that all replservers genIds have been reset.");
      debugInfo(
      "Expect all genIds to keep their value since broker2 is still connected.");
      waitRes=0;
      while(waitRes<100)
      {
        if (replServer1.getGenerationId(baseDN) == genId
            && replServer2.getGenerationId(baseDN) == genId
            && replServer3.getGenerationId(baseDN) == genId)
          break;
        waitRes++;
        Thread.sleep(100);
      }
      debugInfo("Expect all genIds to keep their value since broker2 is still connected.");
      waitForStableGenerationId(genId);
      assertEquals(replServer1.getGenerationId(baseDN), genId);
      assertEquals(replServer2.getGenerationId(baseDN), genId);
      assertEquals(replServer3.getGenerationId(baseDN), genId);
@@ -1092,10 +989,7 @@
      debugInfo("Expecting that broker3 is in bad gen id since it has a bad genId");
      assertTrue(isDegradedDueToGenerationId(replServer1, server3ID));
      int found = testEntriesInDb();
      assertEquals(found, updatedEntries.length,
        " Entries present in DB :" + found +
        " Expected entries :" + updatedEntries.length);
      assertEquals(countUpdatedEntriesInDb(), updatedEntries.length);
      debugInfo("Connecting DS to replServer1.");
      connectServer1ToChangelog(changelog1ID);
@@ -1134,7 +1028,7 @@
      waitTaskState(taskReset, TaskState.COMPLETED_SUCCESSFULLY, null);
      debugInfo("Verifying that all replservers genIds have been reset.");
      waitRes=0;
      int waitRes = 0;
      while(waitRes<100)
      {
        readGenIdFromSuffixRootEntry();
@@ -1160,6 +1054,23 @@
    }
  }
  private void waitForStableGenerationId(long expectedGenId) throws Exception
  {
    int wait = 0;
    while (wait < 100)
    {
      if (replServer1.getGenerationId(baseDN) == expectedGenId
          && replServer2.getGenerationId(baseDN) == expectedGenId
          && replServer3.getGenerationId(baseDN) == expectedGenId)
        break;
      wait++;
      Thread.sleep(100);
    }
    assertEquals(replServer1.getGenerationId(baseDN), expectedGenId, " in replServer1");
    assertEquals(replServer2.getGenerationId(baseDN), expectedGenId, " in replServer2");
    assertEquals(replServer3.getGenerationId(baseDN), expectedGenId, " in replServer3");
  }
  private boolean isDegradedDueToGenerationId(ReplicationServer rs, int serverId)
  {
    ReplicationServerDomain domain = rs.getReplicationServerDomain(baseDN);
@@ -1169,7 +1080,7 @@
  /**
   * Disconnect broker and remove entries from the local DB
   */
  protected void postTest()
  private void postTest() throws Exception
  {
    debugInfo("Post test cleaning.");
@@ -1182,12 +1093,8 @@
    Arrays.fill(replServerPort, 0);
    debugInfo("Clearing DS backend");
    try
    {
      TestCaseUtils.initializeTestBackend(false);
    } catch (Exception ex)
    {debugInfo("postTest(): error cleaning memory backend: " + ex);}
    debugInfo("Clearing DJ backend");
    TestCaseUtils.initializeTestBackend(false);
  }
  /**
@@ -1208,8 +1115,6 @@
    try
    {
      long genId;
      replServer1 = createReplicationServer(changelog1ID, false, testCase);
      /*
@@ -1223,14 +1128,12 @@
      connectServer1ToChangelog(changelog1ID);
      debugInfo(testCase + " Expect genId attribute to be not retrievable");
      genId = readGenIdFromSuffixRootEntry();
      assertEquals(genId,-1);
      assertEquals(readGenIdFromSuffixRootEntry(), -1);
      addTestEntriesToDB(updatedEntries);
      debugInfo(testCase + " Expect genId attribute to be retrievable");
      genId = readGenIdFromSuffixRootEntry();
      assertEquals(genId, EMPTY_DN_GENID);
      assertEquals(readGenIdFromSuffixRootEntry(), EMPTY_DN_GENID);
      disconnectFromReplServer(changelog1ID);
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReplicationTestCase.java
@@ -327,33 +327,6 @@
    return broker;
  }
  /**
   * Open a replicationServer session with flow control to the local
   * ReplicationServer.
   */
  protected ReplicationBroker openReplicationSession(
      final DN baseDN, int serverId, int window_size,
      int port, int timeout, int maxSendQueue, int maxRcvQueue,
      boolean emptyOldChanges)
      throws Exception, SocketException
  {
    ServerState state = new ServerState();
    if (emptyOldChanges)
       new PersistentServerState(baseDN, serverId, new ServerState());
    ReplicationBroker broker = new ReplicationBroker(null,
        state, baseDN, serverId, window_size,
        getGenerationId(baseDN), 0, getReplSessionSecurity(), (byte)1, 500);
    List<String> servers = new ArrayList<String>(1);
    servers.add("localhost:" + port);
    broker.start(servers);
    checkConnection(30, broker, port);
    if (timeout != 0)
      broker.setSoTimeout(timeout);
    return broker;
  }
  protected void deleteEntry(DN dn)
  {
    try
@@ -364,10 +337,9 @@
    catch(Exception e)
    {}
    DeleteOperationBasis op;
    op = new DeleteOperationBasis(connection, InternalClientConnection
        .nextOperationID(), InternalClientConnection.nextMessageID(), null,
        dn);
    DeleteOperationBasis op = new DeleteOperationBasis(connection,
        InternalClientConnection.nextOperationID(), InternalClientConnection.nextMessageID(),
        null, dn);
    op.run();
    if ((op.getResultCode() != ResultCode.SUCCESS) &&
        (op.getResultCode() != ResultCode.NO_SUCH_OBJECT))
@@ -897,102 +869,76 @@
  }
  protected void waitTaskState(Entry taskEntry, TaskState expectedTaskState,
      Message expectedMessage)
      Message expectedMessage) throws Exception
  {
    TaskState taskState = null;
    int cpt=40;
    try
    SearchFilter filter = SearchFilter.createFilterFromString("(objectclass=*)");
    Entry resultEntry = null;
    do
    {
      SearchFilter filter =
        SearchFilter.createFilterFromString("(objectclass=*)");
      Entry resultEntry = null;
      do
      {
        InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(),
              SearchScope.BASE_OBJECT,
              filter);
        try
        {
          resultEntry = searchOperation.getSearchEntries().getFirst();
        } catch (Exception e)
        {
          fail("Task entry was not returned from the search.");
          continue;
        }
      InternalSearchOperation searchOperation =
          connection.processSearch(taskEntry.getDN(), SearchScope.BASE_OBJECT, filter);
      resultEntry = searchOperation.getSearchEntries().getFirst();
        try
        {
          // Check that the task state is as expected.
          AttributeType taskStateType =
            DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
          String stateString =
            resultEntry.getAttributeValue(taskStateType,
                DirectoryStringSyntax.DECODER);
          taskState = TaskState.fromString(stateString);
        }
        catch(Exception e)
        {
          fail("Exception"+ e.getMessage()+e.getStackTrace());
        }
        Thread.sleep(500);
        cpt--;
      }
      while ((taskState != expectedTaskState) &&
             (taskState != TaskState.STOPPED_BY_ERROR) &&
             (taskState != TaskState.COMPLETED_SUCCESSFULLY) &&
             (cpt > 0));
      // Check that the task state is as expected.
      AttributeType taskStateType =
          DirectoryServer.getAttributeType(ATTR_TASK_STATE.toLowerCase());
      String stateString =
          resultEntry.getAttributeValue(taskStateType, DirectoryStringSyntax.DECODER);
      taskState = TaskState.fromString(stateString);
      // Check that the task contains some log messages.
      AttributeType logMessagesType = DirectoryServer.getAttributeType(
          ATTR_TASK_LOG_MESSAGES.toLowerCase());
      List<String> logMessages = new ArrayList<String>();
      resultEntry.getAttributeValues(logMessagesType,
          DirectoryStringSyntax.DECODER,
          logMessages);
      Thread.sleep(500);
      cpt--;
    }
    while ((taskState != expectedTaskState)
        && (taskState != TaskState.STOPPED_BY_ERROR)
        && (taskState != TaskState.COMPLETED_SUCCESSFULLY) && (cpt > 0));
      if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
          && (taskState != TaskState.RUNNING))
      {
        if (logMessages.size() == 0)
        {
          fail("No log messages were written to the task entry on a failed task");
        }
      }
      if (logMessages.size() != 0)
      {
          TRACER.debugInfo(logMessages.get(0));
          if (expectedMessage != null)
          {
            TRACER.debugInfo(expectedMessage.toString());
            assertTrue(logMessages.get(0).indexOf(
                expectedMessage.toString())>0);
          }
      }
    // Check that the task contains some log messages.
    AttributeType logMessagesType =
        DirectoryServer.getAttributeType(ATTR_TASK_LOG_MESSAGES.toLowerCase());
    List<String> logMessages = new ArrayList<String>();
    resultEntry.getAttributeValues(logMessagesType,
        DirectoryStringSyntax.DECODER, logMessages);
      if ((expectedTaskState == TaskState.RUNNING)
          && (taskState == TaskState.COMPLETED_SUCCESSFULLY))
    if ((taskState != TaskState.COMPLETED_SUCCESSFULLY)
        && (taskState != TaskState.RUNNING))
    {
      if (logMessages.size() == 0)
      {
        // We usually wait the running state after adding the task
        // and if the task is fast enough then it may be already done
        // and we can go on.
      }
      else
      {
        assertEquals(taskState, expectedTaskState, "Task State:" + taskState +
          " Expected task state:" + expectedTaskState);
        fail("No log messages were written to the task entry on a failed task");
      }
    }
    catch(Exception e)
    if (logMessages.size() != 0)
    {
      fail("waitTaskState Exception:"+ e.getMessage() + " " + stackTraceToSingleLineString(e));
      TRACER.debugInfo(logMessages.get(0));
      if (expectedMessage != null)
      {
        TRACER.debugInfo(expectedMessage.toString());
        assertTrue(logMessages.get(0).indexOf(expectedMessage.toString()) > 0);
      }
    }
    if ((expectedTaskState == TaskState.RUNNING)
        && (taskState == TaskState.COMPLETED_SUCCESSFULLY))
    {
      // We usually wait the running state after adding the task
      // and if the task is fast enough then it may be already done
      // and we can go on.
    }
    else
    {
      assertEquals(taskState, expectedTaskState, "Task State:" + taskState
          + " Expected task state:" + expectedTaskState);
    }
  }
  /**
   * Add to the current DB the entries necessary to the test
   */
  protected void addTestEntriesToDB(String[] ldifEntries)
  protected void addTestEntriesToDB(String... ldifEntries)
  {
    try
    {
@@ -1040,10 +986,7 @@
   */
  protected String getEntryUUID(DN dn) throws Exception
  {
    Entry newEntry;
    int count = 10;
    if (count<1)
      count=1;
    String found = null;
    while ((count> 0) && (found == null))
    {
@@ -1057,14 +1000,11 @@
      try
      {
        newEntry = DirectoryServer.getEntry(dn);
        Entry newEntry = DirectoryServer.getEntry(dn);
        if (newEntry != null)
        {
          List<Attribute> tmpAttrList = newEntry.getAttribute("entryuuid");
          Attribute tmpAttr = tmpAttrList.get(0);
          for (AttributeValue val : tmpAttr)
          for (AttributeValue val : tmpAttrList.get(0))
          {
            found = val.getValue().toString();
            break;
@@ -1104,15 +1044,13 @@
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(Session session, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      ReplicationMsg replMsg = null;
      try
      {
        replMsg = session.receive();
@@ -1147,15 +1085,13 @@
   * @return The expected message if it comes in time or fails (assertion).
   */
  protected static ReplicationMsg waitForSpecificMsg(ReplicationBroker broker, String msgType) {
    ReplicationMsg replMsg = null;
    int timeOut = 5000; // 5 seconds max to wait for the desired message
    long startTime = System.currentTimeMillis();
    long curTime = startTime;
    int nMsg = 0;
    while ((curTime - startTime) <= timeOut)
    {
      ReplicationMsg replMsg = null;
      try
      {
        replMsg = broker.receive();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -251,41 +251,16 @@
       */
      unknownCSNServer1 = new CSN(time + 1, 1, 1);
      /*
       * Send and receive a Delete Msg from server 1 to server 2
       */
      DeleteMsg msg = new DeleteMsg(EXAMPLE_DN, firstCSNServer1, "uid");
      server1.publish(msg);
      ReplicationMsg msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      sendAndReceiveDeleteMsg(server1, server2, EXAMPLE_DN, firstCSNServer1, "uid");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg(TEST_ROOT_DN, secondCSNServer1, "uid");
      server1.publish(msg);
      msg2 = server2.receive();
      server2.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      // Send and receive a second Delete Msg
      sendAndReceiveDeleteMsg(server1, server2, TEST_ROOT_DN, secondCSNServer1, "uid");
      /*
       * Send and receive a Delete Msg from server 2 to server 1
       */
      msg = new DeleteMsg(EXAMPLE_DN, firstCSNServer2, "other-uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      // Send and receive a Delete Msg from server 2 to server 1
      sendAndReceiveDeleteMsg(server2, server1, EXAMPLE_DN, firstCSNServer2, "other-uid");
      /*
       * Send and receive a second Delete Msg
       */
      msg = new DeleteMsg(TEST_ROOT_DN, secondCSNServer2, "uid");
      server2.publish(msg);
      msg2 = server1.receive();
      server1.updateWindowAfterReplay();
      assertDeleteMsgBodyEquals(msg, msg2);
      // Send and receive a second Delete Msg
      sendAndReceiveDeleteMsg(server2, server1, TEST_ROOT_DN, secondCSNServer2, "uid");
      debugInfo("Ending changelogBasic");
    }
@@ -295,6 +270,16 @@
    }
  }
  private void sendAndReceiveDeleteMsg(ReplicationBroker sender, ReplicationBroker receiver,
      DN dn, CSN csn, String entryUUID) throws Exception
  {
    DeleteMsg sentMsg = new DeleteMsg(dn, csn, entryUUID);
    sender.publish(sentMsg);
    ReplicationMsg receivedMsg = receiver.receive();
    receiver.updateWindowAfterReplay();
    assertDeleteMsgBodyEquals(sentMsg, receivedMsg);
  }
  private void assertDeleteMsgBodyEquals(DeleteMsg msg, ReplicationMsg msg2)
  {
    assertTrue(msg2 instanceof DeleteMsg,
@@ -305,6 +290,16 @@
        "ReplicationServer basic : incorrect message body received.");
  }
  private ServerState newServerState(CSN... csns)
  {
    ServerState state = new ServerState();
    for (CSN csn : csns)
    {
      state.update(csn);
    }
    return state;
  }
  /**
   * Test that a new client see the change that was sent in the
   * previous test.
@@ -319,9 +314,9 @@
          3, 100, replicationServerPort, 1000, false);
      assertTrue(broker.isConnected());
      ReplicationMsg msg2 = broker.receive();
      ReplicationMsg receivedMsg = broker.receive();
      broker.updateWindowAfterReplay();
      assertDeleteMsgCSNEquals(msg2, firstCSNServer1, "first");
      assertDeleteMsgCSNEquals(receivedMsg, firstCSNServer1, "first");
      debugInfo("Ending newClient");
    }
    finally
@@ -330,8 +325,6 @@
    }
  }
  /**
   * Test that a client that has already seen some changes now receive
   * the correct next change.
@@ -345,9 +338,9 @@
      broker = openReplicationSession(TEST_ROOT_DN,
          3, 100, replicationServerPort, 5000, state);
      ReplicationMsg msg2 = broker.receive();
      ReplicationMsg receivedMsg = broker.receive();
      broker.updateWindowAfterReplay();
      assertDeleteMsgCSNEquals(msg2, nextCSN, "second");
      assertDeleteMsgCSNEquals(receivedMsg, nextCSN, "second");
    }
    finally
    {
@@ -383,10 +376,7 @@
     * Create a ServerState updated with the first changes from both servers
     * done in test changelogBasic.
     */
    ServerState state = new ServerState();
    state.update(firstCSNServer1);
    state.update(firstCSNServer2);
    ServerState state = newServerState(firstCSNServer1, firstCSNServer2);
    newClientWithChanges(state, secondCSNServer1);
    debugInfo("Ending newClientWithFirstChanges");
  }
@@ -398,13 +388,7 @@
  private void newClientWithUnknownChanges() throws Exception
  {
    debugInfo("Starting newClientWithUnknownChanges");
    /*
     * Create a ServerState with wrongCSNServer1
     */
    ServerState state = new ServerState();
    state.update(unknownCSNServer1);
    state.update(secondCSNServer2);
    ServerState state = newServerState(unknownCSNServer1, secondCSNServer2);
    newClientWithChanges(state, secondCSNServer1);
    debugInfo("Ending newClientWithUnknownChanges");
  }
@@ -416,12 +400,7 @@
  private void newClientWithChangefromServer1() throws Exception
  {
    debugInfo("Starting newClientWithChangefromServer1");
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstCSNServer1);
    ServerState state = newServerState(firstCSNServer1);
    newClientWithChanges(state, firstCSNServer2);
    debugInfo("Ending newClientWithChangefromServer1");
  }
@@ -433,12 +412,7 @@
  private void newClientWithChangefromServer2() throws Exception
  {
    debugInfo("Starting newClientWithChangefromServer2");
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(firstCSNServer2);
    ServerState state = newServerState(firstCSNServer2);
    newClientWithChanges(state, firstCSNServer1);
    debugInfo("Ending newClientWithChangefromServer2");
  }
@@ -450,13 +424,7 @@
  private void newClientLateServer1() throws Exception
  {
    debugInfo("Starting newClientLateServer1");
    /*
     * Create a ServerState updated with the first change from server 1
     */
    ServerState state = new ServerState();
    state.update(secondCSNServer2);
    state.update(firstCSNServer1);
    ServerState state = newServerState(secondCSNServer2, firstCSNServer1);
    newClientWithChanges(state, secondCSNServer1);
    debugInfo("Ending newClientLateServer1");
  }
@@ -511,7 +479,7 @@
       * Open a sender session
       */
      server = openReplicationSession(TEST_ROOT_DN,
          5, 100, replicationServerPort, 100000, 1000, 0, false);
          5, 100, replicationServerPort, 100000, false);
      assertTrue(server.isConnected());
      reader = new BrokerReader(server, TOTAL_MSG);
@@ -596,7 +564,7 @@
        int serverId = 10 + i;
        CSNGenerator gen = new CSNGenerator(serverId , 0);
        broker[i] = openReplicationSession(TEST_ROOT_DN,
            serverId, 100, replicationServerPort, 3000, 1000, 0, true);
            serverId, 100, replicationServerPort, 3000, true);
        assertTrue(broker[i].isConnected());
        producer[i] = new BrokerWriter(broker[i], gen, TOTAL_MSG/THREADS);
@@ -723,9 +691,8 @@
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        List<Modification> mods =
            Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        broker1.publish(modMsg);
@@ -821,9 +788,8 @@
        // - Modify
        Attribute attr1 = Attributes.create("description", "new value");
        Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
        List<Modification> mods = new ArrayList<Modification>();
        mods.add(mod1);
        List<Modification> mods =
            Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
        ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
        broker1.publish(modMsg);
@@ -1188,9 +1154,8 @@
      debugInfo("Ending export");
    }
   private Entry createBackupTask()
   throws Exception
   {
  private Entry createBackupTask() throws Exception
  {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
@@ -1200,12 +1165,10 @@
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges",
     "ds-task-backup-backend-id: replicationChanges");
  }
   }
   private Entry createRestoreTask()
   throws Exception
   {
  private Entry createRestoreTask() throws Exception
  {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
@@ -1214,11 +1177,10 @@
     "ds-task-class-name: org.opends.server.tasks.RestoreTask",
     "ds-backup-directory-path: bak" + File.separator +
                        "replicationChanges");
   }
  }
   private Entry createExportAllTask()
   throws Exception
   {
  private Entry createExportAllTask() throws Exception
  {
     return TestCaseUtils.makeEntry(
     "dn: ds-task-id=" + UUID.randomUUID() + ",cn=Scheduled Tasks,cn=Tasks",
     "objectclass: top",
@@ -1228,11 +1190,10 @@
     "ds-task-export-ldif-file: " + exportLDIFAllFile,
     "ds-task-export-backend-id: replicationChanges",
     "ds-task-export-include-branch: dc=replicationChanges");
   }
  }
   private Entry createExportDomainTask(String suffix)
   throws Exception
   {
  private Entry createExportDomainTask(String suffix) throws Exception
  {
     String root = suffix.substring(suffix.indexOf('=')+1, suffix.indexOf(','));
     exportLDIFDomainFile = "exportLDIF" + root +".ldif";
     return TestCaseUtils.makeEntry(
@@ -1244,7 +1205,7 @@
     "ds-task-export-ldif-file: " + exportLDIFDomainFile,
     "ds-task-export-backend-id: replicationChanges",
     "ds-task-export-include-branch: "+suffix+",dc=replicationChanges");
   }
  }
   private List<UpdateMsg> createChanges(String suffix, int serverId) throws Exception
   {
@@ -1293,13 +1254,13 @@
       // - Modify
       Attribute attr1 = Attributes.create("description", "new value");
       Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
       Attribute attr2 = Attributes.create("modifiersName", "cn=Directory Manager,cn=Root DNs,cn=config");
       Modification mod2 = new Modification(ModificationType.REPLACE, attr2);
       Attribute attr3 = Attributes.create("modifyTimestamp", "20070917172420Z");
       Modification mod3 = new Modification(ModificationType.REPLACE, attr3);
       List<Modification> mods = Arrays.asList(mod1, mod2, mod3);
       List<Modification> mods = Arrays.asList(
           new Modification(ModificationType.REPLACE, attr1),
           new Modification(ModificationType.REPLACE, attr2),
           new Modification(ModificationType.REPLACE, attr3));
       DN dn = exampleSuffixDN;
       ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), dn, mods, "fakeuniqueid");
@@ -1629,9 +1590,8 @@
         // - Modify
         Attribute attr1 = Attributes.create("description", "new value");
         Modification mod1 = new Modification(ModificationType.REPLACE, attr1);
         List<Modification> mods = new ArrayList<Modification>();
         mods.add(mod1);
      List<Modification> mods =
          Arrays.asList(new Modification(ModificationType.REPLACE, attr1));
         ModifyMsg modMsg = new ModifyMsg(csnGen.newCSN(), EXAMPLE_DN, mods, "fakeuniqueid");
         broker1.publish(modMsg);