mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
16.26.2014 b10fb8b65d3dede6239be9d1a7d569b25b58e48b
opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -41,11 +41,11 @@
 * {@link DBCursor}s, advancing from the oldest to the newest change cross all
 * cursors.
 *
 * @param <Data>
 * @param <T>
 *          The type of data associated with each cursor
 * \@NotThreadSafe
 */
abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg>
{
  private static final byte UNINITIALIZED = 0;
@@ -63,8 +63,8 @@
   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
   * might be recycled at some point when they start returning changes again.
   */
  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, Data>();
  private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors =
      new HashMap<DBCursor<UpdateMsg>, T>();
  /**
   * The cursors are sorted based on the current change of each cursor to
   * consider the next change across all available cursors.
@@ -74,8 +74,8 @@
   * thrown about
   * "Non-transactional Cursors may not be used in multiple threads;".
   */
  private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
      new TreeMap<DBCursor<UpdateMsg>, Data>(
  private final TreeMap<DBCursor<UpdateMsg>, T> cursors =
      new TreeMap<DBCursor<UpdateMsg>, T>(
          new Comparator<DBCursor<UpdateMsg>>()
          {
            @Override
@@ -100,7 +100,7 @@
    // (which UpdateMsg has been consumed).
    // To keep consistent the cursors' order in the SortedSet, it is necessary
    // to remove the first cursor, then add it again after moving it forward.
    final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance =
    final Entry<DBCursor<UpdateMsg>, T> cursorToAdvance =
        state != UNINITIALIZED ? cursors.pollFirstEntry() : null;
    state = READY;
    recycleExhaustedCursors();
@@ -118,10 +118,10 @@
    if (!exhaustedCursors.isEmpty())
    {
      // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
      final Map<DBCursor<UpdateMsg>, Data> copy =
          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
      final Map<DBCursor<UpdateMsg>, T> copy =
          new HashMap<DBCursor<UpdateMsg>, T>(exhaustedCursors);
      exhaustedCursors.clear();
      for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
      for (Entry<DBCursor<UpdateMsg>, T> entry : copy.entrySet())
      {
        addCursor(entry.getKey(), entry.getValue());
      }
@@ -134,18 +134,18 @@
   * @param dataToFind
   *          the data for which the cursor must be found and removed
   */
  protected void removeCursor(final Data dataToFind)
  protected void removeCursor(final T dataToFind)
  {
    removeCursor(this.cursors, dataToFind);
    removeCursor(this.exhaustedCursors, dataToFind);
  }
  private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind)
  private void removeCursor(Map<DBCursor<UpdateMsg>, T> cursors, T dataToFind)
  {
    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
    for (Iterator<Entry<DBCursor<UpdateMsg>, T>> cursorIter =
        cursors.entrySet().iterator(); cursorIter.hasNext();)
    {
      final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
      final Entry<DBCursor<UpdateMsg>, T> entry = cursorIter.next();
      if (dataToFind.equals(entry.getValue()))
      {
        entry.getKey().close();
@@ -165,7 +165,7 @@
   * @throws ChangelogException
   *           if a database problem occurred
   */
  protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
  protected void addCursor(final DBCursor<UpdateMsg> cursor, final T data) throws ChangelogException
  {
    if (cursor.next())
    {
@@ -183,7 +183,7 @@
  {
    // Cannot call incorporateNewCursors() here because
    // somebody might have already called DBCursor.getRecord() and read the record
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    final Entry<DBCursor<UpdateMsg>, T> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getKey().getRecord();
@@ -206,9 +206,9 @@
   *
   * @return the data associated to the cursor that returned the current record.
   */
  public Data getData()
  public T getData()
  {
    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
    final Entry<DBCursor<UpdateMsg>, T> entry = cursors.firstEntry();
    if (entry != null)
    {
      return entry.getValue();
@@ -223,19 +223,19 @@
   *         cursor. In each pair, the data or the update message may be
   *         {@code null}, but at least one of them is non-null.
   */
  public List<Pair<Data, UpdateMsg>> getSnapshot()
  public List<Pair<T, UpdateMsg>> getSnapshot()
  {
    final List<Pair<Data, UpdateMsg>> snapshot = new ArrayList<Pair<Data, UpdateMsg>>();
    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
    final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>();
    for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet())
    {
      final UpdateMsg updateMsg = entry.getKey().getRecord();
      final Data data = entry.getValue();
      final T data = entry.getValue();
      if (updateMsg != null || data != null)
      {
        snapshot.add(Pair.of(data, updateMsg));
      }
    }
    for (Data data : exhaustedCursors.values())
    for (T data : exhaustedCursors.values())
    {
      if (data != null)
      {