From 891159050af4aa3fe47c67e3ba7d3f21299027a4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 02 Dec 2013 14:01:32 +0000
Subject: [PATCH] OPENDJ-1174 (CR-2631) Transfer responsibility for populating the ChangeNumberIndexDB to ChangelogDB

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java |  292 ++++++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 193 insertions(+), 99 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 93b8cd3..ce0f7b4 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -30,6 +30,7 @@
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
 import org.opends.server.api.DirectoryThread;
@@ -43,6 +44,9 @@
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.types.DirectoryException;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
@@ -57,6 +61,11 @@
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
+  /**
+   * If this is true, then the {@link #run()} method must clear its state.
+   * Otherwise the run method executes normally.
+   */
+  private final AtomicBoolean doClear = new AtomicBoolean();
   private final ChangelogDB changelogDB;
   /** Only used for initialization, and then discarded. */
   private ChangelogState changelogState;
@@ -101,12 +110,12 @@
       new MultiDomainServerState();
 
   /**
-   * Composite cursor across all the replicaDBs for all the replication domains.
-   * It is volatile to ensure it supports concurrent update. Each time it is
-   * used more than once in a method, the method must take a local copy to
-   * ensure the cursor does not get updated in the middle of the method.
+   * Cursor across all the replicaDBs for all the replication domains. It is
+   * positioned on the next change that needs to be inserted in the CNIndexDB.
+   * <p>
+   * Note: it is only accessed from the {@link #run()} method.
    */
-  private volatile CompositeDBCursor<DN> crossDomainDBCursor;
+  private CompositeDBCursor<DN> nextChangeForInsertDBCursor;
 
   /**
    * New cursors for this Map must be created from the {@link #run()} method,
@@ -116,9 +125,27 @@
    */
   private Map<DN, Map<Integer, DBCursor<UpdateMsg>>> allCursors =
       new HashMap<DN, Map<Integer, DBCursor<UpdateMsg>>>();
-  /** This map can be updated by multiple threads. */
-  private ConcurrentMap<CSN, DN> newCursors =
-      new ConcurrentSkipListMap<CSN, DN>();
+  /**
+   * Holds the newCursors that will have to be created in the next iteration
+   * inside the {@link #run()} method.
+   * <p>
+   * This map can be updated by multiple threads.
+   */
+  private ConcurrentMap<Pair<DN, Integer>, CSN> newCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, CSN>(
+          new Comparator<Pair<DN, Integer>>()
+          {
+            @Override
+            public int compare(Pair<DN, Integer> o1, Pair<DN, Integer> o2)
+            {
+              final int compareBaseDN = o1.getFirst().compareTo(o2.getFirst());
+              if (compareBaseDN == 0)
+              {
+                return o1.getSecond().compareTo(o2.getSecond());
+              }
+              return compareBaseDN;
+            }
+          });
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -164,7 +191,8 @@
   {
     final CSN csn = updateMsg.getCSN();
     lastSeenUpdates.update(baseDN, csn);
-    newCursors.put(csn, baseDN);
+    // only keep the oldest CSN that will be the new cursor's starting point
+    newCursors.putIfAbsent(Pair.of(baseDN, csn.getServerId()), csn);
     tryNotify(baseDN);
   }
 
@@ -210,17 +238,23 @@
     return true;
   }
 
+  /**
+   * Restores in memory data needed to build the CNIndexDB, including the medium
+   * consistency point.
+   */
   private void initialize() throws ChangelogException, DirectoryException
   {
     final ChangeNumberIndexRecord newestRecord =
         changelogDB.getChangeNumberIndexDB().getNewestRecord();
     if (newestRecord != null)
     {
+      // restore the mediumConsistencyRUV from DB
       mediumConsistencyRUV.update(
           new MultiDomainServerState(newestRecord.getPreviousCookie()));
     }
 
-    // initialize the cross domain DB cursor
+    // initialize the DB cursor and the last seen updates
+    // to ensure the medium consistency CSN can move forward
     final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
     for (Entry<DN, List<Integer>> entry
         : changelogState.getDomainToServerIds().entrySet())
@@ -235,12 +269,12 @@
       ServerState latestKnownState = domainDB.getDomainNewestCSNs(baseDN);
       lastSeenUpdates.update(baseDN, latestKnownState);
     }
+    resetNextChangeForInsertDBCursor();
 
-    crossDomainDBCursor = newCompositeDBCursor();
     if (newestRecord != null)
     {
       // restore the "previousCookie" state before shutdown
-      final UpdateMsg record = crossDomainDBCursor.getRecord();
+      final UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
       if (!record.getCSN().equals(newestRecord.getCSN()))
       {
         // TODO JNR i18n safety check, should never happen
@@ -248,14 +282,14 @@
             + record.getCSN() + " newestRecordCSN=" + newestRecord.getCSN()));
       }
       mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
-      crossDomainDBCursor.next();
+      nextChangeForInsertDBCursor.next();
     }
 
     // this will not be used any more. Discard for garbage collection.
     this.changelogState = null;
   }
 
-  private CompositeDBCursor<DN> newCompositeDBCursor() throws ChangelogException
+  private void resetNextChangeForInsertDBCursor() throws ChangelogException
   {
     final Map<DBCursor<UpdateMsg>, DN> cursors =
         new HashMap<DBCursor<UpdateMsg>, DN>();
@@ -270,7 +304,7 @@
     }
     final CompositeDBCursor<DN> result = new CompositeDBCursor<DN>(cursors);
     result.next();
-    return result;
+    nextChangeForInsertDBCursor = result;
   }
 
   private boolean ensureCursorExists(DN baseDN, Integer serverId, CSN csn)
@@ -286,13 +320,27 @@
     if (cursor == null)
     {
       final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
-      cursor = domainDB.getCursorFrom(baseDN, serverId, csn);
+      // use an older CSN because getCursorFrom() starts after the given CSN
+      final CSN anOlderCSN = getPrecedingCSN(csn);
+      cursor = domainDB.getCursorFrom(baseDN, serverId, anOlderCSN);
       map.put(serverId, cursor);
       return false;
     }
     return true;
   }
 
+  /**
+   * Returns the immediately preceding CSN.
+   */
+  private CSN getPrecedingCSN(CSN csn)
+  {
+    if (csn.getSeqnum() > 0)
+    {
+      return new CSN(csn.getTime(), csn.getSeqnum() - 1, csn.getServerId());
+    }
+    return new CSN(csn.getTime() - 1, Integer.MAX_VALUE, csn.getServerId());
+  }
+
   /** {@inheritDoc} */
   @Override
   public void run()
@@ -305,83 +353,96 @@
        * used.
        */
       initialize();
-    }
-    catch (DirectoryException e)
-    {
-      // TODO JNR error message i18n
-      if (debugEnabled())
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      return;
+
+      while (!isShutdownInitiated())
+      {
+        try
+        {
+          if (doClear.get())
+          {
+            removeAllCursors();
+            // No need to use CAS here because it is only for unit tests and at
+            // this point all will have been cleaned up anyway.
+            doClear.set(false);
+          }
+          else
+          {
+            createNewCursors();
+          }
+
+          final UpdateMsg msg = nextChangeForInsertDBCursor.getRecord();
+          if (msg == null)
+          {
+            synchronized (this)
+            {
+              wait();
+            }
+            // advance cursor, success/failure will be checked later
+            nextChangeForInsertDBCursor.next();
+            // loop to check whether new changes have been added to the
+            // ReplicaDBs
+            continue;
+          }
+
+          final CSN csn = msg.getCSN();
+          final DN baseDN = nextChangeForInsertDBCursor.getData();
+          // FIXME problem: what if the serverId is not part of the ServerState?
+          // right now, thread will be blocked
+          if (!canMoveForwardMediumConsistencyPoint(baseDN))
+          {
+            // the oldest record to insert is newer than the medium consistency
+            // point. Let's wait for a change that can be published.
+            synchronized (this)
+            {
+              // double check to protect against a missed call to notify()
+              if (!canMoveForwardMediumConsistencyPoint(baseDN))
+              {
+                wait();
+                // loop to check if changes older than the medium consistency
+                // point have been added to the ReplicaDBs
+                continue;
+              }
+            }
+          }
+
+
+          // OK, the oldest change is older than the medium consistency point
+          // let's publish it to the CNIndexDB.
+
+          // Next if statement is ugly but ensures the first change will not be
+          // immediately trimmed from the CNIndexDB. Yuck!
+          if (mediumConsistencyRUV.isEmpty())
+          {
+            mediumConsistencyRUV.replace(baseDN, new ServerState());
+          }
+          final String previousCookie = mediumConsistencyRUV.toString();
+          final ChangeNumberIndexRecord record =
+              new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
+          changelogDB.getChangeNumberIndexDB().addRecord(record);
+          moveForwardMediumConsistencyPoint(csn, baseDN);
+
+          // advance cursor, success/failure will be checked later
+          nextChangeForInsertDBCursor.next();
+        }
+        catch (InterruptedException ignored)
+        {
+          // was shutdown called? loop to figure it out.
+          Thread.currentThread().interrupt();
+        }
+      }
+      removeAllCursors();
     }
     catch (ChangelogException e)
     {
-      // TODO JNR error message i18n
       if (debugEnabled())
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      return;
+      // TODO JNR error message i18n
     }
-
-    while (!isShutdownInitiated())
+    catch (DirectoryException e)
     {
-      try
-      {
-        createNewCursors();
-
-        final UpdateMsg msg = crossDomainDBCursor.getRecord();
-        if (msg == null)
-        {
-          synchronized (this)
-          {
-            wait();
-          }
-          // advance cursor, success/failure will be checked later
-          crossDomainDBCursor.next();
-          // loop to check whether new changes have been added to the ReplicaDBs
-          continue;
-        }
-
-        final CSN csn = msg.getCSN();
-        final DN baseDN = crossDomainDBCursor.getData();
-        // FIXME problem: what if the serverId is not part of the ServerState?
-        // right now, thread will be blocked
-        if (!canMoveForwardMediumConsistencyPoint(baseDN))
-        {
-          // the oldest record to insert is newer than the medium consistency
-          // point. Let's wait for a change that can be published.
-          synchronized (this)
-          {
-            // double check to protect against a missed call to notify()
-            if (!canMoveForwardMediumConsistencyPoint(baseDN))
-            {
-              wait();
-              // loop to check if changes older than the medium consistency
-              // point have been added to the ReplicaDBs
-              continue;
-            }
-          }
-        }
-
-        // OK, the oldest change is older than the medium consistency point
-        // let's publish it to the CNIndexDB
-        final String previousCookie = mediumConsistencyRUV.toString();
-        final ChangeNumberIndexRecord record =
-            new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
-        changelogDB.getChangeNumberIndexDB().addRecord(record);
-        moveForwardMediumConsistencyPoint(csn, baseDN);
-
-        // advance cursor, success/failure will be checked later
-        crossDomainDBCursor.next();
-      }
-      catch (ChangelogException e)
-      {
-        if (debugEnabled())
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        // TODO JNR error message i18n
-      }
-      catch (InterruptedException ignored)
-      {
-        // was shutdown called?
-      }
+      if (debugEnabled())
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      // TODO JNR error message i18n
     }
   }
 
@@ -402,20 +463,32 @@
     }
   }
 
+  private void removeAllCursors() throws ChangelogException
+  {
+    for (Map<Integer, DBCursor<UpdateMsg>> map : allCursors.values())
+    {
+      StaticUtils.close(map.values());
+    }
+    allCursors.clear();
+    newCursors.clear();
+    resetNextChangeForInsertDBCursor();
+  }
+
   private void removeCursor(final DN baseDN, final CSN csn)
   {
-    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry : allCursors
-        .entrySet())
+    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
+        : allCursors.entrySet())
     {
-      if (baseDN.equals(entry.getKey()))
+      if (baseDN.equals(entry1.getKey()))
       {
-        final Set<Integer> serverIds = entry.getValue().keySet();
-        for (Iterator<Integer> iter = serverIds.iterator(); iter.hasNext();)
+        for (Iterator<Entry<Integer, DBCursor<UpdateMsg>>> iter =
+            entry1.getValue().entrySet().iterator(); iter.hasNext();)
         {
-          final int serverId = iter.next();
-          if (csn.getServerId() == serverId)
+          final Entry<Integer, DBCursor<UpdateMsg>> entry2 = iter.next();
+          if (csn.getServerId() == entry2.getKey())
           {
             iter.remove();
+            StaticUtils.close(entry2.getValue());
             return;
           }
         }
@@ -428,12 +501,13 @@
     if (!newCursors.isEmpty())
     {
       boolean newCursorAdded = false;
-      for (Iterator<Entry<CSN, DN>> iter = newCursors.entrySet().iterator();
-          iter.hasNext();)
+      for (Iterator<Entry<Pair<DN, Integer>, CSN>> iter =
+          newCursors.entrySet().iterator(); iter.hasNext();)
       {
-        final Entry<CSN, DN> entry = iter.next();
-        final CSN csn = entry.getKey();
-        if (!ensureCursorExists(entry.getValue(), csn.getServerId(), null))
+        final Entry<Pair<DN, Integer>, CSN> entry = iter.next();
+        final DN baseDN = entry.getKey().getFirst();
+        final CSN csn = entry.getValue();
+        if (!ensureCursorExists(baseDN, csn.getServerId(), csn))
         {
           newCursorAdded = true;
         }
@@ -441,9 +515,29 @@
       }
       if (newCursorAdded)
       {
-        crossDomainDBCursor = newCompositeDBCursor();
+        resetNextChangeForInsertDBCursor();
       }
     }
   }
 
+  /**
+   * Asks the current thread to clear its state.
+   * <p>
+   * This method is only useful for unit tests.
+   */
+  public void clear()
+  {
+    doClear.set(true);
+    synchronized (this)
+    {
+      notify();
+    }
+    while (doClear.get())
+    {
+      // wait until clear() has been done by thread
+      // ensures unit tests wait that this thread's state is cleaned up
+      Thread.yield();
+    }
+  }
+
 }

--
Gitblit v1.10.0