mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.56.2014 60f8d8d4575206697f47c040d4272dee27251bab
OPENDJ-1430 Some changes are missing from the external changelog changeNumber

The bug was due to a very complex interaction between various components. Here is a scenario and explanation:
1- the change number indexer has no more records to proceed, because all cursors are exhausted, so it calls wait().
2- a new change Upd1 comes in for an exhausted cursor, medium consistency cannot move.
3- a new change Upd2 comes in for a cursor that is not already opened, medium consistency can move, so wake up the change number indexer.
3- on wake up, the change number indexer calls next(), advancing the CompositeDBCursor, which recycles the exhausted cursor, then calls next() on it, making it lose its change. CompositeDBCursor currentRecord == Upd1.
4- on the next iteration of the loop in run(), a new cursor is created, triggering the creation of a new CompositeDBCursor => Upd1 is lost. CompositeDBCursor currentRecord == Upd2.

The problem comes from two parts:
- CompositeDBCursor consumes next change from a cursor (which completely forget about this change) and stores it itself
- ChangeNumberIndexer manages recycling/creating cursors on its own and recreates CompositeDBCursor when a new cursor is created.

The fix required:
- Preventing CompositeDBCursor from consuming changes from underlying cursors until it can forget about this same change itself.
- Ensuring only ChangeNumberIndexer handle recycling the cursors it owns instead of having both CompositeDBCursor and ChangeNumberIndexer trying to do it. It is also more performant to let ChangeNumberIndexer manage its cursors.



CompositeDBCursor.java:
Added recycleExhaustedCursors field to tell the composite whether it can recycle the cursors itself or not (recycling the cursors is currently needed for persistent searches on the changelog, maybe will we be able to remove it in the future, that would simplify the code a lot).
Modified the ctor to pass in value of recycleExhaustedCursors.
Removed currentRecord and currentData fields, replaced by reading the record and field on the first entry in the cursors SortedMap.
Added state field to ensure the first call to next() does not consume the first change in the cursors SortedMap.

ChangeNumberIndexer.java:
ChangeNumberIndexer now manages alone the cursors recycling and creation and recreates the CompositeDBCursor when needed.
In run(), removed the now unneeded call to next() after the wait.
Added recycleExhaustedCursors().

JEChangelogDB.java:
Consequence of the change to CompositeDBCursor. Kept old recycling behaviour.


ChangeNumberIndexerTest.java:
Added emptyDBTwoDSsDoesNotLoseChanges() to cover the case being fixed by current commit.
Renamed test methods dropping the "Initial" when it was not adding much to the test comprehension.

CompositeDBCursorTest.java:
In newUpdateMsg(), added toString() implementation to help debug.
Removed recycleTwoElementCursorsTODOJNR().
In recycleTwoElementCursors(), changed the tests a bit to match the changes to CompositeDBCursor.
5 files modified
259 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 40 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java 92 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 4 ●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java 83 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java 40 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -373,7 +373,10 @@
        cursors.put(entry2.getValue(), entry.getKey());
      }
    }
    final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
    // CNIndexer manages the cursor itself,
    // so do not try to recycle exhausted cursors
    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
    result.next();
    nextChangeForInsertDBCursor = result;
  }
@@ -456,7 +459,12 @@
          }
          else
          {
            createNewCursors();
            final boolean createdCursors = createNewCursors();
            final boolean recycledCursors = recycleExhaustedCursors();
            if (createdCursors || recycledCursors)
            {
              resetNextChangeForInsertDBCursor();
            }
          }
          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
@@ -470,9 +478,6 @@
              }
              wait();
            }
            // try to recycle the exhausted cursors,
            // success/failure will be checked later
            nextChangeForInsertDBCursor.next();
            // loop to check whether new changes have been added to the
            // ReplicaDBs
            continue;
@@ -603,7 +608,24 @@
    }
  }
  private void createNewCursors() throws ChangelogException
  private boolean recycleExhaustedCursors() throws ChangelogException
  {
    boolean succesfullyRecycled = false;
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    {
      for (DBCursor<UpdateMsg> cursor : map.values())
      {
        // try to recycle it by calling next()
        if (cursor.getRecord() == null && cursor.next())
        {
          succesfullyRecycled = true;
        }
      }
    }
    return succesfullyRecycled;
  }
  private boolean createNewCursors() throws ChangelogException
  {
    if (!newCursors.isEmpty())
    {
@@ -623,11 +645,9 @@
        }
        iter.remove();
      }
      if (newCursorAdded)
      {
        resetNextChangeForInsertDBCursor();
      }
      return newCursorAdded;
    }
    return false;
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -45,8 +45,23 @@
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentRecord;
  private Data currentData;
  private static final byte UNINITIALIZED = 0;
  private static final byte READY = 1;
  private static final byte CLOSED = 2;
  /**
   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
   * {@link #CLOSED}
   */
  private byte state = UNINITIALIZED;
  /** Whether this composite should try to recycle exhausted cursors. */
  private final boolean recycleExhaustedCursors;
  /**
   * These cursors are considered exhausted because they had no new changes the
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
   * might be recycled at some point when they start returning changes again.
   */
  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, Data>();
  /**
@@ -71,9 +86,14 @@
   *
   * @param cursors
   *          the cursors that will be iterated upon.
   * @param recycleExhaustedCursors
   *          whether a call to {@link #next()} tries to recycle exhausted
   *          cursors
   */
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
      boolean recycleExhaustedCursors)
  {
    this.recycleExhaustedCursors = recycleExhaustedCursors;
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      put(entry);
@@ -84,10 +104,16 @@
  @Override
  public boolean next() throws ChangelogException
  {
    if (!exhaustedCursors.isEmpty())
    if (state == CLOSED)
    {
      return false;
    }
    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
    state = READY;
    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
    {
      // try to recycle empty cursors in case the underlying ReplicaDBs received
      // new changes. Copy the List to avoid ConcurrentModificationExceptions.
      // new changes.
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      exhaustedCursors.clear();
@@ -96,25 +122,30 @@
        entry.getKey().next();
        put(entry);
      }
    }
    if (cursors.isEmpty())
    {
      // no cursors are left with changes.
      currentRecord = null;
      currentData = null;
      return false;
      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
      {
        // if the first cursor was previously an exhausted cursor,
        // then we have already called next() on it.
        // Avoid calling it again because we know new changes have been found.
        return true;
      }
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and eventually add again a cursor (after moving it forward).
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    currentRecord = cursor.getRecord();
    currentData = entry.getValue();
    cursor.next();
    put(entry);
    return true;
    // to remove and add again the cursor after moving it forward.
    if (advanceNonExhaustedCursors)
    {
      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
      if (firstEntry != null)
      {
        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
        cursor.next();
        put(firstEntry);
      }
    }
    // no cursors are left with changes.
    return !cursors.isEmpty();
  }
  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
  @Override
  public UpdateMsg getRecord()
  {
    return currentRecord;
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getKey().getRecord();
    }
    return null;
  }
  /**
@@ -145,7 +181,12 @@
   */
  public Data getData()
  {
    return currentData;
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getValue();
    }
    return null;
  }
  /** {@inheritDoc} */
@@ -160,8 +201,7 @@
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentRecord=" + currentRecord
        + " currentData=" + currentData + " openCursors=" + cursors
    return getClass().getSimpleName() + " openCursors=" + cursors
        + " exhaustedCursors=" + exhaustedCursors;
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -728,7 +728,9 @@
          startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    return new CompositeDBCursor<Void>(cursors);
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
  }
  /** {@inheritDoc} */
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -155,7 +155,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDS() throws Exception
  public void emptyDBOneDS() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
@@ -166,7 +166,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBOneInitialDS() throws Exception
  public void nonEmptyDBOneDS() throws Exception
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
@@ -179,7 +179,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSs() throws Exception
  public void emptyDBTwoDSs() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -196,7 +196,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsDifferentDomains() throws Exception
  public void emptyDBTwoDSsDifferentDomains() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
@@ -211,8 +211,53 @@
    assertExternalChangelogContent(msg1, msg2);
  }
  /**
   * This test tries to reproduce a very subtle implementation bug where:
   * <ol>
   * <li>the change number indexer has no more records to proceed, because all
   * cursors are exhausted, so it calls wait()<li>
   * <li>a new change Upd1 comes in for an exhausted cursor,
   * medium consistency cannot move<li>
   * <li>a new change Upd2 comes in for a cursor that is not already opened,
   * medium consistency can move, so wake up the change number indexer<li>
   * <li>on wake up, the change number indexer calls next(),
   * advancing the CompositeDBCursor, which recycles the exhausted cursor,
   * then calls next() on it, making it lose its change.
   * CompositeDBCursor currentRecord == Upd1.<li>
   * <li>on the next iteration of the loop in run(), a new cursor is created,
   * triggering the creation of a new CompositeDBCursor => Upd1 is lost.
   * CompositeDBCursor currentRecord == Upd2.<li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoInitialDSs() throws Exception
  public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    assertExternalChangelogContent(msg1);
    addReplica(BASE_DN1, serverId2);
    sendHeartbeat(BASE_DN1, serverId2, 2);
    assertExternalChangelogContent(msg1);
    // publish change that will not trigger a wake up of change number indexer,
    // but will make it open a cursor on next wake up
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg1);
    // wake up change number indexer
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg2);
    sendHeartbeat(BASE_DN1, serverId2, 4);
    // assert no changes have been lost
    assertExternalChangelogContent(msg1, msg2, msg3);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoDSs() throws Exception
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -252,7 +297,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
  public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
  {
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -291,7 +336,25 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
  public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg1);
    sendHeartbeat(BASE_DN1, serverId1, 3);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -307,7 +370,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
  public void emptyDBTwoDSsOneGoingOffline() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -477,7 +540,7 @@
  }
  @DataProvider
  public Object[][] precedingCSNData()
  public Object[][] precedingCSNDataProvider()
  {
    final int serverId = 42;
    final int t = 1000;
@@ -490,7 +553,7 @@
    };
  }
  @Test(dataProvider = "precedingCSNData")
  @Test(dataProvider = "precedingCSNDataProvider")
  public void getPrecedingCSN(CSN start, CSN expected)
  {
    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -125,34 +125,26 @@
  public void recycleTwoElementCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, null, msg3), baseDN1),
        of(new SequentialDBCursor(null, msg1, msg4), baseDN2));
        of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
        of(new SequentialDBCursor(null, msg1, msg3), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2));
        of(msg3, baseDN2),
        of(msg4, baseDN1));
  }
  @Test
  public void recycleTwoElementCursorsTODOJNR() throws Exception
  private UpdateMsg newUpdateMsg(final int t)
  {
    SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3);
    SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4);
    cursor1.next();
    cursor2.next();
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(cursor1, baseDN1),
        of(cursor2, baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg3, baseDN1),
        of(msg4, baseDN2));
  }
  private UpdateMsg newUpdateMsg(int t)
  {
    return new UpdateMsg(new CSN(t, t, t), new byte[t]);
    return new UpdateMsg(new CSN(t, t, t), new byte[t])
    {
      /** {@inheritDoc} */
      @Override
      public String toString()
      {
        return "UpdateMsg(" + t + ")";
      }
    };
  }
  private CompositeDBCursor<String> newCompositeDBCursor(
@@ -164,7 +156,7 @@
    {
      cursorsMap.put(pair.getFirst(), pair.getSecond());
    }
    return new CompositeDBCursor<String>(cursorsMap);
    return new CompositeDBCursor<String>(cursorsMap, true);
  }
  private void assertInOrder(final CompositeDBCursor<String> compCursor,