From 4fe72a4bef946169b0f50bc05bd9dc3b4b1131d3 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Fri, 14 Aug 2009 12:37:19 +0000
Subject: [PATCH] Support for External change log compatible with draft-good-ldap-changelog-04.txt , March 2003

---
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java | 1510 +++++++++++++++++++++++++++++----------------------------
 1 files changed, 761 insertions(+), 749 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index d0565d6..5272f50 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,12 +27,14 @@
 package org.opends.server.replication.server;
 
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -41,7 +43,6 @@
 import org.opends.messages.Category;
 import org.opends.messages.Message;
 import org.opends.messages.Severity;
-import org.opends.server.api.DirectoryThread;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
@@ -52,7 +53,7 @@
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.ResultCode;
-import org.opends.server.util.TimeThread;
+import org.opends.server.util.ServerConstants;
 
 /**
  * This class defines a server handler, which handles all interaction with a
@@ -61,128 +62,234 @@
 public class ECLServerHandler extends ServerHandler
 {
 
-  // Properties filled only if remote server is a RS
-  private String serverAddressURL;
-
+  // This is a string identifying the operation, provided by the client part
+  // of the ECL, used to help interpretation of messages logged.
   String operationId;
 
+  // Iterator on the draftCN database.
+  private DraftCNDbIterator draftCNDbIter = null;
+
+  boolean draftCompat = false;
   /**
-   * CLDomainContext : contains the state properties for the search
-   * currently being processed, by replication domain.
+   * Specifies the last draft changer number (seqnum) requested.
    */
-  private class CLDomainContext
+  public int lastDraftCN = 0;
+  /**
+   * Specifies whether the draft change number (seqnum) db has been read until
+   * its end.
+   */
+  public boolean isEndOfDraftCNReached = false;
+  /**
+   * Specifies whether the current search has been requested to be persistent
+   * or not.
+   */
+  public short isPersistent;
+  /**
+   * Specifies the current search phase : INIT or PERSISTENT.
+   */
+  public int searchPhase = INIT_PHASE;
+  /**
+   * Specifies the cookie contained in the request, specifying where
+   * to start serving the ECL.
+   */
+  public String startCookie;
+  /**
+   * Specifies the value of the cookie before the change currently processed
+   * is returned. It is updated with the change number of the change
+   * currently processed (thus becoming the "current" cookie just
+   * before the change is returned.
+   */
+  public MultiDomainServerState previousCookie =
+    new MultiDomainServerState();
+  /**
+   * Specifies the excluded DNs (like cn=admin, ...).
+   */
+  public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
+
+  /**
+   * Eligible changeNumber - only changes older or equal to eligibleCN
+   * are published in the ECL.
+   */
+  public ChangeNumber eligibleCN = null;
+
+  /**
+   * Provides a string representation of this object.
+   * @return the string representation.
+   */
+  public String dumpState()
   {
-    ReplicationServerDomain rsd; // the repl server domain
-    boolean active;              // is the domain still active
-    MessageHandler mh;           // the message handler associated
-    UpdateMsg nextMsg;
-    UpdateMsg nonElligiblemsg;
+    return new String(
+        this.getClass().getCanonicalName() +
+        "[" +
+        "[draftCompat=" + draftCompat +
+        "] [persistent=" + isPersistent +
+        "] [lastDraftCN=" + lastDraftCN +
+        "] [isEndOfDraftCNReached=" + isEndOfDraftCNReached +
+        "] [searchPhase=" + searchPhase +
+        "] [startCookie=" + startCookie +
+        "] [previousCookie=" + previousCookie +
+    "]]");
+  }
+
+  /**
+   * Class that manages the 'by domain' state variables for the search being
+   * currently processed on the ECL.
+   * For example :
+   * if search on 'cn=changelog' is being processed when 2 replicated domains
+   * dc=us and dc=europe are configured, then there will be 2 DomainContext
+   * used, one for ds=us, and one for dc=europe.
+   */
+  private class DomainContext
+  {
+    ReplicationServerDomain rsd;
+
+    boolean active;              // active when there are still changes
+    // supposed eligible for the ECL.
+
+    MessageHandler mh;           // the message handler from which are read
+    // the changes for this domain
+    private UpdateMsg nextMsg;
+    private UpdateMsg nextNonEligibleMsg;
     ServerState startState;
     ServerState currentState;
     ServerState stopState;
 
     /**
-     * Add to the provider buffer a string representation of this object.
+     * {@inheritDoc}
      */
-    public void toString(StringBuilder buffer, int i)
+    @Override
+    public String toString()
     {
-      CLDomainContext xx = clDomCtxts[i];
+      StringBuilder buffer = new StringBuilder();
+      toString(buffer);
+      return buffer.toString();
+    }
+    /**
+     * Provide a string representation of this object for debug purpose..
+     */
+    public void toString(StringBuilder buffer)
+    {
       buffer.append(
-          " clDomCtxts(" + i + ") [act=" + xx.active +
-          " rsd=" + rsd +
-          " nextMsg=" + nextMsg + "(" +
+          "[ [active=" + active +
+          "] [rsd=" + rsd +
+          "] [nextMsg=" + nextMsg + "(" +
           (nextMsg != null?
               new Date(nextMsg.getChangeNumber().getTime()).toString():"")
               + ")" +
-          " nextNonEligibleMsg="      + nonElligiblemsg +
-          " startState=" + startState +
-          " stopState= " + stopState +
-          " currState= " + currentState + "]");
+              "] [nextNonEligibleMsg="      + nextNonEligibleMsg +
+              "] [startState=" + startState +
+              "] [stopState= " + stopState +
+              "] [currentState= " + currentState + "]]");
+    }
+
+    /**
+     * Get the next message elligible regarding
+     * the crossDomain elligible CN. Put it in the context table.
+     * @param opid The operation id.
+     */
+    private void getNextEligibleMessageForDomain(String opid)
+    {
+      if (debugEnabled())
+        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
+          " getNextEligibleMessageForDomain(" + opid+ ") "
+          + "ctxt=" + toString());
+
+      assert(nextMsg == null);
+      try
+      {
+        // Before get a new message from the domain, evaluate in priority
+        // a message that has not been published to the ECL because it was
+        // not eligible
+        if (nextNonEligibleMsg != null)
+        {
+          boolean hasBecomeEligible =
+            (nextNonEligibleMsg.getChangeNumber().getTime()
+                <= eligibleCN.getTime());
+
+          if (debugEnabled())
+            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
+                " getNextEligibleMessageForDomain(" + opid+ ") "
+              + " stored nonEligibleMsg " + nextNonEligibleMsg
+              + " has now become eligible regarding "
+              + " the eligibleCN ("+ eligibleCN
+              + " ):" + hasBecomeEligible);
+
+          if (hasBecomeEligible)
+          {
+            // it is now elligible
+            nextMsg = nextNonEligibleMsg;
+            nextNonEligibleMsg = null;
+          }
+          else
+          {
+            // the oldest is still not elligible - let's wait next
+          }
+        }
+        else
+        {
+          // Here comes a new message !!!
+          // non blocking
+          UpdateMsg newMsg = mh.getnextMessage(false);
+
+          if (debugEnabled())
+            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
+                " getNextEligibleMessageForDomain(" + opid+ ") "
+                + " got new message : "
+                +  " serviceId=[" + mh.getServiceId()
+                + "] [newMsg=" + newMsg + "]" + dumpState());
+
+          // in non blocking mode, return null when no more msg
+          if (newMsg != null)
+          {
+            boolean isEligible = (newMsg.getChangeNumber().getTime()
+                <= eligibleCN.getTime());
+
+            if (debugEnabled())
+              TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId()
+                + " getNextEligibleMessageForDomain(" + opid+ ") "
+                + "newMsg isEligible=" + isEligible + " since "
+                + "newMsg=[" + newMsg.getChangeNumber()
+                + " " + new Date(newMsg.getChangeNumber().getTime()).toString()
+                + "] eligibleCN=[" + eligibleCN
+                + " " + new Date(eligibleCN.getTime()).toString()+"]"
+                + dumpState());
+
+            if (isEligible)
+            {
+              nextMsg = newMsg;
+            }
+            else
+            {
+              nextNonEligibleMsg = newMsg;
+            }
+          }
+        }
+      }
+      catch(Exception e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
     }
   }
 
-  // The list of contexts by domain for the current search
-  CLDomainContext[] clDomCtxts = new CLDomainContext[0];
+  // The global list of contexts by domain for the search currently processed.
+  DomainContext[] domainCtxts = new DomainContext[0];
 
-  private void clDomCtxtsToString(String msg)
+  private String clDomCtxtsToString(String msg)
   {
     StringBuilder buffer = new StringBuilder();
     buffer.append(msg+"\n");
-    for (int i=0;i<clDomCtxts.length;i++)
+    for (int i=0;i<domainCtxts.length;i++)
     {
-      clDomCtxts[i].toString(buffer, i);
+      domainCtxts[i].toString(buffer);
       buffer.append("\n");
     }
-    TRACER.debugInfo(
-        "In " + this.getName() + " clDomCtxts: " + buffer.toString());
+    return buffer.toString();
   }
 
-  /**
-   * Class that manages the state variables for the current search on the ECL.
-   */
-  private class CLTraverseCtxt
-  {
-    /**
-     * Specifies the next changer number (seqnum), -1 when not.
-     */
-    public int nextSeqnum;
-    /**
-     * Specifies whether the current search has been requested to be persistent
-     * or not.
-     */
-    public short isPersistent;
-    /**
-     * Specifies the last changer number (seqnum) requested.
-     */
-    public int stopSeqnum;
-    /**
-     * Specifies whether the change number (seqnum) db has been read until
-     * its end.
-     */
-    public boolean endOfSeqnumdbReached = false;
-    /**
-     * Specifies the current search phase.
-     * 1 = init
-     * 2 = persistent
-     */
-    public int searchPhase = 1;
-    /**
-     * Specifies the cookie contained in the request, specifying where
-     * to start serving the ECL.
-     */
-    public String generalizedStartState;
-    /**
-     * Specifies the current cookie value.
-     */
-    public MultiDomainServerState currentCookie =
-      new MultiDomainServerState();
-    /**
-     * Specifies the excluded DNs.
-     */
-    public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
-
-    /**
-     * Provides a string representation of this object.
-     * @return the string representation.
-     */
-    public String toString()
-    {
-      return new String(
-        this.getClass().getCanonicalName() +
-        ":[" +
-        " nextSeqnum=" + nextSeqnum +
-        " persistent=" + isPersistent +
-        " stopSeqnum" + stopSeqnum +
-        " endOfSeqnumdbReached=" + endOfSeqnumdbReached +
-        " searchPhase=" + searchPhase +
-        " generalizedStartState=" + generalizedStartState +
-        "]");
-    }
-
-  }
-
-  // The context of the current search
-  private CLTraverseCtxt cLSearchCtxt = new CLTraverseCtxt();
+  static int UNDEFINED_PHASE = 0;
+  static int INIT_PHASE = 1;
+  static int PERSISTENT_PHASE = 2;
 
   /**
    * Starts this handler based on a start message received from remote server.
@@ -199,10 +306,6 @@
           inECLStartMsg.getVersion());
       generationId = inECLStartMsg.getGenerationId();
       serverURL = inECLStartMsg.getServerURL();
-      int separator = serverURL.lastIndexOf(':');
-      serverAddressURL =
-        session.getRemoteAddress() + ":" + serverURL.substring(separator +
-            1);
       setInitialServerState(inECLStartMsg.getServerState());
       setSendWindowSize(inECLStartMsg.getWindowSize());
       if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -211,8 +314,6 @@
         // Only V2 protocol has the group id in repl server start message
         this.groupId = inECLStartMsg.getGroupId();
       }
-      // FIXME:ECL Any generationID must be removed, it makes no sense here.
-      oldGenerationId = -100;
     }
     catch(Exception e)
     {
@@ -243,7 +344,7 @@
         replicationServer, rcvWindowSize);
     try
     {
-      setServiceIdAndDomain("cn=changelog");
+      setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
     }
     catch(DirectoryException de)
     {
@@ -268,12 +369,12 @@
       StartECLSessionMsg startECLSessionMsg)
   throws DirectoryException
   {
-    // FIXME:ECL queueSize is hard coded to 1 else Handler hangs for some reason
+    // queueSize is hard coded to 1 else super class hangs for some reason
     super(null, 1, replicationServerURL, replicationServerId,
         replicationServer, 0);
     try
     {
-      setServiceIdAndDomain("cn=changelog");
+      setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
     }
     catch(DirectoryException de)
     {
@@ -369,115 +470,153 @@
 
   /**
    * Initialize the handler from a provided cookie value.
-   * @param providedGeneralizedStartState The provided cookie value.
+   * @param crossDomainStartState The provided cookie value.
    * @throws DirectoryException When an error is raised.
    */
-  public void initializeCLSearchFromGenState(
-      String providedGeneralizedStartState)
+  public void initializeCLSearchFromGenState(String crossDomainStartState)
   throws DirectoryException
   {
-    this.cLSearchCtxt.nextSeqnum = -1; // will not generate seqnum
-    initializeCLDomCtxts(providedGeneralizedStartState);
+    initializeCLDomCtxts(crossDomainStartState);
+  }
+
+  /**
+   * Initialize the handler from a provided draft first change number.
+   * @param startDraftCN The provided draft first change number.
+   * @throws DirectoryException When an error is raised.
+   */
+  public void initializeCLSearchFromDraftCN(int startDraftCN)
+  throws DirectoryException
+  {
+    String crossDomainStartState;
+
+    draftCompat = true;
+
+    DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler();
+    if (startDraftCN < 0)
+    {
+      // Request filter does not contain any firstDraftCN
+      // So we'll generate from the beginning of what we have stored here.
+
+      // Get the first DraftCN from DraftCNdb
+      if (draftCNDb.count() == 0)
+      {
+        // db is empty
+        isEndOfDraftCNReached = true;
+        crossDomainStartState = null;
+      }
+      else
+      {
+        // get the generalizedServerState related to the start of the draftDb
+        crossDomainStartState = draftCNDb.getValue(draftCNDb.getFirstKey());
+
+        // Get an iterator to traverse the draftCNDb
+        try
+        {
+          draftCNDbIter =
+            draftCNDb.generateIterator(draftCNDb.getFirstKey());
+        }
+        catch(Exception e)
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+
+          if (draftCNDbIter != null)
+            draftCNDbIter.releaseCursor();
+
+          throw new DirectoryException(
+              ResultCode.OPERATIONS_ERROR,
+              Message.raw(Category.SYNC,
+                  Severity.FATAL_ERROR,"Server Error."));
+        }
+      }
+    }
+    else
+    {
+      // Request filter does contain a startDraftCN
+
+      // Read the draftCNDb to see whether it contains startDraftCN
+      crossDomainStartState = draftCNDb.getValue(startDraftCN);
+
+      if (crossDomainStartState != null)
+      {
+        // startDraftCN is present in the draftCnDb
+        // Get an iterator to traverse the draftCNDb
+        try
+        {
+          draftCNDbIter =
+            draftCNDb.generateIterator(draftCNDb.getFirstKey());
+        }
+        catch(Exception e)
+        {
+          TRACER.debugCaught(DebugLogLevel.ERROR, e);
+
+          if (draftCNDbIter != null)
+            draftCNDbIter.releaseCursor();
+
+          throw new DirectoryException(
+              ResultCode.OPERATIONS_ERROR,
+              Message.raw(Category.SYNC,
+                  Severity.FATAL_ERROR,"Server Error."));
+        }
+      }
+      else
+      {
+        // startDraftCN provided in the request is not present in the draftCnDb
+        // Is the provided startDraftCN <= the potential last DraftCNdb
+
+        // Get the draftLimits (from the eligibleCN got at the beginning of
+        // the operation.
+        int[] limits = getECLDraftCNLimits(eligibleCN);
+
+        if (startDraftCN<=limits[1])
+        {
+          // startDraftCN is between first and last and has never been
+          // returned yet
+          crossDomainStartState = draftCNDb.getValue(draftCNDb.getLastKey());
+          // FIXME:ECL ... ok we'll start from the end of the draftCNDb BUT ...
+          // this is NOT the request of the client !!!!
+        }
+        else
+        {
+          throw new DirectoryException(
+              ResultCode.SUCCESS,
+              Message.raw(Category.SYNC,
+                  Severity.INFORMATION,"Bad value provided for change number "
+                  + " Failed to match a replication state to "+startDraftCN));
+        }
+      }
+    }
+    this.draftCompat = true;
+
+    initializeCLDomCtxts(crossDomainStartState);
+
   }
 
   /**
    * Initialize the context for each domain.
-   * @param providedGeneralizedStartState the provided generalized state
+   * @param providedCookie the provided generalized state
    * @throws DirectoryException When an error occurs.
    */
-  public void initializeCLDomCtxts(String providedGeneralizedStartState)
+  public void initializeCLDomCtxts(String providedCookie)
   throws DirectoryException
   {
     HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
 
     ReplicationServer rs = replicationServerDomain.getReplicationServer();
 
-    try
-    {
-      // Initialize start state for  all running domains with empty state
-      Iterator<ReplicationServerDomain> rsdk = rs.getCacheIterator();
-      if (rsdk != null)
-      {
-        while (rsdk.hasNext())
-        {
-          // process a domain
-          ReplicationServerDomain rsd = rsdk.next();
-          // skip the changelog domain
-          if (rsd == this.replicationServerDomain)
-            continue;
-          startStates.put(rsd.getBaseDn(), new ServerState());
-        }
-      }
-
-      // Overwrite start state from the cookie provided in the request
-      if ((providedGeneralizedStartState != null) &&
-          (providedGeneralizedStartState.length()>0))
-      {
-        String[] domains = providedGeneralizedStartState.split(";");
-        for (String domainState : domains)
-        {
-          // Split baseDN and serverState
-          String[] fields = domainState.split(":");
-
-          // BaseDN - Check it
-          String domainBaseDNReceived = fields[0];
-          if (!startStates.containsKey(domainBaseDNReceived))
-            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-                ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
-                  "unknown " + domainBaseDNReceived));
-
-          // ServerState
-          ServerState domainServerState = new ServerState();
-          if (fields.length>1)
-          {
-            String strState = fields[1];
-            String[] strCN = strState.split(" ");
-            for (String sr : strCN)
-            {
-              ChangeNumber fromChangeNumber = new ChangeNumber(sr);
-              domainServerState.update(fromChangeNumber);
-            }
-          }
-          startStates.put(domainBaseDNReceived, domainServerState);
-
-          // FIXME: ECL first cookie value check
-          // ECL For each of the provided state, it this state is older
-          // than the older change stored in the replication changelog ....
-          // then a purge occured since the time the cookie was published
-          // it is recommended to do a full resync
-          ReplicationServerDomain rsd =
-            rs.getReplicationServerDomain(domainBaseDNReceived, false);
-          ServerState domainStartState = rsd.getStartState();
-          if (!domainServerState.cover(domainStartState))
-          {
-            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-                ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
-                  "too old cookie provided " + providedGeneralizedStartState
-                  + " first acceptable change for " + rsd.getBaseDn()
-                  + " is " + rsd.getStartState()));
-          }
-        }
-      }
-    }
-    catch(DirectoryException de)
-    {
-      throw de;
-    }
-    catch(Exception e)
-    {
-      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-          ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
-            "Exception raised: " + e.getMessage()));
-    }
+    // Parse the provided cookie and overwrite startState from it.
+    if ((providedCookie != null) && (providedCookie.length()!=0))
+      startStates =
+        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
 
     try
     {
-      // Now traverse all domains and build the initial changelog context
-      Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator();
+      // Now traverse all domains and build all the initial contexts :
+      // - the global one : dumpState()
+      // - the domain by domain ones : domainCtxts
+      Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
 
       // Creates the table that will contain the real-time info by domain.
-      clDomCtxts = new CLDomainContext[rs.getCacheSize()-1
-                         -this.cLSearchCtxt.excludedServiceIDs.size()];
+      HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
       int i =0;
       if (rsdi != null)
       {
@@ -491,73 +630,87 @@
             continue;
 
           // skip the excluded domains
-          boolean excluded = false;
-          for(String excludedServiceID : this.cLSearchCtxt.excludedServiceIDs)
-          {
-            if (excludedServiceID.equalsIgnoreCase(rsd.getBaseDn()))
-            {
-              excluded=true;
-              break;
-            }
-          }
-          if (excluded)
+          if (isServiceIDExcluded(rsd.getBaseDn()))
             continue;
 
-          // Creates the context record
-          CLDomainContext newContext = new CLDomainContext();
-          newContext.active = true;
-          newContext.rsd = rsd;
+          // Creates the new domain context
+          DomainContext newDomainCtxt = new DomainContext();
+          newDomainCtxt.active = true;
+          newDomainCtxt.rsd = rsd;
 
-          if (this.cLSearchCtxt.isPersistent ==
+          // Assign the start state for the domain
+          if (isPersistent ==
             StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
           {
-            newContext.startState = rsd.getCLElligibleState();
+            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
           }
           else
           {
-            newContext.startState = startStates.get(rsd.getBaseDn());
-            newContext.stopState = rsd.getCLElligibleState();
+            newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
+            if ((providedCookie==null)||(providedCookie.isEmpty()))
+              newDomainCtxt.startState = new ServerState();
+            else
+              if (newDomainCtxt.startState == null)
+                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+                    ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
+                        "missing " + rsd.getBaseDn()));
+            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
           }
-          newContext.currentState = new ServerState();
+          newDomainCtxt.currentState = new ServerState();
 
-          // Creates an unconnected SH
+          // Creates an unconnected SH for the domain
           MessageHandler mh = new MessageHandler(maxQueueSize,
               replicationServerURL, replicationServerId, replicationServer);
           // set initial state
-          mh.setInitialServerState(newContext.startState);
+          mh.setInitialServerState(newDomainCtxt.startState);
           // set serviceID and domain
           mh.setServiceIdAndDomain(rsd.getBaseDn());
-          // register into domain
+          // register the unconnected into the domain
           rsd.registerHandler(mh);
-          newContext.mh = mh;
+          newDomainCtxt.mh = mh;
+
+          previousCookie.update(
+              newDomainCtxt.rsd.getBaseDn(),
+              newDomainCtxt.startState);
 
           // store the new context
-          clDomCtxts[i] = newContext;
+          tmpSet.add(newDomainCtxt);
           i++;
         }
       }
-
-      // the next record from the seqnumdb should be the one
-      cLSearchCtxt.endOfSeqnumdbReached = false;
-      cLSearchCtxt.generalizedStartState = providedGeneralizedStartState;
-
-      // Initializes all domain with the next elligible message
-      for (int j=0; j<clDomCtxts.length; j++)
+      if (!startStates.isEmpty())
       {
-        this.getNextElligibleMessage(j);
-        if (clDomCtxts[j].nextMsg == null)
-          clDomCtxts[j].active = false;
+        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+            ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
+                "unknown " + startStates.toString()));
+      }
+      domainCtxts = tmpSet.toArray(new DomainContext[0]);
+
+      // the next record from the DraftCNdb should be the one
+      startCookie = providedCookie;
+
+      // Initializes all domain with the next(first) elligible message
+      for (int j=0; j<domainCtxts.length; j++)
+      {
+        domainCtxts[j].getNextEligibleMessageForDomain(operationId);
+
+        if (domainCtxts[j].nextMsg == null)
+          domainCtxts[j].active = false;
       }
     }
     catch(Exception e)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      // FIXME:ECL do not publish internal exception plumb to the client
       throw new DirectoryException(
           ResultCode.OPERATIONS_ERROR,
           Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " +
-              e.getLocalizedMessage()),
-          e);
+              e),
+              e);
     }
+    if (debugEnabled())
+      TRACER.debugInfo(
+        " initializeCLDomCtxts ends with " + " " + dumpState());
   }
 
   /**
@@ -569,21 +722,22 @@
   }
 
   /**
-   * Shutdown this handler ServerHandler.
+   * Shutdown this handler.
    */
   public void shutdown()
   {
-    for (int i=0;i<clDomCtxts.length;i++)
+    for (int i=0;i<domainCtxts.length;i++)
     {
-      if (!clDomCtxts[i].rsd.unRegisterHandler(clDomCtxts[i].mh))
+      if (!domainCtxts[i].rsd.unRegisterHandler(domainCtxts[i].mh))
       {
-        TRACER.debugInfo(this +" shutdown() Internal error " +
-                " when unregistering "+ clDomCtxts[i].mh);
+        logError(Message.raw(Category.SYNC, Severity.NOTICE,
+            this +" shutdown() - error when unregistering handler "
+            + domainCtxts[i].mh));
       }
-      clDomCtxts[i].rsd.stopServer(clDomCtxts[i].mh);
+      domainCtxts[i].rsd.stopServer(domainCtxts[i].mh);
     }
     super.shutdown();
-    clDomCtxts = null;
+    domainCtxts = null;
   }
 
   /**
@@ -629,7 +783,7 @@
     attributes.add(Attributes.create("External-Changelog-Server",
         serverURL));
 
-    // FIXME:ECL No monitoring exist for ECL.
+    // TODO:ECL No monitoring exist for ECL.
     return attributes;
   }
   /**
@@ -640,8 +794,9 @@
   {
     String localString;
     localString = "External changelog Server ";
-    if (this.cLSearchCtxt==null)
-      localString += serverId + " " + serverURL + " " + getServiceId();
+    if (this.serverId != 0)
+      localString += serverId + " " + serverURL + " " + getServiceId()
+       + " " + this.getOperationId();
     else
       localString += this.getName();
     return localString;
@@ -652,18 +807,9 @@
    */
   public ServerStatus getStatus()
   {
-    // FIXME:ECL Sould ECLServerHandler manage a ServerStatus ?
-    return ServerStatus.INVALID_STATUS;
-  }
-  /**
-   * Retrieves the Address URL for this server handler.
-   *
-   * @return  The Address URL for this server handler,
-   *          in the form of an IP address and port separated by a colon.
-   */
-  public String getServerAddressURL()
-  {
-    return serverAddressURL;
+    // There is no other status possible for the ECL Server Handler to
+    // be normally connected.
+    return ServerStatus.NORMAL_STATUS;
   }
   /**
    * {@inheritDoc}
@@ -684,29 +830,34 @@
   {
 
     //
-    this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
-    this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
-    this.setConsumerActive(true);
-    this.cLSearchCtxt.searchPhase = 1;
+    //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
+    //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
+    //this.setConsumerActive(true);
+
     this.operationId = startECLSessionMsg.getOperationId();
     this.setName(this.getClass().getCanonicalName()+ " " + operationId);
 
-    if (eligibleCNComputerThread==null)
-      eligibleCNComputerThread = new ECLEligibleCNComputerThread();
-
-    cLSearchCtxt.isPersistent  = startECLSessionMsg.isPersistent();
-    cLSearchCtxt.stopSeqnum    = startECLSessionMsg.getLastDraftChangeNumber();
-    cLSearchCtxt.searchPhase   = 1;
-    cLSearchCtxt.currentCookie = new MultiDomainServerState(
+    isPersistent  = startECLSessionMsg.isPersistent();
+    lastDraftCN   = startECLSessionMsg.getLastDraftChangeNumber();
+    searchPhase   = INIT_PHASE;
+    previousCookie = new MultiDomainServerState(
         startECLSessionMsg.getCrossDomainServerState());
-    cLSearchCtxt.excludedServiceIDs=startECLSessionMsg.getExcludedServiceIDs();
+    excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
+    replicationServer.disableEligibility(excludedServiceIDs);
+    eligibleCN = replicationServer.getEligibleCN();
 
-    //--
-    if (startECLSessionMsg.getECLRequestType()==0)
+    if (startECLSessionMsg.getECLRequestType()==
+      StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE)
     {
       initializeCLSearchFromGenState(
           startECLSessionMsg.getCrossDomainServerState());
     }
+    else if (startECLSessionMsg.getECLRequestType()==
+      StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER)
+    {
+      initializeCLSearchFromDraftCN(
+          startECLSessionMsg.getFirstDraftChangeNumber());
+    }
 
     if (session != null)
     {
@@ -735,21 +886,15 @@
       // Resume the writer
       ((ECLServerWriter)writer).resumeWriter();
 
-      // FIXME:ECL Potential race condition if writer not yet resumed here
+      // TODO:ECL Potential race condition if writer not yet resumed here
     }
 
-    if (cLSearchCtxt.isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
+    if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
     {
-      closePhase1();
+      closeInitPhase();
     }
 
-    /* TODO: Good Draft Compat
-    //--
-    if (startCLMsg.getStartMode()==1)
-    {
-      initializeCLSearchFromProvidedSeqnum(startCLMsg.getSequenceNumber());
-    }
-
+    /* TODO: From replication changenumber
     //--
     if (startCLMsg.getStartMode()==2)
     {
@@ -762,14 +907,14 @@
     {
       // to get the CL first and last
       initializeCLDomCtxts(null); // from start
-      ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN();
+      ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN();
 
       try
       {
         // to get the CL first and last
-        // last rely on the crossDomainElligibleCN thhus must have been
+        // last rely on the crossDomainEligibleCN thhus must have been
         // computed before
-        int[] limits = computeCLLimits(crossDomainElligibleCN);
+        int[] limits = computeCLLimits(crossDomainEligibleCN);
         // Send the response
         CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
         session.publish(msg);
@@ -793,11 +938,17 @@
       }
       return;
     }
-    Good Draft Compat */
+     */
 
     // Store into domain
     registerIntoDomain();
 
+    if (debugEnabled())
+      TRACER.debugInfo(
+          this.getName() + " initialized: " +
+          " " + dumpState() + " " +
+          " " + clDomCtxtsToString(""));
+
   }
 
   /**
@@ -812,30 +963,12 @@
   throws DirectoryException
   {
     boolean interrupted = true;
-    ECLUpdateMsg msg = getnextUpdate();
+    ECLUpdateMsg msg = getNextECLUpdate();
 
-    // FIXME:ECL We should refactor so that a SH always have a session
+    // TODO:ECL We should refactor so that a SH always have a session
     if (session == null)
       return msg;
 
-    /*
-     * When we remove a message from the queue we need to check if another
-     * server is waiting in flow control because this queue was too long.
-     * This check might cause a performance penalty an therefore it
-     * is not done for every message removed but only every few messages.
-     */
-    /** FIXME:ECL checkAllSaturation makes no sense for ECLServerHandler ?
-    if (++saturationCount > 10)
-    {
-      saturationCount = 0;
-      try
-      {
-        replicationServerDomain.checkAllSaturation();
-      } catch (IOException e)
-      {
-      }
-    }
-    */
     boolean acquired = false;
     do
     {
@@ -857,16 +990,17 @@
 
   /**
    * Get the next message - non blocking - null when none.
-   *
-   * @param synchronous - not used - always non blocking.
-   * @return the next message - null when none.
+   * This method is currently not used but we don't want to keep the mother
+   * class method since it make no sense for ECLServerHandler.
+   * @param synchronous - not used
+   * @return the next message
    */
   protected UpdateMsg getnextMessage(boolean synchronous)
   {
     UpdateMsg msg = null;
     try
     {
-      ECLUpdateMsg eclMsg = getnextUpdate();
+      ECLUpdateMsg eclMsg = getNextECLUpdate();
       if (eclMsg!=null)
         msg = eclMsg.getUpdateMsg();
     }
@@ -878,60 +1012,24 @@
   }
 
   /**
-   * Get the next external changelog update.
-   *
-   * @return    The ECL update, null when none.
-   * @exception DirectoryException when any problem occurs.
+   * Returns the next update message for the External Changelog (ECL).
+   * @return the ECL update message, null when there aren't anymore.
+   * @throws DirectoryException when an error occurs.
    */
-  protected ECLUpdateMsg getnextUpdate()
+  public ECLUpdateMsg getNextECLUpdate()
   throws DirectoryException
   {
-    return getGeneralizedNextECLUpdate(this.cLSearchCtxt);
-  }
+    ECLUpdateMsg oldestChange = null;
 
-  /**
-   * Computes the cross domain eligible message (non blocking).
-   * Return null when search is covered
-   */
-  private ECLUpdateMsg getGeneralizedNextECLUpdate(CLTraverseCtxt cLSearchCtxt)
-  throws DirectoryException
-  {
-    ECLUpdateMsg theOldestChange = null;
-    try
-    {
+    if (debugEnabled())
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
           getMonitorInstanceName() + "," + this +
-          " getGeneralizedNextECLUpdate starts with ctxt="
-          + cLSearchCtxt);
+          " getNextECLUpdate starts: " + dumpState());
 
-      if ((cLSearchCtxt.nextSeqnum != -1) &&
-          (!cLSearchCtxt.endOfSeqnumdbReached))
-      {
-        /* TODO:ECL G Good changelog draft compat.
-        // First time , initialise the cursor to traverse the seqnumdb
-        if (seqnumDbReadIterator == null)
-        {
-          try
-          {
-          seqnumDbReadIterator = replicationServerDomain.getReplicationServer().
-            getSeqnumDbHandler().generateIterator(cLSearchCtxt.nextSeqnum);
-            TRACER.debugInfo("getGeneralizedNextMessage(): "
-                + " creates seqnumDbReadIterator from nextSeqnum="
-                + cLSearchCtxt.nextSeqnum
-                + " 1rst=" + seqnumDbReadIterator.getSeqnum()
-                + " CN=" + seqnumDbReadIterator.getChangeNumber()
-                + cLSearchCtxt);
-          }
-          catch(Exception e)
-          {
-            cLSearchCtxt.endOfSeqnumdbReached = true;
-          }
-        }
-        */
-      }
+    try
+    {
 
-
-      // Search / no seqnum / not persistent
+      // Search / no DraftCN / not persistent
       // -----------------------------------
       //  init: all domain are candidate
       //    get one msg from each
@@ -958,136 +1056,136 @@
       //    if one domain has no msg, still is candidate
 
       int iDom = 0;
-      boolean nextclchange = true;
-      while ((nextclchange) && (cLSearchCtxt.searchPhase==1))
+      boolean continueLooping = true;
+      while ((continueLooping) && (searchPhase == INIT_PHASE))
       {
-
         // Step 1 & 2
-        if (cLSearchCtxt.searchPhase==1)
+        if (searchPhase == INIT_PHASE)
         {
-          if (debugEnabled())
-            clDomCtxtsToString("In getGeneralizedNextMessage : " +
-              "looking for the generalized oldest change");
+          // Normally we whould not loop .. except ...
+          continueLooping = false;
 
-          // Retrieves the index in the subx table of the
-          // generalizedOldestChange
-          iDom = getGeneralizedOldestChange();
+          iDom = getOldestChangeFromDomainCtxts();
 
-          // idomain != -1 means that we got one
-          // generalized oldest change to process
-          if (iDom==-1)
+          // iDom == -1 means that there is no oldest change to process
+          if (iDom == -1)
           {
-            closePhase1();
+            closeInitPhase();
 
-            // signify end of phase 1 to the caller
+            // signals end of phase 1 to the caller
             return null;
           }
 
-          // idomain != -1 means that we got one
-          // generalized oldest change to process
-          String suffix = this.clDomCtxts[iDom].rsd.getBaseDn();
-          theOldestChange = new ECLUpdateMsg(
-              (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg,
-              null, // set later
-              suffix);
+          // Build the ECLUpdateMsg to be returned
+          oldestChange = new ECLUpdateMsg(
+              (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
+              null, // cookie will be set later
+              domainCtxts[iDom].rsd.getBaseDn(),
+              0); //  draftChangeNumber may be set later
+          domainCtxts[iDom].nextMsg = null;
 
-          nextclchange = false;
-
-          /* TODO:ECL G Good change log draft compat.
-          if (cLSearchCtxt.nextSeqnum!=-1)
+          if (draftCompat)
           {
-            // Should either retrieve or generate a seqnum
-            // we also need to check if the seqnumdb is acccurate reagrding
+            // either retrieve a draftCN from the draftCNDb
+            // or assign a new draftCN and store in the db
+
+            DraftCNDbHandler draftCNDb=replicationServer.getDraftCNDbHandler();
+
+            // We also need to check if the draftCNdb is consistent with
             // the changelogdb.
             // if not, 2 potential reasons
-            // -1 : purge from the changelog .. let's traverse the seqnumdb
-            // -2 : changelog is late .. let's traverse the changelog
+            // a/ : changelog has been purged (trim)let's traverse the draftCNDb
+            // b/ : changelog is late .. let's traverse the changelogDb
+            // The following loop allows to loop until being on the same cn
+            // in the 2 dbs
 
             // replogcn : the oldest change from the changelog db
-            ChangeNumber replogcn = theOldestChange.getChangeNumber();
-            DN replogReplDomDN = clDomCtxts[iDom].rsd.getBaseDn();
+            ChangeNumber cnFromChangelogDb =
+              oldestChange.getUpdateMsg().getChangeNumber();
+            String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn();
 
             while (true)
             {
-              if (!cLSearchCtxt.endOfSeqnumdbReached)
+              if (!isEndOfDraftCNReached)
               {
-                // we did not reach yet the end of the seqnumdb
+                // we did not reach yet the end of the DraftCNdb
 
-                // seqnumcn : the next change from from the seqnum db
-                ChangeNumber seqnumcn = seqnumDbReadIterator.getChangeNumber();
+                // the next change from the DraftCN db
+                ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber();
+                String dnFromDraftCNDb = draftCNDbIter.getServiceID();
 
-                // are replogcn and seqnumcn should be the same change ?
-                int cmp = replogcn.compareTo(seqnumcn);
-                DN seqnumReplDomDN=DN.decode(seqnumDbReadIterator.
-                getDomainDN());
+                // are replogcn and DraftCNcn should be the same change ?
+                int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb);
+                int areDNEqual = dnFromChangelogDb.compareTo(dnFromDraftCNDb);
 
-                TRACER.debugInfo("seqnumgen: comparing the 2 db "
-                    + " changelogdb:" + replogReplDomDN + "=" + replogcn
-                    + " ts=" +
-                    new Date(replogcn.getTime()).toString()
-                    + "## seqnumdb:" + seqnumReplDomDN + "=" + seqnumcn
-                    + " ts=" +
-                    new Date(seqnumcn.getTime()).toString()
-                    + " sn older=" + seqnumcn.older(replogcn));
+                if (debugEnabled())
+                  TRACER.debugInfo("getNextECLUpdate generating draftCN "
+                    + " comparing the 2 db DNs :"
+                    + dnFromChangelogDb + "?=" + cnFromChangelogDb
+                    + " timestamps:" + new Date(cnFromChangelogDb.getTime())
+                    + " ?older" +   new Date(cnFromDraftCNDb.getTime()));
 
-                if ((replogReplDomDN.compareTo(seqnumReplDomDN)==0) && (cmp==0))
+                if ((areDNEqual==0) && (areCNEqual==0))
                 {
                   // same domain and same CN => same change
 
-                  // assign the seqnum from the seqnumdb
-                  // to the change from the changelogdb
+                  // assign the DraftCN found to the change from the changelogdb
+                  if (debugEnabled())
+                    TRACER.debugInfo("getNextECLUpdate generating draftCN "
+                      + " assigning draftCN=" + draftCNDbIter.getDraftCN()
+                      + " to change=" + oldestChange);
 
-                  TRACER.debugInfo("seqnumgen: assigning seqnum="
-                      + seqnumDbReadIterator.getSeqnum()
-                      + " to change=" + theOldestChange);
-                  theOldestChange.setSeqnum(seqnumDbReadIterator.getSeqnum());
+                  oldestChange.setDraftChangeNumber(
+                      draftCNDbIter.getDraftCN());
 
-                  // prepare the next seqnum for the potential next change added
-                  // to the seqnumDb
-                  cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum()
-                  + 1;
                   break;
                 }
                 else
                 {
-                  // replogcn and seqnumcn are NOT the same change
-                  if (seqnumcn.older(replogcn))
+                  // replogcn and DraftCNcn are NOT on the same change
+                  if (cnFromDraftCNDb.older(cnFromChangelogDb))
                   {
-                    // the change from the seqnumDb is older
+                    // the change from the DraftCNDb is older
                     // that means that the change has been purged from the
-                    // changelog
+                    // changelogDb (and DraftCNdb not yet been trimed)
+
                     try
                     {
-                      // let's traverse the seqnumdb searching for the change
+                      // let's traverse the DraftCNdb searching for the change
                       // found in the changelogDb.
-                      TRACER.debugInfo("seqnumgen: will skip "
-                          + seqnumcn  + " and next from  the seqnum");
-                      cLSearchCtxt.endOfSeqnumdbReached =
-                        (seqnumDbReadIterator.next()==false);
-                      TRACER.debugInfo("seqnumgen: has nexted cr to "
-                          + " sn=" + seqnumDbReadIterator.getSeqnum()
-                          + " cn=" + seqnumDbReadIterator.getChangeNumber()
-                          + " and reached end "
-                          + " of seqnumdb:"+cLSearchCtxt.endOfSeqnumdbReached);
-                      if (cLSearchCtxt.endOfSeqnumdbReached)
+                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
+                          + " will skip " + cnFromDraftCNDb
+                          + " and read next change from the DraftCNDb.");
+
+                      isEndOfDraftCNReached = (draftCNDbIter.next()==false);
+
+                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
+                          + " has skiped to "
+                          + " sn=" + draftCNDbIter.getDraftCN()
+                          + " cn=" + draftCNDbIter.getChangeNumber()
+                          + " End of draftCNDb ?"+isEndOfDraftCNReached);
+
+                      if (isEndOfDraftCNReached)
                       {
-                        // we are at the end of the seqnumdb in the append mode
-                        // store in seqnumdb the pair
-                        // seqnum of the cur change,state before this change)
-                        replicationServerDomain.addSeqnum(
-                            cLSearchCtxt.nextSeqnum,
-                            getGenState(),
-                      clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
-                            theOldestChange.getChangeNumber());
-                        theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
-                        cLSearchCtxt.nextSeqnum++;
+                        // we are at the end of the DraftCNdb in the append mode
+
+                        // generate a new draftCN and assign to this change
+                        oldestChange.setDraftChangeNumber(
+                            replicationServer.getNewDraftCN());
+
+                        // store in DraftCNdb the pair
+                        // (draftCN_of_the_cur_change, state_before_this_change)
+                        draftCNDb.add(
+                            oldestChange.getDraftChangeNumber(),
+                            previousCookie.toString(),
+                            oldestChange.getServiceId(),
+                            oldestChange.getUpdateMsg().getChangeNumber());
+
                         break;
                       }
                       else
                       {
-                        // next change from seqnumdb
-                        cLSearchCtxt.nextSeqnum =
-                          seqnumDbReadIterator.getSeqnum() + 1;
+                        // let's go to test this new change fro the DraftCNdb
                         continue;
                       }
                     }
@@ -1100,108 +1198,99 @@
                     // the change from the changelogDb is older
                     // it should have been stored lately
                     // let's continue to traverse the changelogdb
-                    TRACER.debugInfo("seqnumgen: will skip "
-                        + replogcn  + " and next from  the CL");
-                    nextclchange = true;
+                    TRACER.debugInfo("getNextECLUpdate: will skip "
+                        + cnFromChangelogDb
+                        + " and read next from the regular changelog.");
+                    continueLooping = true;
                     break; // TO BE CHECKED
                   }
                 }
               }
               else
               {
-                // we are at the end of the seqnumdb in the append mode
-                // store in seqnumdb the pair
-                // (seqnum of the current change, state before this change)
-                replicationServerDomain.addSeqnum(
-                    cLSearchCtxt.nextSeqnum,
-                    getGenState(),
-                    clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
-                    theOldestChange.getChangeNumber());
-                theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
-                cLSearchCtxt.nextSeqnum++;
+                // we are at the end of the DraftCNdb in the append mode
+                // store in DraftCNdb the pair
+                // (DraftCN of the current change, state before this change)
+                oldestChange.setDraftChangeNumber(
+                    replicationServer.getNewDraftCN());
+
+                draftCNDb.add(
+                    oldestChange.getDraftChangeNumber(),
+                    this.previousCookie.toString(),
+                    domainCtxts[iDom].rsd.getBaseDn(),
+                    oldestChange.getUpdateMsg().getChangeNumber());
+
                 break;
               }
-            } // while seqnum
-          } // nextseqnum !- -1
-          */
+            } // while DraftCN
+          }
 
-          // here we have the right oldest change and in the seqnum case we
-          // have its seqnum
+          // here we have the right oldest change
+          // and in the draft case, we have its draft changenumber
 
           // Set and test the domain of the oldestChange see if we reached
           // the end of the phase for this domain
-          clDomCtxts[iDom].currentState.update(
-              theOldestChange.getUpdateMsg().getChangeNumber());
+          domainCtxts[iDom].currentState.update(
+              oldestChange.getUpdateMsg().getChangeNumber());
 
-          if (clDomCtxts[iDom].currentState.cover(clDomCtxts[iDom].stopState))
+          if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState))
           {
-            clDomCtxts[iDom].active = false;
+            domainCtxts[iDom].active = false;
           }
 
-          // Test the seqnum of the oldestChange see if we reached
-          // the end of operation
-          /* TODO:ECL G Good changelog draft compat. Not yet implemented
-          if ((cLSearchCtxt.stopSeqnum>0) &&
-              (theOldestChange.getSeqnum()>=cLSearchCtxt.stopSeqnum))
-          {
-            closePhase1();
-
-            // means end of phase 1 to the calling writer
-            return null;
-          }
-          */
-
-          if (clDomCtxts[iDom].active)
+          if (domainCtxts[iDom].active)
           {
             // populates the table with the next eligible msg from idomain
             // in non blocking mode, return null when no more eligible msg
-            getNextElligibleMessage(iDom);
+            domainCtxts[iDom].getNextEligibleMessageForDomain(operationId);
           }
-        } // phase ==1
-      } // while (nextclchange)
+        } // phase == INIT_PHASE
+      } // while (...)
 
-      if (cLSearchCtxt.searchPhase==2)
+      if (searchPhase == PERSISTENT_PHASE)
       {
-        clDomCtxtsToString("In getGeneralizedNextMessage (persistent): " +
-        "looking for the generalized oldest change");
+        if (debugEnabled())
+          clDomCtxtsToString("In getNextECLUpdate (persistent): " +
+          "looking for the generalized oldest change");
 
-        for (int ido=0; ido<clDomCtxts.length; ido++)
+        for (int ido=0; ido<domainCtxts.length; ido++)
         {
           // get next msg
-          getNextElligibleMessage(ido);
+          domainCtxts[ido].getNextEligibleMessageForDomain(operationId);
         }
 
         // take the oldest one
-        iDom = getGeneralizedOldestChange();
+        iDom = getOldestChangeFromDomainCtxts();
 
         if (iDom != -1)
         {
-          String suffix = this.clDomCtxts[iDom].rsd.getBaseDn();
+          String suffix = this.domainCtxts[iDom].rsd.getBaseDn();
 
-          theOldestChange = new ECLUpdateMsg(
-              (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg,
+          oldestChange = new ECLUpdateMsg(
+              (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
               null, // set later
-              suffix);
+              suffix, 0);
+          domainCtxts[iDom].nextMsg = null; // clean
 
-          clDomCtxts[iDom].currentState.update(
-              theOldestChange.getUpdateMsg().getChangeNumber());
+          domainCtxts[iDom].currentState.update(
+              oldestChange.getUpdateMsg().getChangeNumber());
 
-          /* TODO:ECL G Good changelog draft compat.
-          if (cLSearchCtxt.nextSeqnum!=-1)
+          if (draftCompat)
           {
-            // should generate seqnum
+            // should generate DraftCN
+            DraftCNDbHandler draftCNDb =replicationServer.getDraftCNDbHandler();
 
-            // store in seqnumdb the pair
-            // (seqnum of the current change, state before this change)
-            replicationServerDomain.addSeqnum(
-                cLSearchCtxt.nextSeqnum,
-                getGenState(),
-                clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
-                theOldestChange.getChangeNumber());
-            theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
-            cLSearchCtxt.nextSeqnum++;
+            oldestChange.setDraftChangeNumber(
+                replicationServer.getNewDraftCN());
+
+            // store in DraftCNdb the pair
+            // (DraftCN of the current change, state before this change)
+            draftCNDb.add(
+                oldestChange.getDraftChangeNumber(),
+                this.previousCookie.toString(),
+                domainCtxts[iDom].rsd.getBaseDn(),
+                oldestChange.getUpdateMsg().getChangeNumber());
           }
-          */
         }
       }
     }
@@ -1214,39 +1303,48 @@
           e);
     }
 
-    if (theOldestChange != null)
+    if (oldestChange != null)
     {
+      if (debugEnabled())
+        TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
+          + oldestChange.getUpdateMsg().getChangeNumber());
+
       // Update the current state
-      this.cLSearchCtxt.currentCookie.update(
-          theOldestChange.getServiceId(),
-          theOldestChange.getUpdateMsg().getChangeNumber());
+      previousCookie.update(
+          oldestChange.getServiceId(),
+          oldestChange.getUpdateMsg().getChangeNumber());
 
       // Set the current value of global state in the returned message
-      theOldestChange.setCookie(this.cLSearchCtxt.currentCookie);
+      oldestChange.setCookie(previousCookie);
+
+      if (debugEnabled())
+        TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
+                oldestChange);
+
     }
-    return theOldestChange;
+    return oldestChange;
   }
 
   /**
    * Terminates the first (non persistent) phase of the search on the ECL.
    */
-  private void closePhase1()
+  private void closeInitPhase()
   {
     // starvation of changelog messages
     // all domain have been unactived means are covered
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName() + "," + this + " closePhase1()"
-        + " searchCtxt=" + cLSearchCtxt);
+          getMonitorInstanceName() + "," + this + " closeInitPhase(): "
+          + dumpState());
 
     // go to persistent phase if one
-    for (int i=0; i<clDomCtxts.length; i++)
-      clDomCtxts[i].active = true;
+    for (int i=0; i<domainCtxts.length; i++)
+      domainCtxts[i].active = true;
 
-    if (this.cLSearchCtxt.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
+    if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
     {
-      // phase = 1 done AND persistent search => goto phase 2
-      cLSearchCtxt.searchPhase=2;
+      // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
+      searchPhase = PERSISTENT_PHASE;
 
       if (writer ==null)
       {
@@ -1257,269 +1355,53 @@
     }
     else
     {
-      // phase = 1 done AND !persistent search => reinit to phase 0
-      cLSearchCtxt.searchPhase=0;
+      // INIT_PHASE is done AND search is not persistent => reinit
+      searchPhase = UNDEFINED_PHASE;
     }
 
-    /* TODO:ECL G Good changelog draft compat.
-    if (seqnumDbReadIterator!=null)
+    if (draftCNDbIter!=null)
     {
-      // End of phase 1 => always release the seqnum iterator
-      seqnumDbReadIterator.releaseCursor();
-      seqnumDbReadIterator = null;
+      // End of INIT_PHASE => always release the iterator
+      draftCNDbIter.releaseCursor();
+      draftCNDbIter = null;
     }
-     */
   }
 
   /**
-   * Get the oldest change contained in the subx table.
-   * The subx table should be populated before
-   * @return the oldest change.
+   * Get the index in the domainCtxt table of the domain with the oldest change.
+   * @return the index of the domain with the oldest change, -1 when none.
    */
-  private int getGeneralizedOldestChange()
+  private int getOldestChangeFromDomainCtxts()
   {
     int oldest = -1;
-    for (int i=0; i<clDomCtxts.length; i++)
+    for (int i=0; i<domainCtxts.length; i++)
     {
-      if ((clDomCtxts[i].active))
+      if ((domainCtxts[i].active))
       {
         // on the first loop, oldest==-1
         // .msg is null when the previous (non blocking) nextMessage did
         // not have any eligible msg to return
-        if (clDomCtxts[i].nextMsg != null)
+        if (domainCtxts[i].nextMsg != null)
         {
           if ((oldest==-1) ||
-              (clDomCtxts[i].nextMsg.compareTo(clDomCtxts[oldest].nextMsg)<0))
+              (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0))
           {
             oldest = i;
           }
         }
       }
     }
+
     if (debugEnabled())
       TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
-        getMonitorInstanceName()
-        + "," + this + " getGeneralizedOldestChange() " +
-        " returns " +
-        ((oldest!=-1)?clDomCtxts[oldest].nextMsg:""));
+          getMonitorInstanceName()
+          + "," + this + " getOldestChangeFromDomainCtxts() returns " +
+          ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
 
     return oldest;
   }
 
   /**
-   * Get from the provided domain, the next message elligible regarding
-   * the crossDomain elligible CN. Put it in the context table.
-   * @param idomain the provided domain.
-   */
-  private void getNextElligibleMessage(int idomain)
-  {
-    ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN();
-    try
-    {
-      if (clDomCtxts[idomain].nonElligiblemsg != null)
-      {
-        TRACER.debugInfo("getNextElligibleMessage tests if the already " +
-            " stored nonElligibleMsg has becoem elligible regarding " +
-            " the crossDomainElligibleCN ("+crossDomainElligibleCN +
-            " ) " +
-            clDomCtxts[idomain].nonElligiblemsg.getChangeNumber().
-            older(crossDomainElligibleCN));
-        // we already got the oldest msg and it was not elligible
-        if (clDomCtxts[idomain].nonElligiblemsg.getChangeNumber().
-            older(crossDomainElligibleCN))
-        {
-          // it is now elligible
-          clDomCtxts[idomain].nextMsg = clDomCtxts[idomain].nonElligiblemsg;
-          clDomCtxts[idomain].nonElligiblemsg = null;
-        }
-        else
-        {
-          // the oldest is still not elligible - let's wait next
-        }
-      }
-      else
-      {
-        // non blocking
-        UpdateMsg newMsg = clDomCtxts[idomain].mh.getnextMessage(false);
-        if (debugEnabled())
-          TRACER.debugInfo(this +
-            " getNextElligibleMessage got the next changelogmsg "
-            + " from " + clDomCtxts[idomain].mh.getServiceId()
-            + " newCLMsg=" + newMsg);
-        clDomCtxts[idomain].nextMsg =
-          clDomCtxts[idomain].nonElligiblemsg = null;
-        // in non blocking mode, return null when no more msg
-        if (newMsg != null)
-        {
-          /* TODO:ECL Take into account eligibility.
-          TRACER.debugInfo("getNextElligibleMessage is "
-              + newMsg.getChangeNumber()
-              + new Date(newMsg.getChangeNumber().getTime()).toString()
-              + " elligible "
-              + newMsg.getChangeNumber().older(crossDomainElligibleCN));
-          if (newMsg.getChangeNumber().older(crossDomainElligibleCN))
-          {
-            // is elligible
-            clDomCtxts[idomain].nextMsg = newMsg;
-          }
-          else
-          {
-            // is not elligible
-            clDomCtxts[idomain].nonElligiblemsg = newMsg;
-          }
-          */
-          clDomCtxts[idomain].nextMsg = newMsg;
-        }
-      }
-    }
-    catch(Exception e)
-    {
-      TRACER.debugCaught(DebugLogLevel.ERROR, e);
-    }
-  }
-
-  /*
-   */
-  ECLEligibleCNComputerThread eligibleCNComputerThread = null;
-  ChangeNumber liveecn;
-  private ChangeNumber computeCrossDomainElligibleCN()
-  {
-    return liveecn;
-  }
-
-
-  /**
-   * This class specifies the thread that computes periodically
-   * the cross domain eligible CN.
-   */
-  private final class ECLEligibleCNComputerThread
-    extends DirectoryThread
-  {
-    /**
-     * The tracer object for the debug logger.
-     */
-    private boolean shutdown = false;
-
-    private ECLEligibleCNComputerThread()
-    {
-      super("ECL eligible CN computer thread");
-      start();
-    }
-
-    public void run()
-    {
-      while (shutdown == false)
-      {
-        try {
-          synchronized (this)
-          {
-            liveecn = computeNewCrossDomainElligibleCN();
-            try
-            {
-              this.wait(1000);
-            } catch (InterruptedException e)
-            { }
-          }
-        } catch (Exception end)
-        {
-          break;
-        }
-      }
-    }
-
-    private ChangeNumber computeNewCrossDomainElligibleCN()
-    {
-      ChangeNumber computedCrossDomainElligibleCN = null;
-      String s = "=> ";
-
-      ReplicationServer rs = replicationServerDomain.getReplicationServer();
-
-      if (debugEnabled())
-        TRACER.debugInfo("ECLSH.computeNewCrossDomainElligibleCN() "
-          + " periodic starts rs="+rs);
-
-      Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator();
-      if (rsdi != null)
-      {
-        while (rsdi.hasNext())
-        {
-          ReplicationServerDomain domain = rsdi.next();
-          if (domain.getBaseDn().equalsIgnoreCase("cn=changelog"))
-            continue;
-
-          ChangeNumber domainElligibleCN = computeEligibleCN(domain);
-          if (domainElligibleCN==null)
-            continue;
-          if ((computedCrossDomainElligibleCN == null) ||
-              (domainElligibleCN.older(computedCrossDomainElligibleCN)))
-          {
-            computedCrossDomainElligibleCN = domainElligibleCN;
-          }
-          s += "\n DN:" + domain.getBaseDn()
-          + "\t\t domainElligibleCN :" + domainElligibleCN
-          + "/" +
-          new Date(domainElligibleCN.getTime()).toString();
-        }
-      }
-
-      if (debugEnabled())
-        TRACER.debugInfo("SH.computeNewCrossDomainElligibleCN() periodic " +
-          " ends with " +
-          " the following domainElligibleCN for each domain :" + s +
-          "\n thus CrossDomainElligibleCN=" + computedCrossDomainElligibleCN +
-          "  ts=" +
-          new Date(computedCrossDomainElligibleCN.getTime()).toString());
-
-      return computedCrossDomainElligibleCN;
-    }
-  }
-
-  /**
-   * Compute the eligible CN.
-   * @param rsd The provided replication server domain for which we want
-   * to retrieve the eligible date.
-   * @return null if the domain does not play in eligibility.
-   */
-  public ChangeNumber computeEligibleCN(ReplicationServerDomain rsd)
-  {
-    ChangeNumber elligibleCN = null;
-    ServerState heartbeatState = rsd.getHeartbeatState();
-    if (heartbeatState==null)
-      return null;
-
-    // compute elligible CN
-    ServerState hbState = heartbeatState.duplicate();
-
-    Iterator<Short> it = hbState.iterator();
-    while (it.hasNext())
-    {
-      short sid = it.next();
-      ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
-
-      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
-      // then the server is considered down and not considered for eligibility
-      if (TimeThread.getTime()-storedCN.getTime()>2000)
-      {
-        if (debugEnabled())
-          TRACER.debugInfo(
-            "For RSD." + rsd.getBaseDn() + " Server " + sid
-            + " is not considered for eligibility ... potentially down");
-        continue;
-      }
-
-      if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
-      {
-        elligibleCN = storedCN;
-      }
-    }
-
-    if (debugEnabled())
-      TRACER.debugInfo(
-        "For RSD." + rsd.getBaseDn() + " ElligibleCN()=" + elligibleCN);
-    return elligibleCN;
-  }
-
-  /**
    * Returns the client operation id.
    * @return The client operation id.
    */
@@ -1533,7 +1415,7 @@
    * @return Whether the current search is persistent or not.
    */
   public short isPersistent() {
-    return this.cLSearchCtxt.isPersistent;
+    return this.isPersistent;
   }
 
   /**
@@ -1541,7 +1423,137 @@
    * @return Whether the current search is persistent or not.
    */
   public int getSearchPhase() {
-    return this.cLSearchCtxt.searchPhase;
+    return this.searchPhase;
   }
 
+  /**
+   * Refresh the eligibleCN by requesting the replication server.
+   */
+  public void refreshEligibleCN()
+  {
+    eligibleCN = replicationServer.getEligibleCN();
+  }
+
+  /*
+   * Get first and last DraftCN
+   * @param crossDomainEligibleCN
+   * @return
+   */
+  private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN)
+  throws DirectoryException
+  {
+    /* The content of the DraftCNdb depends on the SEARCH operations done before
+     * requesting the DraftCN. If no operations, DraftCNdb is empty.
+     * The limits we want to get are the "potential" limits if a request was
+     * done, the DraftCNdb is probably not complete to do that.
+     *
+     * The first DraftCN is :
+     *  - the first record from the DraftCNdb
+     *  - if none because DraftCNdb empty,
+     *      then
+     *        if no change in replchangelog then return 0
+     *        else return 1 (DraftCN that WILL be returned to next search)
+     *
+     * The last DraftCN is :
+     *  - initialized with the last record from the DraftCNdb (0 if none)
+     *    and consider the genState associated
+     *  - to the last DraftCN, we add the count of updates in the replchangelog
+     *     FROM that genState TO the crossDomainEligibleCN
+     *     (this diff is done domain by domain)
+     */
+
+    int firstDraftCN;
+    int lastDraftCN;
+    boolean DraftCNdbIsEmpty;
+    DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler();
+
+    ReplicationServer rs = replicationServerDomain.getReplicationServer();
+
+    // Get the first DraftCN from the DraftCNdb
+    firstDraftCN = draftCNDbH.getFirstKey();
+    HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
+    if (firstDraftCN < 1)
+    {
+      DraftCNdbIsEmpty=true;
+      firstDraftCN = 0;
+      lastDraftCN = 0;
+    }
+    else
+    {
+      DraftCNdbIsEmpty=false;
+
+      // Get the last DraftCN from the DraftCNdb
+      lastDraftCN = draftCNDbH.getLastKey();
+
+      // Get the generalized state associated with the current last DraftCN
+      // and initializes from it the startStates table
+      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
+      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
+      {
+        domainsServerStateForLastSeqnum = MultiDomainServerState.
+          splitGenStateToServerStates(lastSeqnumGenState);
+      }
+    }
+
+    // Domain by domain
+    Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
+    if (rsdi != null)
+    {
+      while (rsdi.hasNext())
+      {
+        // process a domain
+        ReplicationServerDomain rsd = rsdi.next();
+
+        if (isServiceIDExcluded(rsd.getBaseDn()))
+          continue;
+
+        // for this domain, have the state in the replchangelog
+        // where the last DraftCN update is
+        ServerState domainServerStateForLastSeqnum;
+        if ((domainsServerStateForLastSeqnum == null) ||
+            (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
+        {
+          domainServerStateForLastSeqnum = new ServerState();
+        }
+        else
+        {
+          domainServerStateForLastSeqnum =
+            domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
+        }
+
+        // Count the number of (eligible) changes from this place
+        // to the eligible CN (cross server)
+        long ec = rsd.getEligibleCount(
+            domainServerStateForLastSeqnum, crossDomainEligibleCN);
+
+        // ... hum ...
+        if ((ec>0) && (DraftCNdbIsEmpty==false))
+          ec--;
+
+        // cumulates on domains
+        lastDraftCN += ec;
+
+        // DraftCN is empty and there are eligible updates in the repl changelog
+        // then init first DraftCN
+        if ((ec>0) && (firstDraftCN==0))
+          firstDraftCN = 1;
+      }
+    }
+    return new int[]{firstDraftCN, lastDraftCN};
+  }
+
+  private boolean isServiceIDExcluded(String serviceID)
+  {
+    // skip the excluded domains
+    boolean excluded = false;
+    for(String excludedServiceID : this.excludedServiceIDs)
+    {
+      if (excludedServiceID.equalsIgnoreCase(serviceID))
+      {
+        excluded=true;
+        break;
+      }
+    }
+    return excluded;
+  }
 }

--
Gitblit v1.10.0