From 3ee70c76e7e5c1cfb8c5ce16bc3f50f2306bdee7 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 18 Aug 2014 11:00:35 +0000
Subject: [PATCH] Forward port of checkpoint commit for OPENDJ-1471 File based changelog : improve cursor behavior

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                    |    5 -
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java  |    4 -
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java       |    8 ++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                              |    1 
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java    |  110 +++++++++++++++++++++++++++++++++++-
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                             |    4 -
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java                                        |   26 +++++---
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java |    6 +-
 8 files changed, 136 insertions(+), 28 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
index 05e93cd..6970a9e 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -21,7 +21,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.api;
 
@@ -32,15 +32,23 @@
  * anymore, a cursor must be closed to release all the resources into the
  * database.
  * <p>
- * Here is a typical usage pattern:
- *
+ * The cursor provides a java.sql.ResultSet like API : it is positioned before
+ * the first requested record and needs to be moved forward by calling
+ * {@link DBCursor#next()}.
+ * <p>
+ * Usage:
  * <pre>
- * DBCursor&lt;T&gt; cursor = ...;         // obtain a new cursor,
- *                                   // already initialized
- * T record1 = cursor.getRecord();   // get the first record
- * while (cursor.next()) {           // advance to the next record
- *   T record = cursor.getRecord();  // get the next record
- *   ...                             // etc.
+ * {@code
+ *  DBCursor cursor = ...;
+ *  try {
+ *    while (cursor.next()) {
+ *      Record record = cursor.getRecord();
+ *      // ... can call cursor.getRecord() again: it will return the same result
+ *    }
+ *  }
+ *  finally {
+ *    close(cursor);
+ *  }
  * }
  * </pre>
  *
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index d323346..ad2ceb1 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -96,8 +96,6 @@
    * replication domain starting after the provided {@link ServerState} for each
    * replicaDBs.
    * <p>
-   * The cursor is already advanced to the records after the serverState.
-   * <p>
    * When the cursor is not used anymore, client code MUST call the
    * {@link DBCursor#close()} method to free the resources and locks used by the
    * cursor.
@@ -119,8 +117,6 @@
    * Generates a {@link DBCursor} for one replicaDB for the specified
    * replication domain and serverId starting after the provided {@link CSN}.
    * <p>
-   * The cursor is already advanced to the records after the CSN.
-   * <p>
    * When the cursor is not used anymore, client code MUST call the
    * {@link DBCursor#close()} method to free the resources and locks used by the
    * cursor.
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 3e2fd32..ea4f86a 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -463,6 +463,7 @@
     {
       final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
       cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+      cursor.next();
       map.put(serverId, cursor);
       return false;
     }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 2ece66c..7e75bf1 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -705,6 +705,7 @@
       // get the last already sent CSN from that server to get a cursor
       final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
       final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
+      replicaDBCursor.next();
       final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
       cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
     }
@@ -739,9 +740,7 @@
     JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
-      cursor.next();
-      return cursor;
+      return replicaDB.generateCursorFrom(startAfterCSN);
     }
     return EMPTY_CURSOR;
   }
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 3fc8e58..bf5c2da 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -631,10 +631,8 @@
         final CSN csn = newestMsg.getCSN();
         when(cnIndexDB.getNewestRecord()).thenReturn(
             new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
-        final SequentialDBCursor cursor =
-            cursors.get(Pair.of(baseDN, csn.getServerId()));
+        final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
         cursor.add(newestMsg);
-        cursor.next(); // simulate the cursor had been initialized with this change
       }
       initialCookie.update(msg.getBaseDN(), msg.getCSN());
     }
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index a2a2644..5f54b10 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -49,6 +49,8 @@
   private UpdateMsg msg2;
   private UpdateMsg msg3;
   private UpdateMsg msg4;
+  private UpdateMsg msg5;
+  private UpdateMsg msg6;
   private String baseDN1 = "dc=forgerock,dc=com";
   private String baseDN2 = "dc=example,dc=com";
 
@@ -59,6 +61,8 @@
     msg2 = new FakeUpdateMsg(2);
     msg3 = new FakeUpdateMsg(3);
     msg4 = new FakeUpdateMsg(4);
+    msg5 = new FakeUpdateMsg(5);
+    msg6 = new FakeUpdateMsg(6);
   }
 
   @Test
@@ -88,6 +92,17 @@
   }
 
   @Test
+  public void threeElementsCursor() throws Exception
+  {
+    final CompositeDBCursor<String> compCursor =
+        newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2, msg3), baseDN1));
+    assertInOrder(compCursor,
+        of(msg1, baseDN1),
+        of(msg2, baseDN1),
+        of(msg3, baseDN1));
+  }
+
+  @Test
   public void twoEmptyCursors() throws Exception
   {
     final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -121,7 +136,32 @@
   }
 
   @Test
-  public void recycleTwoElementCursors() throws Exception
+  public void twoThreeElementCursors() throws Exception
+  {
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg2, msg3, msg6), baseDN1),
+        of(new SequentialDBCursor(msg1, msg4, msg5), baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg2, baseDN1),
+        of(msg3, baseDN1),
+        of(msg4, baseDN2),
+        of(msg5, baseDN2),
+        of(msg6, baseDN1));
+  }
+
+  @Test
+  public void recycleTwoElementsCursor() throws Exception
+  {
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg1, null, msg2), baseDN1));
+    assertNextRecord(compCursor, of(msg1, baseDN1));
+    assertFalse(compCursor.next());
+    assertNextRecord(compCursor, of(msg2, baseDN1));
+  }
+
+  @Test
+  public void recycleTwoElementsCursors() throws Exception
   {
     final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
         of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
@@ -133,6 +173,50 @@
         of(msg4, baseDN1));
   }
 
+  // TODO : this test fails because msg2 is returned twice
+  @Test(enabled=false)
+  public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
+  {
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(null, null, msg1), baseDN1),
+        of(new SequentialDBCursor(msg2, msg3, msg4), baseDN2));
+    assertInOrder(compCursor,
+        of(msg2, baseDN2),
+        of(msg1, baseDN1),
+        of(msg3, baseDN2),
+        of(msg4, baseDN2));
+  }
+
+  @Test
+  public void recycleThreeElementsCursors() throws Exception
+  {
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
+        of(new SequentialDBCursor(null, msg1, null, msg4, msg5), baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg2, baseDN1),
+        of(msg3, baseDN1),
+        of(msg4, baseDN2),
+        of(msg5, baseDN2),
+        of(msg6, baseDN1));
+  }
+
+  @Test
+  public void recycleThreeElementsCursorsLongerExhaustion() throws Exception
+  {
+    final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+        of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
+        of(new SequentialDBCursor(null, msg1, null, null, msg4, msg5), baseDN2));
+    assertInOrder(compCursor,
+        of(msg1, baseDN2),
+        of(msg2, baseDN1),
+        of(msg3, baseDN1),
+        of(msg4, baseDN2),
+        of(msg5, baseDN2),
+        of(msg6, baseDN1));
+  }
+
   private CompositeDBCursor<String> newCompositeDBCursor(
       Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
   {
@@ -140,6 +224,9 @@
         new HashMap<DBCursor<UpdateMsg>, String>();
     for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
     {
+      // The cursors in the composite are expected to be pointing
+      // to first record available
+      pair.getFirst().next();
       cursorsMap.put(pair.getFirst(), pair.getSecond());
     }
     return new CompositeDBCursor<String>(cursorsMap, true);
@@ -148,13 +235,26 @@
   private void assertInOrder(final CompositeDBCursor<String> compCursor,
       Pair<UpdateMsg, String>... expecteds) throws ChangelogException
   {
-    for (final Pair<UpdateMsg, String> expected : expecteds)
+    for (int i = 0; i < expecteds.length ; i++)
     {
-      assertTrue(compCursor.next());
-      assertSame(compCursor.getRecord(), expected.getFirst());
-      assertSame(compCursor.getData(), expected.getSecond());
+      final Pair<UpdateMsg, String> expected = expecteds[i];
+      final String index = " at element i=" + i;
+      assertTrue(compCursor.next(), index);
+      assertSame(compCursor.getRecord(), expected.getFirst(), index);
+      assertSame(compCursor.getData(), expected.getSecond(), index);
     }
     assertFalse(compCursor.next());
+    assertNull(compCursor.getRecord());
+    assertNull(compCursor.getData());
     compCursor.close();
   }
+
+  private void assertNextRecord(final CompositeDBCursor<String> compCursor,
+      Pair<UpdateMsg, String> expected) throws ChangelogException
+  {
+    assertTrue(compCursor.next());
+    assertSame(compCursor.getRecord(), expected.getFirst());
+    assertSame(compCursor.getData(), expected.getSecond());
+  }
+
 }
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
index 1737f27..538b6de 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -65,7 +65,7 @@
   public void cursorReturnsTrue() throws Exception
   {
     final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
-    delegateCursor = new SequentialDBCursor(null, updateMsg);
+    delegateCursor = new SequentialDBCursor(updateMsg);
 
     final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
     assertThat(cursor.getRecord()).isNull();
@@ -95,7 +95,7 @@
   public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
   {
     final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
-    delegateCursor = new SequentialDBCursor(null, updateMsg);
+    delegateCursor = new SequentialDBCursor(updateMsg);
 
     final CSN offlineCSN = new CSN(timestamp++, 1, 1);
     final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
@@ -116,7 +116,7 @@
     final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
 
     final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
-    delegateCursor = new SequentialDBCursor(null, updateMsg);
+    delegateCursor = new SequentialDBCursor(updateMsg);
 
     final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
     assertThat(cursor.getRecord()).isNull();
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
index f9d5d78..e0da0d5 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -39,10 +39,16 @@
   private final List<UpdateMsg> msgs;
   private UpdateMsg current;
 
+  /**
+   * A cursor built from a list of update messages.
+   * <p>
+   * This cursor provides a java.sql.ResultSet-like API to be consistent with the
+   * {@code DBCursor} API : it is positioned before the first requested record
+   * and needs to be moved forward by calling {@link DBCursor#next()}.
+   */
   public SequentialDBCursor(UpdateMsg... msgs)
   {
     this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
-    next();
   }
 
   public void add(UpdateMsg msg)

--
Gitblit v1.10.0