mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
04.43.2013 fe9b2994e042be96cb148e52fc58653fcf09aa9d
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -69,14 +69,13 @@
  private boolean draftCompat = false;
  /**
   * Specifies the last draft changer number (seqnum) requested.
   * Specifies the last changer number requested.
   */
  private int lastDraftCN = 0;
  private int lastChangeNumber = 0;
  /**
   * Specifies whether the draft change number (seqnum) db has been read until
   * its end.
   * Specifies whether the change number db has been read until its end.
   */
  private boolean isEndOfDraftCNReached = false;
  private boolean isEndOfCNIndexDBReached = false;
  /**
   * Specifies whether the current search has been requested to be persistent
   * or not.
@@ -118,8 +117,8 @@
           "[" +
           "[draftCompat=" + draftCompat +
           "] [persistent=" + isPersistent +
           "] [lastDraftCN=" + lastDraftCN +
           "] [isEndOfDraftCNReached=" + isEndOfDraftCNReached +
           "] [startChangeNumber=" + lastChangeNumber +
           "] [isEndOfDraftCNReached=" + isEndOfCNIndexDBReached +
           "] [searchPhase=" + searchPhase +
           "] [startCookie=" + startCookie +
           "] [previousCookie=" + previousCookie +
@@ -516,18 +515,21 @@
  }
  /**
   * Initialize the handler from a provided draft first change number.
   * @param startDraftCN The provided draft first change number.
   * @throws DirectoryException When an error is raised.
   * Initialize the handler from a provided first change number.
   *
   * @param startChangeNumber
   *          The provided first change number.
   * @throws DirectoryException
   *           When an error is raised.
   */
  private void initializeCLSearchFromDraftCN(int startDraftCN)
  private void initializeCLSearchFromChangeNumber(int startChangeNumber)
      throws DirectoryException
  {
    try
    {
      this.draftCompat = true;
      final String providedCookie = findCookie(startDraftCN);
      final String providedCookie = findCookie(startChangeNumber);
      initializeChangelogDomainCtxts(providedCookie, true);
    }
    catch(DirectoryException de)
@@ -548,91 +550,93 @@
  }
  /**
   * Finds in the draft changelog DB the cookie corresponding to the passed in
   * startDraftCN.
   * Finds in the {@link ChangeNumberIndexDB} the cookie corresponding to the
   * passed in startChangeNumber.
   *
   * @param startDraftCN
   *          the start draftCN coming from the request filter.
   * @return the cookie corresponding to the passed in startDraftCN.
   * @param startChangeNumber
   *          the start change number coming from the request filter.
   * @return the cookie corresponding to the passed in startChangeNumber.
   * @throws Exception
   *           if a database problem occurred
   * @throws DirectoryException
   *           if a database problem occurred
   */
  private String findCookie(final int startDraftCN) throws ChangelogException,
  private String findCookie(final int startChangeNumber)
      throws ChangelogException,
      DirectoryException
  {
    final ChangeNumberIndexDB cnIndexDB =
        replicationServer.getChangeNumberIndexDB();
    if (startDraftCN <= 1)
    if (startChangeNumber <= 1)
    {
      // Request filter DOES NOT contain any firstDraftCN
      // So we'll generate from the first DraftCN in the DraftCNdb
      // Request filter DOES NOT contain any first change number
      // So we'll generate from the first change number in the DraftCNdb
      if (cnIndexDB.isEmpty())
      {
        // FIXME JNR if we find a way to make draftCNDb.isEmpty() a non costly
        // operation, then I think we can move this check to the top of this
        // method
        isEndOfDraftCNReached = true;
        isEndOfCNIndexDBReached = true;
        return null;
      }
      final int firstDraftCN = cnIndexDB.getFirstDraftCN();
      final int firstChangeNumber = cnIndexDB.getFirstChangeNumber();
      final String crossDomainStartState =
          cnIndexDB.getPreviousCookie(firstDraftCN);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(firstDraftCN);
          cnIndexDB.getPreviousCookie(firstChangeNumber);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
      return crossDomainStartState;
    }
    // Request filter DOES contain a startDraftCN
    // Request filter DOES contain a startChangeNumber
    // Read the draftCNDb to see whether it contains startDraftCN
    String crossDomainStartState = cnIndexDB.getPreviousCookie(startDraftCN);
    // Read the draftCNDb to see whether it contains startChangeNumber
    String crossDomainStartState =
        cnIndexDB.getPreviousCookie(startChangeNumber);
    if (crossDomainStartState != null)
    {
      // found the provided startDraftCN, let's return it
      cnIndexDBCursor = cnIndexDB.getCursorFrom(startDraftCN);
      // found the provided startChangeNumber, let's return it
      cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
      return crossDomainStartState;
    }
    // startDraftCN provided in the request IS NOT in the DraftCNDb
    // startChangeNumber provided in the request IS NOT in the DraftCNDb
    /*
     * Get the draftLimits (from the eligibleCSN got at the beginning of the
     * operation) in order to have the first and possible last DraftCN.
     * operation) in order to have the first and possible last change number.
     */
    final int[] limits =
        replicationServer.getECLDraftCNLimits(eligibleCSN, excludedBaseDNs);
    final int firstDraftCN = limits[0];
    final int lastDraftCN = limits[1];
    final int[] limits = replicationServer.getECLChangeNumberLimits(
        eligibleCSN, excludedBaseDNs);
    final int firstChangeNumber = limits[0];
    final int lastChangeNumber = limits[1];
    // If the startDraftCN provided is lower than the first Draft CN in
    // If the startChangeNumber provided is lower than the firstChangeNumber in
    // the DB, let's use the lower limit.
    if (startDraftCN < firstDraftCN)
    if (startChangeNumber < firstChangeNumber)
    {
      crossDomainStartState = cnIndexDB.getPreviousCookie(firstDraftCN);
      crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber);
      if (crossDomainStartState != null)
      {
        cnIndexDBCursor = cnIndexDB.getCursorFrom(firstDraftCN);
        cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
        return crossDomainStartState;
      }
      // This should not happen
      isEndOfDraftCNReached = true;
      isEndOfCNIndexDBReached = true;
      return null;
    }
    else if (startDraftCN <= lastDraftCN)
    else if (startChangeNumber <= lastChangeNumber)
    {
      // startDraftCN is between first and potential last and has never
      // startChangeNumber is between first and potential last and has never
      // been returned yet
      if (cnIndexDB.isEmpty())
      {
        isEndOfDraftCNReached = true;
        isEndOfCNIndexDBReached = true;
        return null;
      }
      final int lastKey = cnIndexDB.getLastDraftCN();
      final int lastKey = cnIndexDB.getLastChangeNumber();
      crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
      cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
      return crossDomainStartState;
@@ -641,7 +645,7 @@
      // this may be very long. Work on perf improvement here.
    }
    // startDraftCN is greater than the potential last DraftCN
    // startChangeNumber is greater than the potential lastChangeNumber
    throw new DirectoryException(ResultCode.SUCCESS, Message.raw(""));
  }
@@ -999,7 +1003,7 @@
    this.operationId = startECLSessionMsg.getOperationId();
    isPersistent  = startECLSessionMsg.isPersistent();
    lastDraftCN   = startECLSessionMsg.getLastDraftChangeNumber();
    lastChangeNumber = startECLSessionMsg.getLastChangeNumber();
    searchPhase   = INIT_PHASE;
    try
    {
@@ -1117,9 +1121,9 @@
    {
      initializeCLSearchFromGenState(msg.getCrossDomainServerState());
    }
    else if (requestType == REQUEST_TYPE_FROM_DRAFT_CHANGE_NUMBER)
    else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER)
    {
      initializeCLSearchFromDraftCN(msg.getFirstDraftChangeNumber());
      initializeCLSearchFromChangeNumber(msg.getFirstChangeNumber());
    }
  }
@@ -1131,8 +1135,7 @@
   *         ServerHandler.
   * @exception DirectoryException when an error occurs.
   */
  public ECLUpdateMsg takeECLUpdate()
  throws DirectoryException
  public ECLUpdateMsg takeECLUpdate() throws DirectoryException
  {
    ECLUpdateMsg msg = getNextECLUpdate();
@@ -1235,18 +1238,18 @@
            (LDAPUpdateMsg) oldestContext.nextMsg,
            null, // cookie will be set later
            oldestContext.rsd.getBaseDn(),
            0); //  draftChangeNumber may be set later
            0); // changeNumber may be set later
        oldestContext.nextMsg = null;
        // Default is not to loop, with one exception
        continueLooping = false;
        if (draftCompat)
        {
          continueLooping = !assignDraftCN(change);
          continueLooping = !assignChangeNumber(change);
        }
        // here we have the right oldest change
        // and in the draft case, we have its draft changenumber
        // and in the draft case, we have its change number
        // Set and test the domain of the oldestChange see if we reached
        // the end of the phase for this domain
@@ -1257,8 +1260,9 @@
        {
          oldestContext.active = false;
        }
        if (draftCompat && (lastDraftCN>0) &&
            (change.getDraftChangeNumber()>lastDraftCN))
        if (draftCompat
            && lastChangeNumber > 0
            && change.getChangeNumber() > lastChangeNumber)
        {
          oldestContext.active = false;
        }
@@ -1334,17 +1338,17 @@
  }
  /**
   * Either retrieves a draftCN from the draftCNDb, or assign a new draftCN and
   * store in the db.
   * Either retrieves a change number from the DB, or assign a new change number
   * and store in the DB.
   *
   * @param oldestChange
   *          the oldestChange where to assign the draftCN
   * @return <code>true</code> if a draftCN has been assigned to the provided
   *         oldestChange, <code>false</code> otherwise
   *          the oldestChange where to assign the change number
   * @return <code>true</code> if a change number has been assigned to the
   *         provided oldestChange, <code>false</code> otherwise
   * @throws DirectoryException
   *           if any problem occur
   */
  private boolean assignDraftCN(final ECLUpdateMsg oldestChange)
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
      throws DirectoryException
  {
    // We also need to check if the draftCNdb is consistent with
@@ -1361,7 +1365,7 @@
    while (true)
    {
      if (isEndOfDraftCNReached)
      if (isEndOfCNIndexDBReached)
      {
        // we are at the end of the DraftCNdb in the append mode
        assignNewDraftCNAndStore(oldestChange);
@@ -1369,12 +1373,12 @@
      }
      // the next change from the DraftCN db
      // the next change from the CNIndexDB
      CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN();
      String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate generating draftCN "
        TRACER.debugInfo("assignChangeNumber() generating change number "
            + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
            + csnFromChangelogDb + " timestamps:"
            + new Date(csnFromChangelogDb.getTime()) + " ?older"
@@ -1385,11 +1389,11 @@
          csnFromDraftCNDb, dnFromDraftCNDb))
      {
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
              + " assigning draftCN=" + cnIndexDBCursor.getDraftCN()
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber()
              + " to change=" + oldestChange);
        oldestChange.setDraftChangeNumber(cnIndexDBCursor.getDraftCN());
        oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber());
        return true;
      }
@@ -1400,7 +1404,8 @@
        // it should have been stored lately
        // let's continue to traverse the changelogdb
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate: will skip " + csnFromChangelogDb
          TRACER.debugInfo("assignChangeNumber(): will skip "
              + csnFromChangelogDb
              + " and read next from the regular changelog.");
        return false; // TO BE CHECKED
      }
@@ -1414,17 +1419,17 @@
        // let's traverse the DraftCNdb searching for the change
        // found in the changelogDb.
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
          TRACER.debugInfo("assignChangeNumber() generating change number "
              + " will skip " + csnFromDraftCNDb
              + " and read next change from the DraftCNDb.");
              + " and read next change from the CNIndexDB.");
        isEndOfDraftCNReached = !cnIndexDBCursor.next();
        isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
              + " has skipped to " + " sn=" + cnIndexDBCursor.getDraftCN()
              + " csn=" + cnIndexDBCursor.getCSN()
              + " End of draftCNDb ?" + isEndOfDraftCNReached);
          TRACER.debugInfo("assignChangeNumber() generating change number has"
              + "skipped to  changeNumber=" + cnIndexDBCursor.getChangeNumber()
              + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?"
              + isEndOfCNIndexDBReached);
      }
      catch (ChangelogException e)
      {
@@ -1448,13 +1453,13 @@
  private void assignNewDraftCNAndStore(ECLUpdateMsg change)
      throws DirectoryException
  {
    // generate a new draftCN and assign to this change
    change.setDraftChangeNumber(replicationServer.getNewDraftCN());
    // generate a new change number and assign to this change
    change.setChangeNumber(replicationServer.getNewChangeNumber());
    // store in changelogDB the pair
    // (DraftCN of the current change, state before this change)
    // store in CNIndexDB the pair
    // (change number of the current change, state before this change)
    replicationServer.getChangeNumberIndexDB().add(
        change.getDraftChangeNumber(),
        change.getChangeNumber(),
        previousCookie.toString(),
        change.getBaseDN(),
        change.getUpdateMsg().getCSN());