From 60b4019ba512ad303ab5f0dbd06c3203ba53e940 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Jun 2014 10:46:07 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3667) Change time heart beat change numbers should be synced with updates
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 99 ++++++++++++++++++++++++++++++++-----------------
1 files changed, 64 insertions(+), 35 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 2419dc4..8a068f6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@
* inserted in the DB. After insert, it is updated with the CSN of the change
* currently processed (thus becoming the "current" cookie just before the
* change is returned.
+ * <p>
+ * Note: This object is only updated by changes/updates.
*
* @see <a href=
* "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -91,15 +93,6 @@
*/
private final MultiDomainServerState mediumConsistencyRUV =
new MultiDomainServerState();
- /**
- * Holds the cross domain medium consistency baseDN and CSN for the current
- * replication server.
- *
- * @see <a href=
- * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
- * >OpenDJ Domain Names - medium consistency CSN</a>
- */
- private volatile Pair<DN, CSN> mediumConsistency;
/**
* Holds the last time each replica was seen alive, whether via updates or
@@ -108,9 +101,12 @@
* <p>
* Updates are persistent and stored in the replicaDBs, heartbeats are
* transient and are easily constructed on normal operations.
+ * <p>
+ * Note: This object is updated by both heartbeats and changes/updates.
*/
private final MultiDomainServerState lastAliveCSNs =
new MultiDomainServerState();
+ /** Note: This object is updated by replica offline messages. */
private final MultiDomainServerState replicasOffline =
new MultiDomainServerState();
@@ -119,8 +115,12 @@
* positioned on the next change that needs to be inserted in the CNIndexDB.
* <p>
* Note: it is only accessed from the {@link #run()} method.
+ *
+ * @NonNull
*/
- private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
+ @SuppressWarnings("unchecked")
+ private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
+ new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
/**
* New cursors for this Map must be created from the {@link #run()} method,
@@ -182,8 +182,9 @@
return;
}
+ final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, heartbeatCSN);
- tryNotify();
+ tryNotify(oldestCSNBefore);
}
/**
@@ -222,8 +223,9 @@
final CSN csn = updateMsg.getCSN();
// only keep the oldest CSN that will be the new cursor's starting point
newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
+ final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, csn);
- tryNotify();
+ tryNotify(oldestCSNBefore);
}
/**
@@ -254,18 +256,29 @@
*/
public void replicaOffline(DN baseDN, CSN offlineCSN)
{
+ if (!isECLEnabledDomain(baseDN))
+ {
+ return;
+ }
+
replicasOffline.update(baseDN, offlineCSN);
+ final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, offlineCSN);
- tryNotify();
+ tryNotify(oldestCSNBefore);
+ }
+
+ private CSN getOldestLastAliveCSN()
+ {
+ return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
}
/**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
- private void tryNotify()
+ private void tryNotify(final CSN oldestCSNBefore)
{
- if (canMoveForwardMediumConsistencyPoint())
+ if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
{
synchronized (this)
{
@@ -274,19 +287,32 @@
}
}
- private boolean canMoveForwardMediumConsistencyPoint()
+ /**
+ * Used for waking up the {@link ChangeNumberIndexer} thread because it might
+ * have some work to do.
+ */
+ private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
{
- final Pair<DN, CSN> mc = mediumConsistency;
- if (mc != null)
- {
- final CSN mcCSN = mc.getSecond();
- final CSN lastTimeSameReplicaSeenAlive =
- lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
- return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
- }
+ final CSN oldestCSNAfter = getOldestLastAliveCSN();
// ensure that all initial replicas alive information have been updated
// with CSNs that are acceptable for moving the medium consistency forward
- return allInitialReplicasAreOfflineOrAlive();
+ return allInitialReplicasAreOfflineOrAlive()
+ && oldestCSNBefore != null // then oldestCSNAfter cannot be null
+ // has the oldest CSN changed?
+ && oldestCSNBefore.isOlderThan(oldestCSNAfter);
+ }
+
+ /**
+ * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
+ * must be persisted to the change number index DB.
+ */
+ private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
+ {
+ // ensure that all initial replicas alive information have been updated
+ // with CSNs that are acceptable for moving the medium consistency forward
+ return allInitialReplicasAreOfflineOrAlive()
+ // can we persist the next CSN?
+ && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
}
/**
@@ -299,8 +325,8 @@
* CSN has been updated to something past the oldest possible CSN), we have
* enough info to compute medium consistency</li>
* </ul>
- * In this case, we have enough information to compute medium consistency
- * without waiting any more.
+ * In both cases, we have enough information to compute medium consistency
+ * without waiting any further.
*/
private boolean allInitialReplicasAreOfflineOrAlive()
{
@@ -308,11 +334,11 @@
{
for (CSN csn : lastAliveCSNs.getServerState(baseDN))
{
- if (// oldest possible CSN?
- csn.getTime() == 0
- // replica is not offline
+ if (csn.getTime() == 0
&& replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
{
+ // this is the oldest possible CSN, but the replica is not offline
+ // we must wait for more up to date information from this replica
return false;
}
}
@@ -537,16 +563,19 @@
final DN baseDN = nextChangeForInsertDBCursor.getData();
// FIXME problem: what if the serverId is not part of the ServerState?
// right now, change number will be blocked
- if (!canMoveForwardMediumConsistencyPoint())
+ if (!canMoveForwardMediumConsistencyPoint(csn))
{
// the oldest record to insert is newer than the medium consistency
// point. Let's wait for a change that can be published.
synchronized (this)
{
// double check to protect against a missed call to notify()
- if (!isShutdownInitiated()
- && !canMoveForwardMediumConsistencyPoint())
+ if (!canMoveForwardMediumConsistencyPoint(csn))
{
+ if (isShutdownInitiated())
+ {
+ return;
+ }
wait();
// loop to check if changes older than the medium consistency
// point have been added to the ReplicaDBs
@@ -601,10 +630,10 @@
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
- boolean callNextOnCursor = true;
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
- mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+
+ boolean callNextOnCursor = true;
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
--
Gitblit v1.10.0