mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
16.05.2014 3f27a7ede5ca9df06137254aa32d41d023ac105d
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -48,14 +48,18 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * Thread responsible for inserting replicated changes into the ChangeNumber
 * Index DB (CNIndexDB for short). Only changes older than the medium
 * consistency point are inserted in the CNIndexDB. As a consequence this class
 * is also responsible for maintaining the medium consistency point.
 * Index DB (CNIndexDB for short).
 * <p>
 * Only changes older than the medium consistency point are inserted in the
 * CNIndexDB. As a consequence this class is also responsible for maintaining
 * the medium consistency point (indirectly through an
 * {@code ECLMultiDomainDBCursor}).
 */
public class ChangeNumberIndexer extends DirectoryThread
{
@@ -78,27 +82,10 @@
  /*
   * The following MultiDomainServerState fields must be thread safe, because
   * 1) initialization can happen while the replication server starts receiving
   * updates 2) many updates can happen concurrently.
   * updates
   * 2) many updates can happen concurrently.
   */
  /**
   * Holds the cross domain medium consistency Replication Update Vector for the
   * current replication server, also known as the previous cookie.
   * <p>
   * Stores the value of the cookie before the change currently processed is
   * inserted in the DB. After insert, it is updated with the CSN of the change
   * currently processed (thus becoming the "current" cookie just before the
   * change is returned.
   * <p>
   * Note: This object is only updated by changes/updates.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency RUV</a>
   */
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the last time each replica was seen alive, whether via updates or
   * heartbeat notifications, or offline notifications. Data is held for each
   * serverId cross domain.
@@ -108,11 +95,10 @@
   * <p>
   * Note: This object is updated by both heartbeats and changes/updates.
   */
  private final MultiDomainServerState lastAliveCSNs =
      new MultiDomainServerState();
  private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
  /** Note: This object is updated by replica offline messages. */
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
  private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
  /**
   * Cursor across all the replicaDBs for all the replication domains. It is
@@ -315,25 +301,50 @@
  }
  /**
   * Restores in memory data needed to build the CNIndexDB, including the medium
   * consistency point.
   * Restores in memory data needed to build the CNIndexDB. In particular,
   * initializes the changes cursor to the medium consistency point.
   */
  private void initialize() throws ChangelogException, DirectoryException
  {
    final ChangeNumberIndexRecord newestRecord =
        changelogDB.getChangeNumberIndexDB().getNewestRecord();
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
    initializeLastAliveCSNs(domainDB);
    initializeNextChangeCursor(domainDB);
    initializeOfflineReplicas();
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
  {
    final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
        domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
    nextChangeForInsertDBCursor.next();
  }
  /** Returns a cookie initialised with the newest CSN for each replica. */
  private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
  {
    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
    final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
    if (newestRecord != null)
    {
      // restore the mediumConsistencyRUV from DB
      mediumConsistencyRUV.update(
          new MultiDomainServerState(newestRecord.getPreviousCookie()));
      // Do not update with the newestRecord CSN
      // as it will be used for a sanity check later in the same method
      final CSN newestCsn = newestRecord.getCSN();
      for (DN baseDN : changelogState.getDomainToServerIds().keySet())
      {
        cookieWithNewestCSN.update(baseDN, newestCsn);
      }
    }
    return cookieWithNewestCSN;
  }
    // initialize the DB cursor and the last seen updates
    // to ensure the medium consistency CSN can move forward
    final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
  private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
  {
    for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      final DN baseDN = entry.getKey();
@@ -352,34 +363,10 @@
        lastAliveCSNs.update(baseDN, latestKnownState);
      }
    }
  }
    nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
        domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
    nextChangeForInsertDBCursor.next();
    if (newestRecord != null)
    {
      // restore the "previousCookie" state before shutdown
      UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
      if (record instanceof ReplicaOfflineMsg)
      {
        // ignore: replica offline messages are never stored in the CNIndexDB
        nextChangeForInsertDBCursor.next();
        record = nextChangeForInsertDBCursor.getRecord();
      }
      // sanity check: ensure that when initializing the cursors at the previous
      // cookie, the next change we find is the newest record in the CNIndexDB
      if (!record.getCSN().equals(newestRecord.getCSN()))
      {
        throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
            newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
      }
      // Now we can update the mediumConsistencyRUV
      mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
      nextChangeForInsertDBCursor.next();
    }
  private void initializeOfflineReplicas()
  {
    final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
    for (DN baseDN : offlineReplicas)
    {
@@ -394,9 +381,6 @@
        }
      }
    }
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
  private CSN oldestPossibleCSN(int serverId)
@@ -491,10 +475,10 @@
          // OK, the oldest change is older than the medium consistency point
          // let's publish it to the CNIndexDB.
          final String previousCookie = mediumConsistencyRUV.toString();
          final long changeNumber = changelogDB.getChangeNumberIndexDB().addRecord(
              new ChangeNumberIndexRecord(previousCookie, baseDN, csn));
          notifyEntryAddedToChangelog(baseDN, changeNumber, previousCookie, msg);
          final long changeNumber = changelogDB.getChangeNumberIndexDB()
              .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
          MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
          notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
          moveForwardMediumConsistencyPoint(csn, baseDN);
        }
        catch (InterruptedException ignored)
@@ -532,18 +516,18 @@
   *          the change number of the newly added entry. It will be greater
   *          than zero for entries added to the change number index and less
   *          than or equal to zero for entries added to any replica DB
   * @param cookieString
   *          a string representing the cookie of the newly added entry. This is
   *          only meaningful for entries added to the change number index
   * @param cookie
   *          the cookie of the newly added entry. This is only meaningful for
   *          entries added to the change number index
   * @param msg
   *          the update message of the newly added entry
   * @throws ChangelogException
   *           If a problem occurs while notifying of the newly added entry.
   */
  protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
      String cookieString, UpdateMsg msg) throws ChangelogException
      MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
  {
    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg);
    ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg);
  }
  /**
@@ -561,12 +545,8 @@
    TRACER.debugError(msg.toString());
  }
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -587,7 +567,6 @@
         * from the medium consistency RUV).
         */
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }