From 60f8d8d4575206697f47c040d4272dee27251bab Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Apr 2014 13:56:22 +0000
Subject: [PATCH] OPENDJ-1430 Some changes are missing from the external changelog changeNumber

---
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java |   92 +++++++++++++++++++++++++++++++++-------------
 1 files changed, 66 insertions(+), 26 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 46994c5..52ef4c6 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -21,7 +21,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.je;
 
@@ -45,8 +45,23 @@
 final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
-  private UpdateMsg currentRecord;
-  private Data currentData;
+  private static final byte UNINITIALIZED = 0;
+  private static final byte READY = 1;
+  private static final byte CLOSED = 2;
+
+  /**
+   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
+   * {@link #CLOSED}
+   */
+  private byte state = UNINITIALIZED;
+
+  /** Whether this composite should try to recycle exhausted cursors. */
+  private final boolean recycleExhaustedCursors;
+  /**
+   * These cursors are considered exhausted because they had no new changes the
+   * last time {@link DBCursor#next()} was called on them. Exhausted cursors
+   * might be recycled at some point when they start returning changes again.
+   */
   private final Map<DBCursor<UpdateMsg>, Data> exhaustedCursors =
       new HashMap<DBCursor<UpdateMsg>, Data>();
   /**
@@ -71,9 +86,14 @@
    *
    * @param cursors
    *          the cursors that will be iterated upon.
+   * @param recycleExhaustedCursors
+   *          whether a call to {@link #next()} tries to recycle exhausted
+   *          cursors
    */
-  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors)
+  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
+      boolean recycleExhaustedCursors)
   {
+    this.recycleExhaustedCursors = recycleExhaustedCursors;
     for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
     {
       put(entry);
@@ -84,10 +104,16 @@
   @Override
   public boolean next() throws ChangelogException
   {
-    if (!exhaustedCursors.isEmpty())
+    if (state == CLOSED)
+    {
+      return false;
+    }
+    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
+    state = READY;
+    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
     {
       // try to recycle empty cursors in case the underlying ReplicaDBs received
-      // new changes. Copy the List to avoid ConcurrentModificationExceptions.
+      // new changes.
       final Map<DBCursor<UpdateMsg>, Data> copy =
           new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
@@ -96,25 +122,30 @@
         entry.getKey().next();
         put(entry);
       }
-    }
-
-    if (cursors.isEmpty())
-    {
-      // no cursors are left with changes.
-      currentRecord = null;
-      currentData = null;
-      return false;
+      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
+      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
+      {
+        // if the first cursor was previously an exhausted cursor,
+        // then we have already called next() on it.
+        // Avoid calling it again because we know new changes have been found.
+        return true;
+      }
     }
 
     // To keep consistent the cursors' order in the SortedSet, it is necessary
-    // to remove and eventually add again a cursor (after moving it forward).
-    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.pollFirstEntry();
-    final DBCursor<UpdateMsg> cursor = entry.getKey();
-    currentRecord = cursor.getRecord();
-    currentData = entry.getValue();
-    cursor.next();
-    put(entry);
-    return true;
+    // to remove and add again the cursor after moving it forward.
+    if (advanceNonExhaustedCursors)
+    {
+      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
+      if (firstEntry != null)
+      {
+        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
+        cursor.next();
+        put(firstEntry);
+      }
+    }
+    // no cursors are left with changes.
+    return !cursors.isEmpty();
   }
 
   private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
@@ -135,7 +166,12 @@
   @Override
   public UpdateMsg getRecord()
   {
-    return currentRecord;
+    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+    if (entry != null)
+    {
+      return entry.getKey().getRecord();
+    }
+    return null;
   }
 
   /**
@@ -145,7 +181,12 @@
    */
   public Data getData()
   {
-    return currentData;
+    final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
+    if (entry != null)
+    {
+      return entry.getValue();
+    }
+    return null;
   }
 
   /** {@inheritDoc} */
@@ -160,8 +201,7 @@
   @Override
   public String toString()
   {
-    return getClass().getSimpleName() + " currentRecord=" + currentRecord
-        + " currentData=" + currentData + " openCursors=" + cursors
+    return getClass().getSimpleName() + " openCursors=" + cursors
         + " exhaustedCursors=" + exhaustedCursors;
   }
 

--
Gitblit v1.10.0