From 5f803832687ee9d56deec9946d6be7f3772e7688 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Dec 2013 11:08:12 +0000
Subject: [PATCH] OPENDJ-1231 (CR-2724) Make the Medium Consistency Point support replica heartbeats and replicas shutting down
---
opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 15 +++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 24 +++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 20 ++++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 39 ++++++---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 62 +-------------
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 36 +++-----
6 files changed, 99 insertions(+), 97 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index cfed222..f6273b6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -209,6 +209,19 @@
}
/**
+ * Returns the ServerState associated to the provided replication domain's
+ * baseDN.
+ *
+ * @param baseDN
+ * the replication domain's baseDN
+ * @return the associated ServerState
+ */
+ public ServerState getServerState(DN baseDN)
+ {
+ return list.get(baseDN);
+ }
+
+ /**
* Returns the CSN associated to the provided replication domain's baseDN and
* serverId.
*
@@ -216,7 +229,7 @@
* the replication domain's baseDN
* @param serverId
* the serverId
- * @return the associated ServerState
+ * @return the associated CSN
*/
public CSN getCSN(DN baseDN, int serverId)
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 91e799a..70ee7cd 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -167,8 +167,6 @@
*/
private int assuredTimeoutTimerPurgeCounter = 0;
- private ServerState ctHeartbeatState;
-
/**
* Creates a new ReplicationServerDomain associated to the baseDN.
*
@@ -2558,19 +2556,6 @@
}
/**
- * Return the state that contain for each server the time of eligibility.
- * @return the state.
- */
- public ServerState getChangeTimeHeartbeatState()
- {
- if (ctHeartbeatState == null)
- {
- ctHeartbeatState = getLatestServerState().duplicate();
- }
- return ctHeartbeatState;
- }
-
- /**
* Returns the oldest known state for the domain, made of the oldest CSN
* stored for each serverId.
* <p>
@@ -2593,31 +2578,13 @@
*
* @return the eligible CSN.
*/
- public CSN getEligibleCSN()
+ CSN getEligibleCSN()
{
CSN eligibleCSN = null;
-
- final ServerState newestCSNs = domainDB.getDomainNewestCSNs(baseDN);
- for (final CSN replicaNewestCSN : newestCSNs)
+ for (final CSN lastAliveCSN : domainDB.getDomainLastAliveCSNs(baseDN))
{
// Should it be considered for eligibility ?
- int serverId = replicaNewestCSN.getServerId();
- CSN heartbeatLastCSN = getChangeTimeHeartbeatState().getCSN(serverId);
-
- // If the most recent UpdateMsg or CLHeartbeatMsg received is very old
- // then the domain is considered down and not considered for eligibility
- /*
- if ((heartbeatLastDN != null) &&
- (TimeThread.getTime()- heartbeatLastDN.getTime() > 5000))
- {
- if (debugEnabled())
- TRACER.debugInfo("In " + this.getName() +
- " Server " + sid
- + " is not considered for eligibility ... potentially down");
- continue;
- }
- */
-
+ final int serverId = lastAliveCSN.getServerId();
if (!isServerConnected(serverId))
{
if (debugEnabled())
@@ -2628,13 +2595,9 @@
continue;
}
- if (eligibleCSN == null || replicaNewestCSN.isNewerThan(eligibleCSN))
+ if (eligibleCSN == null || lastAliveCSN.isNewerThan(eligibleCSN))
{
- eligibleCSN = replicaNewestCSN;
- }
- if (heartbeatLastCSN != null && heartbeatLastCSN.isNewerThan(eligibleCSN))
- {
- eligibleCSN = heartbeatLastCSN;
+ eligibleCSN = lastAliveCSN;
}
}
@@ -2671,7 +2634,7 @@
* @param msg The message to process.
*/
public void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
- ChangeTimeHeartbeatMsg msg )
+ ChangeTimeHeartbeatMsg msg)
{
try
{
@@ -2689,7 +2652,7 @@
try
{
- storeReceivedCTHeartbeat(msg.getCSN());
+ domainDB.replicaHeartbeat(baseDN, msg.getCSN());
if (senderHandler.isDataServer())
{
// If we are the first replication server warned,
@@ -2722,17 +2685,6 @@
}
/**
- * Store a change time value received from a data server.
- * @param csn The provided change time.
- */
- public void storeReceivedCTHeartbeat(CSN csn)
- {
- // TODO:Maybe we can spare processing by only storing CSN (timestamp)
- // instead of a server state.
- getChangeTimeHeartbeatState().update(csn);
- }
-
- /**
* This methods count the changes, server by server :
* - from a serverState start point
* - to (inclusive) an end point (the provided endCSN).
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index ed3b7db..7fe4069 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -47,8 +47,8 @@
long getDomainChangesCount(DN baseDN);
/**
- * Returns the oldest {@link CSN}s of each serverId for the specified
- * replication domain.
+ * Returns the oldest {@link CSN}s from the replicaDBs for each serverId in
+ * the specified replication domain.
*
* @param baseDN
* the replication domain baseDN
@@ -59,8 +59,8 @@
ServerState getDomainOldestCSNs(DN baseDN);
/**
- * Returns the newest {@link CSN}s of each serverId for the specified
- * replication domain.
+ * Returns the newest {@link CSN}s from the replicaDBs for each serverId in
+ * the specified replication domain.
*
* @param baseDN
* the replication domain baseDN
@@ -71,6 +71,18 @@
ServerState getDomainNewestCSNs(DN baseDN);
/**
+ * Returns the last time each serverId was seen alive for the specified
+ * replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return a new ServerState object holding the {serverId => CSN} Map. Can be
+ * null if the config that computes change numbers is set to false or
+ * if domain is not replicated.
+ */
+ ServerState getDomainLastAliveCSNs(DN baseDN);
+
+ /**
* Retrieves the latest trim date for the specified replication domain.
* <p>
* FIXME will be removed when ECLServerHandler will not be responsible anymore
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 4c46759..40cccc9 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -101,12 +101,13 @@
private volatile CSN mediumConsistencyCSN;
/**
- * Holds the most recent changes or heartbeats received for each serverIds
- * cross domain. changes are stored in the replicaDBs and hence persistent,
- * heartbeats are transient because they are easily constructed on normal
- * operations.
+ * Holds the last time each replica was seen alive, whether via updates or
+ * heartbeats received. Data is held for each serverId cross domain.
+ * <p>
+ * Updates are persistent and stored in the replicaDBs, heartbeats are
+ * transient and are easily constructed on normal operations.
*/
- private final MultiDomainServerState lastSeenUpdates =
+ private final MultiDomainServerState lastAliveCSNs =
new MultiDomainServerState();
private final MultiDomainServerState replicasOffline =
new MultiDomainServerState();
@@ -174,7 +175,7 @@
*/
public void publishHeartbeat(DN baseDN, CSN heartbeatCSN)
{
- lastSeenUpdates.update(baseDN, heartbeatCSN);
+ lastAliveCSNs.update(baseDN, heartbeatCSN);
tryNotify(baseDN);
}
@@ -192,13 +193,27 @@
throws ChangelogException
{
final CSN csn = updateMsg.getCSN();
- lastSeenUpdates.update(baseDN, csn);
+ lastAliveCSNs.update(baseDN, csn);
// only keep the oldest CSN that will be the new cursor's starting point
newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
tryNotify(baseDN);
}
/**
+ * Returns the last time each serverId was seen alive for the specified
+ * replication domain.
+ *
+ * @param baseDN
+ * the replication domain baseDN
+ * @return a new ServerState object holding the {serverId => CSN} Map. Can be
+ * null if domain is not replicated.
+ */
+ public ServerState getDomainLastAliveCSNs(DN baseDN)
+ {
+ return lastAliveCSNs.getServerState(baseDN);
+ }
+
+ /**
* Signals a replica went offline.
*
* @param baseDN
@@ -208,7 +223,7 @@
*/
public void replicaOffline(DN baseDN, CSN offlineCSN)
{
- lastSeenUpdates.update(baseDN, offlineCSN);
+ lastAliveCSNs.update(baseDN, offlineCSN);
replicasOffline.update(baseDN, offlineCSN);
tryNotify(baseDN);
}
@@ -234,8 +249,8 @@
if (mcCSN != null)
{
final int serverId = mcCSN.getServerId();
- final CSN lastSeenSameServerId = lastSeenUpdates.getCSN(baseDN, serverId);
- return mcCSN.isOlderThan(lastSeenSameServerId);
+ CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(baseDN, serverId);
+ return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
}
return true;
}
@@ -269,7 +284,7 @@
}
ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
- lastSeenUpdates.update(baseDN, latestKnownState);
+ lastAliveCSNs.update(baseDN, latestKnownState);
}
resetNextChangeForInsertDBCursor();
@@ -491,7 +506,7 @@
if (offlineCSN != null
&& offlineCSN.isOlderThan(mediumConsistencyCSN)
// If no new updates has been seen for this replica
- && lastSeenUpdates.removeCSN(baseDN, offlineCSN))
+ && lastAliveCSNs.removeCSN(baseDN, offlineCSN))
{
removeCursor(baseDN, csn);
replicasOffline.removeCSN(baseDN, offlineCSN);
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 61991fc..7a52433 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -539,6 +539,23 @@
/** {@inheritDoc} */
@Override
+ public ServerState getDomainLastAliveCSNs(DN baseDN)
+ {
+ final ChangeNumberIndexer indexer = this.cnIndexer.get();
+ if (indexer != null)
+ {
+ final ServerState results = indexer.getDomainLastAliveCSNs(baseDN);
+ if (results != null)
+ {
+ // return a copy to protect against concurrent modifications
+ return results.duplicate();
+ }
+ }
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
public void removeDomain(DN baseDN) throws ChangelogException
{
// Remember the first exception because :
@@ -791,8 +808,11 @@
@Override
public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
{
- // TODO implement this when the changelogDB will be responsible for
- // maintaining the medium consistency point
+ final ChangeNumberIndexer indexer = cnIndexer.get();
+ if (indexer != null)
+ {
+ indexer.publishHeartbeat(baseDN, heartbeatCSN);
+ }
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index e347da3..beb510a 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -552,9 +552,8 @@
debugInfo(tn, "Starting test\n\n");
// root entry returned
- final InternalSearchOperation op = searchOnChangelog(
- "(objectclass=*)", Collections.<String>emptySet(), createControls(""), 1, tn);
- waitOpResult(op, ResultCode.SUCCESS);
+ searchOnChangelog("(objectclass=*)", Collections.<String> emptySet(), createControls(""),
+ 1, ResultCode.SUCCESS, tn);
debugInfo(tn, "Ending test successfully");
}
@@ -585,8 +584,7 @@
/** Add an entry in the database */
private void addEntry(Entry entry) throws Exception
{
- AddOperation addOp = connection.processAdd(entry);
- waitOpResult(addOp, ResultCode.SUCCESS);
+ waitOpResult(connection.processAdd(entry), ResultCode.SUCCESS);
assertNotNull(getEntry(entry.getDN(), 1000, true));
}
@@ -868,10 +866,8 @@
throws Exception
{
debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
- final InternalSearchOperation searchOp = searchOnChangelog(
- filterString, ALL_ATTRIBUTES, createControls(cookie), expectedNbEntries, testName);
- waitOpResult(searchOp, expectedResultCode);
- return searchOp;
+ return searchOnChangelog(filterString, ALL_ATTRIBUTES, createControls(cookie),
+ expectedNbEntries, expectedResultCode, testName);
}
private InternalSearchOperation searchOnChangelog(String filterString,
@@ -879,15 +875,13 @@
throws Exception
{
debugInfo(testName, " Search: " + filterString);
- final InternalSearchOperation searchOp = searchOnChangelog(
- filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, testName);
- waitOpResult(searchOp, expectedResultCode);
- return searchOp;
+ return searchOnChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL,
+ expectedNbEntries, expectedResultCode, testName);
}
private InternalSearchOperation searchOnChangelog(String filterString,
Set<String> attributes, List<Control> controls, int expectedNbEntries,
- String testName) throws Exception
+ ResultCode expectedResultCode, String testName) throws Exception
{
InternalSearchOperation op = null;
int cnt = 0;
@@ -912,6 +906,7 @@
final List<SearchResultEntry> entries = op.getSearchEntries();
assertThat(entries).hasSize(expectedNbEntries);
debugAndWriteEntries(getLDIFWriter(), entries, testName);
+ waitOpResult(op, expectedResultCode);
return op;
}
@@ -2104,7 +2099,6 @@
ReplicationServerDomain rsd1 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN);
debugInfo(tn, rsd1.getBaseDN()
+ " LatestServerState=" + rsd1.getLatestServerState()
- + " ChangeTimeHeartBeatState=" + rsd1.getChangeTimeHeartbeatState()
+ " eligibleCSN=" + rsd1.getEligibleCSN()
+ " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
// FIXME:ECL Enable this test by adding an assert on the right value
@@ -2112,7 +2106,6 @@
ReplicationServerDomain rsd2 = replicationServer.getReplicationServerDomain(TEST_ROOT_DN2);
debugInfo(tn, rsd2.getBaseDN()
+ " LatestServerState=" + rsd2.getLatestServerState()
- + " ChangeTimeHeartBeatState=" + rsd2.getChangeTimeHeartbeatState()
+ " eligibleCSN=" + rsd2.getEligibleCSN()
+ " rs eligibleCSN=" + replicationServer.getEligibleCSN(null));
// FIXME:ECL Enable this test by adding an assert on the right value
@@ -2882,13 +2875,10 @@
}
finally
{
- final DeleteOperation delOp1 = connection.processDelete(
- DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2));
- waitOpResult(delOp1, ResultCode.SUCCESS);
- final DeleteOperation delOp2 = connection.processDelete(TEST_ROOT_DN2);
- waitOpResult(delOp2, ResultCode.SUCCESS);
- final DeleteOperation delOp3 = connection.processDelete(baseDN3);
- waitOpResult(delOp3, ResultCode.SUCCESS);
+ final DN fionaDN = DN.decode("cn=Fiona Jensen," + TEST_ROOT_DN_STRING2);
+ waitOpResult(connection.processDelete(fionaDN), ResultCode.SUCCESS);
+ waitOpResult(connection.processDelete(TEST_ROOT_DN2), ResultCode.SUCCESS);
+ waitOpResult(connection.processDelete(baseDN3), ResultCode.SUCCESS);
remove(domain21, domain2, domain3);
removeTestBackend(backend2, backend3);
--
Gitblit v1.10.0