From fa0dd51b38d4d1c2ac738232608e9a8538c92192 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 19 Aug 2014 10:06:35 +0000
Subject: [PATCH] Forward port of checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend   to support cn=changelog CR-4053

---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java                             |   10 ++
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   13 ++--
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java                                  |   13 +++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                                      |    5 +
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                   |   32 ++++++----
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java   |    1 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java                                       |   23 +++++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                            |   32 +++++++---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |    3 
 9 files changed, 94 insertions(+), 38 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 1b00dce..644b994 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -53,6 +53,7 @@
 import static org.opends.server.replication.common.ServerStatus.*;
 import static org.opends.server.replication.common.StatusMachineEvent.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 import static org.opends.server.util.StaticUtils.*;
 
 /**
@@ -1347,12 +1348,12 @@
    * @return a non null {@link DBCursor} going from oldest to newest CSN
    * @throws ChangelogException
    *           If a database problem happened
-   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
+   * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
    */
   public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
       throws ChangelogException
   {
-    return domainDB.getCursorFrom(baseDN, startAfterServerState);
+    return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
   }
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
index ad658e4..1fe0714 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -60,6 +60,29 @@
 {
 
   /**
+   * Represents a cursor key matching strategy, which allow to choose if only
+   * the exact key must be found or if any key equals or higher should match.
+   */
+  public enum KeyMatchingStrategy {
+    /** matches only if the exact key is found. */
+    EQUAL_TO_KEY,
+    /** matches if the key or a greater key is found. */
+    GREATER_THAN_OR_EQUAL_TO_KEY
+  }
+
+  /**
+   * Represents a cursor positioning strategy, which allow to choose if the start point
+   * corresponds to the record at the provided key or the record just after the provided
+   * key.
+   */
+  public enum PositionStrategy {
+    /** start point is on the matching key. */
+    ON_MATCHING_KEY,
+    /** start point is after the matching key. */
+    AFTER_MATCHING_KEY
+  }
+
+  /**
    * Getter for the current record.
    *
    * @return The current record.
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 5728e6a..011a7c4 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -29,6 +29,7 @@
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
 import org.opends.server.types.DN;
 
@@ -92,30 +93,33 @@
   void removeDomain(DN baseDN) throws ChangelogException;
 
   /**
-   * Generates a {@link DBCursor} across all the domains starting after the
+   * Generates a {@link DBCursor} across all the domains starting at or after the
    * provided {@link MultiDomainServerState} for each domain.
    * <p>
    * When the cursor is not used anymore, client code MUST call the
    * {@link DBCursor#close()} method to free the resources and locks used by the
    * cursor.
    *
-   * @param startAfterState
+   * @param startState
    *          Starting point for each domain cursor. If any {@link ServerState}
    *          for a domain is null, then start from the oldest CSN for each
    *          replicaDBs
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to indicates at which
+   *          exact position the cursor must start
    * @return a non null {@link DBCursor}
    * @throws ChangelogException
    *           If a database problem happened
-   * @see #getCursorFrom(DN, ServerState)
+   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
    */
-  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
       throws ChangelogException;
 
   // serverId methods
 
   /**
    * Generates a {@link DBCursor} across all the replicaDBs for the specified
-   * replication domain starting after the provided {@link ServerState} for each
+   * replication domain starting at or after the provided {@link ServerState} for each
    * replicaDBs.
    * <p>
    * When the cursor is not used anymore, client code MUST call the
@@ -124,21 +128,24 @@
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @param startAfterState
+   * @param startState
    *          Starting point for each ReplicaDB cursor. If any CSN for a
    *          replicaDB is null, then start from the oldest CSN for this
    *          replicaDB
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to indicates at which
+   *          exact position the cursor must start
    * @return a non null {@link DBCursor}
    * @throws ChangelogException
    *           If a database problem happened
-   * @see #getCursorFrom(DN, int, CSN)
+   * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
    */
-  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
       throws ChangelogException;
 
   /**
    * Generates a {@link DBCursor} for one replicaDB for the specified
-   * replication domain and serverId starting after the provided {@link CSN}.
+   * replication domain and serverId starting at or after the provided {@link CSN}.
    * <p>
    * When the cursor is not used anymore, client code MUST call the
    * {@link DBCursor#close()} method to free the resources and locks used by the
@@ -148,14 +155,17 @@
    *          the replication domain baseDN of the replicaDB
    * @param serverId
    *          the serverId of the replicaDB
-   * @param startAfterCSN
+   * @param startCSN
    *          Starting point for the ReplicaDB cursor. If the CSN is null, then
    *          start from the oldest CSN for this replicaDB
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to indicates at which
+   *          exact position the cursor must start
    * @return a non null {@link DBCursor}
    * @throws ChangelogException
    *           If a database problem happened
    */
-  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
+  DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
       throws ChangelogException;
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 9e9b6df..04cfbb3 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -42,6 +42,7 @@
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 
@@ -335,7 +336,7 @@
       }
     }
 
-    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+    nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
     nextChangeForInsertDBCursor.next();
 
     if (newestRecord != null)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index c249a78..26c40f7 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -54,6 +54,8 @@
    */
   private static final CSN NULL_CSN = new CSN(0, 0, 0);
 
+  private final PositionStrategy positionStrategy;
+
   /**
    * Builds a DomainDBCursor instance.
    *
@@ -61,11 +63,15 @@
    *          the replication domain baseDN of this cursor
    * @param domainDB
    *          the DB for the provided replication domain
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to indicates at which
+   *          exact position the cursor must start
    */
-  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+  public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
   {
     this.baseDN = baseDN;
     this.domainDB = domainDB;
+    this.positionStrategy = positionStrategy;
   }
 
   /**
@@ -102,8 +108,9 @@
       final Entry<Integer, CSN> pair = iter.next();
       final int serverId = pair.getKey();
       final CSN csn = pair.getValue();
-      final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
-      final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+      final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
+      final DBCursor<UpdateMsg> cursor =
+          domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
       addCursor(cursor, null);
       iter.remove();
     }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index dc23a7e..ab55875 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -36,6 +36,7 @@
 import org.forgerock.i18n.LocalizableMessageBuilder;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
 import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.util.Reject;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.api.DirectoryThread;
 import org.opends.server.replication.common.CSN;
@@ -46,6 +47,7 @@
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.types.DN;
 import org.opends.server.util.StaticUtils;
 import org.opends.server.util.TimeThread;
@@ -696,37 +698,38 @@
 
   /** {@inheritDoc} */
   @Override
-  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
-    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
     registeredMultiDomainCursors.add(cursor);
     for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
+      cursor.addDomain(baseDN, startState.getServerState(baseDN));
     }
     return cursor;
   }
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
-      throws ChangelogException
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
-    final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
     for (int serverId : getDomainMap(baseDN).keySet())
     {
       // get the last already sent CSN from that server to get a cursor
-      final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
       cursor.addReplicaDB(serverId, lastCSN);
     }
     return cursor;
   }
 
-  private DomainDBCursor newDomainDBCursor(final DN baseDN)
+  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
   {
     synchronized (registeredDomainCursors)
     {
-      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
       List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
       if (cursors == null)
       {
@@ -753,15 +756,18 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
-      throws ChangelogException
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
+      PositionStrategy positionStrategy) throws ChangelogException
+
   {
+    Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
+        + " is not supported for the JE implementation fo changelog");
     final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
       final DBCursor<UpdateMsg> cursor =
-          replicaDB.generateCursorFrom(startAfterCSN);
-      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+          replicaDB.generateCursorFrom(startCSN);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
       // TODO JNR if (offlineCSN != null) ??
       // What about replicas that suddenly become offline?
       return new ReplicaOfflineCursor(cursor, offlineCSN);
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index cf88679..a7f067f 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -50,15 +50,21 @@
   private final ConcurrentSkipListSet<DN> removeDomains =
       new ConcurrentSkipListSet<DN>();
 
+  private final PositionStrategy positionStrategy;
+
   /**
    * Builds a MultiDomainDBCursor instance.
    *
    * @param domainDB
    *          the replication domain management DB
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to indicates at which
+   *          exact position the cursor must start
    */
-  public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+  public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
   {
     this.domainDB = domainDB;
+    this.positionStrategy = positionStrategy;
   }
 
   /**
@@ -86,7 +92,7 @@
       final Entry<DN, ServerState> entry = iter.next();
       final DN baseDN = entry.getKey();
       final ServerState serverState = entry.getValue();
-      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+      final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
       addCursor(domainDBCursor, baseDN);
       iter.remove();
     }
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 23c9318..ae9b0f0 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -54,6 +54,7 @@
 import static org.assertj.core.api.Assertions.*;
 import static org.mockito.Matchers.*;
 import static org.mockito.Mockito.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
 
 /**
  * Test for ChangeNumberIndexer class. All dependencies to the changelog DB
@@ -158,7 +159,7 @@
   {
     MockitoAnnotations.initMocks(this);
 
-    multiDomainCursor = new MultiDomainDBCursor(domainDB);
+    multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
     initialState = new ChangelogState();
     initialCookie = new MultiDomainServerState();
     replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
@@ -167,8 +168,8 @@
 
     when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
     when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
-    when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
-        multiDomainCursor);
+    when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
+      .thenReturn(multiDomainCursor);
   }
 
   @AfterMethod
@@ -596,15 +597,15 @@
       DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
       if (domainDBCursor == null)
       {
-        domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+        domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
         domainDBCursors.put(baseDN, domainDBCursor);
 
         multiDomainCursor.addDomain(baseDN, null);
-        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+        when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
             .thenReturn(domainDBCursor);
       }
       domainDBCursor.addReplicaDB(serverId, null);
-      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+      when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
           .thenReturn(replicaDBCursor);
     }
 
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index da8bcd0..e5b022a 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -187,6 +187,7 @@
         of(msg4, baseDN1));
   }
 
+  @Test
   public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
   {
     final CompositeDBCursor<String> compCursor = newCompositeDBCursor(

--
Gitblit v1.10.0