mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
14.37.2009 5ec0cb08889c9f1a24fd4cc8b139dcdb942dd92a
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -27,12 +27,14 @@
package org.opends.server.replication.server;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -41,7 +43,6 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
@@ -52,7 +53,7 @@
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
import org.opends.server.util.TimeThread;
import org.opends.server.util.ServerConstants;
/**
 * This class defines a server handler, which handles all interaction with a
@@ -61,128 +62,234 @@
public class ECLServerHandler extends ServerHandler
{
  // Properties filled only if remote server is a RS
  private String serverAddressURL;
  // This is a string identifying the operation, provided by the client part
  // of the ECL, used to help interpretation of messages logged.
  String operationId;
  // Iterator on the draftCN database.
  private DraftCNDbIterator draftCNDbIter = null;
  boolean draftCompat = false;
  /**
   * CLDomainContext : contains the state properties for the search
   * currently being processed, by replication domain.
   * Specifies the last draft changer number (seqnum) requested.
   */
  private class CLDomainContext
  public int lastDraftCN = 0;
  /**
   * Specifies whether the draft change number (seqnum) db has been read until
   * its end.
   */
  public boolean isEndOfDraftCNReached = false;
  /**
   * Specifies whether the current search has been requested to be persistent
   * or not.
   */
  public short isPersistent;
  /**
   * Specifies the current search phase : INIT or PERSISTENT.
   */
  public int searchPhase = INIT_PHASE;
  /**
   * Specifies the cookie contained in the request, specifying where
   * to start serving the ECL.
   */
  public String startCookie;
  /**
   * Specifies the value of the cookie before the change currently processed
   * is returned. It is updated with the change number of the change
   * currently processed (thus becoming the "current" cookie just
   * before the change is returned.
   */
  public MultiDomainServerState previousCookie =
    new MultiDomainServerState();
  /**
   * Specifies the excluded DNs (like cn=admin, ...).
   */
  public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
  /**
   * Eligible changeNumber - only changes older or equal to eligibleCN
   * are published in the ECL.
   */
  public ChangeNumber eligibleCN = null;
  /**
   * Provides a string representation of this object.
   * @return the string representation.
   */
  public String dumpState()
  {
    ReplicationServerDomain rsd; // the repl server domain
    boolean active;              // is the domain still active
    MessageHandler mh;           // the message handler associated
    UpdateMsg nextMsg;
    UpdateMsg nonElligiblemsg;
    return new String(
        this.getClass().getCanonicalName() +
        "[" +
        "[draftCompat=" + draftCompat +
        "] [persistent=" + isPersistent +
        "] [lastDraftCN=" + lastDraftCN +
        "] [isEndOfDraftCNReached=" + isEndOfDraftCNReached +
        "] [searchPhase=" + searchPhase +
        "] [startCookie=" + startCookie +
        "] [previousCookie=" + previousCookie +
    "]]");
  }
  /**
   * Class that manages the 'by domain' state variables for the search being
   * currently processed on the ECL.
   * For example :
   * if search on 'cn=changelog' is being processed when 2 replicated domains
   * dc=us and dc=europe are configured, then there will be 2 DomainContext
   * used, one for ds=us, and one for dc=europe.
   */
  private class DomainContext
  {
    ReplicationServerDomain rsd;
    boolean active;              // active when there are still changes
    // supposed eligible for the ECL.
    MessageHandler mh;           // the message handler from which are read
    // the changes for this domain
    private UpdateMsg nextMsg;
    private UpdateMsg nextNonEligibleMsg;
    ServerState startState;
    ServerState currentState;
    ServerState stopState;
    /**
     * Add to the provider buffer a string representation of this object.
     * {@inheritDoc}
     */
    public void toString(StringBuilder buffer, int i)
    @Override
    public String toString()
    {
      CLDomainContext xx = clDomCtxts[i];
      StringBuilder buffer = new StringBuilder();
      toString(buffer);
      return buffer.toString();
    }
    /**
     * Provide a string representation of this object for debug purpose..
     */
    public void toString(StringBuilder buffer)
    {
      buffer.append(
          " clDomCtxts(" + i + ") [act=" + xx.active +
          " rsd=" + rsd +
          " nextMsg=" + nextMsg + "(" +
          "[ [active=" + active +
          "] [rsd=" + rsd +
          "] [nextMsg=" + nextMsg + "(" +
          (nextMsg != null?
              new Date(nextMsg.getChangeNumber().getTime()).toString():"")
              + ")" +
          " nextNonEligibleMsg="      + nonElligiblemsg +
          " startState=" + startState +
          " stopState= " + stopState +
          " currState= " + currentState + "]");
              "] [nextNonEligibleMsg="      + nextNonEligibleMsg +
              "] [startState=" + startState +
              "] [stopState= " + stopState +
              "] [currentState= " + currentState + "]]");
    }
    /**
     * Get the next message elligible regarding
     * the crossDomain elligible CN. Put it in the context table.
     * @param opid The operation id.
     */
    private void getNextEligibleMessageForDomain(String opid)
    {
      if (debugEnabled())
        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
          " getNextEligibleMessageForDomain(" + opid+ ") "
          + "ctxt=" + toString());
      assert(nextMsg == null);
      try
      {
        // Before get a new message from the domain, evaluate in priority
        // a message that has not been published to the ECL because it was
        // not eligible
        if (nextNonEligibleMsg != null)
        {
          boolean hasBecomeEligible =
            (nextNonEligibleMsg.getChangeNumber().getTime()
                <= eligibleCN.getTime());
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
                " getNextEligibleMessageForDomain(" + opid+ ") "
              + " stored nonEligibleMsg " + nextNonEligibleMsg
              + " has now become eligible regarding "
              + " the eligibleCN ("+ eligibleCN
              + " ):" + hasBecomeEligible);
          if (hasBecomeEligible)
          {
            // it is now elligible
            nextMsg = nextNonEligibleMsg;
            nextNonEligibleMsg = null;
          }
          else
          {
            // the oldest is still not elligible - let's wait next
          }
        }
        else
        {
          // Here comes a new message !!!
          // non blocking
          UpdateMsg newMsg = mh.getnextMessage(false);
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
                " getNextEligibleMessageForDomain(" + opid+ ") "
                + " got new message : "
                +  " serviceId=[" + mh.getServiceId()
                + "] [newMsg=" + newMsg + "]" + dumpState());
          // in non blocking mode, return null when no more msg
          if (newMsg != null)
          {
            boolean isEligible = (newMsg.getChangeNumber().getTime()
                <= eligibleCN.getTime());
            if (debugEnabled())
              TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId()
                + " getNextEligibleMessageForDomain(" + opid+ ") "
                + "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + newMsg.getChangeNumber()
                + " " + new Date(newMsg.getChangeNumber().getTime()).toString()
                + "] eligibleCN=[" + eligibleCN
                + " " + new Date(eligibleCN.getTime()).toString()+"]"
                + dumpState());
            if (isEligible)
            {
              nextMsg = newMsg;
            }
            else
            {
              nextNonEligibleMsg = newMsg;
            }
          }
        }
      }
      catch(Exception e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
    }
  }
  // The list of contexts by domain for the current search
  CLDomainContext[] clDomCtxts = new CLDomainContext[0];
  // The global list of contexts by domain for the search currently processed.
  DomainContext[] domainCtxts = new DomainContext[0];
  private void clDomCtxtsToString(String msg)
  private String clDomCtxtsToString(String msg)
  {
    StringBuilder buffer = new StringBuilder();
    buffer.append(msg+"\n");
    for (int i=0;i<clDomCtxts.length;i++)
    for (int i=0;i<domainCtxts.length;i++)
    {
      clDomCtxts[i].toString(buffer, i);
      domainCtxts[i].toString(buffer);
      buffer.append("\n");
    }
    TRACER.debugInfo(
        "In " + this.getName() + " clDomCtxts: " + buffer.toString());
    return buffer.toString();
  }
  /**
   * Class that manages the state variables for the current search on the ECL.
   */
  private class CLTraverseCtxt
  {
    /**
     * Specifies the next changer number (seqnum), -1 when not.
     */
    public int nextSeqnum;
    /**
     * Specifies whether the current search has been requested to be persistent
     * or not.
     */
    public short isPersistent;
    /**
     * Specifies the last changer number (seqnum) requested.
     */
    public int stopSeqnum;
    /**
     * Specifies whether the change number (seqnum) db has been read until
     * its end.
     */
    public boolean endOfSeqnumdbReached = false;
    /**
     * Specifies the current search phase.
     * 1 = init
     * 2 = persistent
     */
    public int searchPhase = 1;
    /**
     * Specifies the cookie contained in the request, specifying where
     * to start serving the ECL.
     */
    public String generalizedStartState;
    /**
     * Specifies the current cookie value.
     */
    public MultiDomainServerState currentCookie =
      new MultiDomainServerState();
    /**
     * Specifies the excluded DNs.
     */
    public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
    /**
     * Provides a string representation of this object.
     * @return the string representation.
     */
    public String toString()
    {
      return new String(
        this.getClass().getCanonicalName() +
        ":[" +
        " nextSeqnum=" + nextSeqnum +
        " persistent=" + isPersistent +
        " stopSeqnum" + stopSeqnum +
        " endOfSeqnumdbReached=" + endOfSeqnumdbReached +
        " searchPhase=" + searchPhase +
        " generalizedStartState=" + generalizedStartState +
        "]");
    }
  }
  // The context of the current search
  private CLTraverseCtxt cLSearchCtxt = new CLTraverseCtxt();
  static int UNDEFINED_PHASE = 0;
  static int INIT_PHASE = 1;
  static int PERSISTENT_PHASE = 2;
  /**
   * Starts this handler based on a start message received from remote server.
@@ -199,10 +306,6 @@
          inECLStartMsg.getVersion());
      generationId = inECLStartMsg.getGenerationId();
      serverURL = inECLStartMsg.getServerURL();
      int separator = serverURL.lastIndexOf(':');
      serverAddressURL =
        session.getRemoteAddress() + ":" + serverURL.substring(separator +
            1);
      setInitialServerState(inECLStartMsg.getServerState());
      setSendWindowSize(inECLStartMsg.getWindowSize());
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -211,8 +314,6 @@
        // Only V2 protocol has the group id in repl server start message
        this.groupId = inECLStartMsg.getGroupId();
      }
      // FIXME:ECL Any generationID must be removed, it makes no sense here.
      oldGenerationId = -100;
    }
    catch(Exception e)
    {
@@ -243,7 +344,7 @@
        replicationServer, rcvWindowSize);
    try
    {
      setServiceIdAndDomain("cn=changelog");
      setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
    }
    catch(DirectoryException de)
    {
@@ -268,12 +369,12 @@
      StartECLSessionMsg startECLSessionMsg)
  throws DirectoryException
  {
    // FIXME:ECL queueSize is hard coded to 1 else Handler hangs for some reason
    // queueSize is hard coded to 1 else super class hangs for some reason
    super(null, 1, replicationServerURL, replicationServerId,
        replicationServer, 0);
    try
    {
      setServiceIdAndDomain("cn=changelog");
      setServiceIdAndDomain(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
    }
    catch(DirectoryException de)
    {
@@ -369,115 +470,153 @@
  /**
   * Initialize the handler from a provided cookie value.
   * @param providedGeneralizedStartState The provided cookie value.
   * @param crossDomainStartState The provided cookie value.
   * @throws DirectoryException When an error is raised.
   */
  public void initializeCLSearchFromGenState(
      String providedGeneralizedStartState)
  public void initializeCLSearchFromGenState(String crossDomainStartState)
  throws DirectoryException
  {
    this.cLSearchCtxt.nextSeqnum = -1; // will not generate seqnum
    initializeCLDomCtxts(providedGeneralizedStartState);
    initializeCLDomCtxts(crossDomainStartState);
  }
  /**
   * Initialize the handler from a provided draft first change number.
   * @param startDraftCN The provided draft first change number.
   * @throws DirectoryException When an error is raised.
   */
  public void initializeCLSearchFromDraftCN(int startDraftCN)
  throws DirectoryException
  {
    String crossDomainStartState;
    draftCompat = true;
    DraftCNDbHandler draftCNDb = replicationServer.getDraftCNDbHandler();
    if (startDraftCN < 0)
    {
      // Request filter does not contain any firstDraftCN
      // So we'll generate from the beginning of what we have stored here.
      // Get the first DraftCN from DraftCNdb
      if (draftCNDb.count() == 0)
      {
        // db is empty
        isEndOfDraftCNReached = true;
        crossDomainStartState = null;
      }
      else
      {
        // get the generalizedServerState related to the start of the draftDb
        crossDomainStartState = draftCNDb.getValue(draftCNDb.getFirstKey());
        // Get an iterator to traverse the draftCNDb
        try
        {
          draftCNDbIter =
            draftCNDb.generateIterator(draftCNDb.getFirstKey());
        }
        catch(Exception e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          if (draftCNDbIter != null)
            draftCNDbIter.releaseCursor();
          throw new DirectoryException(
              ResultCode.OPERATIONS_ERROR,
              Message.raw(Category.SYNC,
                  Severity.FATAL_ERROR,"Server Error."));
        }
      }
    }
    else
    {
      // Request filter does contain a startDraftCN
      // Read the draftCNDb to see whether it contains startDraftCN
      crossDomainStartState = draftCNDb.getValue(startDraftCN);
      if (crossDomainStartState != null)
      {
        // startDraftCN is present in the draftCnDb
        // Get an iterator to traverse the draftCNDb
        try
        {
          draftCNDbIter =
            draftCNDb.generateIterator(draftCNDb.getFirstKey());
        }
        catch(Exception e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          if (draftCNDbIter != null)
            draftCNDbIter.releaseCursor();
          throw new DirectoryException(
              ResultCode.OPERATIONS_ERROR,
              Message.raw(Category.SYNC,
                  Severity.FATAL_ERROR,"Server Error."));
        }
      }
      else
      {
        // startDraftCN provided in the request is not present in the draftCnDb
        // Is the provided startDraftCN <= the potential last DraftCNdb
        // Get the draftLimits (from the eligibleCN got at the beginning of
        // the operation.
        int[] limits = getECLDraftCNLimits(eligibleCN);
        if (startDraftCN<=limits[1])
        {
          // startDraftCN is between first and last and has never been
          // returned yet
          crossDomainStartState = draftCNDb.getValue(draftCNDb.getLastKey());
          // FIXME:ECL ... ok we'll start from the end of the draftCNDb BUT ...
          // this is NOT the request of the client !!!!
        }
        else
        {
          throw new DirectoryException(
              ResultCode.SUCCESS,
              Message.raw(Category.SYNC,
                  Severity.INFORMATION,"Bad value provided for change number "
                  + " Failed to match a replication state to "+startDraftCN));
        }
      }
    }
    this.draftCompat = true;
    initializeCLDomCtxts(crossDomainStartState);
  }
  /**
   * Initialize the context for each domain.
   * @param providedGeneralizedStartState the provided generalized state
   * @param providedCookie the provided generalized state
   * @throws DirectoryException When an error occurs.
   */
  public void initializeCLDomCtxts(String providedGeneralizedStartState)
  public void initializeCLDomCtxts(String providedCookie)
  throws DirectoryException
  {
    HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
    ReplicationServer rs = replicationServerDomain.getReplicationServer();
    try
    {
      // Initialize start state for  all running domains with empty state
      Iterator<ReplicationServerDomain> rsdk = rs.getCacheIterator();
      if (rsdk != null)
      {
        while (rsdk.hasNext())
        {
          // process a domain
          ReplicationServerDomain rsd = rsdk.next();
          // skip the changelog domain
          if (rsd == this.replicationServerDomain)
            continue;
          startStates.put(rsd.getBaseDn(), new ServerState());
        }
      }
      // Overwrite start state from the cookie provided in the request
      if ((providedGeneralizedStartState != null) &&
          (providedGeneralizedStartState.length()>0))
      {
        String[] domains = providedGeneralizedStartState.split(";");
        for (String domainState : domains)
        {
          // Split baseDN and serverState
          String[] fields = domainState.split(":");
          // BaseDN - Check it
          String domainBaseDNReceived = fields[0];
          if (!startStates.containsKey(domainBaseDNReceived))
            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
                  "unknown " + domainBaseDNReceived));
          // ServerState
          ServerState domainServerState = new ServerState();
          if (fields.length>1)
          {
            String strState = fields[1];
            String[] strCN = strState.split(" ");
            for (String sr : strCN)
            {
              ChangeNumber fromChangeNumber = new ChangeNumber(sr);
              domainServerState.update(fromChangeNumber);
            }
          }
          startStates.put(domainBaseDNReceived, domainServerState);
          // FIXME: ECL first cookie value check
          // ECL For each of the provided state, it this state is older
          // than the older change stored in the replication changelog ....
          // then a purge occured since the time the cookie was published
          // it is recommended to do a full resync
          ReplicationServerDomain rsd =
            rs.getReplicationServerDomain(domainBaseDNReceived, false);
          ServerState domainStartState = rsd.getStartState();
          if (!domainServerState.cover(domainStartState))
          {
            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
                  "too old cookie provided " + providedGeneralizedStartState
                  + " first acceptable change for " + rsd.getBaseDn()
                  + " is " + rsd.getStartState()));
          }
        }
      }
    }
    catch(DirectoryException de)
    {
      throw de;
    }
    catch(Exception e)
    {
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
            "Exception raised: " + e.getMessage()));
    }
    // Parse the provided cookie and overwrite startState from it.
    if ((providedCookie != null) && (providedCookie.length()!=0))
      startStates =
        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
    try
    {
      // Now traverse all domains and build the initial changelog context
      Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator();
      // Now traverse all domains and build all the initial contexts :
      // - the global one : dumpState()
      // - the domain by domain ones : domainCtxts
      Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
      // Creates the table that will contain the real-time info by domain.
      clDomCtxts = new CLDomainContext[rs.getCacheSize()-1
                         -this.cLSearchCtxt.excludedServiceIDs.size()];
      HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
      int i =0;
      if (rsdi != null)
      {
@@ -491,73 +630,87 @@
            continue;
          // skip the excluded domains
          boolean excluded = false;
          for(String excludedServiceID : this.cLSearchCtxt.excludedServiceIDs)
          {
            if (excludedServiceID.equalsIgnoreCase(rsd.getBaseDn()))
            {
              excluded=true;
              break;
            }
          }
          if (excluded)
          if (isServiceIDExcluded(rsd.getBaseDn()))
            continue;
          // Creates the context record
          CLDomainContext newContext = new CLDomainContext();
          newContext.active = true;
          newContext.rsd = rsd;
          // Creates the new domain context
          DomainContext newDomainCtxt = new DomainContext();
          newDomainCtxt.active = true;
          newDomainCtxt.rsd = rsd;
          if (this.cLSearchCtxt.isPersistent ==
          // Assign the start state for the domain
          if (isPersistent ==
            StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
          {
            newContext.startState = rsd.getCLElligibleState();
            newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
          }
          else
          {
            newContext.startState = startStates.get(rsd.getBaseDn());
            newContext.stopState = rsd.getCLElligibleState();
            newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
            if ((providedCookie==null)||(providedCookie.isEmpty()))
              newDomainCtxt.startState = new ServerState();
            else
              if (newDomainCtxt.startState == null)
                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                    ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
                        "missing " + rsd.getBaseDn()));
            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
          }
          newContext.currentState = new ServerState();
          newDomainCtxt.currentState = new ServerState();
          // Creates an unconnected SH
          // Creates an unconnected SH for the domain
          MessageHandler mh = new MessageHandler(maxQueueSize,
              replicationServerURL, replicationServerId, replicationServer);
          // set initial state
          mh.setInitialServerState(newContext.startState);
          mh.setInitialServerState(newDomainCtxt.startState);
          // set serviceID and domain
          mh.setServiceIdAndDomain(rsd.getBaseDn());
          // register into domain
          // register the unconnected into the domain
          rsd.registerHandler(mh);
          newContext.mh = mh;
          newDomainCtxt.mh = mh;
          previousCookie.update(
              newDomainCtxt.rsd.getBaseDn(),
              newDomainCtxt.startState);
          // store the new context
          clDomCtxts[i] = newContext;
          tmpSet.add(newDomainCtxt);
          i++;
        }
      }
      // the next record from the seqnumdb should be the one
      cLSearchCtxt.endOfSeqnumdbReached = false;
      cLSearchCtxt.generalizedStartState = providedGeneralizedStartState;
      // Initializes all domain with the next elligible message
      for (int j=0; j<clDomCtxts.length; j++)
      if (!startStates.isEmpty())
      {
        this.getNextElligibleMessage(j);
        if (clDomCtxts[j].nextMsg == null)
          clDomCtxts[j].active = false;
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
            ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
                "unknown " + startStates.toString()));
      }
      domainCtxts = tmpSet.toArray(new DomainContext[0]);
      // the next record from the DraftCNdb should be the one
      startCookie = providedCookie;
      // Initializes all domain with the next(first) elligible message
      for (int j=0; j<domainCtxts.length; j++)
      {
        domainCtxts[j].getNextEligibleMessageForDomain(operationId);
        if (domainCtxts[j].nextMsg == null)
          domainCtxts[j].active = false;
      }
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      // FIXME:ECL do not publish internal exception plumb to the client
      throw new DirectoryException(
          ResultCode.OPERATIONS_ERROR,
          Message.raw(Category.SYNC, Severity.INFORMATION,"Exception raised: " +
              e.getLocalizedMessage()),
          e);
              e),
              e);
    }
    if (debugEnabled())
      TRACER.debugInfo(
        " initializeCLDomCtxts ends with " + " " + dumpState());
  }
  /**
@@ -569,21 +722,22 @@
  }
  /**
   * Shutdown this handler ServerHandler.
   * Shutdown this handler.
   */
  public void shutdown()
  {
    for (int i=0;i<clDomCtxts.length;i++)
    for (int i=0;i<domainCtxts.length;i++)
    {
      if (!clDomCtxts[i].rsd.unRegisterHandler(clDomCtxts[i].mh))
      if (!domainCtxts[i].rsd.unRegisterHandler(domainCtxts[i].mh))
      {
        TRACER.debugInfo(this +" shutdown() Internal error " +
                " when unregistering "+ clDomCtxts[i].mh);
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
            this +" shutdown() - error when unregistering handler "
            + domainCtxts[i].mh));
      }
      clDomCtxts[i].rsd.stopServer(clDomCtxts[i].mh);
      domainCtxts[i].rsd.stopServer(domainCtxts[i].mh);
    }
    super.shutdown();
    clDomCtxts = null;
    domainCtxts = null;
  }
  /**
@@ -629,7 +783,7 @@
    attributes.add(Attributes.create("External-Changelog-Server",
        serverURL));
    // FIXME:ECL No monitoring exist for ECL.
    // TODO:ECL No monitoring exist for ECL.
    return attributes;
  }
  /**
@@ -640,8 +794,9 @@
  {
    String localString;
    localString = "External changelog Server ";
    if (this.cLSearchCtxt==null)
      localString += serverId + " " + serverURL + " " + getServiceId();
    if (this.serverId != 0)
      localString += serverId + " " + serverURL + " " + getServiceId()
       + " " + this.getOperationId();
    else
      localString += this.getName();
    return localString;
@@ -652,18 +807,9 @@
   */
  public ServerStatus getStatus()
  {
    // FIXME:ECL Sould ECLServerHandler manage a ServerStatus ?
    return ServerStatus.INVALID_STATUS;
  }
  /**
   * Retrieves the Address URL for this server handler.
   *
   * @return  The Address URL for this server handler,
   *          in the form of an IP address and port separated by a colon.
   */
  public String getServerAddressURL()
  {
    return serverAddressURL;
    // There is no other status possible for the ECL Server Handler to
    // be normally connected.
    return ServerStatus.NORMAL_STATUS;
  }
  /**
   * {@inheritDoc}
@@ -684,29 +830,34 @@
  {
    //
    this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
    this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
    this.setConsumerActive(true);
    this.cLSearchCtxt.searchPhase = 1;
    //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
    //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
    //this.setConsumerActive(true);
    this.operationId = startECLSessionMsg.getOperationId();
    this.setName(this.getClass().getCanonicalName()+ " " + operationId);
    if (eligibleCNComputerThread==null)
      eligibleCNComputerThread = new ECLEligibleCNComputerThread();
    cLSearchCtxt.isPersistent  = startECLSessionMsg.isPersistent();
    cLSearchCtxt.stopSeqnum    = startECLSessionMsg.getLastDraftChangeNumber();
    cLSearchCtxt.searchPhase   = 1;
    cLSearchCtxt.currentCookie = new MultiDomainServerState(
    isPersistent  = startECLSessionMsg.isPersistent();
    lastDraftCN   = startECLSessionMsg.getLastDraftChangeNumber();
    searchPhase   = INIT_PHASE;
    previousCookie = new MultiDomainServerState(
        startECLSessionMsg.getCrossDomainServerState());
    cLSearchCtxt.excludedServiceIDs=startECLSessionMsg.getExcludedServiceIDs();
    excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
    replicationServer.disableEligibility(excludedServiceIDs);
    eligibleCN = replicationServer.getEligibleCN();
    //--
    if (startECLSessionMsg.getECLRequestType()==0)
    if (startECLSessionMsg.getECLRequestType()==
      StartECLSessionMsg.REQUEST_TYPE_FROM_COOKIE)
    {
      initializeCLSearchFromGenState(
          startECLSessionMsg.getCrossDomainServerState());
    }
    else if (startECLSessionMsg.getECLRequestType()==
      StartECLSessionMsg.REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER)
    {
      initializeCLSearchFromDraftCN(
          startECLSessionMsg.getFirstDraftChangeNumber());
    }
    if (session != null)
    {
@@ -735,21 +886,15 @@
      // Resume the writer
      ((ECLServerWriter)writer).resumeWriter();
      // FIXME:ECL Potential race condition if writer not yet resumed here
      // TODO:ECL Potential race condition if writer not yet resumed here
    }
    if (cLSearchCtxt.isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
    if (isPersistent == StartECLSessionMsg.PERSISTENT_CHANGES_ONLY)
    {
      closePhase1();
      closeInitPhase();
    }
    /* TODO: Good Draft Compat
    //--
    if (startCLMsg.getStartMode()==1)
    {
      initializeCLSearchFromProvidedSeqnum(startCLMsg.getSequenceNumber());
    }
    /* TODO: From replication changenumber
    //--
    if (startCLMsg.getStartMode()==2)
    {
@@ -762,14 +907,14 @@
    {
      // to get the CL first and last
      initializeCLDomCtxts(null); // from start
      ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN();
      ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN();
      try
      {
        // to get the CL first and last
        // last rely on the crossDomainElligibleCN thhus must have been
        // last rely on the crossDomainEligibleCN thhus must have been
        // computed before
        int[] limits = computeCLLimits(crossDomainElligibleCN);
        int[] limits = computeCLLimits(crossDomainEligibleCN);
        // Send the response
        CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
        session.publish(msg);
@@ -793,11 +938,17 @@
      }
      return;
    }
    Good Draft Compat */
     */
    // Store into domain
    registerIntoDomain();
    if (debugEnabled())
      TRACER.debugInfo(
          this.getName() + " initialized: " +
          " " + dumpState() + " " +
          " " + clDomCtxtsToString(""));
  }
  /**
@@ -812,30 +963,12 @@
  throws DirectoryException
  {
    boolean interrupted = true;
    ECLUpdateMsg msg = getnextUpdate();
    ECLUpdateMsg msg = getNextECLUpdate();
    // FIXME:ECL We should refactor so that a SH always have a session
    // TODO:ECL We should refactor so that a SH always have a session
    if (session == null)
      return msg;
    /*
     * When we remove a message from the queue we need to check if another
     * server is waiting in flow control because this queue was too long.
     * This check might cause a performance penalty an therefore it
     * is not done for every message removed but only every few messages.
     */
    /** FIXME:ECL checkAllSaturation makes no sense for ECLServerHandler ?
    if (++saturationCount > 10)
    {
      saturationCount = 0;
      try
      {
        replicationServerDomain.checkAllSaturation();
      } catch (IOException e)
      {
      }
    }
    */
    boolean acquired = false;
    do
    {
@@ -857,16 +990,17 @@
  /**
   * Get the next message - non blocking - null when none.
   *
   * @param synchronous - not used - always non blocking.
   * @return the next message - null when none.
   * This method is currently not used but we don't want to keep the mother
   * class method since it make no sense for ECLServerHandler.
   * @param synchronous - not used
   * @return the next message
   */
  protected UpdateMsg getnextMessage(boolean synchronous)
  {
    UpdateMsg msg = null;
    try
    {
      ECLUpdateMsg eclMsg = getnextUpdate();
      ECLUpdateMsg eclMsg = getNextECLUpdate();
      if (eclMsg!=null)
        msg = eclMsg.getUpdateMsg();
    }
@@ -878,60 +1012,24 @@
  }
  /**
   * Get the next external changelog update.
   *
   * @return    The ECL update, null when none.
   * @exception DirectoryException when any problem occurs.
   * Returns the next update message for the External Changelog (ECL).
   * @return the ECL update message, null when there aren't anymore.
   * @throws DirectoryException when an error occurs.
   */
  protected ECLUpdateMsg getnextUpdate()
  public ECLUpdateMsg getNextECLUpdate()
  throws DirectoryException
  {
    return getGeneralizedNextECLUpdate(this.cLSearchCtxt);
  }
    ECLUpdateMsg oldestChange = null;
  /**
   * Computes the cross domain eligible message (non blocking).
   * Return null when search is covered
   */
  private ECLUpdateMsg getGeneralizedNextECLUpdate(CLTraverseCtxt cLSearchCtxt)
  throws DirectoryException
  {
    ECLUpdateMsg theOldestChange = null;
    try
    {
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
          getMonitorInstanceName() + "," + this +
          " getGeneralizedNextECLUpdate starts with ctxt="
          + cLSearchCtxt);
          " getNextECLUpdate starts: " + dumpState());
      if ((cLSearchCtxt.nextSeqnum != -1) &&
          (!cLSearchCtxt.endOfSeqnumdbReached))
      {
        /* TODO:ECL G Good changelog draft compat.
        // First time , initialise the cursor to traverse the seqnumdb
        if (seqnumDbReadIterator == null)
        {
          try
          {
          seqnumDbReadIterator = replicationServerDomain.getReplicationServer().
            getSeqnumDbHandler().generateIterator(cLSearchCtxt.nextSeqnum);
            TRACER.debugInfo("getGeneralizedNextMessage(): "
                + " creates seqnumDbReadIterator from nextSeqnum="
                + cLSearchCtxt.nextSeqnum
                + " 1rst=" + seqnumDbReadIterator.getSeqnum()
                + " CN=" + seqnumDbReadIterator.getChangeNumber()
                + cLSearchCtxt);
          }
          catch(Exception e)
          {
            cLSearchCtxt.endOfSeqnumdbReached = true;
          }
        }
        */
      }
    try
    {
      // Search / no seqnum / not persistent
      // Search / no DraftCN / not persistent
      // -----------------------------------
      //  init: all domain are candidate
      //    get one msg from each
@@ -958,136 +1056,136 @@
      //    if one domain has no msg, still is candidate
      int iDom = 0;
      boolean nextclchange = true;
      while ((nextclchange) && (cLSearchCtxt.searchPhase==1))
      boolean continueLooping = true;
      while ((continueLooping) && (searchPhase == INIT_PHASE))
      {
        // Step 1 & 2
        if (cLSearchCtxt.searchPhase==1)
        if (searchPhase == INIT_PHASE)
        {
          if (debugEnabled())
            clDomCtxtsToString("In getGeneralizedNextMessage : " +
              "looking for the generalized oldest change");
          // Normally we whould not loop .. except ...
          continueLooping = false;
          // Retrieves the index in the subx table of the
          // generalizedOldestChange
          iDom = getGeneralizedOldestChange();
          iDom = getOldestChangeFromDomainCtxts();
          // idomain != -1 means that we got one
          // generalized oldest change to process
          if (iDom==-1)
          // iDom == -1 means that there is no oldest change to process
          if (iDom == -1)
          {
            closePhase1();
            closeInitPhase();
            // signify end of phase 1 to the caller
            // signals end of phase 1 to the caller
            return null;
          }
          // idomain != -1 means that we got one
          // generalized oldest change to process
          String suffix = this.clDomCtxts[iDom].rsd.getBaseDn();
          theOldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg,
              null, // set later
              suffix);
          // Build the ECLUpdateMsg to be returned
          oldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
              null, // cookie will be set later
              domainCtxts[iDom].rsd.getBaseDn(),
              0); //  draftChangeNumber may be set later
          domainCtxts[iDom].nextMsg = null;
          nextclchange = false;
          /* TODO:ECL G Good change log draft compat.
          if (cLSearchCtxt.nextSeqnum!=-1)
          if (draftCompat)
          {
            // Should either retrieve or generate a seqnum
            // we also need to check if the seqnumdb is acccurate reagrding
            // either retrieve a draftCN from the draftCNDb
            // or assign a new draftCN and store in the db
            DraftCNDbHandler draftCNDb=replicationServer.getDraftCNDbHandler();
            // We also need to check if the draftCNdb is consistent with
            // the changelogdb.
            // if not, 2 potential reasons
            // -1 : purge from the changelog .. let's traverse the seqnumdb
            // -2 : changelog is late .. let's traverse the changelog
            // a/ : changelog has been purged (trim)let's traverse the draftCNDb
            // b/ : changelog is late .. let's traverse the changelogDb
            // The following loop allows to loop until being on the same cn
            // in the 2 dbs
            // replogcn : the oldest change from the changelog db
            ChangeNumber replogcn = theOldestChange.getChangeNumber();
            DN replogReplDomDN = clDomCtxts[iDom].rsd.getBaseDn();
            ChangeNumber cnFromChangelogDb =
              oldestChange.getUpdateMsg().getChangeNumber();
            String dnFromChangelogDb = domainCtxts[iDom].rsd.getBaseDn();
            while (true)
            {
              if (!cLSearchCtxt.endOfSeqnumdbReached)
              if (!isEndOfDraftCNReached)
              {
                // we did not reach yet the end of the seqnumdb
                // we did not reach yet the end of the DraftCNdb
                // seqnumcn : the next change from from the seqnum db
                ChangeNumber seqnumcn = seqnumDbReadIterator.getChangeNumber();
                // the next change from the DraftCN db
                ChangeNumber cnFromDraftCNDb = draftCNDbIter.getChangeNumber();
                String dnFromDraftCNDb = draftCNDbIter.getServiceID();
                // are replogcn and seqnumcn should be the same change ?
                int cmp = replogcn.compareTo(seqnumcn);
                DN seqnumReplDomDN=DN.decode(seqnumDbReadIterator.
                getDomainDN());
                // are replogcn and DraftCNcn should be the same change ?
                int areCNEqual = cnFromChangelogDb.compareTo(cnFromDraftCNDb);
                int areDNEqual = dnFromChangelogDb.compareTo(dnFromDraftCNDb);
                TRACER.debugInfo("seqnumgen: comparing the 2 db "
                    + " changelogdb:" + replogReplDomDN + "=" + replogcn
                    + " ts=" +
                    new Date(replogcn.getTime()).toString()
                    + "## seqnumdb:" + seqnumReplDomDN + "=" + seqnumcn
                    + " ts=" +
                    new Date(seqnumcn.getTime()).toString()
                    + " sn older=" + seqnumcn.older(replogcn));
                if (debugEnabled())
                  TRACER.debugInfo("getNextECLUpdate generating draftCN "
                    + " comparing the 2 db DNs :"
                    + dnFromChangelogDb + "?=" + cnFromChangelogDb
                    + " timestamps:" + new Date(cnFromChangelogDb.getTime())
                    + " ?older" +   new Date(cnFromDraftCNDb.getTime()));
                if ((replogReplDomDN.compareTo(seqnumReplDomDN)==0) && (cmp==0))
                if ((areDNEqual==0) && (areCNEqual==0))
                {
                  // same domain and same CN => same change
                  // assign the seqnum from the seqnumdb
                  // to the change from the changelogdb
                  // assign the DraftCN found to the change from the changelogdb
                  if (debugEnabled())
                    TRACER.debugInfo("getNextECLUpdate generating draftCN "
                      + " assigning draftCN=" + draftCNDbIter.getDraftCN()
                      + " to change=" + oldestChange);
                  TRACER.debugInfo("seqnumgen: assigning seqnum="
                      + seqnumDbReadIterator.getSeqnum()
                      + " to change=" + theOldestChange);
                  theOldestChange.setSeqnum(seqnumDbReadIterator.getSeqnum());
                  oldestChange.setDraftChangeNumber(
                      draftCNDbIter.getDraftCN());
                  // prepare the next seqnum for the potential next change added
                  // to the seqnumDb
                  cLSearchCtxt.nextSeqnum = seqnumDbReadIterator.getSeqnum()
                  + 1;
                  break;
                }
                else
                {
                  // replogcn and seqnumcn are NOT the same change
                  if (seqnumcn.older(replogcn))
                  // replogcn and DraftCNcn are NOT on the same change
                  if (cnFromDraftCNDb.older(cnFromChangelogDb))
                  {
                    // the change from the seqnumDb is older
                    // the change from the DraftCNDb is older
                    // that means that the change has been purged from the
                    // changelog
                    // changelogDb (and DraftCNdb not yet been trimed)
                    try
                    {
                      // let's traverse the seqnumdb searching for the change
                      // let's traverse the DraftCNdb searching for the change
                      // found in the changelogDb.
                      TRACER.debugInfo("seqnumgen: will skip "
                          + seqnumcn  + " and next from  the seqnum");
                      cLSearchCtxt.endOfSeqnumdbReached =
                        (seqnumDbReadIterator.next()==false);
                      TRACER.debugInfo("seqnumgen: has nexted cr to "
                          + " sn=" + seqnumDbReadIterator.getSeqnum()
                          + " cn=" + seqnumDbReadIterator.getChangeNumber()
                          + " and reached end "
                          + " of seqnumdb:"+cLSearchCtxt.endOfSeqnumdbReached);
                      if (cLSearchCtxt.endOfSeqnumdbReached)
                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
                          + " will skip " + cnFromDraftCNDb
                          + " and read next change from the DraftCNDb.");
                      isEndOfDraftCNReached = (draftCNDbIter.next()==false);
                      TRACER.debugInfo("getNextECLUpdate generating draftCN "
                          + " has skiped to "
                          + " sn=" + draftCNDbIter.getDraftCN()
                          + " cn=" + draftCNDbIter.getChangeNumber()
                          + " End of draftCNDb ?"+isEndOfDraftCNReached);
                      if (isEndOfDraftCNReached)
                      {
                        // we are at the end of the seqnumdb in the append mode
                        // store in seqnumdb the pair
                        // seqnum of the cur change,state before this change)
                        replicationServerDomain.addSeqnum(
                            cLSearchCtxt.nextSeqnum,
                            getGenState(),
                      clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
                            theOldestChange.getChangeNumber());
                        theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
                        cLSearchCtxt.nextSeqnum++;
                        // we are at the end of the DraftCNdb in the append mode
                        // generate a new draftCN and assign to this change
                        oldestChange.setDraftChangeNumber(
                            replicationServer.getNewDraftCN());
                        // store in DraftCNdb the pair
                        // (draftCN_of_the_cur_change, state_before_this_change)
                        draftCNDb.add(
                            oldestChange.getDraftChangeNumber(),
                            previousCookie.toString(),
                            oldestChange.getServiceId(),
                            oldestChange.getUpdateMsg().getChangeNumber());
                        break;
                      }
                      else
                      {
                        // next change from seqnumdb
                        cLSearchCtxt.nextSeqnum =
                          seqnumDbReadIterator.getSeqnum() + 1;
                        // let's go to test this new change fro the DraftCNdb
                        continue;
                      }
                    }
@@ -1100,108 +1198,99 @@
                    // the change from the changelogDb is older
                    // it should have been stored lately
                    // let's continue to traverse the changelogdb
                    TRACER.debugInfo("seqnumgen: will skip "
                        + replogcn  + " and next from  the CL");
                    nextclchange = true;
                    TRACER.debugInfo("getNextECLUpdate: will skip "
                        + cnFromChangelogDb
                        + " and read next from the regular changelog.");
                    continueLooping = true;
                    break; // TO BE CHECKED
                  }
                }
              }
              else
              {
                // we are at the end of the seqnumdb in the append mode
                // store in seqnumdb the pair
                // (seqnum of the current change, state before this change)
                replicationServerDomain.addSeqnum(
                    cLSearchCtxt.nextSeqnum,
                    getGenState(),
                    clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
                    theOldestChange.getChangeNumber());
                theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
                cLSearchCtxt.nextSeqnum++;
                // we are at the end of the DraftCNdb in the append mode
                // store in DraftCNdb the pair
                // (DraftCN of the current change, state before this change)
                oldestChange.setDraftChangeNumber(
                    replicationServer.getNewDraftCN());
                draftCNDb.add(
                    oldestChange.getDraftChangeNumber(),
                    this.previousCookie.toString(),
                    domainCtxts[iDom].rsd.getBaseDn(),
                    oldestChange.getUpdateMsg().getChangeNumber());
                break;
              }
            } // while seqnum
          } // nextseqnum !- -1
          */
            } // while DraftCN
          }
          // here we have the right oldest change and in the seqnum case we
          // have its seqnum
          // here we have the right oldest change
          // and in the draft case, we have its draft changenumber
          // Set and test the domain of the oldestChange see if we reached
          // the end of the phase for this domain
          clDomCtxts[iDom].currentState.update(
              theOldestChange.getUpdateMsg().getChangeNumber());
          domainCtxts[iDom].currentState.update(
              oldestChange.getUpdateMsg().getChangeNumber());
          if (clDomCtxts[iDom].currentState.cover(clDomCtxts[iDom].stopState))
          if (domainCtxts[iDom].currentState.cover(domainCtxts[iDom].stopState))
          {
            clDomCtxts[iDom].active = false;
            domainCtxts[iDom].active = false;
          }
          // Test the seqnum of the oldestChange see if we reached
          // the end of operation
          /* TODO:ECL G Good changelog draft compat. Not yet implemented
          if ((cLSearchCtxt.stopSeqnum>0) &&
              (theOldestChange.getSeqnum()>=cLSearchCtxt.stopSeqnum))
          {
            closePhase1();
            // means end of phase 1 to the calling writer
            return null;
          }
          */
          if (clDomCtxts[iDom].active)
          if (domainCtxts[iDom].active)
          {
            // populates the table with the next eligible msg from idomain
            // in non blocking mode, return null when no more eligible msg
            getNextElligibleMessage(iDom);
            domainCtxts[iDom].getNextEligibleMessageForDomain(operationId);
          }
        } // phase ==1
      } // while (nextclchange)
        } // phase == INIT_PHASE
      } // while (...)
      if (cLSearchCtxt.searchPhase==2)
      if (searchPhase == PERSISTENT_PHASE)
      {
        clDomCtxtsToString("In getGeneralizedNextMessage (persistent): " +
        "looking for the generalized oldest change");
        if (debugEnabled())
          clDomCtxtsToString("In getNextECLUpdate (persistent): " +
          "looking for the generalized oldest change");
        for (int ido=0; ido<clDomCtxts.length; ido++)
        for (int ido=0; ido<domainCtxts.length; ido++)
        {
          // get next msg
          getNextElligibleMessage(ido);
          domainCtxts[ido].getNextEligibleMessageForDomain(operationId);
        }
        // take the oldest one
        iDom = getGeneralizedOldestChange();
        iDom = getOldestChangeFromDomainCtxts();
        if (iDom != -1)
        {
          String suffix = this.clDomCtxts[iDom].rsd.getBaseDn();
          String suffix = this.domainCtxts[iDom].rsd.getBaseDn();
          theOldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)clDomCtxts[iDom].nextMsg,
          oldestChange = new ECLUpdateMsg(
              (LDAPUpdateMsg)domainCtxts[iDom].nextMsg,
              null, // set later
              suffix);
              suffix, 0);
          domainCtxts[iDom].nextMsg = null; // clean
          clDomCtxts[iDom].currentState.update(
              theOldestChange.getUpdateMsg().getChangeNumber());
          domainCtxts[iDom].currentState.update(
              oldestChange.getUpdateMsg().getChangeNumber());
          /* TODO:ECL G Good changelog draft compat.
          if (cLSearchCtxt.nextSeqnum!=-1)
          if (draftCompat)
          {
            // should generate seqnum
            // should generate DraftCN
            DraftCNDbHandler draftCNDb =replicationServer.getDraftCNDbHandler();
            // store in seqnumdb the pair
            // (seqnum of the current change, state before this change)
            replicationServerDomain.addSeqnum(
                cLSearchCtxt.nextSeqnum,
                getGenState(),
                clDomCtxts[iDom].rsd.getBaseDn().toNormalizedString(),
                theOldestChange.getChangeNumber());
            theOldestChange.setSeqnum(cLSearchCtxt.nextSeqnum);
            cLSearchCtxt.nextSeqnum++;
            oldestChange.setDraftChangeNumber(
                replicationServer.getNewDraftCN());
            // store in DraftCNdb the pair
            // (DraftCN of the current change, state before this change)
            draftCNDb.add(
                oldestChange.getDraftChangeNumber(),
                this.previousCookie.toString(),
                domainCtxts[iDom].rsd.getBaseDn(),
                oldestChange.getUpdateMsg().getChangeNumber());
          }
          */
        }
      }
    }
@@ -1214,39 +1303,48 @@
          e);
    }
    if (theOldestChange != null)
    if (oldestChange != null)
    {
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
          + oldestChange.getUpdateMsg().getChangeNumber());
      // Update the current state
      this.cLSearchCtxt.currentCookie.update(
          theOldestChange.getServiceId(),
          theOldestChange.getUpdateMsg().getChangeNumber());
      previousCookie.update(
          oldestChange.getServiceId(),
          oldestChange.getUpdateMsg().getChangeNumber());
      // Set the current value of global state in the returned message
      theOldestChange.setCookie(this.cLSearchCtxt.currentCookie);
      oldestChange.setCookie(previousCookie);
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
                oldestChange);
    }
    return theOldestChange;
    return oldestChange;
  }
  /**
   * Terminates the first (non persistent) phase of the search on the ECL.
   */
  private void closePhase1()
  private void closeInitPhase()
  {
    // starvation of changelog messages
    // all domain have been unactived means are covered
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName() + "," + this + " closePhase1()"
        + " searchCtxt=" + cLSearchCtxt);
          getMonitorInstanceName() + "," + this + " closeInitPhase(): "
          + dumpState());
    // go to persistent phase if one
    for (int i=0; i<clDomCtxts.length; i++)
      clDomCtxts[i].active = true;
    for (int i=0; i<domainCtxts.length; i++)
      domainCtxts[i].active = true;
    if (this.cLSearchCtxt.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
    if (this.isPersistent != StartECLSessionMsg.NON_PERSISTENT)
    {
      // phase = 1 done AND persistent search => goto phase 2
      cLSearchCtxt.searchPhase=2;
      // INIT_PHASE is done AND search is persistent => goto PERSISTENT_PHASE
      searchPhase = PERSISTENT_PHASE;
      if (writer ==null)
      {
@@ -1257,269 +1355,53 @@
    }
    else
    {
      // phase = 1 done AND !persistent search => reinit to phase 0
      cLSearchCtxt.searchPhase=0;
      // INIT_PHASE is done AND search is not persistent => reinit
      searchPhase = UNDEFINED_PHASE;
    }
    /* TODO:ECL G Good changelog draft compat.
    if (seqnumDbReadIterator!=null)
    if (draftCNDbIter!=null)
    {
      // End of phase 1 => always release the seqnum iterator
      seqnumDbReadIterator.releaseCursor();
      seqnumDbReadIterator = null;
      // End of INIT_PHASE => always release the iterator
      draftCNDbIter.releaseCursor();
      draftCNDbIter = null;
    }
     */
  }
  /**
   * Get the oldest change contained in the subx table.
   * The subx table should be populated before
   * @return the oldest change.
   * Get the index in the domainCtxt table of the domain with the oldest change.
   * @return the index of the domain with the oldest change, -1 when none.
   */
  private int getGeneralizedOldestChange()
  private int getOldestChangeFromDomainCtxts()
  {
    int oldest = -1;
    for (int i=0; i<clDomCtxts.length; i++)
    for (int i=0; i<domainCtxts.length; i++)
    {
      if ((clDomCtxts[i].active))
      if ((domainCtxts[i].active))
      {
        // on the first loop, oldest==-1
        // .msg is null when the previous (non blocking) nextMessage did
        // not have any eligible msg to return
        if (clDomCtxts[i].nextMsg != null)
        if (domainCtxts[i].nextMsg != null)
        {
          if ((oldest==-1) ||
              (clDomCtxts[i].nextMsg.compareTo(clDomCtxts[oldest].nextMsg)<0))
              (domainCtxts[i].nextMsg.compareTo(domainCtxts[oldest].nextMsg)<0))
          {
            oldest = i;
          }
        }
      }
    }
    if (debugEnabled())
      TRACER.debugInfo("In " + replicationServerDomain.getReplicationServer().
        getMonitorInstanceName()
        + "," + this + " getGeneralizedOldestChange() " +
        " returns " +
        ((oldest!=-1)?clDomCtxts[oldest].nextMsg:""));
          getMonitorInstanceName()
          + "," + this + " getOldestChangeFromDomainCtxts() returns " +
          ((oldest!=-1)?domainCtxts[oldest].nextMsg:"-1"));
    return oldest;
  }
  /**
   * Get from the provided domain, the next message elligible regarding
   * the crossDomain elligible CN. Put it in the context table.
   * @param idomain the provided domain.
   */
  private void getNextElligibleMessage(int idomain)
  {
    ChangeNumber crossDomainElligibleCN = computeCrossDomainElligibleCN();
    try
    {
      if (clDomCtxts[idomain].nonElligiblemsg != null)
      {
        TRACER.debugInfo("getNextElligibleMessage tests if the already " +
            " stored nonElligibleMsg has becoem elligible regarding " +
            " the crossDomainElligibleCN ("+crossDomainElligibleCN +
            " ) " +
            clDomCtxts[idomain].nonElligiblemsg.getChangeNumber().
            older(crossDomainElligibleCN));
        // we already got the oldest msg and it was not elligible
        if (clDomCtxts[idomain].nonElligiblemsg.getChangeNumber().
            older(crossDomainElligibleCN))
        {
          // it is now elligible
          clDomCtxts[idomain].nextMsg = clDomCtxts[idomain].nonElligiblemsg;
          clDomCtxts[idomain].nonElligiblemsg = null;
        }
        else
        {
          // the oldest is still not elligible - let's wait next
        }
      }
      else
      {
        // non blocking
        UpdateMsg newMsg = clDomCtxts[idomain].mh.getnextMessage(false);
        if (debugEnabled())
          TRACER.debugInfo(this +
            " getNextElligibleMessage got the next changelogmsg "
            + " from " + clDomCtxts[idomain].mh.getServiceId()
            + " newCLMsg=" + newMsg);
        clDomCtxts[idomain].nextMsg =
          clDomCtxts[idomain].nonElligiblemsg = null;
        // in non blocking mode, return null when no more msg
        if (newMsg != null)
        {
          /* TODO:ECL Take into account eligibility.
          TRACER.debugInfo("getNextElligibleMessage is "
              + newMsg.getChangeNumber()
              + new Date(newMsg.getChangeNumber().getTime()).toString()
              + " elligible "
              + newMsg.getChangeNumber().older(crossDomainElligibleCN));
          if (newMsg.getChangeNumber().older(crossDomainElligibleCN))
          {
            // is elligible
            clDomCtxts[idomain].nextMsg = newMsg;
          }
          else
          {
            // is not elligible
            clDomCtxts[idomain].nonElligiblemsg = newMsg;
          }
          */
          clDomCtxts[idomain].nextMsg = newMsg;
        }
      }
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
    }
  }
  /*
   */
  ECLEligibleCNComputerThread eligibleCNComputerThread = null;
  ChangeNumber liveecn;
  private ChangeNumber computeCrossDomainElligibleCN()
  {
    return liveecn;
  }
  /**
   * This class specifies the thread that computes periodically
   * the cross domain eligible CN.
   */
  private final class ECLEligibleCNComputerThread
    extends DirectoryThread
  {
    /**
     * The tracer object for the debug logger.
     */
    private boolean shutdown = false;
    private ECLEligibleCNComputerThread()
    {
      super("ECL eligible CN computer thread");
      start();
    }
    public void run()
    {
      while (shutdown == false)
      {
        try {
          synchronized (this)
          {
            liveecn = computeNewCrossDomainElligibleCN();
            try
            {
              this.wait(1000);
            } catch (InterruptedException e)
            { }
          }
        } catch (Exception end)
        {
          break;
        }
      }
    }
    private ChangeNumber computeNewCrossDomainElligibleCN()
    {
      ChangeNumber computedCrossDomainElligibleCN = null;
      String s = "=> ";
      ReplicationServer rs = replicationServerDomain.getReplicationServer();
      if (debugEnabled())
        TRACER.debugInfo("ECLSH.computeNewCrossDomainElligibleCN() "
          + " periodic starts rs="+rs);
      Iterator<ReplicationServerDomain> rsdi = rs.getCacheIterator();
      if (rsdi != null)
      {
        while (rsdi.hasNext())
        {
          ReplicationServerDomain domain = rsdi.next();
          if (domain.getBaseDn().equalsIgnoreCase("cn=changelog"))
            continue;
          ChangeNumber domainElligibleCN = computeEligibleCN(domain);
          if (domainElligibleCN==null)
            continue;
          if ((computedCrossDomainElligibleCN == null) ||
              (domainElligibleCN.older(computedCrossDomainElligibleCN)))
          {
            computedCrossDomainElligibleCN = domainElligibleCN;
          }
          s += "\n DN:" + domain.getBaseDn()
          + "\t\t domainElligibleCN :" + domainElligibleCN
          + "/" +
          new Date(domainElligibleCN.getTime()).toString();
        }
      }
      if (debugEnabled())
        TRACER.debugInfo("SH.computeNewCrossDomainElligibleCN() periodic " +
          " ends with " +
          " the following domainElligibleCN for each domain :" + s +
          "\n thus CrossDomainElligibleCN=" + computedCrossDomainElligibleCN +
          "  ts=" +
          new Date(computedCrossDomainElligibleCN.getTime()).toString());
      return computedCrossDomainElligibleCN;
    }
  }
  /**
   * Compute the eligible CN.
   * @param rsd The provided replication server domain for which we want
   * to retrieve the eligible date.
   * @return null if the domain does not play in eligibility.
   */
  public ChangeNumber computeEligibleCN(ReplicationServerDomain rsd)
  {
    ChangeNumber elligibleCN = null;
    ServerState heartbeatState = rsd.getHeartbeatState();
    if (heartbeatState==null)
      return null;
    // compute elligible CN
    ServerState hbState = heartbeatState.duplicate();
    Iterator<Short> it = hbState.iterator();
    while (it.hasNext())
    {
      short sid = it.next();
      ChangeNumber storedCN = hbState.getMaxChangeNumber(sid);
      // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
      // then the server is considered down and not considered for eligibility
      if (TimeThread.getTime()-storedCN.getTime()>2000)
      {
        if (debugEnabled())
          TRACER.debugInfo(
            "For RSD." + rsd.getBaseDn() + " Server " + sid
            + " is not considered for eligibility ... potentially down");
        continue;
      }
      if ((elligibleCN == null) || (storedCN.older(elligibleCN)))
      {
        elligibleCN = storedCN;
      }
    }
    if (debugEnabled())
      TRACER.debugInfo(
        "For RSD." + rsd.getBaseDn() + " ElligibleCN()=" + elligibleCN);
    return elligibleCN;
  }
  /**
   * Returns the client operation id.
   * @return The client operation id.
   */
@@ -1533,7 +1415,7 @@
   * @return Whether the current search is persistent or not.
   */
  public short isPersistent() {
    return this.cLSearchCtxt.isPersistent;
    return this.isPersistent;
  }
  /**
@@ -1541,7 +1423,137 @@
   * @return Whether the current search is persistent or not.
   */
  public int getSearchPhase() {
    return this.cLSearchCtxt.searchPhase;
    return this.searchPhase;
  }
  /**
   * Refresh the eligibleCN by requesting the replication server.
   */
  public void refreshEligibleCN()
  {
    eligibleCN = replicationServer.getEligibleCN();
  }
  /*
   * Get first and last DraftCN
   * @param crossDomainEligibleCN
   * @return
   */
  private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN)
  throws DirectoryException
  {
    /* The content of the DraftCNdb depends on the SEARCH operations done before
     * requesting the DraftCN. If no operations, DraftCNdb is empty.
     * The limits we want to get are the "potential" limits if a request was
     * done, the DraftCNdb is probably not complete to do that.
     *
     * The first DraftCN is :
     *  - the first record from the DraftCNdb
     *  - if none because DraftCNdb empty,
     *      then
     *        if no change in replchangelog then return 0
     *        else return 1 (DraftCN that WILL be returned to next search)
     *
     * The last DraftCN is :
     *  - initialized with the last record from the DraftCNdb (0 if none)
     *    and consider the genState associated
     *  - to the last DraftCN, we add the count of updates in the replchangelog
     *     FROM that genState TO the crossDomainEligibleCN
     *     (this diff is done domain by domain)
     */
    int firstDraftCN;
    int lastDraftCN;
    boolean DraftCNdbIsEmpty;
    DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler();
    ReplicationServer rs = replicationServerDomain.getReplicationServer();
    // Get the first DraftCN from the DraftCNdb
    firstDraftCN = draftCNDbH.getFirstKey();
    HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
    if (firstDraftCN < 1)
    {
      DraftCNdbIsEmpty=true;
      firstDraftCN = 0;
      lastDraftCN = 0;
    }
    else
    {
      DraftCNdbIsEmpty=false;
      // Get the last DraftCN from the DraftCNdb
      lastDraftCN = draftCNDbH.getLastKey();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
      {
        domainsServerStateForLastSeqnum = MultiDomainServerState.
          splitGenStateToServerStates(lastSeqnumGenState);
      }
    }
    // Domain by domain
    Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        // process a domain
        ReplicationServerDomain rsd = rsdi.next();
        if (isServiceIDExcluded(rsd.getBaseDn()))
          continue;
        // for this domain, have the state in the replchangelog
        // where the last DraftCN update is
        ServerState domainServerStateForLastSeqnum;
        if ((domainsServerStateForLastSeqnum == null) ||
            (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
        {
          domainServerStateForLastSeqnum = new ServerState();
        }
        else
        {
          domainServerStateForLastSeqnum =
            domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
        }
        // Count the number of (eligible) changes from this place
        // to the eligible CN (cross server)
        long ec = rsd.getEligibleCount(
            domainServerStateForLastSeqnum, crossDomainEligibleCN);
        // ... hum ...
        if ((ec>0) && (DraftCNdbIsEmpty==false))
          ec--;
        // cumulates on domains
        lastDraftCN += ec;
        // DraftCN is empty and there are eligible updates in the repl changelog
        // then init first DraftCN
        if ((ec>0) && (firstDraftCN==0))
          firstDraftCN = 1;
      }
    }
    return new int[]{firstDraftCN, lastDraftCN};
  }
  private boolean isServiceIDExcluded(String serviceID)
  {
    // skip the excluded domains
    boolean excluded = false;
    for(String excludedServiceID : this.excludedServiceIDs)
    {
      if (excludedServiceID.equalsIgnoreCase(serviceID))
      {
        excluded=true;
        break;
      }
    }
    return excluded;
  }
}