From 8c0b9cbba08dee4c4ad6fe00357a018cdb54e280 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Jun 2014 10:46:07 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3667) Change time heart beat change numbers should be synced with updates
---
opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java | 53 +++++++-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 99 ++++++++++-----
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java | 109 ++++++++++++++----
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 46 +++++-
4 files changed, 229 insertions(+), 78 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
index f6273b6..f451cab 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
@@ -40,6 +40,8 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.ResultCode;
+import com.forgerock.opendj.util.Pair;
+
import static org.opends.messages.ReplicationMessages.*;
/**
@@ -98,7 +100,9 @@
public boolean update(DN baseDN, CSN csn)
{
if (csn == null)
+ {
return false;
+ }
ServerState serverState = list.get(baseDN);
if (serverState == null)
@@ -242,6 +246,45 @@
}
/**
+ * Returns the oldest Pair<DN, CSN> held in current object, excluding
+ * the provided CSNs. Said otherwise, the value returned is the oldest
+ * Pair<DN, CSN> included in the current object, that is not part of the
+ * excludedCSNs.
+ *
+ * @param excludedCSNs
+ * the CSNs that cannot be returned
+ * @return the oldest Pair<DN, CSN> included in the current object that
+ * is not part of the excludedCSNs, or {@link Pair#EMPTY} if no such
+ * older CSN exists.
+ */
+ public Pair<DN, CSN> getOldestCSNExcluding(MultiDomainServerState excludedCSNs)
+ {
+ Pair<DN, CSN> oldest = Pair.empty();
+ for (Entry<DN, ServerState> entry : list.entrySet())
+ {
+ final DN baseDN = entry.getKey();
+ final ServerState value = entry.getValue();
+ for (Entry<Integer, CSN> entry2 : value.getServerIdToCSNMap().entrySet())
+ {
+ final CSN csn = entry2.getValue();
+ if (!isReplicaExcluded(excludedCSNs, baseDN, csn)
+ && (oldest == Pair.EMPTY || csn.isOlderThan(oldest.getSecond())))
+ {
+ oldest = Pair.of(baseDN, csn);
+ }
+ }
+ }
+ return oldest;
+ }
+
+ private boolean isReplicaExcluded(MultiDomainServerState excluded, DN baseDN,
+ CSN csn)
+ {
+ return excluded != null
+ && csn.equals(excluded.getCSN(baseDN, csn.getServerId()));
+ }
+
+ /**
* Removes the mapping to the provided CSN if it is present in this
* MultiDomainServerState.
*
@@ -253,12 +296,8 @@
*/
public boolean removeCSN(DN baseDN, CSN expectedCSN)
{
- ServerState ss = list.get(baseDN);
- if (ss != null)
- {
- return ss.removeCSN(expectedCSN);
- }
- return false;
+ final ServerState ss = list.get(baseDN);
+ return ss != null && ss.removeCSN(expectedCSN);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 2419dc4..8a068f6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@
* inserted in the DB. After insert, it is updated with the CSN of the change
* currently processed (thus becoming the "current" cookie just before the
* change is returned.
+ * <p>
+ * Note: This object is only updated by changes/updates.
*
* @see <a href=
* "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
@@ -91,15 +93,6 @@
*/
private final MultiDomainServerState mediumConsistencyRUV =
new MultiDomainServerState();
- /**
- * Holds the cross domain medium consistency baseDN and CSN for the current
- * replication server.
- *
- * @see <a href=
- * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
- * >OpenDJ Domain Names - medium consistency CSN</a>
- */
- private volatile Pair<DN, CSN> mediumConsistency;
/**
* Holds the last time each replica was seen alive, whether via updates or
@@ -108,9 +101,12 @@
* <p>
* Updates are persistent and stored in the replicaDBs, heartbeats are
* transient and are easily constructed on normal operations.
+ * <p>
+ * Note: This object is updated by both heartbeats and changes/updates.
*/
private final MultiDomainServerState lastAliveCSNs =
new MultiDomainServerState();
+ /** Note: This object is updated by replica offline messages. */
private final MultiDomainServerState replicasOffline =
new MultiDomainServerState();
@@ -119,8 +115,12 @@
* positioned on the next change that needs to be inserted in the CNIndexDB.
* <p>
* Note: it is only accessed from the {@link #run()} method.
+ *
+ * @NonNull
*/
- private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
+ @SuppressWarnings("unchecked")
+ private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
+ new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
/**
* New cursors for this Map must be created from the {@link #run()} method,
@@ -182,8 +182,9 @@
return;
}
+ final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, heartbeatCSN);
- tryNotify();
+ tryNotify(oldestCSNBefore);
}
/**
@@ -222,8 +223,9 @@
final CSN csn = updateMsg.getCSN();
// only keep the oldest CSN that will be the new cursor's starting point
newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
+ final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, csn);
- tryNotify();
+ tryNotify(oldestCSNBefore);
}
/**
@@ -254,18 +256,29 @@
*/
public void replicaOffline(DN baseDN, CSN offlineCSN)
{
+ if (!isECLEnabledDomain(baseDN))
+ {
+ return;
+ }
+
replicasOffline.update(baseDN, offlineCSN);
+ final CSN oldestCSNBefore = getOldestLastAliveCSN();
lastAliveCSNs.update(baseDN, offlineCSN);
- tryNotify();
+ tryNotify(oldestCSNBefore);
+ }
+
+ private CSN getOldestLastAliveCSN()
+ {
+ return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond();
}
/**
* Notifies the Change number indexer thread if it will be able to do some
* work.
*/
- private void tryNotify()
+ private void tryNotify(final CSN oldestCSNBefore)
{
- if (canMoveForwardMediumConsistencyPoint())
+ if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore))
{
synchronized (this)
{
@@ -274,19 +287,32 @@
}
}
- private boolean canMoveForwardMediumConsistencyPoint()
+ /**
+ * Used for waking up the {@link ChangeNumberIndexer} thread because it might
+ * have some work to do.
+ */
+ private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore)
{
- final Pair<DN, CSN> mc = mediumConsistency;
- if (mc != null)
- {
- final CSN mcCSN = mc.getSecond();
- final CSN lastTimeSameReplicaSeenAlive =
- lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId());
- return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive);
- }
+ final CSN oldestCSNAfter = getOldestLastAliveCSN();
// ensure that all initial replicas alive information have been updated
// with CSNs that are acceptable for moving the medium consistency forward
- return allInitialReplicasAreOfflineOrAlive();
+ return allInitialReplicasAreOfflineOrAlive()
+ && oldestCSNBefore != null // then oldestCSNAfter cannot be null
+ // has the oldest CSN changed?
+ && oldestCSNBefore.isOlderThan(oldestCSNAfter);
+ }
+
+ /**
+ * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN
+ * must be persisted to the change number index DB.
+ */
+ private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist)
+ {
+ // ensure that all initial replicas alive information have been updated
+ // with CSNs that are acceptable for moving the medium consistency forward
+ return allInitialReplicasAreOfflineOrAlive()
+ // can we persist the next CSN?
+ && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN());
}
/**
@@ -299,8 +325,8 @@
* CSN has been updated to something past the oldest possible CSN), we have
* enough info to compute medium consistency</li>
* </ul>
- * In this case, we have enough information to compute medium consistency
- * without waiting any more.
+ * In both cases, we have enough information to compute medium consistency
+ * without waiting any further.
*/
private boolean allInitialReplicasAreOfflineOrAlive()
{
@@ -308,11 +334,11 @@
{
for (CSN csn : lastAliveCSNs.getServerState(baseDN))
{
- if (// oldest possible CSN?
- csn.getTime() == 0
- // replica is not offline
+ if (csn.getTime() == 0
&& replicasOffline.getCSN(baseDN, csn.getServerId()) == null)
{
+ // this is the oldest possible CSN, but the replica is not offline
+ // we must wait for more up to date information from this replica
return false;
}
}
@@ -537,16 +563,19 @@
final DN baseDN = nextChangeForInsertDBCursor.getData();
// FIXME problem: what if the serverId is not part of the ServerState?
// right now, change number will be blocked
- if (!canMoveForwardMediumConsistencyPoint())
+ if (!canMoveForwardMediumConsistencyPoint(csn))
{
// the oldest record to insert is newer than the medium consistency
// point. Let's wait for a change that can be published.
synchronized (this)
{
// double check to protect against a missed call to notify()
- if (!isShutdownInitiated()
- && !canMoveForwardMediumConsistencyPoint())
+ if (!canMoveForwardMediumConsistencyPoint(csn))
{
+ if (isShutdownInitiated())
+ {
+ return;
+ }
wait();
// loop to check if changes older than the medium consistency
// point have been added to the ReplicaDBs
@@ -601,10 +630,10 @@
private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
final DN mcBaseDN) throws ChangelogException
{
- boolean callNextOnCursor = true;
// update, so it becomes the previous cookie for the next change
mediumConsistencyRUV.update(mcBaseDN, mcCSN);
- mediumConsistency = Pair.of(mcBaseDN, mcCSN);
+
+ boolean callNextOnCursor = true;
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
index 5f9d77e..09b6860 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -21,14 +21,17 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.common;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.types.DN;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.forgerock.opendj.util.Pair;
+
import static org.assertj.core.api.Assertions.*;
import static org.testng.Assert.*;
@@ -40,6 +43,18 @@
private static final CSN csn2 = new CSN(4, 5, 6);
private static final CSN csn3 = new CSN(7, 8, 3);
+ private static DN dn1;
+ private static DN dn2;
+ private static DN dn3;
+
+ @BeforeClass
+ public void setBaseDNs() throws Exception
+ {
+ dn1 = DN.decode("o=test1");
+ dn2 = DN.decode("o=test2");
+ dn3 = DN.decode("o=test3");
+ }
+
@Test
public void testDecodeAndEncode1() throws Exception
{
@@ -60,9 +75,6 @@
@Test
public void testUpdateCSN() throws Exception
{
- final DN dn1 = DN.decode("o=test1");
- final DN dn2 = DN.decode("o=test2");
-
final MultiDomainServerState state = new MultiDomainServerState();
assertTrue(state.update(dn1, csn1));
assertTrue(state.update(dn2, csn2));
@@ -77,9 +89,6 @@
@Test
public void testUpdateServerState() throws Exception
{
- final DN dn1 = DN.decode("o=test1");
- final DN dn2 = DN.decode("o=test2");
-
final MultiDomainServerState state = new MultiDomainServerState();
final ServerState ss1 = new ServerState();
assertTrue(ss1.update(csn3));
@@ -95,9 +104,6 @@
@Test
public void testUpdateMultiDomainServerState() throws Exception
{
- final DN dn1 = DN.decode("o=test1");
- final DN dn2 = DN.decode("o=test2");
-
final MultiDomainServerState state1 = new MultiDomainServerState();
state1.update(dn1, csn3);
state1.update(dn2, csn2);
@@ -112,9 +118,6 @@
@Test(dependsOnMethods = { "testUpdateCSN" })
public void testEqualsTo() throws Exception
{
- final DN dn1 = DN.decode("o=test1");
- final DN dn2 = DN.decode("o=test2");
-
final MultiDomainServerState state1 = new MultiDomainServerState();
assertTrue(state1.update(dn1, csn3));
@@ -134,9 +137,6 @@
@Test(dependsOnMethods = { "testUpdateCSN" })
public void testIsEmpty() throws Exception
{
- final DN dn1 = DN.decode("o=test1");
- final DN dn2 = DN.decode("o=test2");
-
final MultiDomainServerState state = new MultiDomainServerState();
assertTrue(state.isEmpty());
@@ -155,15 +155,7 @@
@Test(dependsOnMethods = { "testUpdateCSN" })
public void testRemoveCSN() throws Exception
{
- final DN dn1 = DN.decode("o=test1");
- final DN dn2 = DN.decode("o=test2");
- final DN dn3 = DN.decode("o=test3");
-
- final MultiDomainServerState state = new MultiDomainServerState();
-
- assertTrue(state.update(dn1, csn1));
- assertTrue(state.update(dn2, csn1));
- assertTrue(state.update(dn2, csn2));
+ final MultiDomainServerState state = getLastAliveCSNs();
assertNull(state.getCSN(dn3, 42));
assertFalse(state.removeCSN(dn3, csn1));
@@ -181,4 +173,71 @@
assertNull(state.getCSN(dn2, csn1.getServerId()));
assertSame(csn2, state.getCSN(dn2, csn2.getServerId()));
}
+
+ private MultiDomainServerState getLastAliveCSNs()
+ {
+ final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
+ assertTrue(lastAliveCSNs.update(dn1, csn1));
+ assertTrue(lastAliveCSNs.update(dn2, csn1));
+ assertTrue(lastAliveCSNs.update(dn2, csn2));
+ return lastAliveCSNs;
+ }
+
+ @Test(dependsOnMethods = { "testUpdateCSN" })
+ public void testGetOldestCSNExcluding_null() throws Exception
+ {
+ final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+ assertEquals(lastAliveCSNs.getOldestCSNExcluding(null), Pair.of(dn1, csn1));
+ }
+
+ @Test(dependsOnMethods = { "testUpdateCSN" })
+ public void testGetOldestCSNExcluding_empty() throws Exception
+ {
+ final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+ final MultiDomainServerState excluded = new MultiDomainServerState();
+
+ assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn1, csn1));
+ }
+
+ @Test(dependsOnMethods = { "testUpdateCSN" })
+ public void testGetOldestCSNExcluding_currentOldestCSN_givesNewOldestCSN() throws Exception
+ {
+ final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+
+ final MultiDomainServerState excluded = new MultiDomainServerState();
+ excluded.update(dn1, csn1);
+
+ assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
+ }
+
+ @Test(dependsOnMethods = { "testUpdateCSN" })
+ public void testGetOldestCSNExcluding_CSNOlderThanCurrentOldestCSN_givesNewOldestCSN() throws Exception
+ {
+ final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+
+ final MultiDomainServerState excluded = new MultiDomainServerState();
+ excluded.update(dn1, csn1);
+ final CSN olderThanCSN1 = new CSN(0, 2, 3);
+ assertEquals(olderThanCSN1.getServerId(), csn1.getServerId());
+ assertTrue(olderThanCSN1.isOlderThan(csn1));
+ excluded.update(dn2, olderThanCSN1);
+
+ assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
+ }
+
+ @Test(dependsOnMethods = { "testUpdateCSN" })
+ public void testGetOldestCSNExcluding_CSNNewerThanCurrentOldestCSN_givesNewOldestCSN() throws Exception
+ {
+ final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs();
+
+ final MultiDomainServerState excluded = new MultiDomainServerState();
+ excluded.update(dn1, csn1);
+ final CSN newerThanCSN1 = new CSN(42, 2, 3);
+ assertEquals(newerThanCSN1.getServerId(), csn1.getServerId());
+ assertTrue(newerThanCSN1.isNewerThan(csn1));
+ excluded.update(dn2, newerThanCSN1);
+
+ assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1));
+ }
+
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 05e9770..b91bdc1 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -355,6 +355,7 @@
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
publishUpdateMsg(msg1);
+ assertExternalChangelogContent(msg1);
addReplica(BASE_DN1, serverId2);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -486,19 +487,19 @@
assertExternalChangelogContent(msg1);
// do not wait for temporarily offline serverId1
- final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
- publishUpdateMsg(msg3);
- assertExternalChangelogContent(msg1, msg3);
+ final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
+ publishUpdateMsg(msg4);
+ assertExternalChangelogContent(msg1, msg4);
// serverId1 is back online, wait for changes from serverId2
- final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
- publishUpdateMsg(msg4);
- assertExternalChangelogContent(msg1, msg3);
-
- final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
+ final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
publishUpdateMsg(msg5);
+ assertExternalChangelogContent(msg1, msg4);
+
+ final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
+ publishUpdateMsg(msg6);
// MCP moves forward
- assertExternalChangelogContent(msg1, msg3, msg4);
+ assertExternalChangelogContent(msg1, msg4, msg5);
}
/**
@@ -525,16 +526,39 @@
assertExternalChangelogContent();
// MCP moves forward because serverId1 is not really offline
- // since because we received a message from it after the offline replica msg
+ // since we received a message from it newer than the offline replica msg
final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4);
publishUpdateMsg(msg4);
assertExternalChangelogContent(msg2, msg3);
// back to normal operations
- sendHeartbeat(BASE_DN1, serverId1, 4);
+ sendHeartbeat(BASE_DN1, serverId1, 5);
assertExternalChangelogContent(msg2, msg3, msg4);
}
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoDSsOneKilled() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ addReplica(BASE_DN1, serverId2);
+ startCNIndexer(BASE_DN1);
+ assertExternalChangelogContent();
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ publishUpdateMsg(msg1);
+ // MCP cannot move forward: no news yet from serverId2
+ assertExternalChangelogContent();
+
+ sendHeartbeat(BASE_DN1, serverId2, 2);
+ // MCP moves forward: we know what serverId2 is at
+ assertExternalChangelogContent(msg1);
+
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+ publishUpdateMsg(msg3);
+ // MCP cannot move forward: serverId2 is the oldest CSN
+ assertExternalChangelogContent(msg1);
+ }
+
private void addReplica(DN baseDN, int serverId) throws Exception
{
final SequentialDBCursor cursor = new SequentialDBCursor();
--
Gitblit v1.10.0