mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.51.2013 84cf626ebcae1b535abe9efd3eed5cdf78bdd319
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,7 +41,10 @@
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.api.CNIndexData;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
@@ -71,7 +74,7 @@
  /**
   * Specifies the last changer number requested.
   */
  private int lastChangeNumber = 0;
  private long lastChangeNumber = 0;
  /**
   * Specifies whether the change number db has been read until its end.
   */
@@ -522,7 +525,7 @@
   * @throws DirectoryException
   *           When an error is raised.
   */
  private void initializeCLSearchFromChangeNumber(int startChangeNumber)
  private void initializeCLSearchFromChangeNumber(long startChangeNumber)
      throws DirectoryException
  {
    try
@@ -535,13 +538,13 @@
    catch(DirectoryException de)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, de);
      releaseIterator();
      releaseCursor();
      throw de;
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      releaseIterator();
      releaseCursor();
      throw new DirectoryException(
          ResultCode.OPERATIONS_ERROR,
          Message.raw(Category.SYNC,
@@ -561,9 +564,8 @@
   * @throws DirectoryException
   *           if a database problem occurred
   */
  private String findCookie(final int startChangeNumber)
      throws ChangelogException,
      DirectoryException
  private String findCookie(final long startChangeNumber)
      throws ChangelogException, DirectoryException
  {
    final ChangeNumberIndexDB cnIndexDB =
        replicationServer.getChangeNumberIndexDB();
@@ -581,9 +583,9 @@
        return null;
      }
      final long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
      final String crossDomainStartState =
          cnIndexDB.getPreviousCookie(firstChangeNumber);
      final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData();
      final long firstChangeNumber = firstCNData.getChangeNumber();
      final String crossDomainStartState = firstCNData.getPreviousCookie();
      cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
      return crossDomainStartState;
    }
@@ -591,11 +593,11 @@
    // Request filter DOES contain a startChangeNumber
    // Read the draftCNDb to see whether it contains startChangeNumber
    String crossDomainStartState =
        cnIndexDB.getPreviousCookie(startChangeNumber);
    if (crossDomainStartState != null)
    CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber);
    if (startCNData != null)
    {
      // found the provided startChangeNumber, let's return it
      final String crossDomainStartState = startCNData.getPreviousCookie();
      cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
      return crossDomainStartState;
    }
@@ -615,9 +617,10 @@
    // the DB, let's use the lower limit.
    if (startChangeNumber < firstChangeNumber)
    {
      crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber);
      if (crossDomainStartState != null)
      CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber);
      if (firstCNData != null)
      {
        final String crossDomainStartState = firstCNData.getPreviousCookie();
        cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
        return crossDomainStartState;
      }
@@ -636,8 +639,9 @@
        return null;
      }
      final long lastKey = cnIndexDB.getLastChangeNumber();
      crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
      final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData();
      final long lastKey = lastCNData.getChangeNumber();
      final String crossDomainStartState = lastCNData.getPreviousCookie();
      cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
      return crossDomainStartState;
@@ -897,7 +901,7 @@
  {
    if (debugEnabled())
      TRACER.debugInfo(this + " shutdown()");
    releaseIterator();
    releaseCursor();
    for (DomainContext domainCtxt : domainCtxts) {
      if (!domainCtxt.unRegisterHandler()) {
        logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -910,7 +914,7 @@
    domainCtxts = null;
  }
  private void releaseIterator()
  private void releaseCursor()
  {
    if (this.cnIndexDBCursor != null)
    {
@@ -1256,13 +1260,10 @@
        oldestContext.currentState.update(
            change.getUpdateMsg().getCSN());
        if (oldestContext.currentState.cover(oldestContext.stopState))
        {
          oldestContext.active = false;
        }
        if (draftCompat
            && lastChangeNumber > 0
            && change.getChangeNumber() > lastChangeNumber)
        if (oldestContext.currentState.cover(oldestContext.stopState)
            || (draftCompat
                && lastChangeNumber > 0
                && change.getChangeNumber() > lastChangeNumber))
        {
          oldestContext.active = false;
        }
@@ -1278,8 +1279,9 @@
      if (searchPhase == PERSISTENT_PHASE)
      {
        if (debugEnabled())
          clDomCtxtsToString("In getNextECLUpdate (persistent): " +
          "looking for the generalized oldest change");
          TRACER.debugInfo(clDomCtxtsToString(
              "In getNextECLUpdate (persistent): "
                  + "looking for the generalized oldest change"));
        for (DomainContext domainCtxt : domainCtxts) {
          domainCtxt.getNextEligibleMessageForDomain(operationId);
@@ -1300,7 +1302,7 @@
          if (draftCompat)
          {
            assignNewDraftCNAndStore(change);
            assignNewChangeNumberAndStore(change);
          }
          oldestChange = change;
        }
@@ -1317,21 +1319,19 @@
    if (oldestChange != null)
    {
      final CSN csn = oldestChange.getUpdateMsg().getCSN();
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
          + oldestChange.getUpdateMsg().getCSN());
        TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
      // Update the current state
      previousCookie.update(
          oldestChange.getBaseDN(),
          oldestChange.getUpdateMsg().getCSN());
      previousCookie.update(oldestChange.getBaseDN(), csn);
      // Set the current value of global state in the returned message
      oldestChange.setCookie(previousCookie);
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
                oldestChange);
        TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
            + oldestChange);
    }
    return oldestChange;
@@ -1370,14 +1370,15 @@
      if (isEndOfCNIndexDBReached)
      {
        // we are at the end of the DraftCNdb in the append mode
        assignNewDraftCNAndStore(oldestChange);
        assignNewChangeNumberAndStore(oldestChange);
        return true;
      }
      // the next change from the CNIndexDB
      CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN();
      String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN();
      final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData();
      final CSN csnFromDraftCNDb = cnIndexData.getCSN();
      final String dnFromDraftCNDb = cnIndexData.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("assignChangeNumber() generating change number "
@@ -1392,10 +1393,10 @@
      {
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber()
              + " assigning changeNumber=" + cnIndexData.getChangeNumber()
              + " to change=" + oldestChange);
        oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber());
        oldestChange.setChangeNumber(cnIndexData.getChangeNumber());
        return true;
      }
@@ -1429,8 +1430,8 @@
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() generating change number has"
              + "skipped to  changeNumber=" + cnIndexDBCursor.getChangeNumber()
              + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?"
              + "skipped to  changeNumber=" + cnIndexData.getChangeNumber()
              + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?"
              + isEndOfCNIndexDBReached);
      }
      catch (ChangelogException e)
@@ -1452,7 +1453,7 @@
    return sameDN && sameCSN;
  }
  private void assignNewDraftCNAndStore(ECLUpdateMsg change)
  private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
      throws DirectoryException, ChangelogException
  {
    // generate a new change number and assign to this change
@@ -1460,11 +1461,11 @@
    // store in CNIndexDB the pair
    // (change number of the current change, state before this change)
    replicationServer.getChangeNumberIndexDB().add(
    replicationServer.getChangeNumberIndexDB().add(new CNIndexData(
        change.getChangeNumber(),
        previousCookie.toString(),
        change.getBaseDN(),
        change.getUpdateMsg().getCSN());
        change.getUpdateMsg().getCSN()));
  }
  /**
@@ -1499,7 +1500,7 @@
    }
    // End of INIT_PHASE => always release the iterator
    releaseIterator();
    releaseCursor();
  }
  /**