From b6ccb560e9056cc9c028812f5f63ff2e80c95c87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java |  154 ++++++++++++++++++++++++++++++++-------------------
 1 files changed, 97 insertions(+), 57 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index ccd27c0..7f271c1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -42,7 +42,7 @@
  * @param <Data>
  *          The type of data associated with each cursor
  */
-public final class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
+abstract class CompositeDBCursor<Data> implements DBCursor<UpdateMsg>
 {
 
   private static final byte UNINITIALIZED = 0;
@@ -55,8 +55,6 @@
    */
   private byte state = UNINITIALIZED;
 
-  /** Whether this composite should try to recycle exhausted cursors. */
-  private final boolean recycleExhaustedCursors;
   /**
    * These cursors are considered exhausted because they had no new changes the
    * last time {@link DBCursor#next()} was called on them. Exhausted cursors
@@ -67,8 +65,13 @@
   /**
    * The cursors are sorted based on the current change of each cursor to
    * consider the next change across all available cursors.
+   * <p>
+   * New cursors for this Map must be created from the same thread that will
+   * make use of them. When this rule is not obeyed, a JE exception will be
+   * thrown about
+   * "Non-transactional Cursors may not be used in multiple threads;".
    */
-  private final NavigableMap<DBCursor<UpdateMsg>, Data> cursors =
+  private final TreeMap<DBCursor<UpdateMsg>, Data> cursors =
       new TreeMap<DBCursor<UpdateMsg>, Data>(
           new Comparator<DBCursor<UpdateMsg>>()
           {
@@ -81,25 +84,6 @@
             }
           });
 
-  /**
-   * Builds a CompositeDBCursor using the provided collection of cursors.
-   *
-   * @param cursors
-   *          the cursors that will be iterated upon.
-   * @param recycleExhaustedCursors
-   *          whether a call to {@link #next()} tries to recycle exhausted
-   *          cursors
-   */
-  public CompositeDBCursor(Map<DBCursor<UpdateMsg>, Data> cursors,
-      boolean recycleExhaustedCursors)
-  {
-    this.recycleExhaustedCursors = recycleExhaustedCursors;
-    for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
-    {
-      put(entry);
-    }
-  }
-
   /** {@inheritDoc} */
   @Override
   public boolean next() throws ChangelogException
@@ -108,51 +92,75 @@
     {
       return false;
     }
-    final boolean advanceNonExhaustedCursors = state != UNINITIALIZED;
-    state = READY;
-    if (recycleExhaustedCursors && !exhaustedCursors.isEmpty())
+
+    if (state == UNINITIALIZED)
     {
-      // try to recycle empty cursors in case the underlying ReplicaDBs received
-      // new changes.
+      state = READY;
+    }
+    else
+    {
+      // Previous state was READY => we must advance the first cursor
+      // because the UpdateMsg it is pointing has already been consumed.
+      // To keep consistent the cursors' order in the SortedSet, it is necessary
+      // to remove the first cursor, then add it again after moving it forward.
+      final Entry<DBCursor<UpdateMsg>, Data> cursorToAdvance = cursors.pollFirstEntry();
+      if (cursorToAdvance != null)
+      {
+        addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
+      }
+    }
+
+    recycleExhaustedCursors();
+    removeNoLongerNeededCursors();
+    incorporateNewCursors();
+    return !cursors.isEmpty();
+  }
+
+  private void recycleExhaustedCursors() throws ChangelogException
+  {
+    if (!exhaustedCursors.isEmpty())
+    {
+      // try to recycle exhausted cursors in case the underlying replica DBs received new changes.
       final Map<DBCursor<UpdateMsg>, Data> copy =
           new HashMap<DBCursor<UpdateMsg>, Data>(exhaustedCursors);
       exhaustedCursors.clear();
       for (Entry<DBCursor<UpdateMsg>, Data> entry : copy.entrySet())
       {
-        entry.getKey().next();
-        put(entry);
-      }
-      final Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.firstEntry();
-      if (firstEntry != null && copy.containsKey(firstEntry.getKey()))
-      {
-        // if the first cursor was previously an exhausted cursor,
-        // then we have already called next() on it.
-        // Avoid calling it again because we know new changes have been found.
-        return true;
+        addCursor(entry.getKey(), entry.getValue());
       }
     }
-
-    // To keep consistent the cursors' order in the SortedSet, it is necessary
-    // to remove and add again the cursor after moving it forward.
-    if (advanceNonExhaustedCursors)
-    {
-      Entry<DBCursor<UpdateMsg>, Data> firstEntry = cursors.pollFirstEntry();
-      if (firstEntry != null)
-      {
-        final DBCursor<UpdateMsg> cursor = firstEntry.getKey();
-        cursor.next();
-        put(firstEntry);
-      }
-    }
-    // no cursors are left with changes.
-    return !cursors.isEmpty();
   }
 
-  private void put(Entry<DBCursor<UpdateMsg>, Data> entry)
+  private void removeNoLongerNeededCursors()
   {
-    final DBCursor<UpdateMsg> cursor = entry.getKey();
-    final Data data = entry.getValue();
-    if (cursor.getRecord() != null)
+    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> iterator =
+        cursors.entrySet().iterator(); iterator.hasNext();)
+    {
+      final Entry<DBCursor<UpdateMsg>, Data> entry = iterator.next();
+      final Data data = entry.getValue();
+      if (isCursorNoLongerNeededFor(data))
+      {
+        entry.getKey().close();
+        iterator.remove();
+        cursorRemoved(data);
+      }
+    }
+  }
+
+  /**
+   * Adds a cursor to this composite cursor. It first calls
+   * {@link DBCursor#next()} to verify whether it is exhausted or not.
+   *
+   * @param cursor
+   *          the cursor to add to this composite
+   * @param data
+   *          the data associated to the provided cursor
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  protected void addCursor(final DBCursor<UpdateMsg> cursor, final Data data) throws ChangelogException
+  {
+    if (cursor.next())
     {
       this.cursors.put(cursor, data);
     }
@@ -166,6 +174,8 @@
   @Override
   public UpdateMsg getRecord()
   {
+    // Cannot call incorporateNewCursors() here because
+    // somebody might have already called DBCursor.getRecord() and read the record
     final Entry<DBCursor<UpdateMsg>, Data> entry = cursors.firstEntry();
     if (entry != null)
     {
@@ -175,6 +185,33 @@
   }
 
   /**
+   * Called when implementors should incorporate new cursors into the current
+   * composite DBCursor. Implementors should call
+   * {@link #addCursor(DBCursor, Object)} to do so.
+   *
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  protected abstract void incorporateNewCursors() throws ChangelogException;
+
+  /**
+   * Returns whether the cursor associated to the provided data should be removed.
+   *
+   * @param data the data associated to the cursor to be tested
+   * @return true if the cursor associated to the provided data should be removed,
+   *         false otherwise
+   */
+  protected abstract boolean isCursorNoLongerNeededFor(Data data);
+
+  /**
+   * Notifies that the cursor associated to the provided data has been removed.
+   *
+   * @param data
+   *          the data associated to the removed cursor
+   */
+  protected abstract void cursorRemoved(Data data);
+
+  /**
    * Returns the data associated to the cursor that returned the current record.
    *
    * @return the data associated to the cursor that returned the current record.
@@ -193,8 +230,11 @@
   @Override
   public void close()
   {
+    state = CLOSED;
     StaticUtils.close(cursors.keySet());
     StaticUtils.close(exhaustedCursors.keySet());
+    cursors.clear();
+    exhaustedCursors.clear();
   }
 
   /** {@inheritDoc} */

--
Gitblit v1.10.0