opends/src/server/org/opends/server/replication/common/MultiDomainServerState.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.common; @@ -40,6 +40,8 @@ import org.opends.server.types.DirectoryException; import org.opends.server.types.ResultCode; import com.forgerock.opendj.util.Pair; import static org.opends.messages.ReplicationMessages.*; /** @@ -98,7 +100,9 @@ public boolean update(DN baseDN, CSN csn) { if (csn == null) { return false; } ServerState serverState = list.get(baseDN); if (serverState == null) @@ -242,6 +246,45 @@ } /** * Returns the oldest Pair<DN, CSN> held in current object, excluding * the provided CSNs. Said otherwise, the value returned is the oldest * Pair<DN, CSN> included in the current object, that is not part of the * excludedCSNs. * * @param excludedCSNs * the CSNs that cannot be returned * @return the oldest Pair<DN, CSN> included in the current object that * is not part of the excludedCSNs, or {@link Pair#EMPTY} if no such * older CSN exists. */ public Pair<DN, CSN> getOldestCSNExcluding(MultiDomainServerState excludedCSNs) { Pair<DN, CSN> oldest = Pair.empty(); for (Entry<DN, ServerState> entry : list.entrySet()) { final DN baseDN = entry.getKey(); final ServerState value = entry.getValue(); for (Entry<Integer, CSN> entry2 : value.getServerIdToCSNMap().entrySet()) { final CSN csn = entry2.getValue(); if (!isReplicaExcluded(excludedCSNs, baseDN, csn) && (oldest == Pair.EMPTY || csn.isOlderThan(oldest.getSecond()))) { oldest = Pair.of(baseDN, csn); } } } return oldest; } private boolean isReplicaExcluded(MultiDomainServerState excluded, DN baseDN, CSN csn) { return excluded != null && csn.equals(excluded.getCSN(baseDN, csn.getServerId())); } /** * Removes the mapping to the provided CSN if it is present in this * MultiDomainServerState. * @@ -253,12 +296,8 @@ */ public boolean removeCSN(DN baseDN, CSN expectedCSN) { ServerState ss = list.get(baseDN); if (ss != null) { return ss.removeCSN(expectedCSN); } return false; final ServerState ss = list.get(baseDN); return ss != null && ss.removeCSN(expectedCSN); } /** opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -84,6 +84,8 @@ * inserted in the DB. After insert, it is updated with the CSN of the change * currently processed (thus becoming the "current" cookie just before the * change is returned. * <p> * Note: This object is only updated by changes/updates. * * @see <a href= * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" @@ -91,15 +93,6 @@ */ private final MultiDomainServerState mediumConsistencyRUV = new MultiDomainServerState(); /** * Holds the cross domain medium consistency baseDN and CSN for the current * replication server. * * @see <a href= * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" * >OpenDJ Domain Names - medium consistency CSN</a> */ private volatile Pair<DN, CSN> mediumConsistency; /** * Holds the last time each replica was seen alive, whether via updates or @@ -108,9 +101,12 @@ * <p> * Updates are persistent and stored in the replicaDBs, heartbeats are * transient and are easily constructed on normal operations. * <p> * Note: This object is updated by both heartbeats and changes/updates. */ private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); /** Note: This object is updated by replica offline messages. */ private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); @@ -119,8 +115,12 @@ * positioned on the next change that needs to be inserted in the CNIndexDB. * <p> * Note: it is only accessed from the {@link #run()} method. * * @NonNull */ private CompositeDBCursor<DN> nextChangeForInsertDBCursor; @SuppressWarnings("unchecked") private CompositeDBCursor<DN> nextChangeForInsertDBCursor = new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false); /** * New cursors for this Map must be created from the {@link #run()} method, @@ -182,8 +182,9 @@ return; } final CSN oldestCSNBefore = getOldestLastAliveCSN(); lastAliveCSNs.update(baseDN, heartbeatCSN); tryNotify(); tryNotify(oldestCSNBefore); } /** @@ -222,8 +223,9 @@ final CSN csn = updateMsg.getCSN(); // only keep the oldest CSN that will be the new cursor's starting point newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn); final CSN oldestCSNBefore = getOldestLastAliveCSN(); lastAliveCSNs.update(baseDN, csn); tryNotify(); tryNotify(oldestCSNBefore); } /** @@ -254,18 +256,29 @@ */ public void replicaOffline(DN baseDN, CSN offlineCSN) { if (!isECLEnabledDomain(baseDN)) { return; } replicasOffline.update(baseDN, offlineCSN); final CSN oldestCSNBefore = getOldestLastAliveCSN(); lastAliveCSNs.update(baseDN, offlineCSN); tryNotify(); tryNotify(oldestCSNBefore); } private CSN getOldestLastAliveCSN() { return lastAliveCSNs.getOldestCSNExcluding(replicasOffline).getSecond(); } /** * Notifies the Change number indexer thread if it will be able to do some * work. */ private void tryNotify() private void tryNotify(final CSN oldestCSNBefore) { if (canMoveForwardMediumConsistencyPoint()) if (mightMoveForwardMediumConsistencyPoint(oldestCSNBefore)) { synchronized (this) { @@ -274,19 +287,32 @@ } } private boolean canMoveForwardMediumConsistencyPoint() /** * Used for waking up the {@link ChangeNumberIndexer} thread because it might * have some work to do. */ private boolean mightMoveForwardMediumConsistencyPoint(CSN oldestCSNBefore) { final Pair<DN, CSN> mc = mediumConsistency; if (mc != null) { final CSN mcCSN = mc.getSecond(); final CSN lastTimeSameReplicaSeenAlive = lastAliveCSNs.getCSN(mc.getFirst(), mcCSN.getServerId()); return mcCSN.isOlderThan(lastTimeSameReplicaSeenAlive); } final CSN oldestCSNAfter = getOldestLastAliveCSN(); // ensure that all initial replicas alive information have been updated // with CSNs that are acceptable for moving the medium consistency forward return allInitialReplicasAreOfflineOrAlive(); return allInitialReplicasAreOfflineOrAlive() && oldestCSNBefore != null // then oldestCSNAfter cannot be null // has the oldest CSN changed? && oldestCSNBefore.isOlderThan(oldestCSNAfter); } /** * Used by the {@link ChangeNumberIndexer} thread to determine whether the CSN * must be persisted to the change number index DB. */ private boolean canMoveForwardMediumConsistencyPoint(CSN nextCSNToPersist) { // ensure that all initial replicas alive information have been updated // with CSNs that are acceptable for moving the medium consistency forward return allInitialReplicasAreOfflineOrAlive() // can we persist the next CSN? && nextCSNToPersist.isOlderThanOrEqualTo(getOldestLastAliveCSN()); } /** @@ -299,8 +325,8 @@ * CSN has been updated to something past the oldest possible CSN), we have * enough info to compute medium consistency</li> * </ul> * In this case, we have enough information to compute medium consistency * without waiting any more. * In both cases, we have enough information to compute medium consistency * without waiting any further. */ private boolean allInitialReplicasAreOfflineOrAlive() { @@ -308,11 +334,11 @@ { for (CSN csn : lastAliveCSNs.getServerState(baseDN)) { if (// oldest possible CSN? csn.getTime() == 0 // replica is not offline if (csn.getTime() == 0 && replicasOffline.getCSN(baseDN, csn.getServerId()) == null) { // this is the oldest possible CSN, but the replica is not offline // we must wait for more up to date information from this replica return false; } } @@ -537,16 +563,19 @@ final DN baseDN = nextChangeForInsertDBCursor.getData(); // FIXME problem: what if the serverId is not part of the ServerState? // right now, change number will be blocked if (!canMoveForwardMediumConsistencyPoint()) if (!canMoveForwardMediumConsistencyPoint(csn)) { // the oldest record to insert is newer than the medium consistency // point. Let's wait for a change that can be published. synchronized (this) { // double check to protect against a missed call to notify() if (!isShutdownInitiated() && !canMoveForwardMediumConsistencyPoint()) if (!canMoveForwardMediumConsistencyPoint(csn)) { if (isShutdownInitiated()) { return; } wait(); // loop to check if changes older than the medium consistency // point have been added to the ReplicaDBs @@ -601,10 +630,10 @@ private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException { boolean callNextOnCursor = true; // update, so it becomes the previous cookie for the next change mediumConsistencyRUV.update(mcBaseDN, mcCSN); mediumConsistency = Pair.of(mcBaseDN, mcCSN); boolean callNextOnCursor = true; final int mcServerId = mcCSN.getServerId(); final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/MultiDomainServerStateTest.java
@@ -21,14 +21,17 @@ * CDDL HEADER END * * * Copyright 2013 ForgeRock AS * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.common; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.types.DN; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.forgerock.opendj.util.Pair; import static org.assertj.core.api.Assertions.*; import static org.testng.Assert.*; @@ -40,6 +43,18 @@ private static final CSN csn2 = new CSN(4, 5, 6); private static final CSN csn3 = new CSN(7, 8, 3); private static DN dn1; private static DN dn2; private static DN dn3; @BeforeClass public void setBaseDNs() throws Exception { dn1 = DN.decode("o=test1"); dn2 = DN.decode("o=test2"); dn3 = DN.decode("o=test3"); } @Test public void testDecodeAndEncode1() throws Exception { @@ -60,9 +75,6 @@ @Test public void testUpdateCSN() throws Exception { final DN dn1 = DN.decode("o=test1"); final DN dn2 = DN.decode("o=test2"); final MultiDomainServerState state = new MultiDomainServerState(); assertTrue(state.update(dn1, csn1)); assertTrue(state.update(dn2, csn2)); @@ -77,9 +89,6 @@ @Test public void testUpdateServerState() throws Exception { final DN dn1 = DN.decode("o=test1"); final DN dn2 = DN.decode("o=test2"); final MultiDomainServerState state = new MultiDomainServerState(); final ServerState ss1 = new ServerState(); assertTrue(ss1.update(csn3)); @@ -95,9 +104,6 @@ @Test public void testUpdateMultiDomainServerState() throws Exception { final DN dn1 = DN.decode("o=test1"); final DN dn2 = DN.decode("o=test2"); final MultiDomainServerState state1 = new MultiDomainServerState(); state1.update(dn1, csn3); state1.update(dn2, csn2); @@ -112,9 +118,6 @@ @Test(dependsOnMethods = { "testUpdateCSN" }) public void testEqualsTo() throws Exception { final DN dn1 = DN.decode("o=test1"); final DN dn2 = DN.decode("o=test2"); final MultiDomainServerState state1 = new MultiDomainServerState(); assertTrue(state1.update(dn1, csn3)); @@ -134,9 +137,6 @@ @Test(dependsOnMethods = { "testUpdateCSN" }) public void testIsEmpty() throws Exception { final DN dn1 = DN.decode("o=test1"); final DN dn2 = DN.decode("o=test2"); final MultiDomainServerState state = new MultiDomainServerState(); assertTrue(state.isEmpty()); @@ -155,15 +155,7 @@ @Test(dependsOnMethods = { "testUpdateCSN" }) public void testRemoveCSN() throws Exception { final DN dn1 = DN.decode("o=test1"); final DN dn2 = DN.decode("o=test2"); final DN dn3 = DN.decode("o=test3"); final MultiDomainServerState state = new MultiDomainServerState(); assertTrue(state.update(dn1, csn1)); assertTrue(state.update(dn2, csn1)); assertTrue(state.update(dn2, csn2)); final MultiDomainServerState state = getLastAliveCSNs(); assertNull(state.getCSN(dn3, 42)); assertFalse(state.removeCSN(dn3, csn1)); @@ -181,4 +173,71 @@ assertNull(state.getCSN(dn2, csn1.getServerId())); assertSame(csn2, state.getCSN(dn2, csn2.getServerId())); } private MultiDomainServerState getLastAliveCSNs() { final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); assertTrue(lastAliveCSNs.update(dn1, csn1)); assertTrue(lastAliveCSNs.update(dn2, csn1)); assertTrue(lastAliveCSNs.update(dn2, csn2)); return lastAliveCSNs; } @Test(dependsOnMethods = { "testUpdateCSN" }) public void testGetOldestCSNExcluding_null() throws Exception { final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs(); assertEquals(lastAliveCSNs.getOldestCSNExcluding(null), Pair.of(dn1, csn1)); } @Test(dependsOnMethods = { "testUpdateCSN" }) public void testGetOldestCSNExcluding_empty() throws Exception { final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs(); final MultiDomainServerState excluded = new MultiDomainServerState(); assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn1, csn1)); } @Test(dependsOnMethods = { "testUpdateCSN" }) public void testGetOldestCSNExcluding_currentOldestCSN_givesNewOldestCSN() throws Exception { final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs(); final MultiDomainServerState excluded = new MultiDomainServerState(); excluded.update(dn1, csn1); assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1)); } @Test(dependsOnMethods = { "testUpdateCSN" }) public void testGetOldestCSNExcluding_CSNOlderThanCurrentOldestCSN_givesNewOldestCSN() throws Exception { final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs(); final MultiDomainServerState excluded = new MultiDomainServerState(); excluded.update(dn1, csn1); final CSN olderThanCSN1 = new CSN(0, 2, 3); assertEquals(olderThanCSN1.getServerId(), csn1.getServerId()); assertTrue(olderThanCSN1.isOlderThan(csn1)); excluded.update(dn2, olderThanCSN1); assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1)); } @Test(dependsOnMethods = { "testUpdateCSN" }) public void testGetOldestCSNExcluding_CSNNewerThanCurrentOldestCSN_givesNewOldestCSN() throws Exception { final MultiDomainServerState lastAliveCSNs = getLastAliveCSNs(); final MultiDomainServerState excluded = new MultiDomainServerState(); excluded.update(dn1, csn1); final CSN newerThanCSN1 = new CSN(42, 2, 3); assertEquals(newerThanCSN1.getServerId(), csn1.getServerId()); assertTrue(newerThanCSN1.isNewerThan(csn1)); excluded.update(dn2, newerThanCSN1); assertEquals(lastAliveCSNs.getOldestCSNExcluding(excluded), Pair.of(dn2, csn1)); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -355,6 +355,7 @@ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); assertExternalChangelogContent(msg1); addReplica(BASE_DN1, serverId2); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); @@ -486,19 +487,19 @@ assertExternalChangelogContent(msg1); // do not wait for temporarily offline serverId1 final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); publishUpdateMsg(msg3); assertExternalChangelogContent(msg1, msg3); final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); publishUpdateMsg(msg4); assertExternalChangelogContent(msg1, msg4); // serverId1 is back online, wait for changes from serverId2 final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); publishUpdateMsg(msg4); assertExternalChangelogContent(msg1, msg3); final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5); final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5); publishUpdateMsg(msg5); assertExternalChangelogContent(msg1, msg4); final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6); publishUpdateMsg(msg6); // MCP moves forward assertExternalChangelogContent(msg1, msg3, msg4); assertExternalChangelogContent(msg1, msg4, msg5); } /** @@ -525,16 +526,39 @@ assertExternalChangelogContent(); // MCP moves forward because serverId1 is not really offline // since because we received a message from it after the offline replica msg // since we received a message from it newer than the offline replica msg final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId2, 4); publishUpdateMsg(msg4); assertExternalChangelogContent(msg2, msg3); // back to normal operations sendHeartbeat(BASE_DN1, serverId1, 4); sendHeartbeat(BASE_DN1, serverId1, 5); assertExternalChangelogContent(msg2, msg3, msg4); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneKilled() throws Exception { addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); startCNIndexer(BASE_DN1); assertExternalChangelogContent(); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); // MCP cannot move forward: no news yet from serverId2 assertExternalChangelogContent(); sendHeartbeat(BASE_DN1, serverId2, 2); // MCP moves forward: we know what serverId2 is at assertExternalChangelogContent(msg1); final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); // MCP cannot move forward: serverId2 is the oldest CSN assertExternalChangelogContent(msg1); } private void addReplica(DN baseDN, int serverId) throws Exception { final SequentialDBCursor cursor = new SequentialDBCursor();