From 3ee70c76e7e5c1cfb8c5ce16bc3f50f2306bdee7 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 18 Aug 2014 11:00:35 +0000
Subject: [PATCH] Forward port of checkpoint commit for OPENDJ-1471 File based changelog : improve cursor behavior
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 5 -
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 4 -
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java | 8 ++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 1
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 110 +++++++++++++++++++++++++++++++++++-
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 4 -
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java | 26 +++++---
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java | 6 +-
8 files changed, 136 insertions(+), 28 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
index 05e93cd..6970a9e 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.api;
@@ -32,15 +32,23 @@
* anymore, a cursor must be closed to release all the resources into the
* database.
* <p>
- * Here is a typical usage pattern:
- *
+ * The cursor provides a java.sql.ResultSet like API : it is positioned before
+ * the first requested record and needs to be moved forward by calling
+ * {@link DBCursor#next()}.
+ * <p>
+ * Usage:
* <pre>
- * DBCursor<T> cursor = ...; // obtain a new cursor,
- * // already initialized
- * T record1 = cursor.getRecord(); // get the first record
- * while (cursor.next()) { // advance to the next record
- * T record = cursor.getRecord(); // get the next record
- * ... // etc.
+ * {@code
+ * DBCursor cursor = ...;
+ * try {
+ * while (cursor.next()) {
+ * Record record = cursor.getRecord();
+ * // ... can call cursor.getRecord() again: it will return the same result
+ * }
+ * }
+ * finally {
+ * close(cursor);
+ * }
* }
* </pre>
*
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index d323346..ad2ceb1 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -96,8 +96,6 @@
* replication domain starting after the provided {@link ServerState} for each
* replicaDBs.
* <p>
- * The cursor is already advanced to the records after the serverState.
- * <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
* cursor.
@@ -119,8 +117,6 @@
* Generates a {@link DBCursor} for one replicaDB for the specified
* replication domain and serverId starting after the provided {@link CSN}.
* <p>
- * The cursor is already advanced to the records after the CSN.
- * <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
* cursor.
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 3e2fd32..ea4f86a 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -463,6 +463,7 @@
{
final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ cursor.next();
map.put(serverId, cursor);
return false;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 2ece66c..7e75bf1 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -705,6 +705,7 @@
// get the last already sent CSN from that server to get a cursor
final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
+ replicaDBCursor.next();
final CSN offlineCSN = getOfflineCSN(offlineReplicas, baseDN, serverId, startAfterServerState);
cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
}
@@ -739,9 +740,7 @@
JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startAfterCSN);
- cursor.next();
- return cursor;
+ return replicaDB.generateCursorFrom(startAfterCSN);
}
return EMPTY_CURSOR;
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 3fc8e58..bf5c2da 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -631,10 +631,8 @@
final CSN csn = newestMsg.getCSN();
when(cnIndexDB.getNewestRecord()).thenReturn(
new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
- final SequentialDBCursor cursor =
- cursors.get(Pair.of(baseDN, csn.getServerId()));
+ final SequentialDBCursor cursor = cursors.get(Pair.of(baseDN, csn.getServerId()));
cursor.add(newestMsg);
- cursor.next(); // simulate the cursor had been initialized with this change
}
initialCookie.update(msg.getBaseDN(), msg.getCSN());
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index a2a2644..5f54b10 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -49,6 +49,8 @@
private UpdateMsg msg2;
private UpdateMsg msg3;
private UpdateMsg msg4;
+ private UpdateMsg msg5;
+ private UpdateMsg msg6;
private String baseDN1 = "dc=forgerock,dc=com";
private String baseDN2 = "dc=example,dc=com";
@@ -59,6 +61,8 @@
msg2 = new FakeUpdateMsg(2);
msg3 = new FakeUpdateMsg(3);
msg4 = new FakeUpdateMsg(4);
+ msg5 = new FakeUpdateMsg(5);
+ msg6 = new FakeUpdateMsg(6);
}
@Test
@@ -88,6 +92,17 @@
}
@Test
+ public void threeElementsCursor() throws Exception
+ {
+ final CompositeDBCursor<String> compCursor =
+ newCompositeDBCursor(of(new SequentialDBCursor(msg1, msg2, msg3), baseDN1));
+ assertInOrder(compCursor,
+ of(msg1, baseDN1),
+ of(msg2, baseDN1),
+ of(msg3, baseDN1));
+ }
+
+ @Test
public void twoEmptyCursors() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
@@ -121,7 +136,32 @@
}
@Test
- public void recycleTwoElementCursors() throws Exception
+ public void twoThreeElementCursors() throws Exception
+ {
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg2, msg3, msg6), baseDN1),
+ of(new SequentialDBCursor(msg1, msg4, msg5), baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg2, baseDN1),
+ of(msg3, baseDN1),
+ of(msg4, baseDN2),
+ of(msg5, baseDN2),
+ of(msg6, baseDN1));
+ }
+
+ @Test
+ public void recycleTwoElementsCursor() throws Exception
+ {
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg1, null, msg2), baseDN1));
+ assertNextRecord(compCursor, of(msg1, baseDN1));
+ assertFalse(compCursor.next());
+ assertNextRecord(compCursor, of(msg2, baseDN1));
+ }
+
+ @Test
+ public void recycleTwoElementsCursors() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
of(new SequentialDBCursor(msg2, null, msg4), baseDN1),
@@ -133,6 +173,50 @@
of(msg4, baseDN1));
}
+ // TODO : this test fails because msg2 is returned twice
+ @Test(enabled=false)
+ public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
+ {
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(null, null, msg1), baseDN1),
+ of(new SequentialDBCursor(msg2, msg3, msg4), baseDN2));
+ assertInOrder(compCursor,
+ of(msg2, baseDN2),
+ of(msg1, baseDN1),
+ of(msg3, baseDN2),
+ of(msg4, baseDN2));
+ }
+
+ @Test
+ public void recycleThreeElementsCursors() throws Exception
+ {
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
+ of(new SequentialDBCursor(null, msg1, null, msg4, msg5), baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg2, baseDN1),
+ of(msg3, baseDN1),
+ of(msg4, baseDN2),
+ of(msg5, baseDN2),
+ of(msg6, baseDN1));
+ }
+
+ @Test
+ public void recycleThreeElementsCursorsLongerExhaustion() throws Exception
+ {
+ final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
+ of(new SequentialDBCursor(msg2, msg3, null, msg6), baseDN1),
+ of(new SequentialDBCursor(null, msg1, null, null, msg4, msg5), baseDN2));
+ assertInOrder(compCursor,
+ of(msg1, baseDN2),
+ of(msg2, baseDN1),
+ of(msg3, baseDN1),
+ of(msg4, baseDN2),
+ of(msg5, baseDN2),
+ of(msg6, baseDN1));
+ }
+
private CompositeDBCursor<String> newCompositeDBCursor(
Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
{
@@ -140,6 +224,9 @@
new HashMap<DBCursor<UpdateMsg>, String>();
for (Pair<? extends DBCursor<UpdateMsg>, String> pair : pairs)
{
+ // The cursors in the composite are expected to be pointing
+ // to first record available
+ pair.getFirst().next();
cursorsMap.put(pair.getFirst(), pair.getSecond());
}
return new CompositeDBCursor<String>(cursorsMap, true);
@@ -148,13 +235,26 @@
private void assertInOrder(final CompositeDBCursor<String> compCursor,
Pair<UpdateMsg, String>... expecteds) throws ChangelogException
{
- for (final Pair<UpdateMsg, String> expected : expecteds)
+ for (int i = 0; i < expecteds.length ; i++)
{
- assertTrue(compCursor.next());
- assertSame(compCursor.getRecord(), expected.getFirst());
- assertSame(compCursor.getData(), expected.getSecond());
+ final Pair<UpdateMsg, String> expected = expecteds[i];
+ final String index = " at element i=" + i;
+ assertTrue(compCursor.next(), index);
+ assertSame(compCursor.getRecord(), expected.getFirst(), index);
+ assertSame(compCursor.getData(), expected.getSecond(), index);
}
assertFalse(compCursor.next());
+ assertNull(compCursor.getRecord());
+ assertNull(compCursor.getData());
compCursor.close();
}
+
+ private void assertNextRecord(final CompositeDBCursor<String> compCursor,
+ Pair<UpdateMsg, String> expected) throws ChangelogException
+ {
+ assertTrue(compCursor.next());
+ assertSame(compCursor.getRecord(), expected.getFirst());
+ assertSame(compCursor.getData(), expected.getSecond());
+ }
+
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
index 1737f27..538b6de 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -65,7 +65,7 @@
public void cursorReturnsTrue() throws Exception
{
final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
- delegateCursor = new SequentialDBCursor(null, updateMsg);
+ delegateCursor = new SequentialDBCursor(updateMsg);
final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
assertThat(cursor.getRecord()).isNull();
@@ -95,7 +95,7 @@
public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
{
final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
- delegateCursor = new SequentialDBCursor(null, updateMsg);
+ delegateCursor = new SequentialDBCursor(updateMsg);
final CSN offlineCSN = new CSN(timestamp++, 1, 1);
final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
@@ -116,7 +116,7 @@
final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
- delegateCursor = new SequentialDBCursor(null, updateMsg);
+ delegateCursor = new SequentialDBCursor(updateMsg);
final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
assertThat(cursor.getRecord()).isNull();
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
index f9d5d78..e0da0d5 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/SequentialDBCursor.java
@@ -39,10 +39,16 @@
private final List<UpdateMsg> msgs;
private UpdateMsg current;
+ /**
+ * A cursor built from a list of update messages.
+ * <p>
+ * This cursor provides a java.sql.ResultSet-like API to be consistent with the
+ * {@code DBCursor} API : it is positioned before the first requested record
+ * and needs to be moved forward by calling {@link DBCursor#next()}.
+ */
public SequentialDBCursor(UpdateMsg... msgs)
{
this.msgs = new ArrayList<UpdateMsg>(Arrays.asList(msgs));
- next();
}
public void add(UpdateMsg msg)
--
Gitblit v1.10.0