From 9a13d05fcb1b17c52c7b91b8445d334bce3f9e28 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 Nov 2013 09:24:10 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 55 +++++++++++++++++++++++++++++++++++++++++++++++++++----
1 files changed, 51 insertions(+), 4 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index dbb68f7..93b8cd3 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -64,8 +64,7 @@
/*
* mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
- * updates 2) many updates can happen concurrently. This solution also avoids
- * using a queue that could fill up before we have consumed all its content.
+ * updates 2) many updates can happen concurrently.
*/
/**
* Holds the cross domain medium consistency Replication Update Vector for the
@@ -98,6 +97,8 @@
*/
private final MultiDomainServerState lastSeenUpdates =
new MultiDomainServerState();
+ private final MultiDomainServerState replicasOffline =
+ new MultiDomainServerState();
/**
* Composite cursor across all the replicaDBs for all the replication domains.
@@ -168,6 +169,21 @@
}
/**
+ * Signals a replica went offline.
+ *
+ * @param baseDN
+ * the replica's replication domain
+ * @param offlineCSN
+ * the serverId and time of the replica that went offline
+ */
+ public void replicaOffline(DN baseDN, CSN offlineCSN)
+ {
+ lastSeenUpdates.update(baseDN, offlineCSN);
+ replicasOffline.update(baseDN, offlineCSN);
+ tryNotify(baseDN);
+ }
+
+ /**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
@@ -187,8 +203,8 @@
final CSN mcCSN = mediumConsistencyCSN;
if (mcCSN != null)
{
- final CSN lastSeenSameServerId =
- lastSeenUpdates.getCSN(baseDN, mcCSN.getServerId());
+ final int serverId = mcCSN.getServerId();
+ final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
return mcCSN.isOlderThan(lastSeenSameServerId);
}
return true;
@@ -374,6 +390,37 @@
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(baseDN, csn);
mediumConsistencyCSN = csn;
+ final CSN offlineCSN = replicasOffline.getCSN(baseDN, csn.getServerId());
+ if (offlineCSN != null
+ && offlineCSN.isOlderThan(mediumConsistencyCSN)
+ // If no new updates has been seen for this replica
+ && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
+ {
+ removeCursor(baseDN, csn);
+ replicasOffline.removeCSN(baseDN, offlineCSN);
+ mediumConsistencyRUV.removeCSN(baseDN, offlineCSN);
+ }
+ }
+
+ private void removeCursor(final DN baseDN, final CSN csn)
+ {
+ for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
+ .entrySet())
+ {
+ if (baseDN.equals(entry.getKey()))
+ {
+ final Set<Integer> serverIds = entry.getValue().keySet();
+ for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
+ {
+ final int serverId = iter.next();
+ if (csn.getServerId() == serverId)
+ {
+ iter.remove();
+ return;
+ }
+ }
+ }
+ }
}
private void createNewCursors() throws ChangelogException
--
Gitblit v1.10.0