mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
05.46.2014 60b4019ba512ad303ab5f0dbd06c3203ba53e940
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@
   * inserted in the DB. After insert, it is updated with the CSN of the change
   * currently processed (thus becoming the "current" cookie just before the
   * change is returned.
   * <p>
   * Note: This object is only updated by changes/updates.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -91,15 +93,6 @@
   */
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the cross domain medium consistency baseDN and CSN for the current
   * replication server.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency CSN</a>
   */
  private volatile Pair<DN, CSN> mediumConsistency;
  /**
   * Holds the last time each replica was seen alive, whether via updates or
@@ -108,9 +101,12 @@
   * <p>
   * Updates are persistent and stored in the replicaDBs, heartbeats are
   * transient and are easily constructed on normal operations.
   * <p>
   * Note: This object is updated by both heartbeats and changes/updates.
   */
  private final MultiDomainServerState lastAliveCSNs =
      new MultiDomainServerState();
  /** Note: This object is updated by replica offline messages. */
  private final MultiDomainServerState replicasOffline =
      new MultiDomainServerState();
@@ -119,8 +115,12 @@
   * positioned on the next change that needs to be inserted in the CNIndexDB.
   * <p>
   * Note: it is only accessed from the {@link #run()} method.
   *
   * @NonNull
   */
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
  @SuppressWarnings("unchecked")
  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
  /**
   * New cursors for this Map must be created from the {@link #run()} method,
@@ -182,8 +182,9 @@
      return;
    }
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, heartbeatCSN);
    tryNotify();
    tryNotify(oldestCSNBefore);
  }
  /**
@@ -222,8 +223,9 @@
    final CSN csn = updateMsg.getCSN();
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, csn);
    tryNotify();
    tryNotify(oldestCSNBefore);
  }
  /**
@@ -254,18 +256,29 @@
   */
  public void replicaOffline(DN baseDN, CSN offlineCSN)
  {
    if (!isECLEnabledDomain(baseDN))
    {
      return;
    }
    replicasOffline.update(baseDN, offlineCSN);
    final CSN oldestCSNBefore = getOldestLastAliveCSN();
    lastAliveCSNs.update(baseDN, offlineCSN);
    tryNotify();
    tryNotify(oldestCSNBefore);
  }
  private CSN getOldestLastAliveCSN()
  {
    return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
  private void tryNotify()
  private void tryNotify(final CSN oldestCSNBefore)
  {
    if (canMoveForwardMediumConsistencyPoint())
    if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
    {
      synchronized (this)
      {
@@ -274,19 +287,32 @@
    }
  }
  private boolean canMoveForwardMediumConsistencyPoint()
  /**
   * Used for waking up the {@link ChangeNumberIndexer} thread because it might
   * have some work to do.
   */
  private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
  {
    final Pair<DN, CSN> mc = mediumConsistency;
    if (mc != null)
    {
      final CSN mcCSN = mc.getSecond();
      final CSN lastTimeSameReplicaSeenAlive =
          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    final CSN oldestCSNAfter = getOldestLastAliveCSN();
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasAreOfflineOrAlive();
    return allInitialReplicasAreOfflineOrAlive()
        && oldestCSNBefore != null // then oldestCSNAfter cannot be null
        // has the oldest CSN changed?
        && oldestCSNBefore.isOlderThan(oldestCSNAfter);
  }
  /**
   * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
   * must be persisted to the change number index DB.
   */
  private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
  {
    // ensure that all initial replicas alive information have been updated
    // with CSNs that are acceptable for moving the medium consistency forward
    return allInitialReplicasAreOfflineOrAlive()
        // can we persist the next CSN?
        && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
  }
  /**
@@ -299,8 +325,8 @@
   * CSN has been updated to something past the oldest possible CSN), we have
   * enough info to compute medium consistency</li>
   * </ul>
   * In this case, we have enough information to compute medium consistency
   * without waiting any more.
   * In both cases, we have enough information to compute medium consistency
   * without waiting any further.
   */
  private boolean allInitialReplicasAreOfflineOrAlive()
  {
@@ -308,11 +334,11 @@
    {
      for (CSN csn : lastAliveCSNs.getServerState(baseDN))
      {
        if (// oldest possible CSN?
            csn.getTime() == 0
            // replica is not offline
        if (csn.getTime() == 0
            && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
        {
          // this is the oldest possible CSN, but the replica is not offline
          // we must wait for more up to date information from this replica
          return false;
        }
      }
@@ -537,16 +563,19 @@
          final DN baseDN = nextChangeForInsertDBCursor.getData();
          // FIXME problem: what if the serverId is not part of the ServerState?
          // right now, change number will be blocked
          if (!canMoveForwardMediumConsistencyPoint())
          if (!canMoveForwardMediumConsistencyPoint(csn))
          {
            // the oldest record to insert is newer than the medium consistency
            // point. Let's wait for a change that can be published.
            synchronized (this)
            {
              // double check to protect against a missed call to notify()
              if (!isShutdownInitiated()
                  && !canMoveForwardMediumConsistencyPoint())
              if (!canMoveForwardMediumConsistencyPoint(csn))
              {
                if (isShutdownInitiated())
                {
                  return;
                }
                wait();
                // loop to check if changes older than the medium consistency
                // point have been added to the ReplicaDBs
@@ -601,10 +630,10 @@
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN) throws ChangelogException
  {
    boolean callNextOnCursor = true;
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
    boolean callNextOnCursor = true;
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);