From 355365ea3f95fdfec5cac95588044b91354096c8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 11:11:46 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4303) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                           |   35 +++++------------
 opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java                                |    9 ----
 opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java                           |   13 ------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java |    9 ----
 opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java                             |   45 +++++++++++-----------
 5 files changed, 34 insertions(+), 77 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index eee6fe4..d70e7a1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -25,11 +25,6 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
-import static org.opends.server.util.StaticUtils.*;
-
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListSet;
@@ -50,6 +45,11 @@
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+
 /**
  * Thread responsible for inserting replicated changes into the ChangeNumber
  * Index DB (CNIndexDB for short). Only changes older than the medium
@@ -430,26 +430,13 @@
       {
         try
         {
-          if (!domainsToClear.isEmpty())
+          while (!domainsToClear.isEmpty())
           {
-            final DN cursorData = nextChangeForInsertDBCursor.getData();
-            final boolean callNextOnCursor =
-                cursorData != null && domainsToClear.contains(cursorData);
-            while (!domainsToClear.isEmpty())
-            {
-              final DN baseDNToClear = domainsToClear.first();
-              nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
-              // Only release the waiting thread
-              // once this domain's state has been cleared.
-              domainsToClear.remove(baseDNToClear);
-            }
-
-            if (callNextOnCursor)
-            {
-              // The next change to consume comes from a domain to be removed.
-              // Call DBCursor.next() to ensure this domain is removed
-              nextChangeForInsertDBCursor.next();
-            }
+            final DN baseDNToClear = domainsToClear.first();
+            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+            // Only release the waiting thread
+            // once this domain's state has been cleared.
+            domainsToClear.remove(baseDNToClear);
           }
 
           // Do not call DBCursor.next() here
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index ae54930..748619c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -107,7 +107,6 @@
       addCursor(cursorToAdvance.getKey(), cursorToAdvance.getValue());
     }
 
-    removeNoLongerNeededCursors();
     incorporateNewCursors();
     return !cursors.isEmpty();
   }
@@ -127,31 +126,31 @@
     }
   }
 
-  private void removeNoLongerNeededCursors()
+  /**
+   * Removes the cursor matching the provided data.
+   *
+   * @param dataToFind
+   *          the data for which the cursor must be found and removed
+   */
+  protected void removeCursor(final Data dataToFind)
   {
-    for (final Iterator<Data> iter = removedCursorsIterator(); iter.hasNext();)
-    {
-      final Data dataToFind = iter.next();
-      for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
-          cursors.entrySet().iterator(); cursorIter.hasNext();)
-      {
-        final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
-        if (dataToFind.equals(entry.getValue()))
-        {
-          entry.getKey().close();
-          cursorIter.remove();
-        }
-      }
-      iter.remove();
-    }
+    removeCursor(this.cursors, dataToFind);
+    removeCursor(this.exhaustedCursors, dataToFind);
   }
 
-  /**
-   * Returns an Iterator over the data associated to cursors that must be removed.
-   *
-   * @return an Iterator over the data associated to cursors that must be removed.
-   */
-  protected abstract Iterator<Data> removedCursorsIterator();
+  private void removeCursor(Map<DBCursor<UpdateMsg>, Data> cursors, Data dataToFind)
+  {
+    for (Iterator<Entry<DBCursor<UpdateMsg>, Data>> cursorIter =
+        cursors.entrySet().iterator(); cursorIter.hasNext();)
+    {
+      final Entry<DBCursor<UpdateMsg>, Data> entry = cursorIter.next();
+      if (dataToFind.equals(entry.getValue()))
+      {
+        entry.getKey().close();
+        cursorIter.remove();
+      }
+    }
+  }
 
   /**
    * Adds a cursor to this composite cursor. It first calls
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index 26c40f7..780b19b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -24,7 +24,6 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -118,14 +117,6 @@
 
   /** {@inheritDoc} */
   @Override
-  @SuppressWarnings("unchecked")
-  protected Iterator<Void> removedCursorsIterator()
-  {
-    return Collections.EMPTY_LIST.iterator(); // nothing to remove
-  }
-
-  /** {@inheritDoc} */
-  @Override
   public void close()
   {
     super.close();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index a7f067f..690e551 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -27,7 +27,6 @@
 import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
@@ -47,8 +46,6 @@
 
   private final ConcurrentSkipListMap<DN, ServerState> newDomains =
       new ConcurrentSkipListMap<DN, ServerState>();
-  private final ConcurrentSkipListSet<DN> removeDomains =
-      new ConcurrentSkipListSet<DN>();
 
   private final PositionStrategy positionStrategy;
 
@@ -108,14 +105,7 @@
    */
   public void removeDomain(DN baseDN)
   {
-    removeDomains.add(baseDN);
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  protected Iterator<DN> removedCursorsIterator()
-  {
-    return removeDomains.iterator();
+    removeCursor(baseDN);
   }
 
   /** {@inheritDoc} */
@@ -125,7 +115,6 @@
     super.close();
     domainDB.unregisterCursor(this);
     newDomains.clear();
-    removeDomains.clear();
   }
 
 }
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index e5b022a..05eb5eb 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -25,9 +25,6 @@
  */
 package org.opends.server.replication.server.changelog.je;
 
-import java.util.Collections;
-import java.util.Iterator;
-
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -51,12 +48,6 @@
     protected void incorporateNewCursors() throws ChangelogException
     {
     }
-
-    @Override
-    protected Iterator<String> removedCursorsIterator()
-    {
-      return Collections.EMPTY_LIST.iterator();
-    }
   }
 
   private UpdateMsg msg1;

--
Gitblit v1.10.0