From c5135432faf9bbbcd496ea160d59755fba31012c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 21 Nov 2013 16:17:00 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1174 Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB 

---
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java |   92 ++++++++++++++++++++++++++++-----------------
 1 files changed, 57 insertions(+), 35 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 7f8e3c3..2e0e7c7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -27,6 +27,7 @@
 package org.opends.server.replication.server.changelog.je;
 
 import java.util.*;
+import java.util.Map.Entry;
 
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -38,28 +39,33 @@
  * {@link DBCursor} implementation that iterates across a Collection of
  * {@link DBCursor}s, advancing from the oldest to the newest change cross all
  * cursors.
+ *
+ * @param <Data>
+ *          The type of data associated with each cursor
  */
-final class CompositeDBCursor implements DBCursor<UpdateMsg>
+final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
-  private UpdateMsg currentChange;
-  private final List<DBCursor<UpdateMsg>> exhaustedCursors =
-      new ArrayList<DBCursor<UpdateMsg>>();
+  private UpdateMsg currentRecord;
+  private Data currentData;
+  private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
+      new HashMap<DBCursor<UpdateMsg>, Data>();
   /**
    * The cursors are sorted based on the current change of each cursor to
    * consider the next change across all available cursors.
    */
-  private final NavigableSet<DBCursor<UpdateMsg>> cursors =
-      new TreeSet<DBCursor<UpdateMsg>>(new Comparator<DBCursor<UpdateMsg>>()
-      {
-        @Override
-        public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
-        {
-          final CSN csn1 = o1.getRecord().getCSN();
-          final CSN csn2 = o2.getRecord().getCSN();
-          return CSN.compare(csn1, csn2);
-        }
-      });
+  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+      new TreeMap<DBCursor<UpdateMsg>, Data>(
+          new Comparator<DBCursor<UpdateMsg>>()
+          {
+            @Override
+            public int compare(DBCursor<UpdateMsg> o1, DBCursor<UpdateMsg> o2)
+            {
+              final CSN csn1 = o1.getRecord().getCSN();
+              final CSN csn2 = o2.getRecord().getCSN();
+              return CSN.compare(csn1, csn2);
+            }
+          });
 
   /**
    * Builds a CompositeDBCursor using the provided collection of cursors.
@@ -67,11 +73,11 @@
    * @param cursors
    *          the cursors that will be iterated upon.
    */
-  public CompositeDBCursor(Collection<DBCursor<UpdateMsg>> cursors)
+  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
   {
-    for (DBCursor<UpdateMsg> cursor : cursors)
+    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
     {
-      add(cursor);
+      put(entry);
     }
   }
 
@@ -83,41 +89,46 @@
     {
       // try to recycle empty cursors in case the underlying ReplicaDBs received
       // new changes. Copy the List to avoid ConcurrentModificationExceptions.
-      final DBCursor<UpdateMsg>[] copy =
-          exhaustedCursors.toArray(new DBCursor[exhaustedCursors.size()]);
+      final Map<DBCursor<UpdateMsg>, Data> copy =
+          new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
-      for (DBCursor<UpdateMsg> cursor : copy)
+      for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
       {
-        cursor.next();
-        add(cursor);
+        entry.getKey().next();
+        put(entry);
       }
     }
 
     if (cursors.isEmpty())
     {
       // no cursors are left with changes.
-      currentChange = null;
+      currentRecord = null;
+      currentData = null;
       return false;
     }
 
     // To keep consistent the cursors' order in the SortedSet, it is necessary
     // to remove and eventually add again a cursor (after moving it forward).
-    final DBCursor<UpdateMsg> cursor = cursors.pollFirst();
-    currentChange = cursor.getRecord();
+    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
+    final DBCursor<UpdateMsg> cursor = entry.getKey();
+    currentRecord = cursor.getRecord();
+    currentData = entry.getValue();
     cursor.next();
-    add(cursor);
+    put(entry);
     return true;
   }
 
-  private void add(DBCursor<UpdateMsg> cursor)
+  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
   {
+    final DBCursor<UpdateMsg> cursor = entry.getKey();
+    final Data data = entry.getValue();
     if (cursor.getRecord() != null)
     {
-      this.cursors.add(cursor);
+      this.cursors.put(cursor, data);
     }
     else
     {
-      this.exhaustedCursors.add(cursor);
+      this.exhaustedCursors.put(cursor, data);
     }
   }
 
@@ -125,23 +136,34 @@
   @Override
   public UpdateMsg getRecord()
   {
-    return currentChange;
+    return currentRecord;
+  }
+
+  /**
+   * Returns the data associated to the cursor that returned the current record.
+   *
+   * @return the data associated to the cursor that returned the current record.
+   */
+  public Data getData()
+  {
+    return currentData;
   }
 
   /** {@inheritDoc} */
   @Override
   public void close()
   {
-    StaticUtils.close(cursors);
-    StaticUtils.close(exhaustedCursors);
+    StaticUtils.close(cursors.keySet());
+    StaticUtils.close(exhaustedCursors.keySet());
   }
 
   /** {@inheritDoc} */
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " currentChange=" + currentChange
-        + " open cursors=" + cursors + " exhausted cursors=" + exhaustedCursors;
+    return getClass().getSimpleName() + " currentRecord=" + currentRecord
+        + " currentData=" + currentData + " openCursors=" + cursors
+        + " exhaustedCursors=" + exhaustedCursors;
   }
 
 }

--
Gitblit v1.10.0