From 831a5f40c6cea9e2a0719c3bf9f176df3e71e4f5 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 09 Dec 2009 09:24:11 +0000
Subject: [PATCH] Fix #4361 ECL - draft mode: temporary fake lastChangeNumber after thousands of updates

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java                              |   37 ++++++
 opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java                     |    3 
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java |  171 ++++++++++++++++++++++++++--
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                |  100 ++++------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java                       |    9 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                       |   20 +-
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java  |   16 ++
 7 files changed, 255 insertions(+), 101 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
index 590ac3f..b28b7b8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -289,6 +289,43 @@
   }
 
   /**
+   * Return the number of changes between 2 provided change numbers.
+   * @param from The lower (older) change number.
+   * @param to   The upper (newer) change number.
+   * @return The computed number of changes.
+   */
+  public int getCount(ChangeNumber from, ChangeNumber to)
+  {
+    int count = 0;
+    flush();
+    ReplServerDBCursor cursor = null;
+    try
+    {
+      try
+      {
+        cursor = db.openReadCursor(from);
+      }
+      catch(Exception e)
+      {
+        return 0;
+      }
+      ChangeNumber curr = null;
+      while ((curr = cursor.nextChangeNumber())!=null)
+      {
+        if (curr.newer(to))
+          break;
+        count++;
+      }
+    }
+    finally
+    {
+      if (cursor != null)
+        cursor.abort();
+    }
+    return count;
+  }
+
+  /**
    * Removes the provided number of messages from the beginning of the msgQueue.
    *
    * @param number the number of changes to be removed.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index 0c067bc..0e1629c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -320,8 +320,8 @@
   /**
    * Clear the changes from this DB (from both memory cache and DB storage)
    * for the provided serviceID.
-   * @param serviceIDToClear The serviceID for which we want to remove the
-   *         all records from the DraftCNDb.
+   * @param serviceIDToClear The serviceID for which we want to remove
+   *         all records from the DraftCNDb - null means all.
    * @throws DatabaseException When an exception occurs while removing the
    * changes from the DB.
    * @throws Exception When an exception occurs while accessing a resource
@@ -339,6 +339,7 @@
     boolean finished = false;
     boolean done = false;
 
+    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
     // In case of deadlock detection by the Database, this thread can
     // by aborted by a DeadlockException. This is a transient error and
     // the transaction should be attempted again.
@@ -385,8 +386,8 @@
             {
               // let's get the eligible part of the domain
               ServerState startSS = domain.getStartState();
-              ServerState endSS   = domain.getEligibleState(
-                  replicationServer.getEligibleCN());
+              ServerState endSS= domain.getEligibleState(crossDomainEligibleCN);
+
               ChangeNumber fcn = startSS.getMaxChangeNumber(cn.getServerId());
               ChangeNumber lcn = endSS.getMaxChangeNumber(cn.getServerId());
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 5bb3046..dad999b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -251,7 +251,7 @@
             boolean isEligible = (newMsg.getChangeNumber().getTime()
                 <= eligibleCN.getTime());
 
-            if (debugEnabled())
+          if (debugEnabled())
               TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId()
                 + " getNextEligibleMessageForDomain(" + opid+ ") "
                 + "newMsg isEligible=" + isEligible + " since "
@@ -568,7 +568,6 @@
   throws DirectoryException
   {
     String crossDomainStartState;
-
     try
     {
       draftCompat = true;
@@ -611,7 +610,7 @@
           // startDraftCN (from the request filter) is present in the draftCnDb
           // Get an iterator to traverse the draftCNDb
           draftCNDbIter =
-            draftCNDb.generateIterator(draftCNDb.getFirstKey());
+            draftCNDb.generateIterator(startDraftCN);
         }
         else
         {
@@ -739,7 +738,10 @@
             newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
             if ((providedCookie==null)||(providedCookie.length()==0)
                 ||allowUnknownDomains)
-              newDomainCtxt.startState = new ServerState();
+            {
+              if (newDomainCtxt.startState == null)
+                newDomainCtxt.startState = new ServerState();
+            }
             else
               if (newDomainCtxt.startState == null)
                 throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
@@ -1121,12 +1123,6 @@
     try
     {
 
-      // Search / no DraftCN / not persistent
-      // -----------------------------------
-      //  init: all domain are candidate
-      //    get one msg from each
-      //       no (null) msg returned: should not happen since we go to a state
-      //       that is computed/expected
       //  getMessage:
       //    get the oldest msg:
       //    after:
@@ -1137,7 +1133,6 @@
       //       get one msg from that domain
       //       no (null) msg returned: should not happen since we go to a state
       //       that is computed/expected
-      //  step 2: send DoneMsg
 
       // Persistent:
       // ----------
@@ -1245,12 +1240,14 @@
                     {
                       // let's traverse the DraftCNdb searching for the change
                       // found in the changelogDb.
+                      if (debugEnabled())
                       TRACER.debugInfo("getNextECLUpdate generating draftCN "
                           + " will skip " + cnFromDraftCNDb
                           + " and read next change from the DraftCNDb.");
 
                       isEndOfDraftCNReached = (draftCNDbIter.next()==false);
 
+                      if (debugEnabled())
                       TRACER.debugInfo("getNextECLUpdate generating draftCN "
                           + " has skiped to "
                           + " sn=" + draftCNDbIter.getDraftCN()
@@ -1290,6 +1287,7 @@
                     // the change from the changelogDb is older
                     // it should have been stored lately
                     // let's continue to traverse the changelogdb
+                    if (debugEnabled())
                     TRACER.debugInfo("getNextECLUpdate: will skip "
                         + cnFromChangelogDb
                         + " and read next from the regular changelog.");
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index d3d0aa5..dc1b8c7 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -1402,7 +1402,7 @@
    * for the provided server Id.
    */
   public ReplicationIterator getChangelogIterator(int serverId,
-    ChangeNumber changeNumber)
+      ChangeNumber changeNumber)
   {
     DbHandler handler = sourceDbHandlers.get(serverId);
     if (handler == null)
@@ -1423,6 +1423,25 @@
     }
   }
 
+ /**
+  * Count the number of changes in the replication changelog for the provided
+  * serverID, between 2 provided changenumbers.
+  * @param serverId Identifier of the server for which the iterator is created.
+  * @param from lower limit changenumber.
+  * @param to   upper limit changenumber.
+  * @return the number of changes.
+  *
+  */
+  public int getCount(int serverId,
+      ChangeNumber from, ChangeNumber to)
+  {
+    DbHandler handler = sourceDbHandlers.get(serverId);
+    if (handler == null)
+      return 0;
+
+    return handler.getCount(from, to);
+  }
+
   /**
    * Creates and returns an iterator.
    * When the iterator is not used anymore, the caller MUST call the
@@ -3358,81 +3377,18 @@
   public long getEligibleCount(ServerState startState, ChangeNumber endCN)
   {
     long res = 0;
-    ReplicationIterator ri=null;
 
     // Parses the dbState of the domain , server by server
     ServerState dbState = this.getDbServerState();
-    Iterator<Integer> it = dbState.iterator();
-    while (it.hasNext())
+    Iterator<Integer> serverIDIterator = dbState.iterator();
+    while (serverIDIterator.hasNext())
     {
-      // for each server
-      int sid = it.next();
-      DbHandler h = sourceDbHandlers.get(sid);
-
-      try
-      {
-        // Set on the change related to the startState
-        ChangeNumber startCN = null;
-        try
-        {
-          ri = h.generateIterator(startState.getMaxChangeNumber(sid));
-          if (ri.next()==true)
-          {
-            startCN = ri.getChange().getChangeNumber();
-          }
-        }
-        catch(Exception e)
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-          startCN = null;
-        }
-        finally
-        {
-          if (ri!=null)
-          {
-            ri.releaseCursor();
-            ri = null;
-          }
-        }
-
-        if (startCN != null)
-        {
-          // Set on the change related to the endCN
-          ChangeNumber upperCN = null;
-          try
-          {
-            // Build a changenumber for this very server, with the timestamp
-            // of the endCN
-            ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
-            ri = h.generateIterator(f);
-            if (ri.next()==true)
-            {
-              upperCN = ri.getChange().getChangeNumber();
-            }
-          }
-          catch(Exception e)
-          {
-            upperCN = h.getLastChange();
-          }
-          finally
-          {
-            if (ri!=null)
-            {
-              ri.releaseCursor();
-              ri = null;
-            }
-          }
-
-          long diff = upperCN.getSeqnum() - startCN.getSeqnum() + 1;
-
-          res += diff;
-        }
-        // TODO:ECL We should compute if changenumber.seqnum has turned !
-      }
-      catch(Exception e)
-      {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      }
+      // process one sid
+      int sid = serverIDIterator.next();
+      ChangeNumber startCN = null;
+      if (startState.getMaxChangeNumber(sid) != null)
+        startCN = startState.getMaxChangeNumber(sid);
+      res += getCount(sid, startCN, endCN);
     }
     return res;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
index 24fc481..91ec9ea 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2564,7 +2564,8 @@
     }
     else
     {
-      TRACER.debugInfo(this +
+      if (debugEnabled())
+        TRACER.debugInfo(this +
           " is not configured to send CN heartbeat interval");
     }
   }
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index baea982..6db04d7 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -132,6 +132,7 @@
 import org.opends.server.types.SearchResultEntry;
 import org.opends.server.types.SearchScope;
 import org.opends.server.util.LDIFWriter;
+import org.opends.server.util.ServerConstants;
 import org.opends.server.util.TimeThread;
 import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -171,6 +172,8 @@
   List<Control> NO_CONTROL = null;
 
   private int brokerSessionTimeout = 5000;
+  
+  private int maxWindow = 100;
   /**
    * Set up the environment for performing the tests in this Class.
    * Replication
@@ -200,7 +203,7 @@
     ReplServerFakeConfiguration conf1 =
       new ReplServerFakeConfiguration(
           replicationServerPort, "ExternalChangeLogTestDb",
-          0, 71, 0, 100, null);
+          0, 71, 0, maxWindow, null);
 
     replicationServer = new ReplicationServer(conf1);;
     debugInfo("configure", "ReplicationServer created"+replicationServer);
@@ -250,7 +253,7 @@
     ts = ECLCompatWriteReadAllOps(5);replicationServer.clearDb();
 
     ECLIncludeAttributes();replicationServer.clearDb();
-    
+
     ChangeTimeHeartbeatTest();replicationServer.clearDb();
 
   }
@@ -3313,8 +3316,11 @@
 
       // search on 'cn=changelog'
       LinkedHashSet<String> attributes = new LinkedHashSet<String>();
-      attributes.add("*");
-      attributes.add("+");
+      if (expectedFirst>0)
+        attributes.add("firstchangenumber");
+      attributes.add("lastchangenumber");
+      attributes.add("changelog");
+      attributes.add("lastExternalChangelogCookie");
 
       debugInfo(tn, " Search: rootDSE");
       InternalSearchOperation searchOp =
@@ -3344,8 +3350,9 @@
           ldifWriter.writeEntry(resultEntry);
           if (eclEnabled)
           {
-            checkValue(resultEntry,"firstchangenumber",
-              String.valueOf(expectedFirst));
+            if (expectedFirst>0)
+              checkValue(resultEntry,"firstchangenumber",
+                String.valueOf(expectedFirst));
             checkValue(resultEntry,"lastchangenumber",
               String.valueOf(expectedLast));
             checkValue(resultEntry,"changelog",
@@ -3353,7 +3360,8 @@
           }
           else
           {
-            assertEquals(getAttributeValue(resultEntry, "firstchangenumber"),
+            if (expectedFirst>0)
+              assertEquals(getAttributeValue(resultEntry, "firstchangenumber"),
                 null);
             assertEquals(getAttributeValue(resultEntry, "lastchangenumber"),
                 null);
@@ -3419,9 +3427,10 @@
     String user1entryUUID = "11111111-1112-1113-1114-111111111115";
     try
     {
-      // The replication changelog is empty
       ReplicationServerDomain rsdtest =
         replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
+
+      // The replication changelog is empty
       long count = rsdtest.getEligibleCount(
           new ServerState(),
           new ChangeNumber(TimeThread.getTime(), 1, 1201));
@@ -3430,10 +3439,10 @@
       // Creates broker on o=test
       ReplicationBroker server01 = openReplicationSession(
           DN.decode(TEST_ROOT_DN_STRING),  1201,
-          100, replicationServerPort,
+          1000, replicationServerPort,
           brokerSessionTimeout, true);
 
-      // Publish 1 message
+      // Publish one first message
       ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, 1201);
       DeleteMsg delMsg =
         new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
@@ -3442,12 +3451,13 @@
       debugInfo(tn, " publishes " + delMsg.getChangeNumber());
       sleep(300);
 
+      // From begin to now : 1 change
       count = rsdtest.getEligibleCount(
           new ServerState(),
           new ChangeNumber(TimeThread.getTime(), 1, 1201));
       assertEquals(count, 1);
 
-      // Publish 1 message
+      // Publish one second message
       ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, 1201);
       delMsg =
         new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2,
@@ -3456,29 +3466,36 @@
       debugInfo(tn, " publishes " + delMsg.getChangeNumber());
       sleep(300);
 
+      // From begin to now : 2 changes
       count = rsdtest.getEligibleCount(
           new ServerState(),
           new ChangeNumber(TimeThread.getTime(), 1, 1201));
       assertEquals(count, 2);
 
+      // From begin to first change (inclusive) : 1 change = cn1
       count = rsdtest.getEligibleCount(
           new ServerState(),  cn1);
       assertEquals(count, 1);
 
       ServerState ss = new ServerState();
       ss.update(cn1);
+      
+      // From state/cn1(exclusive) to cn1 (inclusive) : 0 change
       count = rsdtest.getEligibleCount(ss, cn1);
       assertEquals(count, 0);
 
+      // From state/cn1(exclusive) to cn2 (inclusive) : 1 change = cn2
       count = rsdtest.getEligibleCount(ss, cn2);
       assertEquals(count, 1);
 
       ss.update(cn2);
+
+      // From state/cn2(exclusive) to now (inclusive) : 0 change
       count = rsdtest.getEligibleCount(ss,
           new ChangeNumber(TimeThread.getTime(), 4, 1201));
       assertEquals(count, 0);
 
-      // Publish 1 message
+      // Publish one third message
       ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, 1201);
       delMsg =
         new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3,
@@ -3488,11 +3505,129 @@
       sleep(300);
 
       ss.update(cn2);
+
+      // From state/cn2(exclusive) to now : 1 change = cn3
       count = rsdtest.getEligibleCount(ss,
           new ChangeNumber(TimeThread.getTime(), 4, 1201));
       assertEquals(count, 1);
 
+      boolean perfs=false;
+      if (perfs)
+      {
+      
+      // number of msgs used by the test
+      int maxMsg = 999999;
 
+      // We need an RS configured with a window size bigger than the number
+      // of msg used by the test.
+      assertTrue(maxMsg<maxWindow);
+      debugInfo(tn, "Perf test in compat mode - will generate " + maxMsg + " msgs.");
+      for (int i=4; i<=maxMsg; i++)
+      {
+        ChangeNumber cnx = new ChangeNumber(TimeThread.getTime(), i, 1201);
+        delMsg =
+          new DeleteMsg("uid="+tn+i+"," + TEST_ROOT_DN_STRING, cnx,
+              user1entryUUID);
+        server01.publish(delMsg);  
+      }
+      sleep(1000);
+      debugInfo(tn, "Perfs test in compat - search lastChangeNumber");
+      ArrayList<String> excludedDomains =
+        MultimasterReplication.getECLDisabledDomains();
+      if (!excludedDomains.contains(
+          ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
+        excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
+
+      ECLWorkflowElement eclwe = (ECLWorkflowElement)
+      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
+      ReplicationServer rs = eclwe.getReplicationServer();
+      rs.disableEligibility(excludedDomains);
+      long t1 = TimeThread.getTime();
+      int[] limitss = replicationServer.getECLDraftCNLimits(
+          replicationServer.getEligibleCN(), excludedDomains);
+      assertEquals(limitss[1], maxMsg);
+      long t2 = TimeThread.getTime();
+      debugInfo(tn, "Perfs - " + maxMsg + " counted in (ms):" + (t2 - t1));
+      
+      try
+      {
+        // search on 'cn=changelog'
+        LinkedHashSet<String> attributes = new LinkedHashSet<String>();
+        attributes.add("+");
+        attributes.add("*");
+
+        String filter = "(changenumber>="+maxMsg+")";
+        debugInfo(tn, " Search: " + filter);
+        InternalSearchOperation searchOp =
+          connection.processSearch(
+              ByteString.valueOf("cn=changelog"),
+              SearchScope.WHOLE_SUBTREE,
+              DereferencePolicy.NEVER_DEREF_ALIASES,
+              0, // Size limit
+              0, // Time limit
+              false, // Types only
+              LDAPFilter.decode(filter),
+              attributes,
+              NO_CONTROL,
+              null);
+        waitOpResult(searchOp, ResultCode.SUCCESS);
+        long t3 = TimeThread.getTime();
+        assertEquals(searchOp.getSearchEntries().size(), 1);
+        debugInfo(tn, "Perfs - last change searched in (ms):" + (t3 - t2));
+
+        filter = "(changenumber>="+maxMsg+")";
+        debugInfo(tn, " Search: " + filter);
+        searchOp =
+          connection.processSearch(
+              ByteString.valueOf("cn=changelog"),
+              SearchScope.WHOLE_SUBTREE,
+              DereferencePolicy.NEVER_DEREF_ALIASES,
+              0, // Size limit
+              0, // Time limit
+              false, // Types only
+              LDAPFilter.decode(filter),
+              attributes,
+              NO_CONTROL,
+              null);
+        waitOpResult(searchOp, ResultCode.SUCCESS);
+        long t4 = TimeThread.getTime();
+        assertEquals(searchOp.getSearchEntries().size(), 1);
+        debugInfo(tn, "Perfs - last change searched in (ms):" + (t4 - t3));
+
+        filter = "(changenumber>="+(maxMsg-2)+")";
+        debugInfo(tn, " Search: " + filter);
+        searchOp =
+          connection.processSearch(
+              ByteString.valueOf("cn=changelog"),
+              SearchScope.WHOLE_SUBTREE,
+              DereferencePolicy.NEVER_DEREF_ALIASES,
+              0, // Size limit
+              0, // Time limit
+              false, // Types only
+              LDAPFilter.decode(filter),
+              attributes,
+              NO_CONTROL,
+              null);
+        waitOpResult(searchOp, ResultCode.SUCCESS);
+        long t5 = TimeThread.getTime();
+        assertEquals(searchOp.getSearchEntries().size(), 3);
+        debugInfo(tn, "Perfs - last 3 changes searched in (ms):" + (t5 - t4));
+        if (searchOp.getSearchEntries() != null)
+        {
+          int i=0;
+          for (SearchResultEntry resultEntry : searchOp.getSearchEntries())
+          {
+            i++;
+            debugInfo(tn, "Result entry returned:" + resultEntry.toLDIFString());
+          }
+        }
+      }
+      catch(Exception e)
+      {
+        fail("Ending test "+tn+" with exception:\n"
+            +  stackTraceToSingleLineString(e));
+      }
+      }
       server01.stop();
 
     }
@@ -3539,6 +3674,10 @@
       ExternalChangelogDomainFakeCfg eclCfg = 
         new ExternalChangelogDomainFakeCfg(true, eclInclude);
       domainConf.setExternalChangelogDomain(eclCfg);
+      // Set a Changetime heartbeat interval low enough (less than default
+      // value that is 1000 ms) for the test to be sure to consider all changes
+      // as eligible.
+      domainConf.setChangetimeHeartbeatInterval(10);
       domain2 = MultimasterReplication.createNewDomain(domainConf);
       domain2.start();
 
@@ -3552,6 +3691,10 @@
       eclCfg = 
         new ExternalChangelogDomainFakeCfg(true, eclInclude);
       domainConf.setExternalChangelogDomain(eclCfg);
+      // Set a Changetime heartbeat interval low enough (less than default
+      // value that is 1000 ms) for the test to be sure to consider all changes
+      // as eligible.
+      domainConf.setChangetimeHeartbeatInterval(10);
       domain3 = MultimasterReplication.createNewDomain(domainConf);
       domain3.start();
 
@@ -3562,6 +3705,10 @@
       eclCfg = 
         new ExternalChangelogDomainFakeCfg(true, eclInclude);
       domainConf.setExternalChangelogDomain(eclCfg);
+      // Set a Changetime heartbeat interval low enough (less than default
+      // value that is 1000 ms) for the test to be sure to consider all changes
+      // as eligible.
+      domainConf.setChangetimeHeartbeatInterval(10);
       domain21 = MultimasterReplication.createNewDomain(domainConf);
       domain21.start();
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
index ef10079..b53f774 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -54,6 +54,12 @@
   private int serverId;
   private SortedSet<String> replicationServers;
   private long heartbeatInterval = 1000;
+
+  // By default changeTimeHeartbeatInterval is set to 0 in order to disable
+  // this feature and not kill the tests that expect to receive special
+  // messages.
+  private long changeTimeHeartbeatInterval = 0;
+
   private IsolationPolicy policy = IsolationPolicy.REJECT_ALL_UPDATES;
 
   // Is assured mode enabled or not ?
@@ -197,7 +203,15 @@
    */
   public long getChangetimeHeartbeatInterval()
   {
-    return 0;
+    return changeTimeHeartbeatInterval;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void setChangetimeHeartbeatInterval(long changeTimeHeartbeatInterval)
+  {
+    this.changeTimeHeartbeatInterval = changeTimeHeartbeatInterval;
   }
 
   /**

--
Gitblit v1.10.0