mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
18.25.2014 b6ccb560e9056cc9c028812f5f63ff2e80c95c87
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
 * @param <Data>
 *          The type of data associated with each cursor
 */
public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
  private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
   */
  private byte state = UNINITIALIZED;
  /** Whether this composite should try to recycle exhausted cursors. */
  private final boolean recycleExhaustedCursors;
  /**
   * These cursors are considered exhausted because they had no new changes the
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
  /**
   * The cursors are sorted based on the current change of each cursor to
   * consider the next change across all available cursors.
   * <p>
   * New cursors for this Map must be created from the same thread that will
   * make use of them. When this rule is not obeyed, a JE exception will be
   * thrown about
   * "Non-transactional Cursors may not be used in multiple threads;".
   */
  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
  private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
      new TreeMap<DBCursor<UpdateMsg>, Data>(
          new Comparator<DBCursor<UpdateMsg>>()
          {
@@ -81,25 +84,6 @@
            }
          });
  /**
   * Builds a CompositeDBCursor using the provided collection of cursors.
   *
   * @param cursors
   *          the cursors that will be iterated upon.
   * @param recycleExhaustedCursors
   *          whether a call to {@link #next()} tries to recycle exhausted
   *          cursors
   */
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
      boolean recycleExhaustedCursors)
  {
    this.recycleExhaustedCursors = recycleExhaustedCursors;
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      put(entry);
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
@@ -108,51 +92,75 @@
    {
      return false;
    }
    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
    state = READY;
    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
    if (state == UNINITIALIZED)
    {
      // try to recycle empty cursors in case the underlying ReplicaDBs received
      // new changes.
      state = READY;
    }
    else
    {
      // Previous state was READY => we must advance the first cursor
      // because the UpdateMsg it is pointing has already been consumed.
      // To keep consistent the cursors' order in the SortedSet, it is necessary
      // to remove the first cursor, then add it again after moving it forward.
      final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry();
      if (cursorToAdvance != null)
      {
        addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
      }
    }
    recycleExhaustedCursors();
    removeNoLongerNeededCursors();
    incorporateNewCursors();
    return !cursors.isEmpty();
  }
  private void recycleExhaustedCursors() throws ChangelogException
  {
    if (!exhaustedCursors.isEmpty())
    {
      // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      exhaustedCursors.clear();
      for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
      {
        entry.getKey().next();
        put(entry);
      }
      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
      {
        // if the first cursor was previously an exhausted cursor,
        // then we have already called next() on it.
        // Avoid calling it again because we know new changes have been found.
        return true;
        addCursor(entry.getKey(), entry.getValue());
      }
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and add again the cursor after moving it forward.
    if (advanceNonExhaustedCursors)
    {
      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
      if (firstEntry != null)
      {
        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
        cursor.next();
        put(firstEntry);
      }
    }
    // no cursors are left with changes.
    return !cursors.isEmpty();
  }
  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
  private void removeNoLongerNeededCursors()
  {
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    final Data data = entry.getValue();
    if (cursor.getRecord() != null)
    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
        cursors.entrySet().iterator(); iterator.hasNext();)
    {
      final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
      final Data data = entry.getValue();
      if (isCursorNoLongerNeededFor(data))
      {
        entry.getKey().close();
        iterator.remove();
        cursorRemoved(data);
      }
    }
  }
  /**
   * Adds a cursor to this composite cursor. It first calls
   * {@link DBCursor#next()} to verify whether it is exhausted or not.
   *
   * @param cursor
   *          the cursor to add to this composite
   * @param data
   *          the data associated to the provided cursor
   * @throws ChangelogException
   *           if a database problem occurred
   */
  protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
  {
    if (cursor.next())
    {
      this.cursors.put(cursor, data);
    }
@@ -166,6 +174,8 @@
  @Override
  public UpdateMsg getRecord()
  {
    // Cannot call incorporateNewCursors() here because
    // somebody might have already called DBCursor.getRecord() and read the record
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    if (entry != null)
    {
@@ -175,6 +185,33 @@
  }
  /**
   * Called when implementors should incorporate new cursors into the current
   * composite DBCursor. Implementors should call
   * {@link #addCursor(DBCursor, Object)} to do so.
   *
   * @throws ChangelogException
   *           if a database problem occurred
   */
  protected abstract void incorporateNewCursors() throws ChangelogException;
  /**
   * Returns whether the cursor associated to the provided data should be removed.
   *
   * @param data the data associated to the cursor to be tested
   * @return true if the cursor associated to the provided data should be removed,
   *         false otherwise
   */
  protected abstract boolean isCursorNoLongerNeededFor(Data data);
  /**
   * Notifies that the cursor associated to the provided data has been removed.
   *
   * @param data
   *          the data associated to the removed cursor
   */
  protected abstract void cursorRemoved(Data data);
  /**
   * Returns the data associated to the cursor that returned the current record.
   *
   * @return the data associated to the cursor that returned the current record.
@@ -193,8 +230,11 @@
  @Override
  public void close()
  {
    state = CLOSED;
    StaticUtils.close(cursors.keySet());
    StaticUtils.close(exhaustedCursors.keySet());
    cursors.clear();
    exhaustedCursors.clear();
  }
  /** {@inheritDoc} */