From abe3bce25e7f6ecd0ce8b90a14036d3380739e9e Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Mon, 24 Aug 2009 16:11:46 +0000
Subject: [PATCH] Fix 4183 - ECL (draft mode): first and last ChangeNumber are 0 until first search

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java                         |  115 +++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java    |  336 +++++++++++++++++++++++----
 opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java |   22 +
 opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java  |   18 +
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                          |  133 ----------
 opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java                          |    6 
 opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java                               |   24 +-
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                   |   20 +
 opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java                             |   36 +-
 9 files changed, 479 insertions(+), 231 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java b/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
index a8a4320..0c332e2 100644
--- a/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -25,8 +25,8 @@
  *      Copyright 2009 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.common;
-import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -39,7 +39,8 @@
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.SearchOperation;
-import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.AttributeValues;
 import org.opends.server.types.ByteString;
@@ -48,6 +49,7 @@
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.ResultCode;
 import org.opends.server.types.VirtualAttributeRule;
+import org.opends.server.util.ServerConstants;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 
@@ -63,7 +65,6 @@
        extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg>
        implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg>
 {
-  private static final DebugTracer TRACER = getTracer();
   // The current configuration for this virtual attribute provider.
   private UserDefinedVirtualAttributeCfg currentConfig;
 
@@ -137,8 +138,19 @@
       DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
       if (eclwe!=null)
       {
-        first = String.valueOf(
-            eclwe.getReplicationServer().getFirstDraftChangeNumber());
+        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
+        ArrayList<String> excludedDomains =
+          MultimasterReplication.getPrivateDomains();
+        if (!excludedDomains.contains(
+            ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
+          excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
+
+        ReplicationServer rs = eclwe.getReplicationServer();
+        int[] limits = rs.getECLDraftCNLimits(
+            rs.getEligibleCN(), excludedDomains);
+
+        first = String.valueOf(limits[0]);
+
       }
     }
     catch(Exception e)
diff --git a/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java b/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
index a2a44b8..c8a068b 100644
--- a/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
+++ b/opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -27,6 +27,7 @@
 package org.opends.server.replication.common;
 import static org.opends.server.loggers.debug.DebugLogger.getTracer;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -40,6 +41,8 @@
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.SearchOperation;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.types.AttributeValue;
 import org.opends.server.types.AttributeValues;
 import org.opends.server.types.ByteString;
@@ -48,6 +51,7 @@
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.ResultCode;
 import org.opends.server.types.VirtualAttributeRule;
+import org.opends.server.util.ServerConstants;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
 
@@ -137,8 +141,18 @@
       DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
       if (eclwe!=null)
       {
-        last = String.valueOf(
-            eclwe.getReplicationServer().getLastDraftChangeNumber());
+        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
+        ArrayList<String> excludedDomains =
+          MultimasterReplication.getPrivateDomains();
+        if (!excludedDomains.contains(
+            ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
+          excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
+
+        ReplicationServer rs = eclwe.getReplicationServer();
+        int[] limits = rs.getECLDraftCNLimits(
+            rs.getEligibleCN(), excludedDomains);
+
+        last = String.valueOf(limits[1]);
       }
     }
     catch(Exception e)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
index c7bff36..5beda7e 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -315,28 +315,28 @@
      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
     {
       return "ModifyDNMsg content: " +
-        "\nprotocolVersion: " + protocolVersion +
-        "\ndn: " + dn +
-        "\nchangeNumber: " + changeNumber +
-        "\nuniqueId: " + uniqueId +
-        "\nassuredFlag: " + assuredFlag +
-        "\nnewRDN: " + newRDN +
-        "\nnewSuperior: " + newSuperior +
-        "\ndeleteOldRdn: " + deleteOldRdn;
+        " protocolVersion: " + protocolVersion +
+        " dn: " + dn +
+        " changeNumber: " + changeNumber +
+        " uniqueId: " + uniqueId +
+        " assuredFlag: " + assuredFlag +
+        " newRDN: " + newRDN +
+        " newSuperior: " + newSuperior +
+        " deleteOldRdn: " + deleteOldRdn;
     }
     if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
     {
       return "ModifyDNMsg content: " +
-        "\nprotocolVersion: " + protocolVersion +
-        "\ndn: " + dn +
-        "\nchangeNumber: " + changeNumber +
-        "\nuniqueId: " + uniqueId +
-        "\nnewRDN: " + newRDN +
-        "\nnewSuperior: " + newSuperior +
-        "\ndeleteOldRdn: " + deleteOldRdn +
-        "\nassuredFlag: " + assuredFlag +
-        "\nassuredMode: " + assuredMode +
-        "\nsafeDataLevel: " + safeDataLevel;
+        " protocolVersion: " + protocolVersion +
+        " dn: " + dn +
+        " changeNumber: " + changeNumber +
+        " uniqueId: " + uniqueId +
+        " newRDN: " + newRDN +
+        " newSuperior: " + newSuperior +
+        " deleteOldRdn: " + deleteOldRdn +
+        " assuredFlag: " + assuredFlag +
+        " assuredMode: " + assuredMode +
+        " safeDataLevel: " + safeDataLevel;
     }
     return "!!! Unknown version: " + protocolVersion + "!!!";
   }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
index 507461a..2b50944 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -197,22 +197,22 @@
     if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
     {
       return "ModifyMsg content: " +
-        "\nprotocolVersion: " + protocolVersion +
-        "\ndn: " + dn +
-        "\nchangeNumber: " + changeNumber +
-        "\nuniqueId: " + uniqueId +
-        "\nassuredFlag: " + assuredFlag;
+        " protocolVersion: " + protocolVersion +
+        " dn: " + dn +
+        " changeNumber: " + changeNumber +
+        " uniqueId: " + uniqueId +
+        " assuredFlag: " + assuredFlag;
     }
     if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
     {
       return "ModifyMsg content: " +
-        "\nprotocolVersion: " + protocolVersion +
-        "\ndn: " + dn +
-        "\nchangeNumber: " + changeNumber +
-        "\nuniqueId: " + uniqueId +
-        "\nassuredFlag: " + assuredFlag +
-        "\nassuredMode: " + assuredMode +
-        "\nsafeDataLevel: " + safeDataLevel;
+        " protocolVersion: " + protocolVersion +
+        " dn: " + dn +
+        " changeNumber: " + changeNumber +
+        " uniqueId: " + uniqueId +
+        " assuredFlag: " + assuredFlag +
+        " assuredMode: " + assuredMode +
+        " safeDataLevel: " + safeDataLevel;
     }
     return "!!! Unknown version: " + protocolVersion + "!!!";
   }
diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
index f884b31..13dd7cd 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -43,7 +43,7 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.server.DraftCNDB.*;
+import org.opends.server.replication.server.DraftCNDB.DraftCNDBCursor;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.InitializationException;
@@ -81,7 +81,6 @@
   private boolean shutdown = false;
   private boolean trimDone = false;
   private DirectoryThread thread = null;
-  private final Object flushLock = new Object();
   private ReplicationServer replicationServer;
 
   // The maximum number of retries in case of DatabaseDeadlock Exception.
@@ -142,7 +141,8 @@
     db.addEntry(key, value, serviceID, cn);
 
     if (debugEnabled())
-      TRACER.debugInfo("In DraftCNDbhandler.add, added: "
+      TRACER.debugInfo(
+          "In DraftCNDbhandler.add, added: "
         + " key=" + key
         + " value=" + value
         + " serviceID=" + serviceID
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index c5effda..4346763 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -105,6 +105,7 @@
    * Specifies the excluded DNs (like cn=admin, ...).
    */
   public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
+  //HashSet<String> excludedServiceIDs = new HashSet<String>();
 
   /**
    * Eligible changeNumber - only changes older or equal to eligibleCN
@@ -565,7 +566,8 @@
 
         // Get the draftLimits (from the eligibleCN got at the beginning of
         // the operation.
-        int[] limits = getECLDraftCNLimits(eligibleCN);
+        int[] limits = replicationServer.getECLDraftCNLimits(
+            eligibleCN, excludedServiceIDs);
 
         if (startDraftCN<=limits[1])
         {
@@ -630,7 +632,7 @@
             continue;
 
           // skip the excluded domains
-          if (isServiceIDExcluded(rsd.getBaseDn()))
+          if (excludedServiceIDs.contains(rsd.getBaseDn()))
             continue;
 
           // Creates the new domain context
@@ -829,11 +831,6 @@
   throws DirectoryException
   {
 
-    //
-    //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
-    //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
-    //this.setConsumerActive(true);
-
     this.operationId = startECLSessionMsg.getOperationId();
     this.setName(this.getClass().getCanonicalName()+ " " + operationId);
 
@@ -1434,126 +1431,4 @@
     eligibleCN = replicationServer.getEligibleCN();
   }
 
-  /*
-   * Get first and last DraftCN
-   * @param crossDomainEligibleCN
-   * @return
-   */
-  private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN)
-  throws DirectoryException
-  {
-    /* The content of the DraftCNdb depends on the SEARCH operations done before
-     * requesting the DraftCN. If no operations, DraftCNdb is empty.
-     * The limits we want to get are the "potential" limits if a request was
-     * done, the DraftCNdb is probably not complete to do that.
-     *
-     * The first DraftCN is :
-     *  - the first record from the DraftCNdb
-     *  - if none because DraftCNdb empty,
-     *      then
-     *        if no change in replchangelog then return 0
-     *        else return 1 (DraftCN that WILL be returned to next search)
-     *
-     * The last DraftCN is :
-     *  - initialized with the last record from the DraftCNdb (0 if none)
-     *    and consider the genState associated
-     *  - to the last DraftCN, we add the count of updates in the replchangelog
-     *     FROM that genState TO the crossDomainEligibleCN
-     *     (this diff is done domain by domain)
-     */
-
-    int firstDraftCN;
-    int lastDraftCN;
-    boolean DraftCNdbIsEmpty;
-    DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler();
-
-    ReplicationServer rs = replicationServerDomain.getReplicationServer();
-
-    // Get the first DraftCN from the DraftCNdb
-    firstDraftCN = draftCNDbH.getFirstKey();
-    HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
-    if (firstDraftCN < 1)
-    {
-      DraftCNdbIsEmpty=true;
-      firstDraftCN = 0;
-      lastDraftCN = 0;
-    }
-    else
-    {
-      DraftCNdbIsEmpty=false;
-
-      // Get the last DraftCN from the DraftCNdb
-      lastDraftCN = draftCNDbH.getLastKey();
-
-      // Get the generalized state associated with the current last DraftCN
-      // and initializes from it the startStates table
-      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
-      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
-      {
-        domainsServerStateForLastSeqnum = MultiDomainServerState.
-          splitGenStateToServerStates(lastSeqnumGenState);
-      }
-    }
-
-    // Domain by domain
-    Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
-    if (rsdi != null)
-    {
-      while (rsdi.hasNext())
-      {
-        // process a domain
-        ReplicationServerDomain rsd = rsdi.next();
-
-        if (isServiceIDExcluded(rsd.getBaseDn()))
-          continue;
-
-        // for this domain, have the state in the replchangelog
-        // where the last DraftCN update is
-        ServerState domainServerStateForLastSeqnum;
-        if ((domainsServerStateForLastSeqnum == null) ||
-            (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
-        {
-          domainServerStateForLastSeqnum = new ServerState();
-        }
-        else
-        {
-          domainServerStateForLastSeqnum =
-            domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
-        }
-
-        // Count the number of (eligible) changes from this place
-        // to the eligible CN (cross server)
-        long ec = rsd.getEligibleCount(
-            domainServerStateForLastSeqnum, crossDomainEligibleCN);
-
-        // ... hum ...
-        if ((ec>0) && (DraftCNdbIsEmpty==false))
-          ec--;
-
-        // cumulates on domains
-        lastDraftCN += ec;
-
-        // DraftCN is empty and there are eligible updates in the repl changelog
-        // then init first DraftCN
-        if ((ec>0) && (firstDraftCN==0))
-          firstDraftCN = 1;
-      }
-    }
-    return new int[]{firstDraftCN, lastDraftCN};
-  }
-
-  private boolean isServiceIDExcluded(String serviceID)
-  {
-    // skip the excluded domains
-    boolean excluded = false;
-    for(String excludedServiceID : this.excludedServiceIDs)
-    {
-      if (excludedServiceID.equalsIgnoreCase(serviceID))
-      {
-        excluded=true;
-        break;
-      }
-    }
-    return excluded;
-  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 0b2ae52..4c98bbe 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -70,6 +71,8 @@
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.ExternalChangeLogSession;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.ProtocolSession;
 import org.opends.server.replication.protocol.ReplServerStartMsg;
 import org.opends.server.replication.protocol.ReplSessionSecurity;
@@ -1584,4 +1587,116 @@
     return ++lastGeneratedDraftCN;
   }
 
+  /**
+   * Get first and last DraftCN.
+   * @param crossDomainEligibleCN The provided crossDomainEligibleCN used as
+   *        the upper limit for the lastDraftCN
+   * @param excludedServiceIDs The serviceIDs that are excluded from the ECL.
+   * @return The first and last draftCN.
+   * @throws DirectoryException a.
+   */
+  public int[] getECLDraftCNLimits(
+      ChangeNumber crossDomainEligibleCN,
+      ArrayList<String> excludedServiceIDs)
+  throws DirectoryException
+  {
+    /* The content of the DraftCNdb depends on the SEARCH operations done before
+     * requesting the DraftCN. If no operations, DraftCNdb is empty.
+     * The limits we want to get are the "potential" limits if a request was
+     * done, the DraftCNdb is probably not complete to do that.
+     *
+     * The first DraftCN is :
+     *  - the first record from the DraftCNdb
+     *  - if none because DraftCNdb empty,
+     *      then
+     *        if no change in replchangelog then return 0
+     *        else return 1 (DraftCN that WILL be returned to next search)
+     *
+     * The last DraftCN is :
+     *  - initialized with the last record from the DraftCNdb (0 if none)
+     *    and consider the genState associated
+     *  - to the last DraftCN, we add the count of updates in the replchangelog
+     *     FROM that genState TO the crossDomainEligibleCN
+     *     (this diff is done domain by domain)
+     */
+
+    int firstDraftCN;
+    int lastDraftCN;
+    boolean DraftCNdbIsEmpty;
+    DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler();
+
+    // Get the first DraftCN from the DraftCNdb
+    firstDraftCN = draftCNDbH.getFirstKey();
+    HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
+    if (firstDraftCN < 1)
+    {
+      DraftCNdbIsEmpty=true;
+      firstDraftCN = 0;
+      lastDraftCN = 0;
+    }
+    else
+    {
+      DraftCNdbIsEmpty=false;
+
+      // Get the last DraftCN from the DraftCNdb
+      lastDraftCN = draftCNDbH.getLastKey();
+
+      // Get the generalized state associated with the current last DraftCN
+      // and initializes from it the startStates table
+      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
+      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
+      {
+        domainsServerStateForLastSeqnum = MultiDomainServerState.
+          splitGenStateToServerStates(lastSeqnumGenState);
+      }
+    }
+
+    // Domain by domain
+    Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator();
+    if (rsdi != null)
+    {
+      while (rsdi.hasNext())
+      {
+        // process a domain
+        ReplicationServerDomain rsd = rsdi.next();
+
+        if (excludedServiceIDs.contains(rsd.getBaseDn()))
+          continue;
+
+        // for this domain, have the state in the replchangelog
+        // where the last DraftCN update is
+        ServerState domainServerStateForLastSeqnum;
+        if ((domainsServerStateForLastSeqnum == null) ||
+            (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
+        {
+          domainServerStateForLastSeqnum = new ServerState();
+        }
+        else
+        {
+          domainServerStateForLastSeqnum =
+            domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
+        }
+
+        // Count the number of (eligible) changes from this place
+        // to the eligible CN (cross server)
+        long ec = rsd.getEligibleCount(
+            domainServerStateForLastSeqnum, crossDomainEligibleCN);
+
+        // the state from which we started is the one BEFORE the lastdraftCN
+        // so we must decrement 1 to the EligibleCount
+        if ((ec>0) && (DraftCNdbIsEmpty==false))
+          ec--;
+
+        // cumulates on domains
+        lastDraftCN += ec;
+
+        // DraftCN Db is empty and there are eligible updates in the replication
+        // changelog then init first DraftCN
+        if ((ec>0) && (firstDraftCN==0))
+          firstDraftCN = 1;
+      }
+    }
+    return new int[]{firstDraftCN, lastDraftCN};
+  }
+
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4f844d4..46ed7dd 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -212,6 +212,7 @@
   public void put(UpdateMsg update, ServerHandler sourceHandler)
     throws IOException
   {
+
     ChangeNumber cn = update.getChangeNumber();
     short id = cn.getServerId();
     sourceHandler.updateServerState(update);
@@ -3089,8 +3090,8 @@
 
   /**
    * This methods count the changes, server by server :
-   * - from a start point (cn taken from the provided startState)
-   * - to an end point (the provided endCN).
+   * - from a serverState start point
+   * - to (inclusive) an end point (the provided endCN).
    * @param startState The provided start server state.
    * @param endCN The provided end change number.
    * @return The number of changes between startState and endCN.
@@ -3116,12 +3117,14 @@
         try
         {
           ri = h.generateIterator(startState.getMaxChangeNumber(sid));
-          startCN = ri.getChange().getChangeNumber();
+          if (ri.next()==true)
+          {
+            startCN = ri.getChange().getChangeNumber();
+          }
         }
         catch(Exception e)
         {
           TRACER.debugCaught(DebugLogLevel.ERROR, e);
-          // no change found (purge from CL)
           startCN = null;
         }
         finally
@@ -3136,19 +3139,20 @@
         if (startCN != null)
         {
           // Set on the change related to the endCN
-          ChangeNumber upperCN;
+          ChangeNumber upperCN = null;
           try
           {
             // Build a changenumber for this very server, with the timestamp
             // of the endCN
             ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
             ri = h.generateIterator(f);
-            upperCN = ri.getChange().getChangeNumber();
+            if (ri.next()==true)
+            {
+              upperCN = ri.getChange().getChangeNumber();
+            }
           }
           catch(Exception e)
           {
-            TRACER.debugCaught(DebugLogLevel.ERROR, e);
-            // no new change
             upperCN = h.getLastChange();
           }
           finally
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index 85ff760..a2339f1 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -200,38 +200,104 @@
   @Test(enabled=true)
   public void ECLReplicationServerTest()
   {
+    // --
+    // First set of test are in the cookie mode
+    
+    // Test that private backend is excluded from ECL
     ECLOnPrivateBackend();replicationServer.clearDb();
+    
+    // Test remote API (ECL through replication protocol) with empty ECL
     ECLRemoteEmpty();replicationServer.clearDb();
+    
+    // Test with empty changelog
     ECLEmpty();replicationServer.clearDb();
-    ECLAllOps();replicationServer.clearDb();
-    ECLRemoteNonEmpty();replicationServer.clearDb();
-    ECLTwoDomains();replicationServer.clearDb();
-    ECLPsearch(true, false);replicationServer.clearDb();
-    ECLPsearch(false, false);replicationServer.clearDb();
-    ECLSimulPsearches();replicationServer.clearDb();
+    
+    // Test all types of ops.  
+    ECLAllOps(); // Do not clean the db for the next test
 
+    // First and last should be ok whenever a request has been done or not
+    // in compat mode.
+    ECLCompatTestLimits(1,4);replicationServer.clearDb();
+
+    // Test remote API (ECL through replication protocol) with NON empty ECL
+    ECLRemoteNonEmpty();replicationServer.clearDb();
+
+    // Test with a mix of domains, a mix of DSes
+    ECLTwoDomains();replicationServer.clearDb();
+    
+    // Persistent search with changesOnly request
+    ECLPsearch(true, false);replicationServer.clearDb();
+
+    // Persistent search with init values request
+    ECLPsearch(false, false);replicationServer.clearDb();
+    
+    // Simultaneous psearches
+    ECLSimultaneousPsearches();replicationServer.clearDb();
+
+    // Test eligible count method.
+    ECLGetEligibleCountTest();replicationServer.clearDb();
+    
     // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned
     // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned
     // TODO:ECL Test invalid DN in cookie returns UNWILLING + message
     // TODO:ECL Test notif control returned contains the cookie
     // TODO:ECL Test the attributes list and values returned in ECL entries
     // TODO:ECL Test search -s base, -s one
+    
+    // Test directly from the java obect that the changeTimeHeartbeatState 
+    // stored are ok.
     ChangeTimeHeartbeatTest();replicationServer.clearDb();
+    
+    // Test the different forms of filter that are parsed in order to
+    // optimize the request.
     ECLFilterTest();
 
+    // --
+    // Second set of test are in the draft compat mode
+    
+    // Empty replication changelog
     ECLCompatEmpty();
+
+    // Request from an invalid draft change number
     ECLCompatBadSeqnum();
-    ECLCompatWriteReadAllOps(1);
-    ECLCompatWriteReadAllOps(5);
+    
+    // Write changes and read ECL from start
+    int ts = ECLCompatWriteReadAllOps(1);
+
+    // Write additional changes and read ECL from a provided draft change number
+    ts = ECLCompatWriteReadAllOps(5);
+    
+    // Test request from a provided change number
     ECLCompatReadFrom(6);
+
+    // Test request from a provided change number interval
     ECLCompatReadFromTo(5,7);
+
+    // Test first and last draft changenumber
     ECLCompatTestLimits(1,8);
+
+    // Test first and last draft changenumber, a dd a new change, do not 
+    // search again the ECL, but search fro first and last
+    ECLCompatTestLimitsAndAdd(1,8, ts);
+
+    // Test DraftCNDb is purged when replication change log is purged
     ECLCompatPurge();
+    
+    // Test first and last are updated
     ECLCompatTestLimits(0,0);
+
+    // Persistent search in changesOnly mode
     ECLPsearch(true, true);replicationServer.clearDb();
+
+    // Persistent search in init + changes mode
     ECLPsearch(false, true);
+    
+    // Test Filter on replication csn
+    // TODO: test with optimization when code done.
     ECLFilterOnReplicationCsn();replicationServer.clearDb();
-    ECLSimulPsearches();replicationServer.clearDb();
+    
+    // Test simultaneous persistent searches in draft compat mode.
+    ECLSimultaneousPsearches();replicationServer.clearDb();
     
   }
 
@@ -592,6 +658,21 @@
       assertTrue(entries != null);
       assertTrue(entries.size()==0);
 
+      //
+      // Test lastExternalChangelogCookie attribute of the ECL
+      //
+      /* FIXME: uncomment when fix available
+      ExternalChangeLogSessionImpl session = 
+        new ExternalChangeLogSessionImpl(replicationServer);
+      MultiDomainServerState expectedLastCookie =
+        new MultiDomainServerState("o=test:;");
+      MultiDomainServerState lastCookie = session.getLastCookie();
+      assertTrue(expectedLastCookie.equalsTo(lastCookie),
+          " ExpectedLastCookie=" + expectedLastCookie +
+          " lastCookie=" + lastCookie);
+      assertLastCookieEquals(tn, expectedLastCookie);
+      */
+      
       // Cleaning
       if (domain2 != null)
         MultimasterReplication.deleteDomain(baseDn2);
@@ -917,47 +998,8 @@
       assertTrue(expectedLastCookie.equalsTo(lastCookie),
           " ExpectedLastCookie=" + expectedLastCookie +
           " lastCookie=" + lastCookie);
-
-      //
-      LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>();
-      lastcookieattribute.add("lastExternalChangelogCookie");
-
-      searchOp = connection.processSearch(
-          ByteString.valueOf(""),
-          SearchScope.BASE_OBJECT,
-          DereferencePolicy.NEVER_DEREF_ALIASES, 
-          0, // Size limit
-          0, // Time limit
-          false, // Types only
-          LDAPFilter.decode("(objectclass=*)"),
-          lastcookieattribute,
-          NO_CONTROL,
-          null);
-
-      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
-          searchOp.getErrorMessage().toString()
-          + searchOp.getAdditionalLogMessage());
-      cookie = "";
-      entries = searchOp.getSearchEntries();
-      if (entries != null)
-      {
-        for (SearchResultEntry resultEntry : entries)
-        {
-          debugInfo(tn, "Result entry=\n" + resultEntry.toLDIFString());
-          ldifWriter.writeEntry(resultEntry);
-          try
-          {
-            List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
-            cookie = l.get(0).iterator().next().toString();
-          }
-          catch(NullPointerException e)
-          {}
-        }
-      }
-
-      assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
-          " Expected last cookie attribute value:" + expectedLastCookie +
-          " Read from server: " + cookie + " are equal :");
+      assertLastCookieEquals(tn, expectedLastCookie);
+      
       s1test.stop();
       s1test2.stop();
       s2test.stop();
@@ -973,6 +1015,61 @@
     debugInfo(tn, "Ending test successfully");
   }
 
+  private void assertLastCookieEquals(String tn,
+      MultiDomainServerState expectedLastCookie)
+  {
+    String cookie = "";
+    LDIFWriter ldifWriter = getLDIFWriter();
+
+    //
+    LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>();
+    lastcookieattribute.add("lastExternalChangelogCookie");
+
+    try
+    {
+    InternalSearchOperation searchOp = 
+     connection.processSearch(
+        ByteString.valueOf(""),
+        SearchScope.BASE_OBJECT,
+        DereferencePolicy.NEVER_DEREF_ALIASES, 
+        0, // Size limit
+        0, // Time limit
+        false, // Types only
+        LDAPFilter.decode("(objectclass=*)"),
+        lastcookieattribute,
+        NO_CONTROL,
+        null);
+
+    assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
+        searchOp.getErrorMessage().toString()
+        + searchOp.getAdditionalLogMessage());
+    LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
+    if (entries != null)
+    {
+      for (SearchResultEntry resultEntry : entries)
+      {
+        ldifWriter.writeEntry(resultEntry);
+        try
+        {
+          List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
+          cookie = l.get(0).iterator().next().toString();
+        }
+        catch(NullPointerException e)
+        {}
+      }
+      
+    }
+    }
+    catch(Exception e)
+    {
+      fail("Ending test " + tn + " with exception:\n"
+          +  stackTraceToSingleLineString(e));      
+    }
+    assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
+        " Expected last cookie attribute value:" + expectedLastCookie +
+        " Read from server: " + cookie + " are equal :");
+  }
+  
   // simple update to be received
   private void ECLAllOps()
   {
@@ -1516,9 +1613,9 @@
   /**
    * Test parallel simultaneous psearch with different filters.
    */
-  private void ECLSimulPsearches()
+  private void ECLSimultaneousPsearches()
   {
-    String tn = "ECLSimulPsearches";
+    String tn = "ECLSimultaneousPsearches";
     debugInfo(tn, "Starting test \n\n");
     Socket s1, s2, s3 = null;
     boolean compatMode = false;
@@ -2304,10 +2401,11 @@
     }
   }
 
-  private void ECLCompatWriteReadAllOps(int firstDraftChangeNumber)
+  private int ECLCompatWriteReadAllOps(int firstDraftChangeNumber)
   {
     String tn = "ECLCompatWriteReadAllOps/" + String.valueOf(firstDraftChangeNumber);
     debugInfo(tn, "Starting test\n\n");
+    int ts = 1;
 
     try
     {
@@ -2318,7 +2416,6 @@
           DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 
           100, replicationServerPort,
           1000, true);
-      int ts = 1;
 
       String user1entryUUID = "11111111-1112-1113-1114-111111111115";
       String baseUUID       = "22222222-2222-2222-2222-222222222222";
@@ -2583,6 +2680,7 @@
           +  stackTraceToSingleLineString(e));      
     }
     debugInfo(tn, "Ending test with success");
+    return ts;
   }
 
   private void ECLCompatReadFrom(int firstDraftChangeNumber)
@@ -2987,4 +3085,134 @@
     }
     debugInfo(tn, "Ending test with success");
   }
+
+  private void ECLCompatTestLimitsAndAdd(int expectedFirst, int expectedLast,
+      int ts)
+  {
+    String tn = "ECLCompatTestLimitsAndAdd";
+    debugInfo(tn, "Starting test\n\n");
+    try
+    {
+      ECLCompatTestLimits(expectedFirst, expectedLast);
+     
+      // Creates broker on o=test
+      ReplicationBroker server01 = openReplicationSession(
+          DN.decode(TEST_ROOT_DN_STRING), (short) 1201,
+          100, replicationServerPort,
+          1000, true);
+
+      String user1entryUUID = "11111111-1112-1113-1114-111111111115";
+
+      // Publish DEL
+      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201);
+      DeleteMsg delMsg =
+        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
+            user1entryUUID);
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+     
+      ECLCompatTestLimits(expectedFirst, expectedLast+1);
+      
+      server01.stop();
+    }
+    catch(Exception e)
+    {
+      fail("Ending "+tn+" test with exception:\n"
+          +  stackTraceToSingleLineString(e));
+    }
+    debugInfo(tn, "Ending test with success");
+  }
+
+  private void ECLGetEligibleCountTest()
+  {
+    String tn = "ECLGetEligibleCountTest";
+    debugInfo(tn, "Starting test\n\n");
+    String user1entryUUID = "11111111-1112-1113-1114-111111111115";
+    try
+    {
+      // The replication changelog is empty
+      ReplicationServerDomain rsdtest =
+        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
+      long count = rsdtest.getEligibleCount(
+          new ServerState(), 
+          new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
+      assertEquals(count, 0);
+      
+      // Creates broker on o=test
+      ReplicationBroker server01 = openReplicationSession(
+          DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 
+          100, replicationServerPort,
+          1000, true);
+
+      // Publish 1 message
+      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, (short)1201);
+      DeleteMsg delMsg =
+        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, 
+            user1entryUUID);
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+      sleep(300);
+
+      count = rsdtest.getEligibleCount(
+          new ServerState(), 
+          new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
+      assertEquals(count, 1);
+      
+      // Publish 1 message
+      ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, (short)1201);
+      delMsg =
+        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2, 
+            user1entryUUID);
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+      sleep(300);
+
+      count = rsdtest.getEligibleCount(
+          new ServerState(), 
+          new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
+      assertEquals(count, 2);
+
+      count = rsdtest.getEligibleCount(
+          new ServerState(),  cn1);
+      assertEquals(count, 1);
+
+      ServerState ss = new ServerState();
+      ss.update(cn1);
+      count = rsdtest.getEligibleCount(ss, cn1);
+      assertEquals(count, 0);
+
+      count = rsdtest.getEligibleCount(ss, cn2);
+      assertEquals(count, 1);
+      
+      ss.update(cn2);
+      count = rsdtest.getEligibleCount(ss, 
+          new ChangeNumber(TimeThread.getTime(), 4, (short)1201));
+      assertEquals(count, 0);
+
+      // Publish 1 message
+      ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, (short)1201);
+      delMsg =
+        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3, 
+            user1entryUUID);
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+      sleep(300);
+
+      ss.update(cn2);
+      count = rsdtest.getEligibleCount(ss, 
+          new ChangeNumber(TimeThread.getTime(), 4, (short)1201));
+      assertEquals(count, 1);
+      
+      
+      server01.stop();
+      
+    }
+    catch(Exception e)
+    {
+      fail("Ending "+tn+" test with exception:\n"
+          +  stackTraceToSingleLineString(e));
+    }
+    debugInfo(tn, "Ending test with success");
+  }
+
 }
\ No newline at end of file

--
Gitblit v1.10.0