From 6aa4fa5b4f71e830dba55f3ea3f9530737db2d8b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 17 Apr 2014 12:44:17 +0000
Subject: [PATCH] OPENDJ-1439 Change number stops progressing with cross domain replication
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 56 ++++++++++++++++++++++++++++++--------------------------
1 files changed, 30 insertions(+), 26 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 5179986..1fe38e0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -92,14 +92,14 @@
private final MultiDomainServerState mediumConsistencyRUV =
new MultiDomainServerState();
/**
- * Holds the cross domain medium consistency CSN for the current replication
- * server.
+ * Holds the cross domain medium consistency baseDN and CSN for the current
+ * replication server.
*
* @see <a href=
* "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
* >OpenDJ Domain Names - medium consistency CSN</a>
*/
- private volatile CSN mediumConsistencyCSN;
+ private volatile Pair<DN, CSN> mediumConsistency;
/**
* Holds the last time each replica was seen alive, whether via updates or
@@ -182,7 +182,7 @@
}
lastAliveCSNs.update(baseDN, heartbeatCSN);
- tryNotify(baseDN);
+ tryNotify();
}
/**
@@ -207,7 +207,7 @@
// only keep the oldest CSN that will be the new cursor's starting point
newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
lastAliveCSNs.update(baseDN, csn);
- tryNotify(baseDN);
+ tryNotify();
}
/**
@@ -239,16 +239,16 @@
{
replicasOffline.update(baseDN, offlineCSN);
lastAliveCSNs.update(baseDN, offlineCSN);
- tryNotify(baseDN);
+ tryNotify();
}
/**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
- private void tryNotify(DN baseDN)
+ private void tryNotify()
{
- if (canMoveForwardMediumConsistencyPoint(baseDN))
+ if (canMoveForwardMediumConsistencyPoint())
{
synchronized (this)
{
@@ -257,13 +257,14 @@
}
}
- private boolean canMoveForwardMediumConsistencyPoint(DN baseDN)
+ private boolean canMoveForwardMediumConsistencyPoint()
{
- final CSN mcCSN = mediumConsistencyCSN;
- if (mcCSN != null)
+ final Pair<DN, CSN> mc = mediumConsistency;
+ if (mc != null)
{
- final int serverId = mcCSN.getServerId();
- CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
+ final CSN mcCSN = mc.getSecond();
+ final CSN lastTimeSameReplicaSeenAlive =
+ lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
}
return true;
@@ -441,7 +442,8 @@
}
wait();
}
- // advance cursor, success/failure will be checked later
+ // try to recycle the exhausted cursors,
+ // success/failure will be checked later
nextChangeForInsertDBCursor.next();
// loop to check whether new changes have been added to the
// ReplicaDBs
@@ -452,7 +454,7 @@
final DN baseDN = nextChangeForInsertDBCursor.getData();
// FIXME problem: what if the serverId is not part of the ServerState?
// right now, change number will be blocked
- if (!canMoveForwardMediumConsistencyPoint(baseDN))
+ if (!canMoveForwardMediumConsistencyPoint())
{
// the oldest record to insert is newer than the medium consistency
// point. Let's wait for a change that can be published.
@@ -460,7 +462,7 @@
{
// double check to protect against a missed call to notify()
if (!isShutdownInitiated()
- && !canMoveForwardMediumConsistencyPoint(baseDN))
+ && !canMoveForwardMediumConsistencyPoint())
{
wait();
// loop to check if changes older than the medium consistency
@@ -479,7 +481,8 @@
changelogDB.getChangeNumberIndexDB().addRecord(record);
moveForwardMediumConsistencyPoint(csn, baseDN);
- // advance cursor, success/failure will be checked later
+ // advance the cursor we just read from,
+ // success/failure will be checked later
nextChangeForInsertDBCursor.next();
}
catch (InterruptedException ignored)
@@ -517,20 +520,21 @@
}
}
- private void moveForwardMediumConsistencyPoint(final CSN csn, final DN baseDN)
+ private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
+ final DN mcBaseDN)
{
// update, so it becomes the previous cookie for the next change
- mediumConsistencyRUV.update(baseDN, csn);
- mediumConsistencyCSN = csn;
- final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+ mediumConsistencyRUV.update(mcBaseDN, mcCSN);
+ mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+ final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
if (offlineCSN != null
- && offlineCSN.isOlderThan(mediumConsistencyCSN)
+ && offlineCSN.isOlderThan(mcCSN)
// If no new updates has been seen for this replica
- && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
+ && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
{
- removeCursor(baseDN, csn);
- replicasOffline.removeCSN(baseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+ removeCursor(mcBaseDN, mcCSN);
+ replicasOffline.removeCSN(mcBaseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
--
Gitblit v1.10.0