From 5eb7b26eabf15d047fd913597aa508bb78c1d7e7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 20 May 2014 10:14:29 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3563) Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 77 +++++++++++++++++++++++++++-----------
1 files changed, 54 insertions(+), 23 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index c93ca98..628ca99 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -271,16 +271,32 @@
}
// ensure that all initial replicas alive information have been updated
// with CSNs that are acceptable for moving the medium consistency forward
- return allInitialReplicasArePastOldestPossibleCSN();
+ return allInitialReplicasAreOfflineOrAlive();
}
- private boolean allInitialReplicasArePastOldestPossibleCSN()
+ /**
+ * Returns true only if the initial replicas known from the changelog state DB
+ * are either:
+ * <ul>
+ * <li>offline, so do not wait for them in order to compute medium consistency
+ * </li>
+ * <li>alive, because we received heartbeats or changes (so their last alive
+ * CSN has been updated to something past the oldest possible CSN), we have
+ * enough info to compute medium consistency</li>
+ * </ul>
+ * In this case, we have enough information to compute medium consistency
+ * without waiting any more.
+ */
+ private boolean allInitialReplicasAreOfflineOrAlive()
{
for (DN baseDN : lastAliveCSNs)
{
for (CSN csn : lastAliveCSNs.getServerState(baseDN))
{
- if (csn.getTime() == 0)
+ if (// oldest possible CSN?
+ csn.getTime() == 0
+ // replica is not offline
+ && replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
{
return false;
}
@@ -363,6 +379,9 @@
if (isECLEnabledDomain(baseDN))
{
replicasOffline.update(baseDN, offlineCSN);
+ // a replica offline message could also be the very last time
+ // we heard from this replica :)
+ lastAliveCSNs.update(baseDN, offlineCSN);
}
}
}
@@ -528,10 +547,6 @@
new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
changelogDB.getChangeNumberIndexDB().addRecord(record);
moveForwardMediumConsistencyPoint(csn, baseDN);
-
- // advance the cursor we just read from,
- // success/failure will be checked later
- nextChangeForInsertDBCursor.next();
}
catch (InterruptedException ignored)
{
@@ -571,6 +586,7 @@
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
+ boolean callNextOnCursor = true;
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
mediumConsistency = Pair.of(mcBaseDN, mcCSN);
@@ -586,17 +602,34 @@
}
else if (offlineCSN.isOlderThan(mcCSN))
{
- /*
- * replica is not back online and Medium consistency point has gone past
- * its last offline time: remove everything known about it: cursor,
- * offlineCSN from lastAliveCSN and remove all knowledge of this replica
- * from the medium consistency RUV.
- */
- removeCursor(mcBaseDN, mcCSN);
- lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+ Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+ pair = getCursor(mcBaseDN, mcCSN.getServerId());
+ Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
+ if (iter != null && !iter.hasNext())
+ {
+ /*
+ * replica is not back online, Medium consistency point has gone past
+ * its last offline time, and there are no more changes after the
+ * offline CSN in the cursor: remove everything known about it:
+ * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+ * this replica from the medium consistency RUV.
+ */
+ iter.remove();
+ StaticUtils.close(pair.getFirst());
+ resetNextChangeForInsertDBCursor();
+ callNextOnCursor = false;
+ lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+ }
}
}
+
+ if (callNextOnCursor)
+ {
+ // advance the cursor we just read from,
+ // success/failure will be checked later
+ nextChangeForInsertDBCursor.next();
+ }
}
private void removeAllCursors()
@@ -614,8 +647,8 @@
newCursors.clear();
}
- private void removeCursor(final DN baseDN, final CSN csn)
- throws ChangelogException
+ private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
+ getCursor(final DN baseDN, final int serverId) throws ChangelogException
{
for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
: allCursors.entrySet())
@@ -626,16 +659,14 @@
entry1.getValue().entrySet().iterator(); iter.hasNext();)
{
final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
- if (csn.getServerId() == entry2.getKey())
+ if (serverId == entry2.getKey())
{
- iter.remove();
- StaticUtils.close(entry2.getValue());
- resetNextChangeForInsertDBCursor();
- return;
+ return Pair.of(entry2.getValue(), iter);
}
}
}
}
+ return Pair.empty();
}
private boolean recycleExhaustedCursors() throws ChangelogException
--
Gitblit v1.10.0