From 9737dd7d653611c9d9ee38640685a68d43abbca4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 17 Apr 2014 15:05:17 +0000
Subject: [PATCH] OPENDJ-1440 On startup, change number can progress without waiting for any heartbeat from known replicas
---
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 30 ++++++++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 39 +++++++++----------
2 files changed, 48 insertions(+), 21 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 1fe38e0..99870ed 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -267,6 +267,23 @@
lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
}
+ // ensure that all initial replicas alive information have been updated
+ // with CSNs that are acceptable for moving the medium consistency forward
+ return allInitialReplicasArePastOldestPossibleCSN();
+ }
+
+ private boolean allInitialReplicasArePastOldestPossibleCSN()
+ {
+ for (DN baseDN : lastAliveCSNs)
+ {
+ for (CSN csn : lastAliveCSNs.getServerState(baseDN))
+ {
+ if (csn.getTime() == 0)
+ {
+ return false;
+ }
+ }
+ }
return true;
}
@@ -301,8 +318,14 @@
for (Integer serverId : entry.getValue())
{
- final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
+ /*
+ * initialize with the oldest possible CSN in order for medium
+ * consistency to wait for all replicas to be alive before moving
+ * forward
+ */
+ lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
// start after the actual CSN when initializing from the previous cookie
+ final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
ensureCursorExists(baseDN, serverId, csn);
}
@@ -332,6 +355,11 @@
this.changelogState = null;
}
+ private CSN oldestPossibleCSN(int serverId)
+ {
+ return new CSN(0, 0, serverId);
+ }
+
private void resetNextChangeForInsertDBCursor() throws ChangelogException
{
final Map<DBCursor<UpdateMsg>, DN> cursors =
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index c60411c..49e1994 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -150,7 +150,7 @@
@Test
public void emptyDBNoDS() throws Exception
{
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
assertExternalChangelogContent();
}
@@ -158,7 +158,7 @@
public void emptyDBOneInitialDS() throws Exception
{
addReplica(BASE_DN1, serverId1);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
@@ -171,7 +171,7 @@
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
addReplica(BASE_DN1, serverId1);
setDBInitialRecords(msg1);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
publishUpdateMsg(msg2);
@@ -183,11 +183,15 @@
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
+ // simulate messages received out of order
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
- publishUpdateMsg(msg2, msg1);
+ publishUpdateMsg(msg2);
+ // do not start publishing to the changelog until we hear from serverId1
+ assertExternalChangelogContent();
+ publishUpdateMsg(msg1);
assertExternalChangelogContent(msg1);
}
@@ -196,7 +200,7 @@
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN2, serverId2);
- startCNIndexer();
+ startCNIndexer(BASE_DN1, BASE_DN2);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN2, serverId2, 2);
@@ -215,7 +219,7 @@
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
setDBInitialRecords(msg1, msg2);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
@@ -236,7 +240,7 @@
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg1Sid2 = msg(BASE_DN1, serverId2, 1);
final ReplicatedUpdateMsg emptySid2 = emptyCursor(BASE_DN1, serverId2);
@@ -253,7 +257,7 @@
addReplica(ADMIN_DATA_DN, serverId1);
addReplica(BASE_DN1, serverId2);
addReplica(BASE_DN1, serverId3);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
// cn=admin data will does not participate in the external changelog
// so it cannot add to it
@@ -271,7 +275,7 @@
public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
{
addReplica(BASE_DN1, serverId1);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
@@ -291,7 +295,7 @@
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -307,7 +311,7 @@
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
- startCNIndexer();
+ startCNIndexer(BASE_DN1);
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -335,14 +339,15 @@
initialState.addServerIdToDomain(serverId, baseDN);
}
- private void startCNIndexer()
+ private void startCNIndexer(DN... eclEnabledDomains)
{
+ final List<DN> eclEnabledDomainList = Arrays.asList(eclEnabledDomains);
cnIndexer = new ChangeNumberIndexer(changelogDB, initialState)
{
@Override
protected boolean isECLEnabledDomain(DN baseDN)
{
- return BASE_DN1.equals(baseDN) || BASE_DN2.equals(baseDN);
+ return eclEnabledDomainList.contains(baseDN);
}
};
cnIndexer.start();
@@ -447,12 +452,6 @@
private void assertExternalChangelogContent(ReplicatedUpdateMsg... expectedMsgs)
throws Exception
{
- if (expectedMsgs.length == 0)
- {
- verify(cnIndexDB, never()).addRecord(any(ChangeNumberIndexRecord.class));
- return;
- }
-
final ArgumentCaptor<ChangeNumberIndexRecord> arg =
ArgumentCaptor.forClass(ChangeNumberIndexRecord.class);
verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
--
Gitblit v1.10.0