From dc02a21390ac3b24e2eaa2505c823a33fd3eee07 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 23 Jun 2015 08:06:48 +0000
Subject: [PATCH] OPENDJ-2141 (CR-7337) Cannot find entry in cn=changelog when searching with changelogCookie filter

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java    |   40 +++-----
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java      |   66 ++++--------
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java            |    6 
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java        |    5 
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java |   51 ----------
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java          |    7 
 opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java                                |   60 ++++++++---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java              |    7 
 8 files changed, 92 insertions(+), 150 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
index 1753112..48ce4c1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/ChangelogBackend.java
@@ -1077,7 +1077,7 @@
       continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
       if (continueSearch)
       {
-        final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get());
+        final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN());
         if (updateMsg != null)
         {
           continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie);
@@ -1092,20 +1092,48 @@
   private void initializeCookieForChangeNumberMode(
       MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
   {
-    ECLMultiDomainDBCursor eclCursor = null;
-    try
+    // Initialize the multi domain cursor only from the change number index record.
+    // The cookie is always empty at this stage.
+    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN());
+    MultiDomainServerState unused = new MultiDomainServerState();
+    MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options);
+    try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor))
     {
-      cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
-      CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
-      MultiDomainDBCursor cursor =
-          getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, options);
-      eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
-      eclCursor.next();
-      cookie.update(eclCursor.toCookie());
+      updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord);
     }
-    finally
+  }
+
+  /**
+   * Rebuilds the changelogcookie starting at the newest change number index record.
+   * <p>
+   * It updates the provided cookie with the changes from the provided ECL cursor,
+   * up to (and including) the provided change number index record.
+   * <p>
+   * Therefore, after calling this method, the cursor is positioned
+   * to the change immediately following the provided change number index record.
+   *
+   * @param cookie the cookie to update
+   * @param cursor the cursor where to read changes from
+   * @param cnIndexRecord the change number index record to go right after
+   * @throws ChangelogException if any problem occurs
+   */
+  public static void updateCookieToMediumConsistencyPoint(
+      MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord)
+          throws ChangelogException
+  {
+    if (cnIndexRecord == null)
     {
-      close(eclCursor);
+      return;
+    }
+
+    while (cursor.next())
+    {
+      UpdateMsg updateMsg = cursor.getRecord();
+      if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0)
+      {
+        break;
+      }
+      cookie.update(cursor.getData(), updateMsg.getCSN());
     }
   }
 
@@ -1135,15 +1163,13 @@
    *           If inconsistency is detected between the available update
    *           messages and the provided cnIndexRecord
    */
-  private UpdateMsg findReplicaUpdateMessage(
-      final ChangeNumberIndexRecord cnIndexRecord,
-      final MultiDomainDBCursor replicaUpdatesCursor)
-          throws DirectoryException, ChangelogException
+  private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn)
+      throws ChangelogException, DirectoryException
   {
     while (true)
     {
       final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
-      final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
+      final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN());
       if (compareIndexWithUpdateMsg < 0) {
         // Either update message has been purged or baseDN has been removed from changelogDB,
         // ignore current index record and go to the next one
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
index 0414235..03e3484 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ChangeNumberIndexer.java
@@ -71,8 +71,7 @@
    * for the supplied domain baseDNs. If a supplied domain is
    * {@link DN#NULL_DN}, then all domains will be cleared.
    */
-  private final ConcurrentSkipListSet<DN> domainsToClear =
-      new ConcurrentSkipListSet<DN>();
+  private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>();
   private final ChangelogDB changelogDB;
   /** Only used for initialization, and then discarded. */
   private ChangelogState changelogState;
@@ -108,6 +107,7 @@
    * @NonNull
    */
   private ECLMultiDomainDBCursor nextChangeForInsertDBCursor;
+  private MultiDomainServerState cookie = new MultiDomainServerState();
 
   /**
    * Builds a ChangeNumberIndexer object.
@@ -317,30 +317,16 @@
 
   private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
   {
-    final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
-
-    CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
-    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
-        domainDB.getCursorFrom(cookieWithNewestCSN, options);
+    // Initialize the multi domain cursor only from the change number index record.
+    // The cookie is always empty at this stage.
+    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
+    final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null;
+    final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn);
+    final MultiDomainServerState unused = new MultiDomainServerState();
+    MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options);
 
     nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
-    nextChangeForInsertDBCursor.next();
-  }
-
-  /** Returns a cookie initialised with the newest CSN for each replica. */
-  private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
-  {
-    final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
-    final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
-    if (newestRecord != null)
-    {
-      final CSN newestCsn = newestRecord.getCSN();
-      for (DN baseDN : changelogState.getDomainToServerIds().keySet())
-      {
-        cookieWithNewestCSN.update(baseDN, newestCsn);
-      }
-    }
-    return cookieWithNewestCSN;
+    ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord);
   }
 
   private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
@@ -477,7 +463,11 @@
           // let's publish it to the CNIndexDB.
           final long changeNumber = changelogDB.getChangeNumberIndexDB()
               .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
-          MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
+          if (!cookie.update(baseDN, csn))
+          {
+            throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn
+                + ") would have updated the cookie=" + cookie + ", but it did not");
+          }
           notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
           moveForwardMediumConsistencyPoint(csn, baseDN);
         }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
index 15b5a7d..17aab83 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/CompositeDBCursor.java
@@ -25,10 +25,13 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
-import java.util.*;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Map.Entry;
+import java.util.TreeMap;
 
-import org.forgerock.util.Pair;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -46,15 +49,11 @@
  */
 abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg>
 {
-
   private static final byte UNINITIALIZED = 0;
   private static final byte READY = 1;
   private static final byte CLOSED = 2;
 
-  /**
-   * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or
-   * {@link #CLOSED}
-   */
+  /** The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED} */
   private byte state = UNINITIALIZED;
 
   /**
@@ -62,8 +61,7 @@
    * last time {@link DBCursor#next()} was called on them. Exhausted cursors
    * might be recycled at some point when they start returning changes again.
    */
-  private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors =
-      new HashMap<DBCursor<UpdateMsg>, T>();
+  private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = new HashMap<>();
   /**
    * The cursors are sorted based on the current change of each cursor to
    * consider the next change across all available cursors.
@@ -73,8 +71,7 @@
    * thrown about
    * "Non-transactional Cursors may not be used in multiple threads;".
    */
-  private final TreeMap<DBCursor<UpdateMsg>, T> cursors =
-      new TreeMap<DBCursor<UpdateMsg>, T>(
+  private final TreeMap<DBCursor<UpdateMsg>, T> cursors = new TreeMap<>(
           new Comparator<DBCursor<UpdateMsg>>()
           {
             @Override
@@ -82,7 +79,20 @@
             {
               final CSN csn1 = o1.getRecord().getCSN();
               final CSN csn2 = o2.getRecord().getCSN();
-              return CSN.compare(csn1, csn2);
+              int cmpCsn = CSN.compare(csn1, csn2);
+              if (cmpCsn == 0
+                  && o1 instanceof CompositeDBCursor
+                  && o2 instanceof CompositeDBCursor)
+              {
+                // Ensures a consistent order when the CSNs are equal (rare in practice)
+                T data1 = ((CompositeDBCursor<T>) o1).getData();
+                T data2 = ((CompositeDBCursor<T>) o1).getData();
+                if (data1 instanceof Comparable && data2 instanceof Comparable)
+                {
+                  return ((Comparable<T>) data1).compareTo(data2);
+                }
+              }
+              return cmpCsn;
             }
           });
 
@@ -215,36 +225,6 @@
     return null;
   }
 
-  /**
-   * Returns a snapshot of this cursor.
-   *
-   * @return a list of (Data, UpdateMsg) pairs representing the state of the
-   *         cursor. In each pair, the data or the update message may be
-   *         {@code null}, but at least one of them is non-null.
-   */
-  public List<Pair<T, UpdateMsg>> getSnapshot()
-  {
-    final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>();
-    for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet())
-    {
-      final UpdateMsg updateMsg = entry.getKey().getRecord();
-      final T data = entry.getValue();
-      if (updateMsg != null || data != null)
-      {
-        snapshot.add(Pair.of(data, updateMsg));
-      }
-    }
-    for (T data : exhaustedCursors.values())
-    {
-      if (data != null)
-      {
-        snapshot.add(Pair.of(data, (UpdateMsg) null));
-      }
-    }
-    return snapshot;
-  }
-
-  /** {@inheritDoc} */
   @Override
   public void close()
   {
@@ -255,12 +235,10 @@
     exhaustedCursors.clear();
   }
 
-  /** {@inheritDoc} */
   @Override
   public String toString()
   {
     return getClass().getSimpleName() + " openCursors=" + cursors
         + " exhaustedCursors=" + exhaustedCursors;
   }
-
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
index 19e1ab5..193eb45 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ECLMultiDomainDBCursor.java
@@ -24,11 +24,6 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.forgerock.util.Pair;
-import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -40,7 +35,6 @@
  */
 public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
 {
-
   private final ECLEnabledDomainPredicate predicate;
   private final MultiDomainDBCursor cursor;
 
@@ -88,7 +82,6 @@
     cursor.removeDomain(baseDN);
   }
 
-  /** {@inheritDoc} */
   @Override
   public boolean next() throws ChangelogException
   {
@@ -102,59 +95,15 @@
     return hasNext;
   }
 
-  /** {@inheritDoc} */
   @Override
   public void close()
   {
     cursor.close();
   }
 
-  /** {@inheritDoc} */
   @Override
   public String toString()
   {
     return getClass().getSimpleName() + " cursor=[" + cursor + ']';
   }
-
-  /**
-   * Returns a snapshot of this cursor.
-   *
-   * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled
-   *         for the external changelog. The update message may be {@code null}.
-   */
-  List<Pair<DN, UpdateMsg>> getSnapshot()
-  {
-    final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot();
-    final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>();
-    for (Pair<DN, UpdateMsg> pair : snapshot)
-    {
-      DN baseDN = pair.getFirst();
-      if (predicate.isECLEnabledDomain(baseDN))
-      {
-        eclSnapshot.add(pair);
-      }
-    }
-    return eclSnapshot;
-  }
-
-  /**
-   * Returns the cookie corresponding to the state of this cursor.
-   *
-   * @return a valid cookie taking into account only the base DNs enabled for
-   *         the external changelog
-   */
-  public MultiDomainServerState toCookie()
-  {
-    List<Pair<DN, UpdateMsg>> snapshot = getSnapshot();
-    MultiDomainServerState cookie = new MultiDomainServerState();
-    for (Pair<DN, UpdateMsg> pair : snapshot)
-    {
-      // only put base DNs where a CSN is available in the cookie
-      if (pair.getSecond() != null)
-      {
-        cookie.update(pair.getFirst(), pair.getSecond().getCSN());
-      }
-    }
-    return cookie;
-  }
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index dadbc25..b3f70d9 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -729,9 +729,10 @@
     final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
+      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
       final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
-          startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
-      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
+          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
       final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
       final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
 
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 8c8a85d..3698bb0 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -221,12 +221,11 @@
       final PositionStrategy positionStrategy) throws ChangelogException
   {
     RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy);
-    return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
+    CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
+    return new FileReplicaDBCursor(cursor, actualStartCSN, positionStrategy);
   }
 
-  /**
-   * Shutdown this ReplicaDB.
-   */
+  /** Shutdown this ReplicaDB. */
   void shutdown()
   {
     if (shutdown.compareAndSet(false, true))
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index e0634f9..ec7f3c1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -764,7 +764,6 @@
     return null;
   }
 
-  /** {@inheritDoc} */
   @Override
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
       CursorOptions options) throws ChangelogException
@@ -772,9 +771,10 @@
     final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
+      final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN();
       final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(
-          startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
-      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
+          actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy());
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN);
       final ReplicaId replicaId = ReplicaId.of(baseDN, serverId);
       final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this);
 
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index f032d77..401531f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -195,12 +195,11 @@
   DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
       final PositionStrategy positionStrategy) throws ChangelogException
   {
-    return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this);
+    CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null;
+    return new JEReplicaDBCursor(db, actualStartCSN, matchingStrategy, positionStrategy, this);
   }
 
-  /**
-   * Shutdown this ReplicaDB.
-   */
+  /** Shutdown this ReplicaDB. */
   void shutdown()
   {
     if (shutdown.compareAndSet(false, true))

--
Gitblit v1.10.0