mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
21.17.2013 c5135432faf9bbbcd496ea160d59755fba31012c
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -27,6 +27,7 @@
package org.opends.server.replication.server.changelog.je;
import java.util.*;
import java.util.Map.Entry;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -38,28 +39,33 @@
 * {@link DBCursor} implementation that iterates across a Collection of
 * {@link DBCursor}s, advancing from the oldest to the newest change cross all
 * cursors.
 *
 * @param <Data>
 *          The type of data associated with each cursor
 */
final class CompositeDBCursor implements DBCursor<UpdateMsg>
final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
{
  private UpdateMsg currentChange;
  private final List<DBCursor<UpdateMsg>> exhaustedCursors =
      new ArrayList<DBCursor<UpdateMsg>>();
  private UpdateMsg currentRecord;
  private Data currentData;
  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, Data>();
  /**
   * The cursors are sorted based on the current change of each cursor to
   * consider the next change across all available cursors.
   */
  private final NavigableSet<DBCursor<UpdateMsg>> cursors =
      new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
      {
        @Override
        public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
        {
          final CSN csn1 = o1.getRecord().getCSN();
          final CSN csn2 = o2.getRecord().getCSN();
          return CSN.compare(csn1, csn2);
        }
      });
  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
      new TreeMap<DBCursor<UpdateMsg>, Data>(
          new Comparator<DBCursor<UpdateMsg>>()
          {
            @Override
            public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
            {
              final CSN csn1 = o1.getRecord().getCSN();
              final CSN csn2 = o2.getRecord().getCSN();
              return CSN.compare(csn1, csn2);
            }
          });
  /**
   * Builds a CompositeDBCursor using the provided collection of cursors.
@@ -67,11 +73,11 @@
   * @param cursors
   *          the cursors that will be iterated upon.
   */
  public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
  {
    for (DBCursor<UpdateMsg> cursor : cursors)
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    {
      add(cursor);
      put(entry);
    }
  }
@@ -83,41 +89,46 @@
    {
      // try to recycle empty cursors in case the underlying ReplicaDBs received
      // new changes. Copy the List to avoid ConcurrentModificationExceptions.
      final DBCursor<UpdateMsg>[] copy =
          exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]);
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      exhaustedCursors.clear();
      for (DBCursor<UpdateMsg> cursor : copy)
      for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
      {
        cursor.next();
        add(cursor);
        entry.getKey().next();
        put(entry);
      }
    }
    if (cursors.isEmpty())
    {
      // no cursors are left with changes.
      currentChange = null;
      currentRecord = null;
      currentData = null;
      return false;
    }
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove and eventually add again a cursor (after moving it forward).
    final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
    currentChange = cursor.getRecord();
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    currentRecord = cursor.getRecord();
    currentData = entry.getValue();
    cursor.next();
    add(cursor);
    put(entry);
    return true;
  }
  private void add(DBCursor<UpdateMsg> cursor)
  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
  {
    final DBCursor<UpdateMsg> cursor = entry.getKey();
    final Data data = entry.getValue();
    if (cursor.getRecord() != null)
    {
      this.cursors.add(cursor);
      this.cursors.put(cursor, data);
    }
    else
    {
      this.exhaustedCursors.add(cursor);
      this.exhaustedCursors.put(cursor, data);
    }
  }
@@ -125,23 +136,34 @@
  @Override
  public UpdateMsg getRecord()
  {
    return currentChange;
    return currentRecord;
  }
  /**
   * Returns the data associated to the cursor that returned the current record.
   *
   * @return the data associated to the cursor that returned the current record.
   */
  public Data getData()
  {
    return currentData;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    StaticUtils.close(cursors);
    StaticUtils.close(exhaustedCursors);
    StaticUtils.close(cursors.keySet());
    StaticUtils.close(exhaustedCursors.keySet());
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + " currentChange=" + currentChange
        + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
    return getClass().getSimpleName() + " currentRecord=" + currentRecord
        + " currentData=" + currentData + " openCursors=" + cursors
        + " exhaustedCursors=" + exhaustedCursors;
  }
}