From b6ccb560e9056cc9c028812f5f63ff2e80c95c87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 18 Jul 2014 13:25:32 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4037) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |  359 +++++++++++------------------------------------------------
 1 files changed, 69 insertions(+), 290 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index deacf2a..68b9688 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -29,15 +29,8 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.*;
 
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 
 import org.opends.messages.Message;
@@ -47,18 +40,15 @@
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
-import org.opends.server.util.StaticUtils;
-
-import com.forgerock.opendj.util.Pair;
 
 /**
  * Thread responsible for inserting replicated changes into the ChangeNumber
@@ -84,7 +74,7 @@
   private ChangelogState changelogState;
 
   /*
-   * mediumConsistencyRUV and lastSeenUpdates must be thread safe, because
+   * The following MultiDomainServerState fields must be thread safe, because
    * 1) initialization can happen while the replication server starts receiving
    * updates 2) many updates can happen concurrently.
    */
@@ -130,39 +120,7 @@
    *
    * @NonNull
    */
-  @SuppressWarnings("unchecked")
-  private CompositeDBCursor<DN> nextChangeForInsertDBCursor =
-      new CompositeDBCursor<DN>(Collections.EMPTY_MAP, false);
-
-  /**
-   * New cursors for this Map must be created from the {@link #run()} method,
-   * i.e. from the same thread that will make use of them. If this rule is not
-   * obeyed, then a JE exception will be thrown about
-   * "Non-transactional Cursors may not be used in multiple threads;".
-   */
-  private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
-      new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
-  /**
-   * Holds the newCursors that will have to be created in the next iteration
-   * inside the {@link #run()} method.
-   * <p>
-   * This map can be updated by multiple threads.
-   */
-  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
-      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
-          new Comparator<Pair<DN, Integer>>()
-          {
-            @Override
-            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
-            {
-              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
-              if (compareBaseDN == 0)
-              {
-                return o1.getSecond().compareTo(o2.getSecond());
-              }
-              return compareBaseDN;
-            }
-          });
+  private MultiDomainDBCursor nextChangeForInsertDBCursor;
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -232,11 +190,8 @@
       return;
     }
 
-    final CSN csn = updateMsg.getCSN();
-    // only keep the oldest CSN that will be the new cursor's starting point
-    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
     final CSN oldestCSNBefore = getOldestLastAliveCSN();
-    lastAliveCSNs.update(baseDN, csn);
+    lastAliveCSNs.update(baseDN, updateMsg.getCSN());
     tryNotify(oldestCSNBefore);
   }
 
@@ -381,28 +336,25 @@
     for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
     {
       final DN baseDN = entry.getKey();
-      if (!isECLEnabledDomain(baseDN))
+      if (isECLEnabledDomain(baseDN))
       {
-        continue;
-      }
+        for (Integer serverId : entry.getValue())
+        {
+          /*
+           * initialize with the oldest possible CSN in order for medium
+           * consistency to wait for all replicas to be alive before moving
+           * forward
+           */
+          lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
+        }
 
-      for (Integer serverId : entry.getValue())
-      {
-        /*
-         * initialize with the oldest possible CSN in order for medium
-         * consistency to wait for all replicas to be alive before moving
-         * forward
-         */
-        lastAliveCSNs.update(baseDN, oldestPossibleCSN(serverId));
-        // start after the actual CSN when initializing from the previous cookie
-        final CSN csn = mediumConsistencyRUV.getCSN(baseDN, serverId);
-        ensureCursorExists(baseDN, serverId, csn);
+        ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
+        lastAliveCSNs.update(baseDN, latestKnownState);
       }
-
-      ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
-      lastAliveCSNs.update(baseDN, latestKnownState);
     }
-    resetNextChangeForInsertDBCursor();
+
+    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+    nextChangeForInsertDBCursor.next();
 
     if (newestRecord != null)
     {
@@ -445,68 +397,6 @@
     return new CSN(0, 0, serverId);
   }
 
-  private void resetNextChangeForInsertDBCursor() throws ChangelogException
-  {
-    final Map<DBCursor<UpdateMsg>, DN> cursors =
-        new HashMap<DBCursor<UpdateMsg>, DN>();
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry
-        : this.allCursors.entrySet())
-    {
-      for (Entry<Integer, DBCursor<UpdateMsg>> entry2
-          : entry.getValue().entrySet())
-      {
-        cursors.put(entry2.getValue(), entry.getKey());
-      }
-    }
-
-    // CNIndexer manages the cursor itself,
-    // so do not try to recycle exhausted cursors
-    CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors, false);
-    result.next();
-    nextChangeForInsertDBCursor = result;
-  }
-
-  private boolean ensureCursorExists(DN baseDN, Integer serverId,
-      CSN startAfterCSN) throws ChangelogException
-  {
-    Map<Integer, DBCursor<UpdateMsg>> map = allCursors.get(baseDN);
-    if (map == null)
-    {
-      map = new ConcurrentSkipListMap<Integer, DBCursor<UpdateMsg>>();
-      allCursors.put(baseDN, map);
-    }
-    DBCursor<UpdateMsg> cursor = map.get(serverId);
-    if (cursor == null)
-    {
-      final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
-      cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
-      cursor.next();
-      map.put(serverId, cursor);
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Returns the immediately preceding CSN.
-   *
-   * @param csn
-   *          the CSN to use
-   * @return the immediately preceding CSN or null if the provided CSN is null.
-   */
-  CSN getPrecedingCSN(CSN csn)
-  {
-    if (csn == null)
-    {
-      return null;
-    }
-    if (csn.getSeqnum() > 0)
-    {
-      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
-    }
-    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
-  }
-
   /** {@inheritDoc} */
   @Override
   public void initiateShutdown()
@@ -535,28 +425,18 @@
       {
         try
         {
-          if (!domainsToClear.isEmpty())
+          while (!domainsToClear.isEmpty())
           {
-            while (!domainsToClear.isEmpty())
-            {
-              final DN baseDNToClear = domainsToClear.first();
-              removeCursors(baseDNToClear);
-              // Only release the waiting thread
-              // once this domain's state has been cleared.
-              domainsToClear.remove(baseDNToClear);
-            }
-            resetNextChangeForInsertDBCursor();
-          }
-          else
-          {
-            final boolean createdCursors = createNewCursors();
-            final boolean recycledCursors = recycleExhaustedCursors();
-            if (createdCursors || recycledCursors)
-            {
-              resetNextChangeForInsertDBCursor();
-            }
+            final DN baseDNToClear = domainsToClear.first();
+            nextChangeForInsertDBCursor.removeDomain(baseDNToClear);
+            // Only release the waiting thread
+            // once this domain's state has been cleared.
+            domainsToClear.remove(baseDNToClear);
           }
 
+          // Do not call DBCursor.next() here
+          // because we might not have consumed the last record,
+          // for example if we could not move the MCP forward
           final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
           if (msg == null)
           {
@@ -568,8 +448,13 @@
               }
               wait();
             }
-            // loop to check whether new changes have been added to the
-            // ReplicaDBs
+            // check whether new changes have been added to the ReplicaDBs
+            nextChangeForInsertDBCursor.next();
+            continue;
+          }
+          else if (msg instanceof ReplicaOfflineMsg)
+          {
+            nextChangeForInsertDBCursor.next();
             continue;
           }
 
@@ -615,39 +500,44 @@
     }
     catch (RuntimeException e)
     {
-      // Nothing can be done about it.
-      // Rely on the DirectoryThread uncaught exceptions handler
-      // for logging error + alert.
-      // Message logged here gives corrective information to the administrator.
-      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
-          getClass().getSimpleName(), stackTraceToSingleLineString(e));
-      TRACER.debugError(msg.toString());
+      logUnexpectedException(e);
+      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
       throw e;
     }
     catch (Exception e)
     {
-      // Nothing can be done about it.
-      // Rely on the DirectoryThread uncaught exceptions handler
-      // for logging error + alert.
-      // Message logged here gives corrective information to the administrator.
-      Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
-          getClass().getSimpleName(), stackTraceToSingleLineString(e));
-      TRACER.debugError(msg.toString());
+      logUnexpectedException(e);
+      // Rely on the DirectoryThread uncaught exceptions handler for logging error + alert.
       throw new RuntimeException(e);
     }
     finally
     {
-      removeCursors(DN.NULL_DN);
+      nextChangeForInsertDBCursor.close();
+      nextChangeForInsertDBCursor = null;
     }
   }
 
+  /**
+   * Nothing can be done about it.
+   * <p>
+   * Rely on the DirectoryThread uncaught exceptions handler for logging error +
+   * alert.
+   * <p>
+   * Message logged here gives corrective information to the administrator.
+   */
+  private void logUnexpectedException(Exception e)
+  {
+    Message msg = ERR_CHANGE_NUMBER_INDEXER_UNEXPECTED_EXCEPTION.get(
+        getClass().getSimpleName(), stackTraceToSingleLineString(e));
+    TRACER.debugError(msg.toString());
+  }
+
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
       final DN mcBaseDN) throws ChangelogException
   {
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
 
-    boolean callNextOnCursor = true;
     final int mcServerId = mcCSN.getServerId();
     final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
     final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -660,133 +550,22 @@
       }
       else if (offlineCSN.isOlderThan(mcCSN))
       {
-        Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
-            pair = getCursor(mcBaseDN, mcCSN.getServerId());
-        Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter = pair.getSecond();
-        if (iter != null && !iter.hasNext())
-        {
-          /*
-           * replica is not back online, Medium consistency point has gone past
-           * its last offline time, and there are no more changes after the
-           * offline CSN in the cursor: remove everything known about it:
-           * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
-           * this replica from the medium consistency RUV.
-           */
-          iter.remove();
-          StaticUtils.close(pair.getFirst());
-          resetNextChangeForInsertDBCursor();
-          callNextOnCursor = false;
-          lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
-          mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
-        }
+        /*
+         * replica is not back online, Medium consistency point has gone past
+         * its last offline time, and there are no more changes after the
+         * offline CSN in the cursor: remove everything known about it:
+         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
+         * this replica from the medium consistency RUV.
+         */
+        // TODO JNR how to close cursor for offline replica?
+        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
       }
     }
 
-    if (callNextOnCursor)
-    {
-      // advance the cursor we just read from,
-      // success/failure will be checked later
-      nextChangeForInsertDBCursor.next();
-    }
-  }
-
-  private void removeCursors(DN baseDN)
-  {
-    if (nextChangeForInsertDBCursor != null)
-    {
-      nextChangeForInsertDBCursor.close();
-      nextChangeForInsertDBCursor = null;
-    }
-    if (DN.NULL_DN.equals(baseDN))
-    {
-      // close all cursors
-      for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
-      {
-        StaticUtils.close(map.values());
-      }
-      allCursors.clear();
-      newCursors.clear();
-    }
-    else
-    {
-      // close cursors for this DN
-      final Map<Integer, DBCursor<UpdateMsg>> map = allCursors.remove(baseDN);
-      if (map != null)
-      {
-        StaticUtils.close(map.values());
-      }
-      for (Iterator<Pair<DN, Integer>> it = newCursors.keySet().iterator(); it.hasNext();)
-      {
-        if (it.next().getFirst().equals(baseDN))
-        {
-          it.remove();
-        }
-      }
-    }
-  }
-
-  private Pair<DBCursor<UpdateMsg>, Iterator<Entry<Integer, DBCursor<UpdateMsg>>>>
-      getCursor(final DN baseDN, final int serverId) throws ChangelogException
-  {
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
-        : allCursors.entrySet())
-    {
-      if (baseDN.equals(entry1.getKey()))
-      {
-        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
-            entry1.getValue().entrySet().iterator(); iter.hasNext();)
-        {
-          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
-          if (serverId == entry2.getKey())
-          {
-            return Pair.of(entry2.getValue(), iter);
-          }
-        }
-      }
-    }
-    return Pair.empty();
-  }
-
-  private boolean recycleExhaustedCursors() throws ChangelogException
-  {
-    boolean succesfullyRecycled = false;
-    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
-    {
-      for (DBCursor<UpdateMsg> cursor : map.values())
-      {
-        // try to recycle it by calling next()
-        if (cursor.getRecord() == null && cursor.next())
-        {
-          succesfullyRecycled = true;
-        }
-      }
-    }
-    return succesfullyRecycled;
-  }
-
-  private boolean createNewCursors() throws ChangelogException
-  {
-    if (!newCursors.isEmpty())
-    {
-      boolean newCursorAdded = false;
-      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
-          newCursors.entrySet().iterator(); iter.hasNext();)
-      {
-        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
-        final DN baseDN = entry.getKey().getFirst();
-        final CSN csn = entry.getValue();
-        // start after preceding CSN so the first CSN read will exactly be the
-        // current one
-        final CSN startFromCSN = getPrecedingCSN(csn);
-        if (!ensureCursorExists(baseDN, csn.getServerId(), startFromCSN))
-        {
-          newCursorAdded = true;
-        }
-        iter.remove();
-      }
-      return newCursorAdded;
-    }
-    return false;
+    // advance the cursor we just read from,
+    // success/failure will be checked later
+    nextChangeForInsertDBCursor.next();
   }
 
   /**

--
Gitblit v1.10.0