mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.57.2013 157717b205d4c1f957cf810e04e06f11530c619c
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -36,7 +36,7 @@
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
@@ -94,10 +94,9 @@
   */
  private String startCookie;
  /**
   * Specifies the value of the cookie before the change currently processed
   * is returned. It is updated with the change number of the change
   * currently processed (thus becoming the "current" cookie just
   * before the change is returned.
   * Specifies the value of the cookie before the change currently processed is
   * returned. It is updated with the CSN of the change currently processed
   * (thus becoming the "current" cookie just before the change is returned.
   */
  private MultiDomainServerState previousCookie = new MultiDomainServerState();
  /**
@@ -106,10 +105,10 @@
  private Set<String> excludedBaseDNs = new HashSet<String>();
  /**
   * Eligible changeNumber - only changes older or equal to eligibleCN
   * are published in the ECL.
   * Eligible CSN - only changes older or equal to eligibleCSN * are published
   * in the ECL.
   */
  private ChangeNumber eligibleCN = null;
  private CSN eligibleCSN;
  /**
   * Provides a string representation of this object.
@@ -177,7 +176,7 @@
          .append("] [rsd=").append(rsd)
          .append("] [nextMsg=").append(nextMsg).append("(")
          .append(nextMsg != null ?
          new Date(nextMsg.getChangeNumber().getTime()).toString():"")
          new Date(nextMsg.getCSN().getTime()).toString():"")
          .append(")")
          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
          .append("] [startState=").append(startState)
@@ -188,7 +187,7 @@
    /**
     * Get the next message eligible regarding
     * the crossDomain eligible CN. Put it in the context table.
     * the crossDomain eligible CSN. Put it in the context table.
     * @param opid The operation id.
     */
    private void getNextEligibleMessageForDomain(String opid)
@@ -207,15 +206,15 @@
        if (nextNonEligibleMsg != null)
        {
          boolean hasBecomeEligible =
            (nextNonEligibleMsg.getChangeNumber().getTime()
                <= eligibleCN.getTime());
            (nextNonEligibleMsg.getCSN().getTime()
                <= eligibleCSN.getTime());
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() +
                " getNextEligibleMessageForDomain(" + opid+ ") "
              + " stored nonEligibleMsg " + nextNonEligibleMsg
              + " has now become eligible regarding "
              + " the eligibleCN ("+ eligibleCN
              + " the eligibleCSN ("+ eligibleCSN
              + " ):" + hasBecomeEligible);
          if (hasBecomeEligible)
@@ -238,7 +237,7 @@
            // to be returned in the external changelog.
            // So let's check if the chg time is older than the trim date
          } while ((newMsg!=null) &&
              (newMsg.getChangeNumber().getTime() < domainLatestTrimDate));
              (newMsg.getCSN().getTime() < domainLatestTrimDate));
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN() +
@@ -250,17 +249,17 @@
          // in non blocking mode, return null when no more msg
          if (newMsg != null)
          {
            boolean isEligible = (newMsg.getChangeNumber().getTime()
                <= eligibleCN.getTime());
            boolean isEligible = (newMsg.getCSN().getTime()
                <= eligibleCSN.getTime());
          if (debugEnabled())
              TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDN()
                + " getNextEligibleMessageForDomain(" + opid+ ") "
                + "newMsg isEligible=" + isEligible + " since "
                + "newMsg=[" + newMsg.getChangeNumber()
                + " " + new Date(newMsg.getChangeNumber().getTime())
                + "] eligibleCN=[" + eligibleCN
                + " " + new Date(eligibleCN.getTime())+"]"
                + "newMsg=[" + newMsg.getCSN()
                + " " + new Date(newMsg.getCSN().getTime())
                + "] eligibleCSN=[" + eligibleCSN
                + " " + new Date(eligibleCSN.getTime())+"]"
                + dumpState());
            if (isEligible)
@@ -601,11 +600,11 @@
    // startDraftCN provided in the request IS NOT in the DraftCNDb
    /*
     * Get the draftLimits (from the eligibleCN got at the beginning of the
     * Get the draftLimits (from the eligibleCSN got at the beginning of the
     * operation) in order to have the first and possible last DraftCN.
     */
    final int[] limits =
        replicationServer.getECLDraftCNLimits(eligibleCN, excludedBaseDNs);
        replicationServer.getECLDraftCNLimits(eligibleCSN, excludedBaseDNs);
    final int firstDraftCN = limits[0];
    final int lastDraftCN = limits[1];
@@ -712,7 +711,7 @@
        // Assign the start state for the domain
        if (isPersistent == PERSISTENT_CHANGES_ONLY)
        {
          newDomainCtxt.startState = rsd.getEligibleState(eligibleCN);
          newDomainCtxt.startState = rsd.getEligibleState(eligibleCSN);
          startStatesFromProvidedCookie.remove(rsd.getBaseDn());
        }
        else
@@ -731,10 +730,10 @@
            // what we have in the replication changelog
            if (newDomainCtxt.startState == null)
            {
              ChangeNumber latestTrimCN =
                  new ChangeNumber(newDomainCtxt.domainLatestTrimDate, 0, 0);
              CSN latestTrimCSN =
                  new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
              newDomainCtxt.startState =
                  rsd.getStartState().duplicateOnlyOlderThan(latestTrimCN);
                  rsd.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
            }
          }
          else
@@ -756,8 +755,8 @@
            }
          }
          // Set the stop state for the domain from the eligibleCN
          newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
          // Set the stop state for the domain from the eligibleCSN
          newDomainCtxt.stopState = rsd.getEligibleState(eligibleCSN);
        }
        newDomainCtxt.currentState = new ServerState();
@@ -866,9 +865,9 @@
    */
    for (int serverId : rsDomain.getStartState())
    {
      ChangeNumber dbOldestChange =
          rsDomain.getStartState().getChangeNumber(serverId);
      ChangeNumber providedChange = cookie.getChangeNumber(serverId);
      CSN dbOldestChange =
          rsDomain.getStartState().getCSN(serverId);
      CSN providedChange = cookie.getCSN(serverId);
      if (providedChange != null
          && providedChange.older(dbOldestChange))
      {
@@ -1018,7 +1017,7 @@
    excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
    replicationServer.disableEligibility(excludedBaseDNs);
    eligibleCN = replicationServer.getEligibleCN();
    eligibleCSN = replicationServer.getEligibleCSN();
    initializeChangelogSearch(startECLSessionMsg);
@@ -1056,11 +1055,11 @@
      closeInitPhase();
    }
    /* TODO: From replication changenumber
    /* TODO: From replication CSN
    //--
    if (startCLMsg.getStartMode()==2)
    {
      if (CLSearchFromProvidedExactCN(startCLMsg.getChangeNumber()))
      if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN()))
        return;
    }
@@ -1069,14 +1068,14 @@
    {
      // to get the CL first and last
      initializeCLDomCtxts(null); // from start
      ChangeNumber crossDomainEligibleCN = computeCrossDomainEligibleCN();
      CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN();
      try
      {
        // to get the CL first and last
        // last rely on the crossDomainEligibleCN thus must have been
        // last rely on the crossDomainEligibleCSN thus must have been
        // computed before
        int[] limits = computeCLLimits(crossDomainEligibleCN);
        int[] limits = computeCLLimits(crossDomainEligibleCSN);
        // Send the response
        CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
        session.publish(msg);
@@ -1253,7 +1252,7 @@
        // Set and test the domain of the oldestChange see if we reached
        // the end of the phase for this domain
        oldestContext.currentState.update(
            change.getUpdateMsg().getChangeNumber());
            change.getUpdateMsg().getCSN());
        if (oldestContext.currentState.cover(oldestContext.stopState))
        {
@@ -1294,7 +1293,7 @@
          oldestContext.nextMsg = null; // clean
          oldestContext.currentState.update(
              change.getUpdateMsg().getChangeNumber());
              change.getUpdateMsg().getCSN());
          if (draftCompat)
          {
@@ -1317,12 +1316,12 @@
    {
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
          + oldestChange.getUpdateMsg().getChangeNumber());
          + oldestChange.getUpdateMsg().getCSN());
      // Update the current state
      previousCookie.update(
          oldestChange.getBaseDN(),
          oldestChange.getUpdateMsg().getChangeNumber());
          oldestChange.getUpdateMsg().getCSN());
      // Set the current value of global state in the returned message
      oldestChange.setCookie(previousCookie);
@@ -1357,9 +1356,8 @@
    // The following loop allows to loop until being on the same cn
    // in the 2 dbs
    // replogcn : the oldest change from the changelog db
    ChangeNumber cnFromChangelogDb =
        oldestChange.getUpdateMsg().getChangeNumber();
    // replogCSN : the oldest change from the changelog db
    CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
    String dnFromChangelogDb = oldestChange.getBaseDN();
    while (true)
@@ -1373,19 +1371,19 @@
      // the next change from the DraftCN db
      ChangeNumber cnFromDraftCNDb = changelogDBIter.getChangeNumber();
      CSN csnFromDraftCNDb = changelogDBIter.getCSN();
      String dnFromDraftCNDb = changelogDBIter.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("getNextECLUpdate generating draftCN "
            + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
            + cnFromChangelogDb + " timestamps:"
            + new Date(cnFromChangelogDb.getTime()) + " ?older"
            + new Date(cnFromDraftCNDb.getTime()));
            + csnFromChangelogDb + " timestamps:"
            + new Date(csnFromChangelogDb.getTime()) + " ?older"
            + new Date(csnFromDraftCNDb.getTime()));
      if (areSameChange(cnFromChangelogDb, dnFromChangelogDb,
          cnFromDraftCNDb, dnFromDraftCNDb))
      if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
          csnFromDraftCNDb, dnFromDraftCNDb))
      {
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
@@ -1397,13 +1395,13 @@
      }
      if (!cnFromDraftCNDb.older(cnFromChangelogDb))
      if (!csnFromDraftCNDb.older(csnFromChangelogDb))
      {
        // the change from the changelogDb is older
        // it should have been stored lately
        // let's continue to traverse the changelogdb
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate: will skip " + cnFromChangelogDb
          TRACER.debugInfo("getNextECLUpdate: will skip " + csnFromChangelogDb
              + " and read next from the regular changelog.");
        return false; // TO BE CHECKED
      }
@@ -1418,7 +1416,7 @@
        // found in the changelogDb.
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
              + " will skip " + cnFromDraftCNDb
              + " will skip " + csnFromDraftCNDb
              + " and read next change from the DraftCNDb.");
        isEndOfDraftCNReached = !changelogDBIter.next();
@@ -1426,7 +1424,7 @@
        if (debugEnabled())
          TRACER.debugInfo("getNextECLUpdate generating draftCN "
              + " has skipped to " + " sn=" + changelogDBIter.getDraftCN()
              + " cn=" + changelogDBIter.getChangeNumber()
              + " csn=" + changelogDBIter.getCSN()
              + " End of draftCNDb ?" + isEndOfDraftCNReached);
      }
      catch (ChangelogException e)
@@ -1441,12 +1439,11 @@
    }
  }
  private boolean areSameChange(ChangeNumber cn1, String dn1, ChangeNumber cn2,
      String dn2)
  private boolean areSameChange(CSN csn1, String dn1, CSN csn2, String dn2)
  {
    boolean sameDN = dn1.compareTo(dn2) == 0;
    boolean sameCN = cn1.compareTo(cn2) == 0;
    return sameDN && sameCN;
    boolean sameCSN = csn1.compareTo(csn2) == 0;
    return sameDN && sameCSN;
  }
  private void assignNewDraftCNAndStore(ECLUpdateMsg change)
@@ -1462,7 +1459,7 @@
        change.getDraftChangeNumber(),
        previousCookie.toString(),
        change.getBaseDN(),
        change.getUpdateMsg().getChangeNumber());
        change.getUpdateMsg().getCSN());
  }
  /**
@@ -1556,11 +1553,11 @@
  }
  /**
   * Refresh the eligibleCN by requesting the replication server.
   * Refresh the eligibleCSN by requesting the replication server.
   */
  public void refreshEligibleCN()
  public void refreshEligibleCSN()
  {
    eligibleCN = replicationServer.getEligibleCN();
    eligibleCSN = replicationServer.getEligibleCSN();
  }
}