mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.56.2014 60f8d8d4575206697f47c040d4272dee27251bab
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -45,8 +45,23 @@
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentRecord;
  private Data currentData;
  private static final byte UNINITIALIZED = 0;
  private static final byte READY = 1;
  private static final byte CLOSED = 2;
  /**
   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
   * {@link #CLOSED}
   */
  private byte state = UNINITIALIZED;
  /** Whether this composite should try to recycle exhausted cursors. */
  private final boolean recycleExhaustedCursors;
  /**
   * These cursors are considered exhausted because they had no new changes the
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
   * might be recycled at some point when they start returning changes again.
   */
  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, Data>();
  /**
@@ -71,9 +86,14 @@
   *
   * @param cursors
   *          the cursors that will be iterated upon.
   * @param recycleExhaustedCursors
   *          whether a call to {@link #next()} tries to recycle exhausted
   *          cursors
   */
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
      boolean recycleExhaustedCursors)
  {
    this.recycleExhaustedCursors = recycleExhaustedCursors;
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      put(entry);
@@ -84,10 +104,16 @@
  @Override
  public boolean next() throws ChangelogException
  {
    if (!exhaustedCursors.isEmpty())
    if (state == CLOSED)
    {
      return false;
    }
    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
    state = READY;
    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
    {
      // try to recycle empty cursors in case the underlying ReplicaDBs received
      // new changes. Copy the List to avoid ConcurrentModificationExceptions.
      // new changes.
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      exhaustedCursors.clear();
@@ -96,25 +122,30 @@
        entry.getKey().next();
        put(entry);
      }
    }
    if (cursors.isEmpty())
    {
      // no cursors are left with changes.
      currentRecord = null;
      currentData = null;
      return false;
      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
      {
        // if the first cursor was previously an exhausted cursor,
        // then we have already called next() on it.
        // Avoid calling it again because we know new changes have been found.
        return true;
      }
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and eventually add again a cursor (after moving it forward).
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    currentRecord = cursor.getRecord();
    currentData = entry.getValue();
    cursor.next();
    put(entry);
    return true;
    // to remove and add again the cursor after moving it forward.
    if (advanceNonExhaustedCursors)
    {
      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
      if (firstEntry != null)
      {
        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
        cursor.next();
        put(firstEntry);
      }
    }
    // no cursors are left with changes.
    return !cursors.isEmpty();
  }
  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
  @Override
  public UpdateMsg getRecord()
  {
    return currentRecord;
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getKey().getRecord();
    }
    return null;
  }
  /**
@@ -145,7 +181,12 @@
   */
  public Data getData()
  {
    return currentData;
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getValue();
    }
    return null;
  }
  /** {@inheritDoc} */
@@ -160,8 +201,7 @@
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentRecord=" + currentRecord
        + " currentData=" + currentData + " openCursors=" + cursors
    return getClass().getSimpleName() + " openCursors=" + cursors
        + " exhaustedCursors=" + exhaustedCursors;
  }