mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -107,15 +107,13 @@
   */
  private class ScanSearchListener implements InternalSearchListener
  {
    private final ChangeNumber startingChangeNumber;
    private final ChangeNumber endChangeNumber;
    private final CSN startCSN;
    private final CSN endCSN;
    public ScanSearchListener(
        ChangeNumber startingChangeNumber,
        ChangeNumber endChangeNumber)
    public ScanSearchListener(CSN startCSN, CSN endCSN)
    {
      this.startingChangeNumber = startingChangeNumber;
      this.endChangeNumber = endChangeNumber;
      this.startCSN = startCSN;
      this.endCSN = endCSN;
    }
    @Override
@@ -123,20 +121,19 @@
        InternalSearchOperation searchOperation, SearchResultEntry searchEntry)
        throws DirectoryException
    {
      // Build the list of Operations that happened on this entry
      // after startingChangeNumber and before endChangeNumber and
      // add them to the replayOperations list
      // Build the list of Operations that happened on this entry after startCSN
      // and before endCSN and add them to the replayOperations list
      Iterable<FakeOperation> updates =
        EntryHistorical.generateFakeOperations(searchEntry);
      for (FakeOperation op : updates)
      {
        ChangeNumber cn = op.getChangeNumber();
        if ((cn.newer(startingChangeNumber)) && (cn.older(endChangeNumber)))
        CSN csn = op.getCSN();
        if (csn.newer(startCSN) && csn.older(endCSN))
        {
          synchronized (replayOperations)
          {
            replayOperations.put(cn, op);
            replayOperations.put(csn, op);
          }
        }
      }
@@ -184,7 +181,7 @@
  private volatile long generationId = -1;
  private volatile boolean generationIdSavedStatus = false;
  private final ChangeNumberGenerator generator;
  private final CSNGenerator generator;
  /**
   * This object is used to store the list of update currently being
@@ -222,8 +219,8 @@
   * This list is used to temporary store operations that needs to be replayed
   * at session establishment time.
   */
  private final SortedMap<ChangeNumber, FakeOperation> replayOperations =
    new TreeMap<ChangeNumber, FakeOperation>();
  private final SortedMap<CSN, FakeOperation> replayOperations =
    new TreeMap<CSN, FakeOperation>();
  /**
   * The isolation policy that this domain is going to use.
@@ -338,25 +335,25 @@
  /**
   * This configuration boolean indicates if this ReplicationDomain should log
   * ChangeNumbers.
   * CSNs.
   */
  private boolean logChangeNumber = false;
  private boolean logCSN = false;
  /**
   * This configuration integer indicates the time the domain keeps the
   * historical information necessary to solve conflicts.<br>
   * When a change stored in the historical part of the user entry has a date
   * (from its replication ChangeNumber) older than this delay, it is candidate
   * to be purged.
   * (from its replication CSN) older than this delay, it is candidate to be
   * purged.
   */
  private long histPurgeDelayInMilliSec = 0;
  /**
   * The last change number purged in this domain. Allows to have a continuous
   * purging process from one purge processing (task run) to the next one.
   * Values 0 when the server starts.
   * The last CSN purged in this domain. Allows to have a continuous purging
   * process from one purge processing (task run) to the next one. Values 0 when
   * the server starts.
   */
  private ChangeNumber lastChangeNumberPurgedFromHist = new ChangeNumber(0,0,0);
  private CSN lastCSNPurgedFromHist = new CSN(0,0,0);
  /**
   * The thread that periodically saves the ServerState of this
@@ -410,16 +407,14 @@
   */
  private class RSUpdater extends DirectoryThread
  {
    private final ChangeNumber startChangeNumber;
    private final CSN startCSN;
    protected RSUpdater(ChangeNumber replServerMaxChangeNumber)
    protected RSUpdater(CSN replServerMaxCSN)
    {
      super("Replica DS(" + serverId
          + ") missing change publisher for domain \""
          + baseDn.toString() + "\"");
      this.startChangeNumber = replServerMaxChangeNumber;
      this.startCSN = replServerMaxCSN;
    }
    /**
@@ -439,7 +434,7 @@
       */
      try
      {
        if (buildAndPublishMissingChanges(startChangeNumber, broker))
        if (buildAndPublishMissingChanges(startCSN, broker))
        {
          message = DEBUG_CHANGES_SENT.get();
          logError(message);
@@ -511,7 +506,7 @@
    this.isolationPolicy = configuration.getIsolationPolicy();
    this.configDn = configuration.dn();
    this.logChangeNumber = configuration.isLogChangenumber();
    this.logCSN = configuration.isLogChangenumber();
    this.updateToReplayQueue = updateToReplayQueue;
    this.histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
@@ -549,17 +544,17 @@
    /*
     * Create a new Persistent Server State that will be used to store
     * the last ChangeNumber seen from all LDAP servers in the topology.
     * the last CSN seen from all LDAP servers in the topology.
     */
    state = new PersistentServerState(baseDn, serverId, getServerState());
    flushThread = new ServerStateFlush();
    /*
     * ChangeNumberGenerator is used to create new unique ChangeNumbers
     * for each operation done on this replication domain.
     * CSNGenerator is used to create new unique CSNs for each operation done on
     * this replication domain.
     *
     * The generator time is adjusted to the time of the last CN received from
     * The generator time is adjusted to the time of the last CSN received from
     * remote other servers.
     */
    generator = getGenerator();
@@ -1766,9 +1761,9 @@
    {
      // There is no replication context attached to the operation
      // so this is not a replication operation.
      ChangeNumber changeNumber = generateChangeNumber(deleteOperation);
      CSN csn = generateCSN(deleteOperation);
      String modifiedEntryUUID = EntryHistorical.getEntryUUID(deletedEntry);
      ctx = new DeleteContext(changeNumber, modifiedEntryUUID);
      ctx = new DeleteContext(csn, modifiedEntryUUID);
      deleteOperation.setAttachment(SYNCHROCONTEXT, ctx);
      synchronized (replayOperations)
@@ -1779,10 +1774,10 @@
          replayOperations.remove(replayOperations.firstKey());
        }
        replayOperations.put(
            changeNumber,
            csn,
            new FakeDelOperation(
                deleteOperation.getEntryDN().toString(),
                changeNumber,modifiedEntryUUID ));
                csn, modifiedEntryUUID));
      }
    }
@@ -2017,7 +2012,7 @@
       */
      EntryHistorical hist = EntryHistorical.newInstanceFromEntry(
          modifyDNOperation.getOriginalEntry());
      if (hist.addedOrRenamedAfter(ctx.getChangeNumber()))
      if (hist.addedOrRenamedAfter(ctx.getCSN()))
      {
        return new SynchronizationProviderResult.StopProcessing(
            ResultCode.NO_OPERATION, null);
@@ -2027,7 +2022,7 @@
    {
      // There is no replication context attached to the operation
      // so this is not a replication operation.
      ChangeNumber changeNumber = generateChangeNumber(modifyDNOperation);
      CSN csn = generateCSN(modifyDNOperation);
      String newParentId = null;
      if (modifyDNOperation.getNewSuperior() != null)
      {
@@ -2036,7 +2031,7 @@
      Entry modifiedEntry = modifyDNOperation.getOriginalEntry();
      String modifiedEntryUUID = EntryHistorical.getEntryUUID(modifiedEntry);
      ctx = new ModifyDnContext(changeNumber, modifiedEntryUUID, newParentId);
      ctx = new ModifyDnContext(csn, modifiedEntryUUID, newParentId);
      modifyDNOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
    return new SynchronizationProviderResult.ContinueProcessing();
@@ -2052,8 +2047,7 @@
  public SynchronizationProviderResult handleConflictResolution(
         PreOperationModifyOperation modifyOperation)
  {
    if ((!modifyOperation.isSynchronizationOperation())
        && (!brokerIsConnected()))
    if (!modifyOperation.isSynchronizationOperation() && !brokerIsConnected())
    {
      Message msg = ERR_REPLICATION_COULD_NOT_CONNECT.get(baseDn.toString());
      return new SynchronizationProviderResult.StopProcessing(
@@ -2112,12 +2106,12 @@
    if (ctx == null)
    {
      // No replication ctx attached => not a replicated operation
      // - create a ctx with : changeNumber, entryUUID
      // - create a ctx with : CSN, entryUUID
      // - attach the context to the op
      ChangeNumber changeNumber = generateChangeNumber(modifyOperation);
      CSN csn = generateCSN(modifyOperation);
      String modifiedEntryUUID = EntryHistorical.getEntryUUID(modifiedEntry);
      ctx = new ModifyContext(changeNumber, modifiedEntryUUID);
      ctx = new ModifyContext(csn, modifiedEntryUUID);
      modifyOperation.setAttachment(SYNCHROCONTEXT, ctx);
    }
@@ -2172,7 +2166,7 @@
   */
  public void doPreOperation(PreOperationAddOperation addOperation)
  {
    AddContext ctx = new AddContext(generateChangeNumber(addOperation),
    AddContext ctx = new AddContext(generateCSN(addOperation),
        EntryHistorical.getEntryUUID(addOperation),
        findEntryUUID(addOperation.getEntryDN().getParentDNInSuffix()));
@@ -2189,11 +2183,11 @@
    ResultCode result = op.getResultCode();
    // Note that a failed non-replication operation might not have a change
    // number.
    ChangeNumber curChangeNumber = OperationContext.getChangeNumber(op);
    if ((curChangeNumber != null) && (logChangeNumber))
    CSN curCSN = OperationContext.getCSN(op);
    if (curCSN != null && logCSN)
    {
      op.addAdditionalLogItem(AdditionalLogItem.unquotedKeyValue(getClass(),
          "replicationCN", curChangeNumber));
          "replicationCSN", curCSN));
    }
    if (result == ResultCode.SUCCESS)
@@ -2203,12 +2197,12 @@
        numReplayedPostOpCalled++;
        try
        {
          remotePendingChanges.commit(curChangeNumber);
          remotePendingChanges.commit(curCSN);
        }
        catch  (NoSuchElementException e)
        {
          Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get(
              op.toString(), curChangeNumber.toString());
              op.toString(), curCSN.toString());
          logError(message);
          return;
        }
@@ -2225,7 +2219,7 @@
          * This is an operation type that we do not know about
          * It should never happen.
          */
          pendingChanges.remove(curChangeNumber);
          pendingChanges.remove(curCSN);
          Message message =
              ERR_UNKNOWN_TYPE.get(op.getOperationType().toString());
          logError(message);
@@ -2247,7 +2241,7 @@
        try
        {
          msg.encode();
          pendingChanges.commitAndPushCommittedChanges(curChangeNumber, msg);
          pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
        } catch (UnsupportedEncodingException e)
        {
          // will be caught at publish time.
@@ -2255,7 +2249,7 @@
        catch  (NoSuchElementException e)
        {
          Message message = ERR_OPERATION_NOT_FOUND_IN_PENDING.get(
              op.toString(), curChangeNumber.toString());
              op.toString(), curCSN.toString());
          logError(message);
          return;
        }
@@ -2293,9 +2287,9 @@
    {
      // Remove an unsuccessful non-replication operation from the pending
      // changes list.
      if (curChangeNumber != null)
      if (curCSN != null)
      {
        pendingChanges.remove(curChangeNumber);
        pendingChanges.remove(curCSN);
        pendingChanges.pushCommittedChanges();
      }
    }
@@ -2352,20 +2346,20 @@
       attrs, null);
     Entry entryToRename = null;
     ChangeNumber entryToRenameCN = null;
     CSN entryToRenameCSN = null;
     for (SearchResultEntry entry : searchOp.getSearchEntries())
     {
       EntryHistorical history = EntryHistorical.newInstanceFromEntry(entry);
       if (entryToRename == null)
       {
         entryToRename = entry;
         entryToRenameCN = history.getDNDate();
         entryToRenameCSN = history.getDNDate();
       }
       else if (!history.addedOrRenamedAfter(entryToRenameCN))
       else if (!history.addedOrRenamedAfter(entryToRenameCSN))
       {
         // this conflict is older than the previous, keep it.
         entryToRename = entry;
         entryToRenameCN = history.getDNDate();
         entryToRenameCSN = history.getDNDate();
       }
     }
@@ -2519,7 +2513,7 @@
    Operation op = null;
    boolean replayDone = false;
    boolean dependency = false;
    ChangeNumber changeNumber = null;
    CSN csn = null;
    int retryCount = 10;
    // Try replay the operation, then flush (replaying) any pending operation
@@ -2547,7 +2541,7 @@
          // are processed locally.
          op.addRequestControl(new LDAPControl(OID_MANAGE_DSAIT_CONTROL));
          changeNumber = OperationContext.getChangeNumber(op);
          csn = OperationContext.getCSN(op);
          op.run();
          ResultCode result = op.getResultCode();
@@ -2616,7 +2610,7 @@
              // the update became a dummy update and the result
              // of the conflict resolution phase is to do nothing.
              // however we still need to push this change to the serverState
              updateError(changeNumber);
              updateError(csn);
            }
            else
            {
@@ -2645,7 +2639,7 @@
          logError(message);
          numUnresolvedNamingConflicts.incrementAndGet();
          replayErrorMsg = message.toString();
          updateError(changeNumber);
          updateError(csn);
        }
      } catch (ASN1Exception e)
      {
@@ -2658,7 +2652,7 @@
        replayErrorMsg = logDecodingOperationError(msg, e);
      } catch (Exception e)
      {
        if (changeNumber != null)
        if (csn != null)
        {
          /*
           * An Exception happened during the replay process.
@@ -2670,7 +2664,7 @@
            stackTraceToSingleLineString(e), op.toString());
          logError(message);
          replayErrorMsg = message.toString();
          updateError(changeNumber);
          updateError(csn);
        } else
        {
          replayErrorMsg = logDecodingOperationError(msg, e);
@@ -2691,7 +2685,7 @@
      // Prepare restart of loop
      replayDone = false;
      dependency = false;
      changeNumber = null;
      csn = null;
      retryCount = 10;
    } while (msg != null);
@@ -2711,13 +2705,13 @@
   * It is necessary because the postOperation does not always get
   * called when error or Exceptions happen during the operation replay.
   *
   * @param changeNumber the ChangeNumber of the operation with error.
   * @param csn the CSN of the operation with error.
   */
  public void updateError(ChangeNumber changeNumber)
  public void updateError(CSN csn)
  {
    try
    {
      remotePendingChanges.commit(changeNumber);
      remotePendingChanges.commit(csn);
    }
    catch (NoSuchElementException e)
    {
@@ -2726,21 +2720,20 @@
      if (debugEnabled())
      {
        TRACER.debugInfo(
                "LDAPReplicationDomain.updateError: Unable to find remote "
                    + "pending change for change number %s",
                changeNumber);
            "LDAPReplicationDomain.updateError: Unable to find remote "
                + "pending change for CSN %s", csn);
      }
    }
  }
  /**
   * Generate a new change number and insert it in the pending list.
   * Generate a new CSN and insert it in the pending list.
   *
   * @param operation The operation for which the change number must be
   *                  generated.
   * @return The new change number.
   * @param operation
   *          The operation for which the CSN must be generated.
   * @return The new CSN.
   */
  private ChangeNumber generateChangeNumber(PluginOperation operation)
  private CSN generateCSN(PluginOperation operation)
  {
    return pendingChanges.putLocalOperation(operation);
  }
@@ -2909,7 +2902,7 @@
      // The other type of errors can not be caused by naming conflicts.
      // Log a message for the repair tool.
      Message message = ERR_ERROR_REPLAYING_OPERATION.get(
          op.toString(), ctx.getChangeNumber().toString(),
          op.toString(), ctx.getCSN().toString(),
          result.toString(), op.getErrorMessage().toString());
      logError(message);
      return true;
@@ -2982,7 +2975,7 @@
     // The other type of errors can not be caused by naming conflicts.
     // Log a message for the repair tool.
     Message message = ERR_ERROR_REPLAYING_OPERATION.get(
         op.toString(), ctx.getChangeNumber().toString(),
         op.toString(), ctx.getCSN().toString(),
         result.toString(), op.getErrorMessage().toString());
     logError(message);
     return true;
@@ -3103,7 +3096,7 @@
    // The other type of errors can not be caused by naming conflicts.
    // Log a message for the repair tool.
    Message message = ERR_ERROR_REPLAYING_OPERATION.get(
        op.toString(), ctx.getChangeNumber().toString(),
        op.toString(), ctx.getCSN().toString(),
        result.toString(), op.getErrorMessage().toString());
    logError(message);
    return true;
@@ -3202,7 +3195,7 @@
      // The other type of errors can not be caused by naming conflicts.
      // log a message for the repair tool.
      Message message = ERR_ERROR_REPLAYING_OPERATION.get(
          op.toString(), ctx.getChangeNumber().toString(),
          op.toString(), ctx.getCSN().toString(),
          result.toString(), op.getErrorMessage().toString());
      logError(message);
      return true;
@@ -3493,7 +3486,7 @@
    state.clearInMemory();
    state.loadState();
    generator.adjust(state.getMaxChangeNumber(serverId));
    generator.adjust(state.getMaxCSN(serverId));
    // Retrieves the generation ID associated with the data imported
    generationId = loadGenerationId();
@@ -3909,8 +3902,7 @@
    {
      String includeAttributeStrings[] =
        {"objectclass", "sn", "cn", "entryuuid"};
      HashSet<AttributeType> includeAttributes;
      includeAttributes = new HashSet<AttributeType>();
      Set<AttributeType> includeAttributes = new HashSet<AttributeType>();
      for (String attrName : includeAttributeStrings)
      {
        AttributeType attrType  = DirectoryServer.getAttributeType(attrName);
@@ -4213,13 +4205,13 @@
  /**
   * Push the modifications contained in the given parameter as
   * a modification that would happen on a local server.
   * The modifications are not applied to the local database,
   * historical information is not updated but a ChangeNumber
   * is generated and the ServerState associated to this domain is
   * updated.
   * @param modifications The modification to push
   * Push the modifications contained in the given parameter as a modification
   * that would happen on a local server. The modifications are not applied to
   * the local database, historical information is not updated but a CSN is
   * generated and the ServerState associated to this domain is updated.
   *
   * @param modifications
   *          The modification to push
   */
  public void synchronizeModifications(List<Modification> modifications)
  {
@@ -4231,8 +4223,8 @@
                          modifications);
    LocalBackendModifyOperation localOp = new LocalBackendModifyOperation(op);
    ChangeNumber cn = generateChangeNumber(localOp);
    OperationContext ctx = new ModifyContext(cn, "schema");
    CSN csn = generateCSN(localOp);
    OperationContext ctx = new ModifyContext(csn, "schema");
    localOp.setAttachment(SYNCHROCONTEXT, ctx);
    localOp.setResultCode(ResultCode.SUCCESS);
    synchronize(localOp);
@@ -4290,7 +4282,7 @@
         ReplicationDomainCfg configuration)
  {
    isolationPolicy = configuration.getIsolationPolicy();
    logChangeNumber = configuration.isLogChangenumber();
    logCSN = configuration.isLogChangenumber();
    histPurgeDelayInMilliSec =
      configuration.getConflictsHistoricalPurgeDelay()*60*1000;
@@ -4400,9 +4392,7 @@
  {
    try
    {
      DN eclConfigEntryDN = DN.decode(
          "cn=external changeLog," + configDn);
      DN eclConfigEntryDN = DN.decode("cn=external changeLog," + configDn);
      if (DirectoryServer.getConfigHandler().entryExists(eclConfigEntryDN))
      {
        DirectoryServer.getConfigHandler().deleteEntry(eclConfigEntryDN, null);
@@ -4558,8 +4548,7 @@
       * Check that the ReplicationServer has seen all our previous
       * changes.
       */
      ChangeNumber replServerMaxChangeNumber =
        replicationServerState.getChangeNumber(serverId);
      CSN replServerMaxCSN = replicationServerState.getCSN(serverId);
      // we don't want to update from here (a DS) an empty RS because
      // normally the RS should have been updated by other RSes except for
@@ -4567,17 +4556,14 @@
      // ... hence the RS we are connected to should not be empty
      // ... or if it is empty, it is due to a voluntary reset
      // and we don't want to update it with our changes that could be huge.
      if ((replServerMaxChangeNumber != null) &&
          (replServerMaxChangeNumber.getSeqnum()!=0))
      if (replServerMaxCSN != null && replServerMaxCSN.getSeqnum() != 0)
      {
        ChangeNumber ourMaxChangeNumber = state.getMaxChangeNumber(serverId);
        if ((ourMaxChangeNumber != null) &&
            (!ourMaxChangeNumber.olderOrEqual(replServerMaxChangeNumber)))
        CSN ourMaxCSN = state.getMaxCSN(serverId);
        if (ourMaxCSN != null && !ourMaxCSN.olderOrEqual(replServerMaxCSN))
        {
          pendingChanges.setRecovering(true);
          broker.setRecoveryRequired(true);
          new RSUpdater(replServerMaxChangeNumber).start();
          new RSUpdater(replServerMaxCSN).start();
        }
      }
    } catch (Exception e)
@@ -4590,31 +4576,27 @@
  }
  /**
   * Build the list of changes that have been processed by this server
   * after the ChangeNumber given as a parameter and publish them
   * using the given session.
   * Build the list of changes that have been processed by this server after the
   * CSN given as a parameter and publish them using the given session.
   *
   * @param startingChangeNumber  The ChangeNumber where we need to start the
   *                              search
   * @param session               The session to use to publish the changes
   *
   * @return                      A boolean indicating he success of the
   *                              operation.
   * @throws Exception            if an Exception happens during the search.
   * @param startCSN
   *          The CSN where we need to start the search
   * @param session
   *          The session to use to publish the changes
   * @return A boolean indicating he success of the operation.
   * @throws Exception
   *           if an Exception happens during the search.
   */
  public boolean buildAndPublishMissingChanges(
      ChangeNumber startingChangeNumber,
      ReplicationBroker session)
      throws Exception
  public boolean buildAndPublishMissingChanges(CSN startCSN,
      ReplicationBroker session) throws Exception
  {
    // Trim the changes in replayOperations that are older than
    // the startingChangeNumber.
    // Trim the changes in replayOperations that are older than the startCSN.
    synchronized (replayOperations)
    {
      Iterator<ChangeNumber> it = replayOperations.keySet().iterator();
      Iterator<CSN> it = replayOperations.keySet().iterator();
      while (it.hasNext())
      {
        if (it.next().olderOrEqual(startingChangeNumber))
        if (it.next().olderOrEqual(startCSN))
        {
          it.remove();
        }
@@ -4625,9 +4607,9 @@
      }
    }
    ChangeNumber lastRetrievedChange;
    CSN lastRetrievedChange;
    InternalSearchOperation op;
    ChangeNumber currentStartChangeNumber = startingChangeNumber;
    CSN currentStartCSN = startCSN;
    do
    {
      lastRetrievedChange = null;
@@ -4637,18 +4619,15 @@
      // So we search by interval of 10 seconds
      // and store the results in the replayOperations list
      // so that they are sorted before sending them.
      long missingChangesDelta = currentStartChangeNumber.getTime() + 10000;
      ChangeNumber endChangeNumber =
        new ChangeNumber(
            missingChangesDelta, 0xffffffff, serverId);
      long missingChangesDelta = currentStartCSN.getTime() + 10000;
      CSN endCSN = new CSN(missingChangesDelta, 0xffffffff, serverId);
      ScanSearchListener listener =
        new ScanSearchListener(currentStartChangeNumber, endChangeNumber);
      op = searchForChangedEntries(
          baseDn, currentStartChangeNumber, endChangeNumber, listener);
        new ScanSearchListener(currentStartCSN, endCSN);
      op = searchForChangedEntries(baseDn, currentStartCSN, endCSN, listener);
      // Publish and remove all the changes from the replayOperations list
      // that are older than the endChangeNumber.
      // that are older than the endCSN.
      List<FakeOperation> opsToSend = new LinkedList<FakeOperation>();
      synchronized (replayOperations)
      {
@@ -4656,10 +4635,10 @@
        while (itOp.hasNext())
        {
          FakeOperation fakeOp = itOp.next();
          if ((fakeOp.getChangeNumber().olderOrEqual(endChangeNumber))
              && state.cover(fakeOp.getChangeNumber()))
          if ((fakeOp.getCSN().olderOrEqual(endCSN))
              && state.cover(fakeOp.getCSN()))
          {
            lastRetrievedChange = fakeOp.getChangeNumber();
            lastRetrievedChange = fakeOp.getCSN();
            opsToSend.add(fakeOp);
            itOp.remove();
          }
@@ -4677,11 +4656,11 @@
      opsToSend.clear();
      if (lastRetrievedChange != null)
      {
        currentStartChangeNumber = lastRetrievedChange;
        currentStartCSN = lastRetrievedChange;
      }
      else
      {
        currentStartChangeNumber = endChangeNumber;
        currentStartCSN = endCSN;
      }
    } while (pendingChanges.recoveryUntil(lastRetrievedChange) &&
@@ -4692,47 +4671,48 @@
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The ChangeNumber from which we want the changes
   * @param lastChangeNumber The max ChangeNumber that the search should return
   * @param resultListener   The listener that will process the entries returned
   * Search for the changes that happened since fromCSN based on the historical
   * attribute. The only changes that will be send will be the one generated on
   * the serverId provided in fromCSN.
   *
   * @param baseDn
   *          the base DN
   * @param fromCSN
   *          The CSN from which we want the changes
   * @param lastCSN
   *          The max CSN that the search should return
   * @param resultListener
   *          The listener that will process the entries returned
   * @return the internal search operation
   * @throws Exception when raised.
   * @throws Exception
   *           when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    ChangeNumber lastChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  public static InternalSearchOperation searchForChangedEntries(DN baseDn,
      CSN fromCSN, CSN lastCSN, InternalSearchListener resultListener)
      throws Exception
  {
    InternalClientConnection conn =
      InternalClientConnection.getRootConnection();
    Integer serverId = fromChangeNumber.getServerId();
    Integer serverId = fromCSN.getServerId();
    String maxValueForId;
    if (lastChangeNumber == null)
    if (lastCSN == null)
    {
      maxValueForId = "ffffffffffffffff" + String.format("%04x", serverId)
                      + "ffffffff";
    }
    else
    {
      maxValueForId = lastChangeNumber.toString();
      maxValueForId = lastCSN.toString();
    }
    LDAPFilter filter = LDAPFilter.decode(
       "(&(" + EntryHistorical.HISTORICAL_ATTRIBUTE_NAME + ">=dummy:"
       + fromChangeNumber + ")(" + EntryHistorical.HISTORICAL_ATTRIBUTE_NAME +
       "<=dummy:" + maxValueForId + "))");
        "(&(" + HISTORICAL_ATTRIBUTE_NAME + ">=dummy:" + fromCSN + ")" +
          "(" + HISTORICAL_ATTRIBUTE_NAME + "<=dummy:" + maxValueForId + "))");
    Set<String> attrs = new LinkedHashSet<String>(1);
    attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME);
    attrs.add(EntryHistorical.ENTRYUUID_ATTRIBUTE_NAME);
    Set<String> attrs = new LinkedHashSet<String>(3);
    attrs.add(HISTORICAL_ATTRIBUTE_NAME);
    attrs.add(ENTRYUUID_ATTRIBUTE_NAME);
    attrs.add("*");
    return conn.processSearch(
      ByteString.valueOf(baseDn.toString()),
@@ -4744,24 +4724,24 @@
  }
  /**
   * Search for the changes that happened since fromChangeNumber
   * based on the historical attribute. The only changes that will
   * be send will be the one generated on the serverId provided in
   * fromChangeNumber.
   * @param baseDn the base DN
   * @param fromChangeNumber The change number from which we want the changes
   * @param resultListener that will process the entries returned.
   * Search for the changes that happened since fromCSN based on the historical
   * attribute. The only changes that will be send will be the one generated on
   * the serverId provided in fromCSN.
   *
   * @param baseDn
   *          the base DN
   * @param fromCSN
   *          The CSN from which we want the changes
   * @param resultListener
   *          that will process the entries returned.
   * @return the internal search operation
   * @throws Exception when raised.
   * @throws Exception
   *           when raised.
   */
  public static InternalSearchOperation searchForChangedEntries(
    DN baseDn,
    ChangeNumber fromChangeNumber,
    InternalSearchListener resultListener)
    throws Exception
  public static InternalSearchOperation searchForChangedEntries(DN baseDn,
      CSN fromCSN, InternalSearchListener resultListener) throws Exception
  {
    return searchForChangedEntries(
        baseDn, fromChangeNumber, null, resultListener);
    return searchForChangedEntries(baseDn, fromCSN, null, resultListener);
  }
@@ -4778,7 +4758,6 @@
  public long countEntries() throws DirectoryException
  {
    Backend backend = retrievesBackend(baseDn);
    if (!backend.supportsLDIFExport())
    {
      Message message = ERR_INIT_EXPORT_NOT_SUPPORTED.get(
@@ -5515,24 +5494,24 @@
     TRACER.debugInfo("[PURGE] purgeConflictsHistorical "
         + "on domain: " + baseDn
         + "endDate:" + new Date(endDate)
         + "lastChangeNumberPurgedFromHist: "
         + lastChangeNumberPurgedFromHist.toStringUI());
         + "lastCSNPurgedFromHist: "
         + lastCSNPurgedFromHist.toStringUI());
     LDAPFilter filter = null;
     try
     {
       filter = LDAPFilter.decode(
         "(" + EntryHistorical.HISTORICAL_ATTRIBUTE_NAME + ">=dummy:"
         + lastChangeNumberPurgedFromHist + ")");
         + lastCSNPurgedFromHist + ")");
     } catch (LDAPException e)
     {
       // Not possible. We know the filter just above is correct.
     }
     Set<String> attrs = new LinkedHashSet<String>(1);
     attrs.add(EntryHistorical.HISTORICAL_ATTRIBUTE_NAME);
     attrs.add(EntryHistorical.ENTRYUUID_ATTRIBUTE_NAME);
     Set<String> attrs = new LinkedHashSet<String>(3);
     attrs.add(HISTORICAL_ATTRIBUTE_NAME);
     attrs.add(ENTRYUUID_ATTRIBUTE_NAME);
     attrs.add("*");
     InternalSearchOperation searchOp =  conn.processSearch(
         ByteString.valueOf(baseDn.toString()),
@@ -5544,7 +5523,7 @@
     int count = 0;
     if (task != null)
       task.setProgressStats(lastChangeNumberPurgedFromHist, count);
       task.setProgressStats(lastCSNPurgedFromHist, count);
     for (SearchResultEntry entry : searchOp.getSearchEntries())
     {
@@ -5556,7 +5535,7 @@
       }
       EntryHistorical entryHist = EntryHistorical.newInstanceFromEntry(entry);
       lastChangeNumberPurgedFromHist = entryHist.getOldestCN();
       lastCSNPurgedFromHist = entryHist.getOldestCSN();
       entryHist.setPurgeDelay(this.histPurgeDelayInMilliSec);
       Attribute attr = entryHist.encodeAndPurge();
       count += entryHist.getLastPurgedValuesCount();
@@ -5583,7 +5562,7 @@
       }
       else if (task != null)
       {
         task.setProgressStats(lastChangeNumberPurgedFromHist, count);
         task.setProgressStats(lastCSNPurgedFromHist, count);
       }
     }
  }