mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.51.2013 77b37b615c18a119334e626b3425d62ebfd2cd54
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -58,13 +58,33 @@
public final class ECLServerHandler extends ServerHandler
{
  /**
   * Marks the end of the search on the External Change Log.
   */
  private static int UNDEFINED_PHASE = 0;
  /**
   * Constant used to indicate the handler is in the ECL initialization phase.
   * The External Change Log initialization phase currently reads the changes
   * from all the ReplicaDBs from oldest to newest and then:
   * <ul>
   * <li>Matches each ReplicaDBs change with the corresponding record in
   * {@link ChangeNumberIndexDB}, then assign its changeNumber in memory before
   * returning the change to the client</li>
   * <li>Once it reaches the end of the {@link ChangeNumberIndexDB}, it inserts
   * each ReplicaDBs change in the {@link ChangeNumberIndexDB} and gets back and
   * assign the changeNumber in memory to the ReplicaDBs change.</li>
   * </ul>
   * Once this phase is over the current search moves to the
   * {@link #UNDEFINED_PHASE} or the {@link #PERSISTENT_PHASE} depending on the
   * search type.
   *
   * @see #getSearchPhase()
   */
  public static int INIT_PHASE = 1;
  /**
   * The persistent phase is only used for persistent searches on the External
   * ChangeLog. It comes after the {@link #INIT_PHASE} and sends back to the
   * client newly added changes to the {@link ChangeNumberIndexDB}.
   */
  private static int PERSISTENT_PHASE = 2;
  /**
@@ -111,8 +131,8 @@
  private Set<String> excludedBaseDNs = new HashSet<String>();
  /**
   * Eligible CSN - only changes older or equal to eligibleCSN * are published
   * in the ECL.
   * Eligible CSN - only changes older or equal to eligibleCSN are published in
   * the ECL.
   */
  private CSN eligibleCSN;
@@ -1203,6 +1223,9 @@
        if (draftCompat)
        {
          continueLooping = !assignChangeNumber(change);
          // if we encounter a change that has been trimmed from the replicaDBs,
          // we will skip it and loop to the next oldest change from the
          // replicaDBs
        }
        // here we have the right oldest change
@@ -1288,34 +1311,34 @@
   * Either retrieves a change number from the DB, or assign a new change number
   * and store in the DB.
   *
   * @param oldestChange
   *          the oldestChange where to assign the change number
   * @param replicaDBChange
   *          the replica DB change to find in the {@link ChangeNumberIndexDB}
   *          where to assign the change number
   * @return <code>true</code> if a change number has been assigned to the
   *         provided oldestChange, <code>false</code> otherwise
   *         provided replicaDBChange, <code>false</code> otherwise
   * @throws DirectoryException
   *           if any problem occur
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
  private boolean assignChangeNumber(final ECLUpdateMsg replicaDBChange)
      throws ChangelogException
  {
    // We also need to check if the CNIndexDB is consistent with the
    // changelogDB. If not, 2 potential reasons:
    // a/ changelog has been purged (trim) let's traverse the CNIndexDB
    // We also need to check if the CNIndexDB is consistent with the replicaDBs.
    // If not, 2 potential reasons:
    // a/ replicaDBs have been purged (trim) let's traverse the CNIndexDB
    // b/ changelog is late ... let's traverse the changelogDb
    // The following loop allows to loop until being on the same cn in the 2 dbs
    // replogCSN : the oldest change from the changelog db
    CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
    DN dnFromChangelogDb = oldestChange.getBaseDN();
    CSN csnFromReplicaDB = replicaDBChange.getUpdateMsg().getCSN();
    DN dnFromReplicaDB = replicaDBChange.getBaseDN();
    while (true)
    {
      if (isEndOfCNIndexDBReached)
      {
        // we are at the end of the CNIndexDB in the append mode
        assignNewChangeNumberAndStore(oldestChange);
        assignNewChangeNumberAndStore(replicaDBChange);
        return true;
      }
@@ -1324,43 +1347,44 @@
      final DN dnFromCNIndexDB = currentRecord.getBaseDN();
      if (debugEnabled())
        TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
            + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
            + asDate(csnFromChangelogDb) + " ?older"
        TRACER.debugInfo("assignChangeNumber() comparing the replicaDB's and"
            + " CNIndexDB's DNs :" + dnFromReplicaDB + "?=" + dnFromCNIndexDB
            + " timestamps:" + asDate(csnFromReplicaDB) + " ?older"
            + asDate(csnFromCNIndexDB));
      if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
          csnFromCNIndexDB, dnFromCNIndexDB))
      if (areSameChange(csnFromReplicaDB, dnFromReplicaDB, csnFromCNIndexDB,
          dnFromCNIndexDB))
      {
        // We matched the ReplicaDB change with a record in the CNIndexDB
        // => set the changeNumber in memory and return the change to the client
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
              + currentRecord.getChangeNumber() + " to change=" + oldestChange);
              + currentRecord.getChangeNumber() + " to change="
              + replicaDBChange);
        oldestChange.setChangeNumber(currentRecord.getChangeNumber());
        replicaDBChange.setChangeNumber(currentRecord.getChangeNumber());
        return true;
      }
      if (!csnFromCNIndexDB.isOlderThan(csnFromChangelogDb))
      if (!csnFromCNIndexDB.isOlderThan(csnFromReplicaDB))
      {
        // the change from the changelogDb is older
        // the change from the replicaDB is older
        // it should have been stored lately
        // let's continue to traverse the changelogDB
        // let's continue to traverse the replicaDBs
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() will skip "
              + csnFromChangelogDb
          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromReplicaDB
              + " and read next change from the regular changelog.");
        return false; // TO BE CHECKED
      }
      // the change from the CNIndexDB is older
      // that means that the change has been purged from the
      // changelogDb (and CNIndexDB not yet been trimmed)
      // The change from the CNIndexDB is older.
      // It means that the CNIndexDB change has been purged from the replicaDB
      // and CNIndexDB has not been trimmed yet.
      try
      {
        // let's traverse the CNIndexDB searching for the change
        // found in the changelogDB
        // keep traversing the CNIndexDB searching for the replicaDB change
        if (debugEnabled())
          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
              + " and read next change from the CNIndexDB.");
@@ -1380,7 +1404,7 @@
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        // FIXME There is an opportunity for an infinite loop here if the DB
        // continuously throws DatabaseExceptions
        // continuously throws ChangelogExceptions
      }
    }
  }