mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.44.2014 6aa4fa5b4f71e830dba55f3ea3f9530737db2d8b
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -92,14 +92,14 @@
  private final MultiDomainServerState mediumConsistencyRUV =
      new MultiDomainServerState();
  /**
   * Holds the cross domain medium consistency CSN for the current replication
   * server.
   * Holds the cross domain medium consistency baseDN and CSN for the current
   * replication server.
   *
   * @see <a href=
   * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
   * >OpenDJ Domain Names - medium consistency CSN</a>
   */
  private volatile CSN mediumConsistencyCSN;
  private volatile Pair<DN, CSN> mediumConsistency;
  /**
   * Holds the last time each replica was seen alive, whether via updates or
@@ -182,7 +182,7 @@
    }
    lastAliveCSNs.update(baseDN, heartbeatCSN);
    tryNotify(baseDN);
    tryNotify();
  }
  /**
@@ -207,7 +207,7 @@
    // only keep the oldest CSN that will be the new cursor's starting point
    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
    lastAliveCSNs.update(baseDN, csn);
    tryNotify(baseDN);
    tryNotify();
  }
  /**
@@ -239,16 +239,16 @@
  {
    replicasOffline.update(baseDN, offlineCSN);
    lastAliveCSNs.update(baseDN, offlineCSN);
    tryNotify(baseDN);
    tryNotify();
  }
  /**
   * Notifies the Change number indexer thread if it will be able to do some
   * work.
   */
  private void tryNotify(DN baseDN)
  private void tryNotify()
  {
    if (canMoveForwardMediumConsistencyPoint(baseDN))
    if (canMoveForwardMediumConsistencyPoint())
    {
      synchronized (this)
      {
@@ -257,13 +257,14 @@
    }
  }
  private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
  private boolean canMoveForwardMediumConsistencyPoint()
  {
    final CSN mcCSN = mediumConsistencyCSN;
    if (mcCSN != null)
    final Pair<DN, CSN> mc = mediumConsistency;
    if (mc != null)
    {
      final int serverId = mcCSN.getServerId();
      CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
      final CSN mcCSN = mc.getSecond();
      final CSN lastTimeSameReplicaSeenAlive =
          lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
      return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
    }
    return true;
@@ -441,7 +442,8 @@
              }
              wait();
            }
            // advance cursor, success/failure will be checked later
            // try to recycle the exhausted cursors,
            // success/failure will be checked later
            nextChangeForInsertDBCursor.next();
            // loop to check whether new changes have been added to the
            // ReplicaDBs
@@ -452,7 +454,7 @@
          final DN baseDN = nextChangeForInsertDBCursor.getData();
          // FIXME problem: what if the serverId is not part of the ServerState?
          // right now, change number will be blocked
          if (!canMoveForwardMediumConsistencyPoint(baseDN))
          if (!canMoveForwardMediumConsistencyPoint())
          {
            // the oldest record to insert is newer than the medium consistency
            // point. Let's wait for a change that can be published.
@@ -460,7 +462,7 @@
            {
              // double check to protect against a missed call to notify()
              if (!isShutdownInitiated()
                  && !canMoveForwardMediumConsistencyPoint(baseDN))
                  && !canMoveForwardMediumConsistencyPoint())
              {
                wait();
                // loop to check if changes older than the medium consistency
@@ -479,7 +481,8 @@
          changelogDB.getChangeNumberIndexDB().addRecord(record);
          moveForwardMediumConsistencyPoint(csn, baseDN);
          // advance cursor, success/failure will be checked later
          // advance the cursor we just read from,
          // success/failure will be checked later
          nextChangeForInsertDBCursor.next();
        }
        catch (InterruptedException ignored)
@@ -517,20 +520,21 @@
    }
  }
  private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN)
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(baseDN, csn);
    mediumConsistencyCSN = csn;
    final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mediumConsistencyCSN)
        && offlineCSN.isOlderThan(mcCSN)
        // If no new updates has been seen for this replica
        && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
    {
      removeCursor(baseDN, csn);
      replicasOffline.removeCSN(baseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
      removeCursor(mcBaseDN, mcCSN);
      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
    }
  }