mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
24.11.2009 abe3bce25e7f6ecd0ce8b90a14036d3380739e9e
Fix 4183 - ECL (draft mode): first and last ChangeNumber are 0 until first search
9 files modified
710 ■■■■■ changed files
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java 22 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java 18 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java 36 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java 24 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java 6 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 133 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 115 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 20 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 336 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/FirstChangeNumberVirtualAttributeProvider.java
@@ -25,8 +25,8 @@
 *      Copyright 2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.common;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -39,7 +39,8 @@
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.ByteString;
@@ -48,6 +49,7 @@
import org.opends.server.types.InitializationException;
import org.opends.server.types.ResultCode;
import org.opends.server.types.VirtualAttributeRule;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -63,7 +65,6 @@
       extends VirtualAttributeProvider<UserDefinedVirtualAttributeCfg>
       implements ConfigurationChangeListener<UserDefinedVirtualAttributeCfg>
{
  private static final DebugTracer TRACER = getTracer();
  // The current configuration for this virtual attribute provider.
  private UserDefinedVirtualAttributeCfg currentConfig;
@@ -137,8 +138,19 @@
      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
      if (eclwe!=null)
      {
        first = String.valueOf(
            eclwe.getReplicationServer().getFirstDraftChangeNumber());
        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
        ArrayList<String> excludedDomains =
          MultimasterReplication.getPrivateDomains();
        if (!excludedDomains.contains(
            ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
          excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
        ReplicationServer rs = eclwe.getReplicationServer();
        int[] limits = rs.getECLDraftCNLimits(
            rs.getEligibleCN(), excludedDomains);
        first = String.valueOf(limits[0]);
      }
    }
    catch(Exception e)
opends/src/server/org/opends/server/replication/common/LastChangeNumberVirtualAttributeProvider.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.common;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -40,6 +41,8 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.AttributeValues;
import org.opends.server.types.ByteString;
@@ -48,6 +51,7 @@
import org.opends.server.types.InitializationException;
import org.opends.server.types.ResultCode;
import org.opends.server.types.VirtualAttributeRule;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
@@ -137,8 +141,18 @@
      DirectoryServer.getWorkflowElement("EXTERNAL CHANGE LOG");
      if (eclwe!=null)
      {
        last = String.valueOf(
            eclwe.getReplicationServer().getLastDraftChangeNumber());
        // Set a list of excluded domains (also exclude 'cn=changelog' itself)
        ArrayList<String> excludedDomains =
          MultimasterReplication.getPrivateDomains();
        if (!excludedDomains.contains(
            ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT))
          excludedDomains.add(ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT);
        ReplicationServer rs = eclwe.getReplicationServer();
        int[] limits = rs.getECLDraftCNLimits(
            rs.getEligibleCN(), excludedDomains);
        last = String.valueOf(limits[1]);
      }
    }
    catch(Exception e)
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -315,28 +315,28 @@
     if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      return "ModifyDNMsg content: " +
        "\nprotocolVersion: " + protocolVersion +
        "\ndn: " + dn +
        "\nchangeNumber: " + changeNumber +
        "\nuniqueId: " + uniqueId +
        "\nassuredFlag: " + assuredFlag +
        "\nnewRDN: " + newRDN +
        "\nnewSuperior: " + newSuperior +
        "\ndeleteOldRdn: " + deleteOldRdn;
        " protocolVersion: " + protocolVersion +
        " dn: " + dn +
        " changeNumber: " + changeNumber +
        " uniqueId: " + uniqueId +
        " assuredFlag: " + assuredFlag +
        " newRDN: " + newRDN +
        " newSuperior: " + newSuperior +
        " deleteOldRdn: " + deleteOldRdn;
    }
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
    {
      return "ModifyDNMsg content: " +
        "\nprotocolVersion: " + protocolVersion +
        "\ndn: " + dn +
        "\nchangeNumber: " + changeNumber +
        "\nuniqueId: " + uniqueId +
        "\nnewRDN: " + newRDN +
        "\nnewSuperior: " + newSuperior +
        "\ndeleteOldRdn: " + deleteOldRdn +
        "\nassuredFlag: " + assuredFlag +
        "\nassuredMode: " + assuredMode +
        "\nsafeDataLevel: " + safeDataLevel;
        " protocolVersion: " + protocolVersion +
        " dn: " + dn +
        " changeNumber: " + changeNumber +
        " uniqueId: " + uniqueId +
        " newRDN: " + newRDN +
        " newSuperior: " + newSuperior +
        " deleteOldRdn: " + deleteOldRdn +
        " assuredFlag: " + assuredFlag +
        " assuredMode: " + assuredMode +
        " safeDataLevel: " + safeDataLevel;
    }
    return "!!! Unknown version: " + protocolVersion + "!!!";
  }
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -197,22 +197,22 @@
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      return "ModifyMsg content: " +
        "\nprotocolVersion: " + protocolVersion +
        "\ndn: " + dn +
        "\nchangeNumber: " + changeNumber +
        "\nuniqueId: " + uniqueId +
        "\nassuredFlag: " + assuredFlag;
        " protocolVersion: " + protocolVersion +
        " dn: " + dn +
        " changeNumber: " + changeNumber +
        " uniqueId: " + uniqueId +
        " assuredFlag: " + assuredFlag;
    }
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V2)
    {
      return "ModifyMsg content: " +
        "\nprotocolVersion: " + protocolVersion +
        "\ndn: " + dn +
        "\nchangeNumber: " + changeNumber +
        "\nuniqueId: " + uniqueId +
        "\nassuredFlag: " + assuredFlag +
        "\nassuredMode: " + assuredMode +
        "\nsafeDataLevel: " + safeDataLevel;
        " protocolVersion: " + protocolVersion +
        " dn: " + dn +
        " changeNumber: " + changeNumber +
        " uniqueId: " + uniqueId +
        " assuredFlag: " + assuredFlag +
        " assuredMode: " + assuredMode +
        " safeDataLevel: " + safeDataLevel;
    }
    return "!!! Unknown version: " + protocolVersion + "!!!";
  }
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -43,7 +43,7 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.server.DraftCNDB.*;
import org.opends.server.replication.server.DraftCNDB.DraftCNDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
@@ -81,7 +81,6 @@
  private boolean shutdown = false;
  private boolean trimDone = false;
  private DirectoryThread thread = null;
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
@@ -142,7 +141,8 @@
    db.addEntry(key, value, serviceID, cn);
    if (debugEnabled())
      TRACER.debugInfo("In DraftCNDbhandler.add, added: "
      TRACER.debugInfo(
          "In DraftCNDbhandler.add, added: "
        + " key=" + key
        + " value=" + value
        + " serviceID=" + serviceID
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -105,6 +105,7 @@
   * Specifies the excluded DNs (like cn=admin, ...).
   */
  public ArrayList<String> excludedServiceIDs = new ArrayList<String>();
  //HashSet<String> excludedServiceIDs = new HashSet<String>();
  /**
   * Eligible changeNumber - only changes older or equal to eligibleCN
@@ -565,7 +566,8 @@
        // Get the draftLimits (from the eligibleCN got at the beginning of
        // the operation.
        int[] limits = getECLDraftCNLimits(eligibleCN);
        int[] limits = replicationServer.getECLDraftCNLimits(
            eligibleCN, excludedServiceIDs);
        if (startDraftCN<=limits[1])
        {
@@ -630,7 +632,7 @@
            continue;
          // skip the excluded domains
          if (isServiceIDExcluded(rsd.getBaseDn()))
          if (excludedServiceIDs.contains(rsd.getBaseDn()))
            continue;
          // Creates the new domain context
@@ -829,11 +831,6 @@
  throws DirectoryException
  {
    //
    //this.following = false; // FIXME:ECL makes no sense for ECLServerHandler ?
    //this.lateQueue.clear(); // FIXME:ECL makes no sense for ECLServerHandler ?
    //this.setConsumerActive(true);
    this.operationId = startECLSessionMsg.getOperationId();
    this.setName(this.getClass().getCanonicalName()+ " " + operationId);
@@ -1434,126 +1431,4 @@
    eligibleCN = replicationServer.getEligibleCN();
  }
  /*
   * Get first and last DraftCN
   * @param crossDomainEligibleCN
   * @return
   */
  private int[] getECLDraftCNLimits(ChangeNumber crossDomainEligibleCN)
  throws DirectoryException
  {
    /* The content of the DraftCNdb depends on the SEARCH operations done before
     * requesting the DraftCN. If no operations, DraftCNdb is empty.
     * The limits we want to get are the "potential" limits if a request was
     * done, the DraftCNdb is probably not complete to do that.
     *
     * The first DraftCN is :
     *  - the first record from the DraftCNdb
     *  - if none because DraftCNdb empty,
     *      then
     *        if no change in replchangelog then return 0
     *        else return 1 (DraftCN that WILL be returned to next search)
     *
     * The last DraftCN is :
     *  - initialized with the last record from the DraftCNdb (0 if none)
     *    and consider the genState associated
     *  - to the last DraftCN, we add the count of updates in the replchangelog
     *     FROM that genState TO the crossDomainEligibleCN
     *     (this diff is done domain by domain)
     */
    int firstDraftCN;
    int lastDraftCN;
    boolean DraftCNdbIsEmpty;
    DraftCNDbHandler draftCNDbH = replicationServer.getDraftCNDbHandler();
    ReplicationServer rs = replicationServerDomain.getReplicationServer();
    // Get the first DraftCN from the DraftCNdb
    firstDraftCN = draftCNDbH.getFirstKey();
    HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
    if (firstDraftCN < 1)
    {
      DraftCNdbIsEmpty=true;
      firstDraftCN = 0;
      lastDraftCN = 0;
    }
    else
    {
      DraftCNdbIsEmpty=false;
      // Get the last DraftCN from the DraftCNdb
      lastDraftCN = draftCNDbH.getLastKey();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
      {
        domainsServerStateForLastSeqnum = MultiDomainServerState.
          splitGenStateToServerStates(lastSeqnumGenState);
      }
    }
    // Domain by domain
    Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        // process a domain
        ReplicationServerDomain rsd = rsdi.next();
        if (isServiceIDExcluded(rsd.getBaseDn()))
          continue;
        // for this domain, have the state in the replchangelog
        // where the last DraftCN update is
        ServerState domainServerStateForLastSeqnum;
        if ((domainsServerStateForLastSeqnum == null) ||
            (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
        {
          domainServerStateForLastSeqnum = new ServerState();
        }
        else
        {
          domainServerStateForLastSeqnum =
            domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
        }
        // Count the number of (eligible) changes from this place
        // to the eligible CN (cross server)
        long ec = rsd.getEligibleCount(
            domainServerStateForLastSeqnum, crossDomainEligibleCN);
        // ... hum ...
        if ((ec>0) && (DraftCNdbIsEmpty==false))
          ec--;
        // cumulates on domains
        lastDraftCN += ec;
        // DraftCN is empty and there are eligible updates in the repl changelog
        // then init first DraftCN
        if ((ec>0) && (firstDraftCN==0))
          firstDraftCN = 1;
      }
    }
    return new int[]{firstDraftCN, lastDraftCN};
  }
  private boolean isServiceIDExcluded(String serviceID)
  {
    // skip the excluded domains
    boolean excluded = false;
    for(String excludedServiceID : this.excludedServiceIDs)
    {
      if (excludedServiceID.equalsIgnoreCase(serviceID))
      {
        excluded=true;
        break;
      }
    }
    return excluded;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -43,6 +43,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -70,6 +71,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.ExternalChangeLogSession;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.ProtocolSession;
import org.opends.server.replication.protocol.ReplServerStartMsg;
import org.opends.server.replication.protocol.ReplSessionSecurity;
@@ -1584,4 +1587,116 @@
    return ++lastGeneratedDraftCN;
  }
  /**
   * Get first and last DraftCN.
   * @param crossDomainEligibleCN The provided crossDomainEligibleCN used as
   *        the upper limit for the lastDraftCN
   * @param excludedServiceIDs The serviceIDs that are excluded from the ECL.
   * @return The first and last draftCN.
   * @throws DirectoryException a.
   */
  public int[] getECLDraftCNLimits(
      ChangeNumber crossDomainEligibleCN,
      ArrayList<String> excludedServiceIDs)
  throws DirectoryException
  {
    /* The content of the DraftCNdb depends on the SEARCH operations done before
     * requesting the DraftCN. If no operations, DraftCNdb is empty.
     * The limits we want to get are the "potential" limits if a request was
     * done, the DraftCNdb is probably not complete to do that.
     *
     * The first DraftCN is :
     *  - the first record from the DraftCNdb
     *  - if none because DraftCNdb empty,
     *      then
     *        if no change in replchangelog then return 0
     *        else return 1 (DraftCN that WILL be returned to next search)
     *
     * The last DraftCN is :
     *  - initialized with the last record from the DraftCNdb (0 if none)
     *    and consider the genState associated
     *  - to the last DraftCN, we add the count of updates in the replchangelog
     *     FROM that genState TO the crossDomainEligibleCN
     *     (this diff is done domain by domain)
     */
    int firstDraftCN;
    int lastDraftCN;
    boolean DraftCNdbIsEmpty;
    DraftCNDbHandler draftCNDbH = this.getDraftCNDbHandler();
    // Get the first DraftCN from the DraftCNdb
    firstDraftCN = draftCNDbH.getFirstKey();
    HashMap<String,ServerState> domainsServerStateForLastSeqnum = null;
    if (firstDraftCN < 1)
    {
      DraftCNdbIsEmpty=true;
      firstDraftCN = 0;
      lastDraftCN = 0;
    }
    else
    {
      DraftCNdbIsEmpty=false;
      // Get the last DraftCN from the DraftCNdb
      lastDraftCN = draftCNDbH.getLastKey();
      // Get the generalized state associated with the current last DraftCN
      // and initializes from it the startStates table
      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
      if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
      {
        domainsServerStateForLastSeqnum = MultiDomainServerState.
          splitGenStateToServerStates(lastSeqnumGenState);
      }
    }
    // Domain by domain
    Iterator<ReplicationServerDomain> rsdi = this.getDomainIterator();
    if (rsdi != null)
    {
      while (rsdi.hasNext())
      {
        // process a domain
        ReplicationServerDomain rsd = rsdi.next();
        if (excludedServiceIDs.contains(rsd.getBaseDn()))
          continue;
        // for this domain, have the state in the replchangelog
        // where the last DraftCN update is
        ServerState domainServerStateForLastSeqnum;
        if ((domainsServerStateForLastSeqnum == null) ||
            (domainsServerStateForLastSeqnum.get(rsd.getBaseDn())==null))
        {
          domainServerStateForLastSeqnum = new ServerState();
        }
        else
        {
          domainServerStateForLastSeqnum =
            domainsServerStateForLastSeqnum.get(rsd.getBaseDn());
        }
        // Count the number of (eligible) changes from this place
        // to the eligible CN (cross server)
        long ec = rsd.getEligibleCount(
            domainServerStateForLastSeqnum, crossDomainEligibleCN);
        // the state from which we started is the one BEFORE the lastdraftCN
        // so we must decrement 1 to the EligibleCount
        if ((ec>0) && (DraftCNdbIsEmpty==false))
          ec--;
        // cumulates on domains
        lastDraftCN += ec;
        // DraftCN Db is empty and there are eligible updates in the replication
        // changelog then init first DraftCN
        if ((ec>0) && (firstDraftCN==0))
          firstDraftCN = 1;
      }
    }
    return new int[]{firstDraftCN, lastDraftCN};
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -212,6 +212,7 @@
  public void put(UpdateMsg update, ServerHandler sourceHandler)
    throws IOException
  {
    ChangeNumber cn = update.getChangeNumber();
    short id = cn.getServerId();
    sourceHandler.updateServerState(update);
@@ -3089,8 +3090,8 @@
  /**
   * This methods count the changes, server by server :
   * - from a start point (cn taken from the provided startState)
   * - to an end point (the provided endCN).
   * - from a serverState start point
   * - to (inclusive) an end point (the provided endCN).
   * @param startState The provided start server state.
   * @param endCN The provided end change number.
   * @return The number of changes between startState and endCN.
@@ -3116,12 +3117,14 @@
        try
        {
          ri = h.generateIterator(startState.getMaxChangeNumber(sid));
          startCN = ri.getChange().getChangeNumber();
          if (ri.next()==true)
          {
            startCN = ri.getChange().getChangeNumber();
          }
        }
        catch(Exception e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
          // no change found (purge from CL)
          startCN = null;
        }
        finally
@@ -3136,19 +3139,20 @@
        if (startCN != null)
        {
          // Set on the change related to the endCN
          ChangeNumber upperCN;
          ChangeNumber upperCN = null;
          try
          {
            // Build a changenumber for this very server, with the timestamp
            // of the endCN
            ChangeNumber f = new ChangeNumber(endCN.getTime(), 0, sid);
            ri = h.generateIterator(f);
            upperCN = ri.getChange().getChangeNumber();
            if (ri.next()==true)
            {
              upperCN = ri.getChange().getChangeNumber();
            }
          }
          catch(Exception e)
          {
            TRACER.debugCaught(DebugLogLevel.ERROR, e);
            // no new change
            upperCN = h.getLastChange();
          }
          finally
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -200,38 +200,104 @@
  @Test(enabled=true)
  public void ECLReplicationServerTest()
  {
    // --
    // First set of test are in the cookie mode
    // Test that private backend is excluded from ECL
    ECLOnPrivateBackend();replicationServer.clearDb();
    // Test remote API (ECL through replication protocol) with empty ECL
    ECLRemoteEmpty();replicationServer.clearDb();
    // Test with empty changelog
    ECLEmpty();replicationServer.clearDb();
    ECLAllOps();replicationServer.clearDb();
    ECLRemoteNonEmpty();replicationServer.clearDb();
    ECLTwoDomains();replicationServer.clearDb();
    ECLPsearch(true, false);replicationServer.clearDb();
    ECLPsearch(false, false);replicationServer.clearDb();
    ECLSimulPsearches();replicationServer.clearDb();
    // Test all types of ops.
    ECLAllOps(); // Do not clean the db for the next test
    // First and last should be ok whenever a request has been done or not
    // in compat mode.
    ECLCompatTestLimits(1,4);replicationServer.clearDb();
    // Test remote API (ECL through replication protocol) with NON empty ECL
    ECLRemoteNonEmpty();replicationServer.clearDb();
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();replicationServer.clearDb();
    // Persistent search with changesOnly request
    ECLPsearch(true, false);replicationServer.clearDb();
    // Persistent search with init values request
    ECLPsearch(false, false);replicationServer.clearDb();
    // Simultaneous psearches
    ECLSimultaneousPsearches();replicationServer.clearDb();
    // Test eligible count method.
    ECLGetEligibleCountTest();replicationServer.clearDb();
    // TODO:ECL Test SEARCH abandon and check everything shutdown and cleaned
    // TODO:ECL Test PSEARCH abandon and check everything shutdown and cleaned
    // TODO:ECL Test invalid DN in cookie returns UNWILLING + message
    // TODO:ECL Test notif control returned contains the cookie
    // TODO:ECL Test the attributes list and values returned in ECL entries
    // TODO:ECL Test search -s base, -s one
    // Test directly from the java obect that the changeTimeHeartbeatState
    // stored are ok.
    ChangeTimeHeartbeatTest();replicationServer.clearDb();
    // Test the different forms of filter that are parsed in order to
    // optimize the request.
    ECLFilterTest();
    // --
    // Second set of test are in the draft compat mode
    // Empty replication changelog
    ECLCompatEmpty();
    // Request from an invalid draft change number
    ECLCompatBadSeqnum();
    ECLCompatWriteReadAllOps(1);
    ECLCompatWriteReadAllOps(5);
    // Write changes and read ECL from start
    int ts = ECLCompatWriteReadAllOps(1);
    // Write additional changes and read ECL from a provided draft change number
    ts = ECLCompatWriteReadAllOps(5);
    // Test request from a provided change number
    ECLCompatReadFrom(6);
    // Test request from a provided change number interval
    ECLCompatReadFromTo(5,7);
    // Test first and last draft changenumber
    ECLCompatTestLimits(1,8);
    // Test first and last draft changenumber, a dd a new change, do not
    // search again the ECL, but search fro first and last
    ECLCompatTestLimitsAndAdd(1,8, ts);
    // Test DraftCNDb is purged when replication change log is purged
    ECLCompatPurge();
    // Test first and last are updated
    ECLCompatTestLimits(0,0);
    // Persistent search in changesOnly mode
    ECLPsearch(true, true);replicationServer.clearDb();
    // Persistent search in init + changes mode
    ECLPsearch(false, true);
    // Test Filter on replication csn
    // TODO: test with optimization when code done.
    ECLFilterOnReplicationCsn();replicationServer.clearDb();
    ECLSimulPsearches();replicationServer.clearDb();
    // Test simultaneous persistent searches in draft compat mode.
    ECLSimultaneousPsearches();replicationServer.clearDb();
    
  }
@@ -592,6 +658,21 @@
      assertTrue(entries != null);
      assertTrue(entries.size()==0);
      //
      // Test lastExternalChangelogCookie attribute of the ECL
      //
      /* FIXME: uncomment when fix available
      ExternalChangeLogSessionImpl session =
        new ExternalChangeLogSessionImpl(replicationServer);
      MultiDomainServerState expectedLastCookie =
        new MultiDomainServerState("o=test:;");
      MultiDomainServerState lastCookie = session.getLastCookie();
      assertTrue(expectedLastCookie.equalsTo(lastCookie),
          " ExpectedLastCookie=" + expectedLastCookie +
          " lastCookie=" + lastCookie);
      assertLastCookieEquals(tn, expectedLastCookie);
      */
      // Cleaning
      if (domain2 != null)
        MultimasterReplication.deleteDomain(baseDn2);
@@ -917,47 +998,8 @@
      assertTrue(expectedLastCookie.equalsTo(lastCookie),
          " ExpectedLastCookie=" + expectedLastCookie +
          " lastCookie=" + lastCookie);
      //
      LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>();
      lastcookieattribute.add("lastExternalChangelogCookie");
      searchOp = connection.processSearch(
          ByteString.valueOf(""),
          SearchScope.BASE_OBJECT,
          DereferencePolicy.NEVER_DEREF_ALIASES,
          0, // Size limit
          0, // Time limit
          false, // Types only
          LDAPFilter.decode("(objectclass=*)"),
          lastcookieattribute,
          NO_CONTROL,
          null);
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString()
          + searchOp.getAdditionalLogMessage());
      cookie = "";
      entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        for (SearchResultEntry resultEntry : entries)
        {
          debugInfo(tn, "Result entry=\n" + resultEntry.toLDIFString());
          ldifWriter.writeEntry(resultEntry);
          try
          {
            List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
            cookie = l.get(0).iterator().next().toString();
          }
          catch(NullPointerException e)
          {}
        }
      }
      assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
          " Expected last cookie attribute value:" + expectedLastCookie +
          " Read from server: " + cookie + " are equal :");
      assertLastCookieEquals(tn, expectedLastCookie);
      s1test.stop();
      s1test2.stop();
      s2test.stop();
@@ -973,6 +1015,61 @@
    debugInfo(tn, "Ending test successfully");
  }
  private void assertLastCookieEquals(String tn,
      MultiDomainServerState expectedLastCookie)
  {
    String cookie = "";
    LDIFWriter ldifWriter = getLDIFWriter();
    //
    LinkedHashSet<String> lastcookieattribute = new LinkedHashSet<String>();
    lastcookieattribute.add("lastExternalChangelogCookie");
    try
    {
    InternalSearchOperation searchOp =
     connection.processSearch(
        ByteString.valueOf(""),
        SearchScope.BASE_OBJECT,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, // Size limit
        0, // Time limit
        false, // Types only
        LDAPFilter.decode("(objectclass=*)"),
        lastcookieattribute,
        NO_CONTROL,
        null);
    assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
        searchOp.getErrorMessage().toString()
        + searchOp.getAdditionalLogMessage());
    LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
    if (entries != null)
    {
      for (SearchResultEntry resultEntry : entries)
      {
        ldifWriter.writeEntry(resultEntry);
        try
        {
          List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
          cookie = l.get(0).iterator().next().toString();
        }
        catch(NullPointerException e)
        {}
      }
    }
    }
    catch(Exception e)
    {
      fail("Ending test " + tn + " with exception:\n"
          +  stackTraceToSingleLineString(e));
    }
    assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
        " Expected last cookie attribute value:" + expectedLastCookie +
        " Read from server: " + cookie + " are equal :");
  }
  // simple update to be received
  private void ECLAllOps()
  {
@@ -1516,9 +1613,9 @@
  /**
   * Test parallel simultaneous psearch with different filters.
   */
  private void ECLSimulPsearches()
  private void ECLSimultaneousPsearches()
  {
    String tn = "ECLSimulPsearches";
    String tn = "ECLSimultaneousPsearches";
    debugInfo(tn, "Starting test \n\n");
    Socket s1, s2, s3 = null;
    boolean compatMode = false;
@@ -2304,10 +2401,11 @@
    }
  }
  private void ECLCompatWriteReadAllOps(int firstDraftChangeNumber)
  private int ECLCompatWriteReadAllOps(int firstDraftChangeNumber)
  {
    String tn = "ECLCompatWriteReadAllOps/" + String.valueOf(firstDraftChangeNumber);
    debugInfo(tn, "Starting test\n\n");
    int ts = 1;
    try
    {
@@ -2318,7 +2416,6 @@
          DN.decode(TEST_ROOT_DN_STRING), (short) 1201, 
          100, replicationServerPort,
          1000, true);
      int ts = 1;
      String user1entryUUID = "11111111-1112-1113-1114-111111111115";
      String baseUUID       = "22222222-2222-2222-2222-222222222222";
@@ -2583,6 +2680,7 @@
          +  stackTraceToSingleLineString(e));      
    }
    debugInfo(tn, "Ending test with success");
    return ts;
  }
  private void ECLCompatReadFrom(int firstDraftChangeNumber)
@@ -2987,4 +3085,134 @@
    }
    debugInfo(tn, "Ending test with success");
  }
  private void ECLCompatTestLimitsAndAdd(int expectedFirst, int expectedLast,
      int ts)
  {
    String tn = "ECLCompatTestLimitsAndAdd";
    debugInfo(tn, "Starting test\n\n");
    try
    {
      ECLCompatTestLimits(expectedFirst, expectedLast);
      // Creates broker on o=test
      ReplicationBroker server01 = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING), (short) 1201,
          100, replicationServerPort,
          1000, true);
      String user1entryUUID = "11111111-1112-1113-1114-111111111115";
      // Publish DEL
      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, (short)1201);
      DeleteMsg delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
            user1entryUUID);
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      ECLCompatTestLimits(expectedFirst, expectedLast+1);
      server01.stop();
    }
    catch(Exception e)
    {
      fail("Ending "+tn+" test with exception:\n"
          +  stackTraceToSingleLineString(e));
    }
    debugInfo(tn, "Ending test with success");
  }
  private void ECLGetEligibleCountTest()
  {
    String tn = "ECLGetEligibleCountTest";
    debugInfo(tn, "Starting test\n\n");
    String user1entryUUID = "11111111-1112-1113-1114-111111111115";
    try
    {
      // The replication changelog is empty
      ReplicationServerDomain rsdtest =
        replicationServer.getReplicationServerDomain(TEST_ROOT_DN_STRING, false);
      long count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
      assertEquals(count, 0);
      // Creates broker on o=test
      ReplicationBroker server01 = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING), (short) 1201,
          100, replicationServerPort,
          1000, true);
      // Publish 1 message
      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), 1, (short)1201);
      DeleteMsg delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1,
            user1entryUUID);
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
      assertEquals(count, 1);
      // Publish 1 message
      ChangeNumber cn2 = new ChangeNumber(TimeThread.getTime(), 2, (short)1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn2,
            user1entryUUID);
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      count = rsdtest.getEligibleCount(
          new ServerState(),
          new ChangeNumber(TimeThread.getTime(), 1, (short)1201));
      assertEquals(count, 2);
      count = rsdtest.getEligibleCount(
          new ServerState(),  cn1);
      assertEquals(count, 1);
      ServerState ss = new ServerState();
      ss.update(cn1);
      count = rsdtest.getEligibleCount(ss, cn1);
      assertEquals(count, 0);
      count = rsdtest.getEligibleCount(ss, cn2);
      assertEquals(count, 1);
      ss.update(cn2);
      count = rsdtest.getEligibleCount(ss,
          new ChangeNumber(TimeThread.getTime(), 4, (short)1201));
      assertEquals(count, 0);
      // Publish 1 message
      ChangeNumber cn3 = new ChangeNumber(TimeThread.getTime(), 3, (short)1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn3,
            user1entryUUID);
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      sleep(300);
      ss.update(cn2);
      count = rsdtest.getEligibleCount(ss,
          new ChangeNumber(TimeThread.getTime(), 4, (short)1201));
      assertEquals(count, 1);
      server01.stop();
    }
    catch(Exception e)
    {
      fail("Ending "+tn+" test with exception:\n"
          +  stackTraceToSingleLineString(e));
    }
    debugInfo(tn, "Ending test with success");
  }
}