From 60f8d8d4575206697f47c040d4272dee27251bab Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Apr 2014 13:56:22 +0000
Subject: [PATCH] OPENDJ-1430 Some changes are missing from the external changelog changeNumber
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 4
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 40 ++++++--
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 83 ++++++++++++++--
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 40 +++----
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 92 +++++++++++++-----
5 files changed, 188 insertions(+), 71 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 99870ed..3b44c84 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -373,7 +373,10 @@
cursors.put(entry2.getValue(), entry.getKey());
}
}
- final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
+
+ // CNIndexer manages the cursor itself,
+ // so do not try to recycle exhausted cursors
+ CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
result.next();
nextChangeForInsertDBCursor = result;
}
@@ -456,7 +459,12 @@
}
else
{
- createNewCursors();
+ final boolean createdCursors = createNewCursors();
+ final boolean recycledCursors = recycleExhaustedCursors();
+ if (createdCursors || recycledCursors)
+ {
+ resetNextChangeForInsertDBCursor();
+ }
}
final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
@@ -470,9 +478,6 @@
}
wait();
}
- // try to recycle the exhausted cursors,
- // success/failure will be checked later
- nextChangeForInsertDBCursor.next();
// loop to check whether new changes have been added to the
// ReplicaDBs
continue;
@@ -603,7 +608,24 @@
}
}
- private void createNewCursors() throws ChangelogException
+ private boolean recycleExhaustedCursors() throws ChangelogException
+ {
+ boolean succesfullyRecycled = false;
+ for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+ {
+ for (DBCursor<UpdateMsg> cursor : map.values())
+ {
+ // try to recycle it by calling next()
+ if (cursor.getRecord() == null && cursor.next())
+ {
+ succesfullyRecycled = true;
+ }
+ }
+ }
+ return succesfullyRecycled;
+ }
+
+ private boolean createNewCursors() throws ChangelogException
{
if (!newCursors.isEmpty())
{
@@ -623,11 +645,9 @@
}
iter.remove();
}
- if (newCursorAdded)
- {
- resetNextChangeForInsertDBCursor();
- }
+ return newCursorAdded;
}
+ return false;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 46994c5..52ef4c6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -45,8 +45,23 @@
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
- private UpdateMsg currentRecord;
- private Data currentData;
+ private static final byte UNINITIALIZED = 0;
+ private static final byte READY = 1;
+ private static final byte CLOSED = 2;
+
+ /**
+ * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
+ * {@link #CLOSED}
+ */
+ private byte state = UNINITIALIZED;
+
+ /** Whether this composite should try to recycle exhausted cursors. */
+ private final boolean recycleExhaustedCursors;
+ /**
+ * These cursors are considered exhausted because they had no new changes the
+ * last time {@link DBCursor#next()} was called on them. Exhausted cursors
+ * might be recycled at some point when they start returning changes again.
+ */
private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
new HashMap<DBCursor<UpdateMsg>, Data>();
/**
@@ -71,9 +86,14 @@
*
* @param cursors
* the cursors that will be iterated upon.
+ * @param recycleExhaustedCursors
+ * whether a call to {@link #next()} tries to recycle exhausted
+ * cursors
*/
- public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
+ public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
+ boolean recycleExhaustedCursors)
{
+ this.recycleExhaustedCursors = recycleExhaustedCursors;
for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
{
put(entry);
@@ -84,10 +104,16 @@
@Override
public boolean next() throws ChangelogException
{
- if (!exhaustedCursors.isEmpty())
+ if (state == CLOSED)
+ {
+ return false;
+ }
+ final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
+ state = READY;
+ if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
{
// try to recycle empty cursors in case the underlying ReplicaDBs received
- // new changes. Copy the List to avoid ConcurrentModificationExceptions.
+ // new changes.
final Map<DBCursor<UpdateMsg>, Data> copy =
new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
exhaustedCursors.clear();
@@ -96,25 +122,30 @@
entry.getKey().next();
put(entry);
}
- }
-
- if (cursors.isEmpty())
- {
- // no cursors are left with changes.
- currentRecord = null;
- currentData = null;
- return false;
+ final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
+ if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
+ {
+ // if the first cursor was previously an exhausted cursor,
+ // then we have already called next() on it.
+ // Avoid calling it again because we know new changes have been found.
+ return true;
+ }
}
// To keep consistent the cursors' order in the SortedSet, it is necessary
- // to remove and eventually add again a cursor (after moving it forward).
- final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
- final DBCursor<UpdateMsg> cursor = entry.getKey();
- currentRecord = cursor.getRecord();
- currentData = entry.getValue();
- cursor.next();
- put(entry);
- return true;
+ // to remove and add again the cursor after moving it forward.
+ if (advanceNonExhaustedCursors)
+ {
+ Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
+ if (firstEntry != null)
+ {
+ final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
+ cursor.next();
+ put(firstEntry);
+ }
+ }
+ // no cursors are left with changes.
+ return !cursors.isEmpty();
}
private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
@Override
public UpdateMsg getRecord()
{
- return currentRecord;
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+ if (entry != null)
+ {
+ return entry.getKey().getRecord();
+ }
+ return null;
}
/**
@@ -145,7 +181,12 @@
*/
public Data getData()
{
- return currentData;
+ final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+ if (entry != null)
+ {
+ return entry.getValue();
+ }
+ return null;
}
/** {@inheritDoc} */
@@ -160,8 +201,7 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentRecord=" + currentRecord
- + " currentData=" + currentData + " openCursors=" + cursors
+ return getClass().getSimpleName() + " openCursors=" + cursors
+ " exhaustedCursors=" + exhaustedCursors;
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fed23b8..b1e2d63 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -728,7 +728,9 @@
startAfterServerState.getCSN(serverId) : null;
cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
}
- return new CompositeDBCursor<Void>(cursors);
+ // recycle exhausted cursors,
+ // because client code will not manage the cursors itself
+ return new CompositeDBCursor<Void>(cursors, true);
}
/** {@inheritDoc} */
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 49e1994..62fe230 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -155,7 +155,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBOneInitialDS() throws Exception
+ public void emptyDBOneDS() throws Exception
{
addReplica(BASE_DN1, serverId1);
startCNIndexer(BASE_DN1);
@@ -166,7 +166,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void nonEmptyDBOneInitialDS() throws Exception
+ public void nonEmptyDBOneDS() throws Exception
{
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
addReplica(BASE_DN1, serverId1);
@@ -179,7 +179,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoInitialDSs() throws Exception
+ public void emptyDBTwoDSs() throws Exception
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
@@ -196,7 +196,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoInitialDSsDifferentDomains() throws Exception
+ public void emptyDBTwoDSsDifferentDomains() throws Exception
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN2, serverId2);
@@ -211,8 +211,53 @@
assertExternalChangelogContent(msg1, msg2);
}
+ /**
+ * This test tries to reproduce a very subtle implementation bug where:
+ * <ol>
+ * <li>the change number indexer has no more records to proceed, because all
+ * cursors are exhausted, so it calls wait()<li>
+ * <li>a new change Upd1 comes in for an exhausted cursor,
+ * medium consistency cannot move<li>
+ * <li>a new change Upd2 comes in for a cursor that is not already opened,
+ * medium consistency can move, so wake up the change number indexer<li>
+ * <li>on wake up, the change number indexer calls next(),
+ * advancing the CompositeDBCursor, which recycles the exhausted cursor,
+ * then calls next() on it, making it lose its change.
+ * CompositeDBCursor currentRecord == Upd1.<li>
+ * <li>on the next iteration of the loop in run(), a new cursor is created,
+ * triggering the creation of a new CompositeDBCursor => Upd1 is lost.
+ * CompositeDBCursor currentRecord == Upd2.<li>
+ * </ol>
+ */
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void nonEmptyDBTwoInitialDSs() throws Exception
+ public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ startCNIndexer(BASE_DN1);
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ publishUpdateMsg(msg1);
+ assertExternalChangelogContent(msg1);
+
+ addReplica(BASE_DN1, serverId2);
+ sendHeartbeat(BASE_DN1, serverId2, 2);
+ assertExternalChangelogContent(msg1);
+ // publish change that will not trigger a wake up of change number indexer,
+ // but will make it open a cursor on next wake up
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+ publishUpdateMsg(msg2);
+ assertExternalChangelogContent(msg1);
+ // wake up change number indexer
+ final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
+ publishUpdateMsg(msg3);
+ assertExternalChangelogContent(msg1, msg2);
+ sendHeartbeat(BASE_DN1, serverId2, 4);
+ // assert no changes have been lost
+ assertExternalChangelogContent(msg1, msg2, msg3);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void nonEmptyDBTwoDSs() throws Exception
{
final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -252,7 +297,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
+ public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
{
addReplica(ADMIN_DATA_DN, serverId1);
addReplica(BASE_DN1, serverId2);
@@ -291,7 +336,25 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
+ public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
+ {
+ addReplica(BASE_DN1, serverId1);
+ startCNIndexer(BASE_DN1);
+
+ final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
+ publishUpdateMsg(msg1);
+
+ addReplica(BASE_DN1, serverId2);
+ final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
+ publishUpdateMsg(msg2);
+ assertExternalChangelogContent(msg1);
+
+ sendHeartbeat(BASE_DN1, serverId1, 3);
+ assertExternalChangelogContent(msg1, msg2);
+ }
+
+ @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
+ public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
@@ -307,7 +370,7 @@
}
@Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
+ public void emptyDBTwoDSsOneGoingOffline() throws Exception
{
addReplica(BASE_DN1, serverId1);
addReplica(BASE_DN1, serverId2);
@@ -477,7 +540,7 @@
}
@DataProvider
- public Object[][] precedingCSNData()
+ public Object[][] precedingCSNDataProvider()
{
final int serverId = 42;
final int t = 1000;
@@ -490,7 +553,7 @@
};
}
- @Test(dataProvider = "precedingCSNData")
+ @Test(dataProvider = "precedingCSNDataProvider")
public void getPrecedingCSN(CSN start, CSN expected)
{
CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index a708775..1a933e8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -125,34 +125,26 @@
public void recycleTwoElementCursors() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
- of(new SequentialDBCursor(msg2, null, msg3), baseDN1),
- of(new SequentialDBCursor(null, msg1, msg4), baseDN2));
+ of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
+ of(new SequentialDBCursor(null, msg1, msg3), baseDN2));
assertInOrder(compCursor,
of(msg1, baseDN2),
of(msg2, baseDN1),
- of(msg3, baseDN1),
- of(msg4, baseDN2));
+ of(msg3, baseDN2),
+ of(msg4, baseDN1));
}
- @Test
- public void recycleTwoElementCursorsTODOJNR() throws Exception
+ private UpdateMsg newUpdateMsg(final int t)
{
- SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3);
- SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4);
- cursor1.next();
- cursor2.next();
- final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
- of(cursor1, baseDN1),
- of(cursor2, baseDN2));
- assertInOrder(compCursor,
- of(msg1, baseDN2),
- of(msg3, baseDN1),
- of(msg4, baseDN2));
- }
-
- private UpdateMsg newUpdateMsg(int t)
- {
- return new UpdateMsg(new CSN(t, t, t), new byte[t]);
+ return new UpdateMsg(new CSN(t, t, t), new byte[t])
+ {
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "UpdateMsg(" + t + ")";
+ }
+ };
}
private CompositeDBCursor<String> newCompositeDBCursor(
@@ -164,7 +156,7 @@
{
cursorsMap.put(pair.getFirst(), pair.getSecond());
}
- return new CompositeDBCursor<String>(cursorsMap);
+ return new CompositeDBCursor<String>(cursorsMap, true);
}
private void assertInOrder(final CompositeDBCursor<String> compCursor,
--
Gitblit v1.10.0