OPENDJ-1430 Some changes are missing from the external changelog changeNumber
The bug was due to a very complex interaction between various components. Here is a scenario and explanation:
1- the change number indexer has no more records to proceed, because all cursors are exhausted, so it calls wait().
2- a new change Upd1 comes in for an exhausted cursor, medium consistency cannot move.
3- a new change Upd2 comes in for a cursor that is not already opened, medium consistency can move, so wake up the change number indexer.
3- on wake up, the change number indexer calls next(), advancing the CompositeDBCursor, which recycles the exhausted cursor, then calls next() on it, making it lose its change. CompositeDBCursor currentRecord == Upd1.
4- on the next iteration of the loop in run(), a new cursor is created, triggering the creation of a new CompositeDBCursor => Upd1 is lost. CompositeDBCursor currentRecord == Upd2.
The problem comes from two parts:
- CompositeDBCursor consumes next change from a cursor (which completely forget about this change) and stores it itself
- ChangeNumberIndexer manages recycling/creating cursors on its own and recreates CompositeDBCursor when a new cursor is created.
The fix required:
- Preventing CompositeDBCursor from consuming changes from underlying cursors until it can forget about this same change itself.
- Ensuring only ChangeNumberIndexer handle recycling the cursors it owns instead of having both CompositeDBCursor and ChangeNumberIndexer trying to do it. It is also more performant to let ChangeNumberIndexer manage its cursors.
CompositeDBCursor.java:
Added recycleExhaustedCursors field to tell the composite whether it can recycle the cursors itself or not (recycling the cursors is currently needed for persistent searches on the changelog, maybe will we be able to remove it in the future, that would simplify the code a lot).
Modified the ctor to pass in value of recycleExhaustedCursors.
Removed currentRecord and currentData fields, replaced by reading the record and field on the first entry in the cursors SortedMap.
Added state field to ensure the first call to next() does not consume the first change in the cursors SortedMap.
ChangeNumberIndexer.java:
ChangeNumberIndexer now manages alone the cursors recycling and creation and recreates the CompositeDBCursor when needed.
In run(), removed the now unneeded call to next() after the wait.
Added recycleExhaustedCursors().
JEChangelogDB.java:
Consequence of the change to CompositeDBCursor. Kept old recycling behaviour.
ChangeNumberIndexerTest.java:
Added emptyDBTwoDSsDoesNotLoseChanges() to cover the case being fixed by current commit.
Renamed test methods dropping the "Initial" when it was not adding much to the test comprehension.
CompositeDBCursorTest.java:
In newUpdateMsg(), added toString() implementation to help debug.
Removed recycleTwoElementCursorsTODOJNR().
In recycleTwoElementCursors(), changed the tests a bit to match the changes to CompositeDBCursor.
| | |
| | | cursors.put(entry2.getValue(), entry.getKey()); |
| | | } |
| | | } |
| | | final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors); |
| | | |
| | | // CNIndexer manages the cursor itself, |
| | | // so do not try to recycle exhausted cursors |
| | | CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false); |
| | | result.next(); |
| | | nextChangeForInsertDBCursor = result; |
| | | } |
| | |
| | | } |
| | | else |
| | | { |
| | | createNewCursors(); |
| | | final boolean createdCursors = createNewCursors(); |
| | | final boolean recycledCursors = recycleExhaustedCursors(); |
| | | if (createdCursors || recycledCursors) |
| | | { |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | } |
| | | |
| | | final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); |
| | |
| | | } |
| | | wait(); |
| | | } |
| | | // try to recycle the exhausted cursors, |
| | | // success/failure will be checked later |
| | | nextChangeForInsertDBCursor.next(); |
| | | // loop to check whether new changes have been added to the |
| | | // ReplicaDBs |
| | | continue; |
| | |
| | | } |
| | | } |
| | | |
| | | private void createNewCursors() throws ChangelogException |
| | | private boolean recycleExhaustedCursors() throws ChangelogException |
| | | { |
| | | boolean succesfullyRecycled = false; |
| | | for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) |
| | | { |
| | | for (DBCursor<UpdateMsg> cursor : map.values()) |
| | | { |
| | | // try to recycle it by calling next() |
| | | if (cursor.getRecord() == null && cursor.next()) |
| | | { |
| | | succesfullyRecycled = true; |
| | | } |
| | | } |
| | | } |
| | | return succesfullyRecycled; |
| | | } |
| | | |
| | | private boolean createNewCursors() throws ChangelogException |
| | | { |
| | | if (!newCursors.isEmpty()) |
| | | { |
| | |
| | | } |
| | | iter.remove(); |
| | | } |
| | | if (newCursorAdded) |
| | | { |
| | | resetNextChangeForInsertDBCursor(); |
| | | } |
| | | return newCursorAdded; |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private UpdateMsg currentRecord; |
| | | private Data currentData; |
| | | private static final byte UNINITIALIZED = 0; |
| | | private static final byte READY = 1; |
| | | private static final byte CLOSED = 2; |
| | | |
| | | /** |
| | | * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or |
| | | * {@link #CLOSED} |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** Whether this composite should try to recycle exhausted cursors. */ |
| | | private final boolean recycleExhaustedCursors; |
| | | /** |
| | | * These cursors are considered exhausted because they had no new changes the |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | | * might be recycled at some point when they start returning changes again. |
| | | */ |
| | | private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(); |
| | | /** |
| | |
| | | * |
| | | * @param cursors |
| | | * the cursors that will be iterated upon. |
| | | * @param recycleExhaustedCursors |
| | | * whether a call to {@link #next()} tries to recycle exhausted |
| | | * cursors |
| | | */ |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors) |
| | | public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, |
| | | boolean recycleExhaustedCursors) |
| | | { |
| | | this.recycleExhaustedCursors = recycleExhaustedCursors; |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | put(entry); |
| | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (!exhaustedCursors.isEmpty()) |
| | | if (state == CLOSED) |
| | | { |
| | | return false; |
| | | } |
| | | final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; |
| | | state = READY; |
| | | if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) |
| | | { |
| | | // try to recycle empty cursors in case the underlying ReplicaDBs received |
| | | // new changes. Copy the List to avoid ConcurrentModificationExceptions. |
| | | // new changes. |
| | | final Map<DBCursor<UpdateMsg>, Data> copy = |
| | | new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); |
| | | exhaustedCursors.clear(); |
| | |
| | | entry.getKey().next(); |
| | | put(entry); |
| | | } |
| | | } |
| | | |
| | | if (cursors.isEmpty()) |
| | | { |
| | | // no cursors are left with changes. |
| | | currentRecord = null; |
| | | currentData = null; |
| | | return false; |
| | | final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); |
| | | if (firstEntry != null && copy.containsKey(firstEntry.getKey())) |
| | | { |
| | | // if the first cursor was previously an exhausted cursor, |
| | | // then we have already called next() on it. |
| | | // Avoid calling it again because we know new changes have been found. |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove and eventually add again a cursor (after moving it forward). |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry(); |
| | | final DBCursor<UpdateMsg> cursor = entry.getKey(); |
| | | currentRecord = cursor.getRecord(); |
| | | currentData = entry.getValue(); |
| | | cursor.next(); |
| | | put(entry); |
| | | return true; |
| | | // to remove and add again the cursor after moving it forward. |
| | | if (advanceNonExhaustedCursors) |
| | | { |
| | | Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); |
| | | if (firstEntry != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); |
| | | cursor.next(); |
| | | put(firstEntry); |
| | | } |
| | | } |
| | | // no cursors are left with changes. |
| | | return !cursors.isEmpty(); |
| | | } |
| | | |
| | | private void put(Entry<DBCursor<UpdateMsg>, Data> entry) |
| | |
| | | @Override |
| | | public UpdateMsg getRecord() |
| | | { |
| | | return currentRecord; |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | | return entry.getKey().getRecord(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public Data getData() |
| | | { |
| | | return currentData; |
| | | final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); |
| | | if (entry != null) |
| | | { |
| | | return entry.getValue(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " currentRecord=" + currentRecord |
| | | + " currentData=" + currentData + " openCursors=" + cursors |
| | | return getClass().getSimpleName() + " openCursors=" + cursors |
| | | + " exhaustedCursors=" + exhaustedCursors; |
| | | } |
| | | |
| | |
| | | startAfterServerState.getCSN(serverId) : null; |
| | | cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); |
| | | } |
| | | return new CompositeDBCursor<Void>(cursors); |
| | | // recycle exhausted cursors, |
| | | // because client code will not manage the cursors itself |
| | | return new CompositeDBCursor<Void>(cursors, true); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDS() throws Exception |
| | | public void emptyDBOneDS() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBOneInitialDS() throws Exception |
| | | public void nonEmptyDBOneDS() throws Exception |
| | | { |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSs() throws Exception |
| | | public void emptyDBTwoDSs() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSsDifferentDomains() throws Exception |
| | | public void emptyDBTwoDSsDifferentDomains() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN2, serverId2); |
| | |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | /** |
| | | * This test tries to reproduce a very subtle implementation bug where: |
| | | * <ol> |
| | | * <li>the change number indexer has no more records to proceed, because all |
| | | * cursors are exhausted, so it calls wait()<li> |
| | | * <li>a new change Upd1 comes in for an exhausted cursor, |
| | | * medium consistency cannot move<li> |
| | | * <li>a new change Upd2 comes in for a cursor that is not already opened, |
| | | * medium consistency can move, so wake up the change number indexer<li> |
| | | * <li>on wake up, the change number indexer calls next(), |
| | | * advancing the CompositeDBCursor, which recycles the exhausted cursor, |
| | | * then calls next() on it, making it lose its change. |
| | | * CompositeDBCursor currentRecord == Upd1.<li> |
| | | * <li>on the next iteration of the loop in run(), a new cursor is created, |
| | | * triggering the creation of a new CompositeDBCursor => Upd1 is lost. |
| | | * CompositeDBCursor currentRecord == Upd2.<li> |
| | | * </ol> |
| | | */ |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoInitialDSs() throws Exception |
| | | public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | addReplica(BASE_DN1, serverId2); |
| | | sendHeartbeat(BASE_DN1, serverId2, 2); |
| | | assertExternalChangelogContent(msg1); |
| | | // publish change that will not trigger a wake up of change number indexer, |
| | | // but will make it open a cursor on next wake up |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | publishUpdateMsg(msg2); |
| | | assertExternalChangelogContent(msg1); |
| | | // wake up change number indexer |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); |
| | | publishUpdateMsg(msg3); |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | sendHeartbeat(BASE_DN1, serverId2, 4); |
| | | // assert no changes have been lost |
| | | assertExternalChangelogContent(msg1, msg2, msg3); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoDSs() throws Exception |
| | | { |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception |
| | | public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception |
| | | { |
| | | addReplica(ADMIN_DATA_DN, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception |
| | | public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | startCNIndexer(BASE_DN1); |
| | | |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | publishUpdateMsg(msg1); |
| | | |
| | | addReplica(BASE_DN1, serverId2); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | publishUpdateMsg(msg2); |
| | | assertExternalChangelogContent(msg1); |
| | | |
| | | sendHeartbeat(BASE_DN1, serverId1, 3); |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception |
| | | public void emptyDBTwoDSsOneGoingOffline() throws Exception |
| | | { |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | |
| | | } |
| | | |
| | | @DataProvider |
| | | public Object[][] precedingCSNData() |
| | | public Object[][] precedingCSNDataProvider() |
| | | { |
| | | final int serverId = 42; |
| | | final int t = 1000; |
| | |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider = "precedingCSNData") |
| | | @Test(dataProvider = "precedingCSNDataProvider") |
| | | public void getPrecedingCSN(CSN start, CSN expected) |
| | | { |
| | | CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | public void recycleTwoElementCursors() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(new SequentialDBCursor(msg2, null, msg3), baseDN1), |
| | | of(new SequentialDBCursor(null, msg1, msg4), baseDN2)); |
| | | of(new SequentialDBCursor(msg2, null, msg4), baseDN1), |
| | | of(new SequentialDBCursor(null, msg1, msg3), baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg2, baseDN1), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2)); |
| | | of(msg3, baseDN2), |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementCursorsTODOJNR() throws Exception |
| | | private UpdateMsg newUpdateMsg(final int t) |
| | | { |
| | | SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3); |
| | | SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4); |
| | | cursor1.next(); |
| | | cursor2.next(); |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |
| | | of(cursor1, baseDN1), |
| | | of(cursor2, baseDN2)); |
| | | assertInOrder(compCursor, |
| | | of(msg1, baseDN2), |
| | | of(msg3, baseDN1), |
| | | of(msg4, baseDN2)); |
| | | } |
| | | |
| | | private UpdateMsg newUpdateMsg(int t) |
| | | { |
| | | return new UpdateMsg(new CSN(t, t, t), new byte[t]); |
| | | return new UpdateMsg(new CSN(t, t, t), new byte[t]) |
| | | { |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "UpdateMsg(" + t + ")"; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | private CompositeDBCursor<String> newCompositeDBCursor( |
| | |
| | | { |
| | | cursorsMap.put(pair.getFirst(), pair.getSecond()); |
| | | } |
| | | return new CompositeDBCursor<String>(cursorsMap); |
| | | return new CompositeDBCursor<String>(cursorsMap, true); |
| | | } |
| | | |
| | | private void assertInOrder(final CompositeDBCursor<String> compCursor, |