mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.13.2014 0cc1a3d688899decfdf9a672958fe84815c76715
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -371,7 +371,10 @@
        cursors.put(entry2.getValue(), entry.getKey());
      }
    }
    final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
    // CNIndexer manages the cursor itself,
    // so do not try to recycle exhausted cursors
    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
    result.next();
    nextChangeForInsertDBCursor = result;
  }
@@ -454,7 +457,12 @@
          }
          else
          {
            createNewCursors();
            final boolean createdCursors = createNewCursors();
            final boolean recycledCursors = recycleExhaustedCursors();
            if (createdCursors || recycledCursors)
            {
              resetNextChangeForInsertDBCursor();
            }
          }
          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
@@ -468,9 +476,6 @@
              }
              wait();
            }
            // try to recycle the exhausted cursors,
            // success/failure will be checked later
            nextChangeForInsertDBCursor.next();
            // loop to check whether new changes have been added to the
            // ReplicaDBs
            continue;
@@ -599,7 +604,24 @@
    }
  }
  private void createNewCursors() throws ChangelogException
  private boolean recycleExhaustedCursors() throws ChangelogException
  {
    boolean succesfullyRecycled = false;
    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
    {
      for (DBCursor<UpdateMsg> cursor : map.values())
      {
        // try to recycle it by calling next()
        if (cursor.getRecord() == null && cursor.next())
        {
          succesfullyRecycled = true;
        }
      }
    }
    return succesfullyRecycled;
  }
  private boolean createNewCursors() throws ChangelogException
  {
    if (!newCursors.isEmpty())
    {
@@ -619,11 +641,9 @@
        }
        iter.remove();
      }
      if (newCursorAdded)
      {
        resetNextChangeForInsertDBCursor();
      }
      return newCursorAdded;
    }
    return false;
  }
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -45,8 +45,23 @@
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentRecord;
  private Data currentData;
  private static final byte UNINITIALIZED = 0;
  private static final byte READY = 1;
  private static final byte CLOSED = 2;
  /**
   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
   * {@link #CLOSED}
   */
  private byte state = UNINITIALIZED;
  /** Whether this composite should try to recycle exhausted cursors. */
  private final boolean recycleExhaustedCursors;
  /**
   * These cursors are considered exhausted because they had no new changes the
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
   * might be recycled at some point when they start returning changes again.
   */
  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, Data>();
  /**
@@ -71,9 +86,14 @@
   *
   * @param cursors
   *          the cursors that will be iterated upon.
   * @param recycleExhaustedCursors
   *          whether a call to {@link #next()} tries to recycle exhausted
   *          cursors
   */
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
      boolean recycleExhaustedCursors)
  {
    this.recycleExhaustedCursors = recycleExhaustedCursors;
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      put(entry);
@@ -84,10 +104,16 @@
  @Override
  public boolean next() throws ChangelogException
  {
    if (!exhaustedCursors.isEmpty())
    if (state == CLOSED)
    {
      return false;
    }
    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
    state = READY;
    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
    {
      // try to recycle empty cursors in case the underlying ReplicaDBs received
      // new changes. Copy the List to avoid ConcurrentModificationExceptions.
      // new changes.
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      exhaustedCursors.clear();
@@ -96,25 +122,30 @@
        entry.getKey().next();
        put(entry);
      }
    }
    if (cursors.isEmpty())
    {
      // no cursors are left with changes.
      currentRecord = null;
      currentData = null;
      return false;
      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
      {
        // if the first cursor was previously an exhausted cursor,
        // then we have already called next() on it.
        // Avoid calling it again because we know new changes have been found.
        return true;
      }
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and eventually add again a cursor (after moving it forward).
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    currentRecord = cursor.getRecord();
    currentData = entry.getValue();
    cursor.next();
    put(entry);
    return true;
    // to remove and add again the cursor after moving it forward.
    if (advanceNonExhaustedCursors)
    {
      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
      if (firstEntry != null)
      {
        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
        cursor.next();
        put(firstEntry);
      }
    }
    // no cursors are left with changes.
    return !cursors.isEmpty();
  }
  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
  @Override
  public UpdateMsg getRecord()
  {
    return currentRecord;
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getKey().getRecord();
    }
    return null;
  }
  /**
@@ -145,7 +181,12 @@
   */
  public Data getData()
  {
    return currentData;
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getValue();
    }
    return null;
  }
  /** {@inheritDoc} */
@@ -160,8 +201,7 @@
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentRecord=" + currentRecord
        + " currentData=" + currentData + " openCursors=" + cursors
    return getClass().getSimpleName() + " openCursors=" + cursors
        + " exhaustedCursors=" + exhaustedCursors;
  }
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -717,7 +717,9 @@
          startAfterServerState.getCSN(serverId) : null;
      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
    }
    return new CompositeDBCursor<Void>(cursors);
    // recycle exhausted cursors,
    // because client code will not manage the cursors itself
    return new CompositeDBCursor<Void>(cursors, true);
  }
  /** {@inheritDoc} */
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -156,7 +156,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBOneInitialDS() throws Exception
  public void emptyDBOneDS() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
@@ -167,7 +167,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBOneInitialDS() throws Exception
  public void nonEmptyDBOneDS() throws Exception
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    addReplica(BASE_DN1, serverId1);
@@ -180,7 +180,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSs() throws Exception
  public void emptyDBTwoDSs() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -197,7 +197,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsDifferentDomains() throws Exception
  public void emptyDBTwoDSsDifferentDomains() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN2, serverId2);
@@ -212,8 +212,53 @@
    assertExternalChangelogContent(msg1, msg2);
  }
  /**
   * This test tries to reproduce a very subtle implementation bug where:
   * <ol>
   * <li>the change number indexer has no more records to proceed, because all
   * cursors are exhausted, so it calls wait()<li>
   * <li>a new change Upd1 comes in for an exhausted cursor,
   * medium consistency cannot move<li>
   * <li>a new change Upd2 comes in for a cursor that is not already opened,
   * medium consistency can move, so wake up the change number indexer<li>
   * <li>on wake up, the change number indexer calls next(),
   * advancing the CompositeDBCursor, which recycles the exhausted cursor,
   * then calls next() on it, making it lose its change.
   * CompositeDBCursor currentRecord == Upd1.<li>
   * <li>on the next iteration of the loop in run(), a new cursor is created,
   * triggering the creation of a new CompositeDBCursor => Upd1 is lost.
   * CompositeDBCursor currentRecord == Upd2.<li>
   * </ol>
   */
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoInitialDSs() throws Exception
  public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    assertExternalChangelogContent(msg1);
    addReplica(BASE_DN1, serverId2);
    sendHeartbeat(BASE_DN1, serverId2, 2);
    assertExternalChangelogContent(msg1);
    // publish change that will not trigger a wake up of change number indexer,
    // but will make it open a cursor on next wake up
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg1);
    // wake up change number indexer
    final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId1, 3);
    publishUpdateMsg(msg3);
    assertExternalChangelogContent(msg1, msg2);
    sendHeartbeat(BASE_DN1, serverId2, 4);
    // assert no changes have been lost
    assertExternalChangelogContent(msg1, msg2, msg3);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void nonEmptyDBTwoDSs() throws Exception
  {
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
@@ -253,7 +298,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBThreeInitialDSsOneIsNotECLEnabledDomain() throws Exception
  public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
  {
    addReplica(ADMIN_DATA_DN, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -292,7 +337,25 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneSendingHeartbeats() throws Exception
  public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    startCNIndexer(BASE_DN1);
    final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
    publishUpdateMsg(msg1);
    addReplica(BASE_DN1, serverId2);
    final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
    publishUpdateMsg(msg2);
    assertExternalChangelogContent(msg1);
    sendHeartbeat(BASE_DN1, serverId1, 3);
    assertExternalChangelogContent(msg1, msg2);
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -308,7 +371,7 @@
  }
  @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
  public void emptyDBTwoInitialDSsOneGoingOffline() throws Exception
  public void emptyDBTwoDSsOneGoingOffline() throws Exception
  {
    addReplica(BASE_DN1, serverId1);
    addReplica(BASE_DN1, serverId2);
@@ -478,7 +541,7 @@
  }
  @DataProvider
  public Object[][] precedingCSNData()
  public Object[][] precedingCSNDataProvider()
  {
    final int serverId = 42;
    final int t = 1000;
@@ -491,7 +554,7 @@
    };
  }
  @Test(dataProvider = "precedingCSNData")
  @Test(dataProvider = "precedingCSNDataProvider")
  public void getPrecedingCSN(CSN start, CSN expected)
  {
    CSN precedingCSN = this.cnIndexer.getPrecedingCSN(start);
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -125,34 +125,26 @@
  public void recycleTwoElementCursors() throws Exception
  {
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(new SequentialDBCursor(msg2, null, msg3), baseDN1),
        of(new SequentialDBCursor(null, msg1, msg4), baseDN2));
        of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
        of(new SequentialDBCursor(null, msg1, msg3), baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg2, baseDN1),
        of(msg3, baseDN1),
        of(msg4, baseDN2));
        of(msg3, baseDN2),
        of(msg4, baseDN1));
  }
  @Test
  public void recycleTwoElementCursorsTODOJNR() throws Exception
  private UpdateMsg newUpdateMsg(final int t)
  {
    SequentialDBCursor cursor1 = new SequentialDBCursor(msg2, null, msg3);
    SequentialDBCursor cursor2 = new SequentialDBCursor(null, msg1, msg4);
    cursor1.next();
    cursor2.next();
    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
        of(cursor1, baseDN1),
        of(cursor2, baseDN2));
    assertInOrder(compCursor,
        of(msg1, baseDN2),
        of(msg3, baseDN1),
        of(msg4, baseDN2));
  }
  private UpdateMsg newUpdateMsg(int t)
  {
    return new UpdateMsg(new CSN(t, t, t), new byte[t]);
    return new UpdateMsg(new CSN(t, t, t), new byte[t])
    {
      /** {@inheritDoc} */
      @Override
      public String toString()
      {
        return "UpdateMsg(" + t + ")";
      }
    };
  }
  private CompositeDBCursor<String> newCompositeDBCursor(
@@ -164,7 +156,7 @@
    {
      cursorsMap.put(pair.getFirst(), pair.getSecond());
    }
    return new CompositeDBCursor<String>(cursorsMap);
    return new CompositeDBCursor<String>(cursorsMap, true);
  }
  private void assertInOrder(final CompositeDBCursor<String> compCursor,