opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -371,7 +371,10 @@ cursors.put(entry2.getValue(), entry.getKey()); } } final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors); // CNIndexer manages the cursor itself, // so do not try to recycle exhausted cursors CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false); result.next(); nextChangeForInsertDBCursor = result; } @@ -454,7 +457,12 @@ } else { createNewCursors(); final boolean createdCursors = createNewCursors(); final boolean recycledCursors = recycleExhaustedCursors(); if (createdCursors || recycledCursors) { resetNextChangeForInsertDBCursor(); } } final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord(); @@ -468,9 +476,6 @@ } wait(); } // try to recycle the exhausted cursors, // success/failure will be checked later nextChangeForInsertDBCursor.next(); // loop to check whether new changes have been added to the // ReplicaDBs continue; @@ -599,7 +604,24 @@ } } private void createNewCursors() throws ChangelogException private boolean recycleExhaustedCursors() throws ChangelogException { boolean succesfullyRecycled = false; for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values()) { for (DBCursor<UpdateMsg> cursor : map.values()) { // try to recycle it by calling next() if (cursor.getRecord() == null && cursor.next()) { succesfullyRecycled = true; } } } return succesfullyRecycled; } private boolean createNewCursors() throws ChangelogException { if (!newCursors.isEmpty()) { @@ -619,11 +641,9 @@ } iter.remove(); } if (newCursorAdded) { resetNextChangeForInsertDBCursor(); } return newCursorAdded; } return false; } /** opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@ * CDDL HEADER END * * * Copyright 2013 ForgeRock AS * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; @@ -45,8 +45,23 @@ final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg> { private UpdateMsg currentRecord; private Data currentData; private static final byte UNINITIALIZED = 0; private static final byte READY = 1; private static final byte CLOSED = 2; /** * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or * {@link #CLOSED} */ private byte state = UNINITIALIZED; /** Whether this composite should try to recycle exhausted cursors. */ private final boolean recycleExhaustedCursors; /** * These cursors are considered exhausted because they had no new changes the * last time {@link DBCursor#next()} was called on them. Exhausted cursors * might be recycled at some point when they start returning changes again. */ private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors = new HashMap<DBCursor<UpdateMsg>, Data>(); /** @@ -71,9 +86,14 @@ * * @param cursors * the cursors that will be iterated upon. * @param recycleExhaustedCursors * whether a call to {@link #next()} tries to recycle exhausted * cursors */ public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors) public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors, boolean recycleExhaustedCursors) { this.recycleExhaustedCursors = recycleExhaustedCursors; for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) { put(entry); @@ -84,10 +104,16 @@ @Override public boolean next() throws ChangelogException { if (!exhaustedCursors.isEmpty()) if (state == CLOSED) { return false; } final boolean advanceNonExhaustedCursors = state != UNINITIALIZED; state = READY; if (recycleExhaustedCursors && !exhaustedCursors.isEmpty()) { // try to recycle empty cursors in case the underlying ReplicaDBs received // new changes. Copy the List to avoid ConcurrentModificationExceptions. // new changes. final Map<DBCursor<UpdateMsg>, Data> copy = new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors); exhaustedCursors.clear(); @@ -96,25 +122,30 @@ entry.getKey().next(); put(entry); } } if (cursors.isEmpty()) { // no cursors are left with changes. currentRecord = null; currentData = null; return false; final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry(); if (firstEntry != null && copy.containsKey(firstEntry.getKey())) { // if the first cursor was previously an exhausted cursor, // then we have already called next() on it. // Avoid calling it again because we know new changes have been found. return true; } } // To keep consistent the cursors' order in the SortedSet, it is necessary // to remove and eventually add again a cursor (after moving it forward). final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry(); final DBCursor<UpdateMsg> cursor = entry.getKey(); currentRecord = cursor.getRecord(); currentData = entry.getValue(); cursor.next(); put(entry); return true; // to remove and add again the cursor after moving it forward. if (advanceNonExhaustedCursors) { Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry(); if (firstEntry != null) { final DBCursor<UpdateMsg> cursor = firstEntry.getKey(); cursor.next(); put(firstEntry); } } // no cursors are left with changes. return !cursors.isEmpty(); } private void put(Entry<DBCursor<UpdateMsg>, Data> entry) @@ -135,7 +166,12 @@ @Override public UpdateMsg getRecord() { return currentRecord; final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); if (entry != null) { return entry.getKey().getRecord(); } return null; } /** @@ -145,7 +181,12 @@ */ public Data getData() { return currentData; final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry(); if (entry != null) { return entry.getValue(); } return null; } /** {@inheritDoc} */ @@ -160,8 +201,7 @@ @Override public String toString() { return getClass().getSimpleName() + " currentRecord=" + currentRecord + " currentData=" + currentData + " openCursors=" + cursors return getClass().getSimpleName() + " openCursors=" + cursors + " exhaustedCursors=" + exhaustedCursors; } opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -717,7 +717,9 @@ startAfterServerState.getCSN(serverId) : null; cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); } return new CompositeDBCursor<Void>(cursors); // recycle exhausted cursors, // because client code will not manage the cursors itself return new CompositeDBCursor<Void>(cursors, true); } /** {@inheritDoc} */ opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -156,7 +156,7 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBOneInitialDS() throws Exception public void emptyDBOneDS() throws Exception { addReplica(BASE_DN1, serverId1); startCNIndexer(BASE_DN1); @@ -167,7 +167,7 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void nonEmptyDBOneInitialDS() throws Exception public void nonEmptyDBOneDS() throws Exception { final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); addReplica(BASE_DN1, serverId1); @@ -180,7 +180,7 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoInitialDSs() throws Exception public void emptyDBTwoDSs() throws Exception { addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); @@ -197,7 +197,7 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoInitialDSsDifferentDomains() throws Exception public void emptyDBTwoDSsDifferentDomains() throws Exception { addReplica(BASE_DN1, serverId1); addReplica(BASE_DN2, serverId2); @@ -212,8 +212,53 @@ assertExternalChangelogContent(msg1, msg2); } /** * This test tries to reproduce a very subtle implementation bug where: * <ol> * <li>the change number indexer has no more records to proceed, because all * cursors are exhausted, so it calls wait()<li> * <li>a new change Upd1 comes in for an exhausted cursor, * medium consistency cannot move<li> * <li>a new change Upd2 comes in for a cursor that is not already opened, * medium consistency can move, so wake up the change number indexer<li> * <li>on wake up, the change number indexer calls next(), * advancing the CompositeDBCursor, which recycles the exhausted cursor, * then calls next() on it, making it lose its change. * CompositeDBCursor currentRecord == Upd1.<li> * <li>on the next iteration of the loop in run(), a new cursor is created, * triggering the creation of a new CompositeDBCursor => Upd1 is lost. * CompositeDBCursor currentRecord == Upd2.<li> * </ol> */ @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void nonEmptyDBTwoInitialDSs() throws Exception public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception { addReplica(BASE_DN1, serverId1); startCNIndexer(BASE_DN1); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); assertExternalChangelogContent(msg1); addReplica(BASE_DN1, serverId2); sendHeartbeat(BASE_DN1, serverId2, 2); assertExternalChangelogContent(msg1); // publish change that will not trigger a wake up of change number indexer, // but will make it open a cursor on next wake up final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); assertExternalChangelogContent(msg1); // wake up change number indexer final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3); publishUpdateMsg(msg3); assertExternalChangelogContent(msg1, msg2); sendHeartbeat(BASE_DN1, serverId2, 4); // assert no changes have been lost assertExternalChangelogContent(msg1, msg2, msg3); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void nonEmptyDBTwoDSs() throws Exception { final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); @@ -253,7 +298,7 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception { addReplica(ADMIN_DATA_DN, serverId1); addReplica(BASE_DN1, serverId2); @@ -292,7 +337,25 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception { addReplica(BASE_DN1, serverId1); startCNIndexer(BASE_DN1); final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); publishUpdateMsg(msg1); addReplica(BASE_DN1, serverId2); final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); publishUpdateMsg(msg2); assertExternalChangelogContent(msg1); sendHeartbeat(BASE_DN1, serverId1, 3); assertExternalChangelogContent(msg1, msg2); } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception { addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); @@ -308,7 +371,7 @@ } @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception public void emptyDBTwoDSsOneGoingOffline() throws Exception { addReplica(BASE_DN1, serverId1); addReplica(BASE_DN1, serverId2); @@ -478,7 +541,7 @@ } @DataProvider public Object[][] precedingCSNData() public Object[][] precedingCSNDataProvider() { final int serverId = 42; final int t = 1000; @@ -491,7 +554,7 @@ }; } @Test(dataProvider = "precedingCSNData") @Test(dataProvider = "precedingCSNDataProvider") public void getPrecedingCSN(CSN start, CSN expected) { CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start); opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -21,7 +21,7 @@ * CDDL HEADER END * * * Copyright 2013 ForgeRock AS * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; @@ -125,34 +125,26 @@ public void recycleTwoElementCursors() throws Exception { final CompositeDBCursor<String> compCursor = newCompositeDBCursor( of(new SequentialDBCursor(msg2, null, msg3), baseDN1), of(new SequentialDBCursor(null, msg1, msg4), baseDN2)); of(new SequentialDBCursor(msg2, null, msg4), baseDN1), of(new SequentialDBCursor(null, msg1, msg3), baseDN2)); assertInOrder(compCursor, of(msg1, baseDN2), of(msg2, baseDN1), of(msg3, baseDN1), of(msg4, baseDN2)); of(msg3, baseDN2), of(msg4, baseDN1)); } @Test public void recycleTwoElementCursorsTODOJNR() throws Exception private UpdateMsg newUpdateMsg(final int t) { SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3); SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4); cursor1.next(); cursor2.next(); final CompositeDBCursor<String> compCursor = newCompositeDBCursor( of(cursor1, baseDN1), of(cursor2, baseDN2)); assertInOrder(compCursor, of(msg1, baseDN2), of(msg3, baseDN1), of(msg4, baseDN2)); } private UpdateMsg newUpdateMsg(int t) { return new UpdateMsg(new CSN(t, t, t), new byte[t]); return new UpdateMsg(new CSN(t, t, t), new byte[t]) { /** {@inheritDoc} */ @Override public String toString() { return "UpdateMsg(" + t + ")"; } }; } private CompositeDBCursor<String> newCompositeDBCursor( @@ -164,7 +156,7 @@ { cursorsMap.put(pair.getFirst(), pair.getSecond()); } return new CompositeDBCursor<String>(cursorsMap); return new CompositeDBCursor<String>(cursorsMap, true); } private void assertInOrder(final CompositeDBCursor<String> compCursor,