From 0d0932dc8bc5dc690c3e0f838ecadb03ddb2517b Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 25 Jun 2014 15:43:38 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1471 File based changelog : improve cursor behavior
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java | 8 +
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 41 ++++--
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java | 60 ++++-----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 35 ++---
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java | 26 +---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java | 68 +++++++----
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 63 +++++-----
7 files changed, 150 insertions(+), 151 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
index efe233b..b5e2502 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
@@ -47,9 +47,15 @@
*
* @param cursor
* The underlying cursor to read log.
+ * @throws ChangelogException
+ * If an error occurs.
*/
- FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor) {
+ FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor)
+ throws ChangelogException
+ {
this.cursor = cursor;
+ // cursor is positioned to first record at start
+ next();
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
index e067b6a..2bd7392 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -32,18 +32,28 @@
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
/**
- * A cursor on ReplicaDB.
+ * A cursor on ReplicaDB, which can re-initialize itself after exhaustion.
* <p>
- * This cursor behaves specially in two ways :
- * <ul>
- * <li>The cursor initially points to a {@code null} value: the
- * {@code getRecord()} method return {@code null} if called before any call to
- * {@code next()} method.</li>
- * <li>The cursor automatically re-initializes itself if it is exhausted: when
- * exhausted, the cursor re-position itself to the last non null CSN previously
- * read.
- * <li>
- * </ul>
+ * The cursor provides a java.sql.ResultSet like API :
+ * <pre>
+ * {@code
+ * FileReplicaDBCursor cursor = ...;
+ * try {
+ * while (cursor.next()) {
+ * Record record = cursor.getRecord();
+ * // ... can call cursor.getRecord() again: it will return the same result
+ * }
+ * }
+ * finally {
+ * close(cursor);
+ * }
+ * }
+ * </pre>
+ * <p>
+ * The cursor automatically re-initializes itself if it is exhausted: if a
+ * record is newly available, a subsequent call to the {@code next()} method will
+ * return {@code true} and the record will be available by calling {@code getRecord()}
+ * method.
*/
class FileReplicaDBCursor implements DBCursor<UpdateMsg>
{
@@ -54,7 +64,7 @@
/** The next record to return. */
private Record<CSN, UpdateMsg> nextRecord;
- /** The CSN to re-start with in case the cursor is exhausted. */
+ /** The CSN to re-start with in case the cursor is exhausted. */
private CSN lastNonNullCurrentCSN;
/**
@@ -82,25 +92,33 @@
@Override
public boolean next() throws ChangelogException
{
- nextRecord = cursor.getRecord();
- if (nextRecord != null)
+ if (cursor.next())
{
+ nextRecord = cursor.getRecord();
+ if (nextRecord.getKey().compareTo(lastNonNullCurrentCSN) > 0)
+ {
+ lastNonNullCurrentCSN = nextRecord.getKey();
+ return true;
+ }
+ }
+ return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
+ }
+
+ /** Re-initialize the cursor after the last non null CSN. */
+ private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
+ {
+ final boolean found = cursor.positionTo(lastNonNullCurrentCSN, true);
+ if (found && cursor.next())
+ {
+ nextRecord = cursor.getRecord();
lastNonNullCurrentCSN = nextRecord.getKey();
+ return true;
}
else
{
- // Exhausted cursor must be able to reinitialize itself
- cursor.positionTo(lastNonNullCurrentCSN, true);
-
- nextRecord = cursor.getRecord();
- if (nextRecord != null)
- {
- lastNonNullCurrentCSN = nextRecord.getKey();
- }
+ nextRecord = null;
+ return false;
}
- // the underlying cursor is one record in advance
- cursor.next();
- return nextRecord != null;
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index a987233..c4dc1ca 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -424,10 +424,9 @@
* Returns a cursor that allows to retrieve the records from this log,
* starting at the first position.
* <p>
- * The returned cursor initially points to record corresponding to the first
- * key, that is {@code cursor.getRecord()} is equals to the record
- * corresponding to the first key before any call to {@code cursor.next()}
- * method.
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals to {@code null} before any call to
+ * {@code cursor.next()} method.
*
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
@@ -463,9 +462,9 @@
* Returns a cursor that allows to retrieve the records from this log,
* starting at the position defined by the provided key.
* <p>
- * The returned cursor initially points to record corresponding to the key,
- * that is {@code cursor.getRecord()} is equals to the record corresponding to
- * the key before any call to {@code cursor.next()} method.
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals to {@code null} before any call to
+ * {@code cursor.next()} method.
*
* @param key
* Key to use as a start position for the cursor. If key is
@@ -484,10 +483,10 @@
* starting at the position defined by the smallest key that is higher than
* the provided key.
* <p>
- * The returned cursor initially points to record corresponding to the key
- * found, that is {@code cursor.getRecord()} is equals to the record
- * corresponding to the key found before any call to {@code cursor.next()}
- * method.
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals to {@code null} before any call to
+ * {@code cursor.next()} method. After the first call to {@code cursor.next()}
+ * the cursor points to the record corresponding to the key found.
*
* @param key
* Key to use as a start position for the cursor. If key is
@@ -1027,7 +1026,7 @@
if (logFile != null)
{
switchToLogFile(logFile);
- return true;
+ return currentCursor.next();
}
return false;
}
@@ -1069,17 +1068,7 @@
{
switchToLogFile(logFile);
}
- if (key != null)
- {
- boolean isFound = currentCursor.positionTo(key, findNearest);
- if (isFound && getRecord() == null && !log.isHeadLogFile(currentLogFile))
- {
- // The key to position is probably in the next file, force the switch
- isFound = next();
- }
- return isFound;
- }
- return true;
+ return (key == null) ? true : currentCursor.positionTo(key, findNearest);
}
finally
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index 3b91bfa..c27d38d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -313,10 +313,9 @@
* Returns a cursor that allows to retrieve the records from this log,
* starting at the first position.
* <p>
- * The returned cursor initially points to record corresponding to the first
- * key, that is {@code cursor.getRecord()} is equals to the record
- * corresponding to the first key before any call to {@code cursor.next()}
- * method.
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals {@code null} before any call to
+ * {@code cursor.next()} method.
*
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
@@ -331,9 +330,10 @@
* Returns a cursor that allows to retrieve the records from this log,
* starting at the position defined by the provided key.
* <p>
- * The returned cursor initially points to record corresponding to the key,
- * that is {@code cursor.getRecord()} is equals to the record corresponding to
- * the key before any call to {@code cursor.next()} method.
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals to {@code null} before any call to
+ * {@code cursor.next()} method. After the first call to {@code cursor.next()}
+ * the cursor points to the record corresponding to the key.
*
* @param key
* Key to use as a start position for the cursor. If key is
@@ -352,10 +352,10 @@
* starting at the position defined by the smallest key that is higher than
* the provided key.
* <p>
- * The returned cursor initially points to record corresponding to the key
- * found, that is {@code cursor.getRecord()} is equals to the record
- * corresponding to the key found before any call to {@code cursor.next()}
- * method.
+ * The returned cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals to {@code null} before any call to
+ * {@code cursor.next()} method. After the first call to {@code cursor.next()}
+ * the cursor points to the record corresponding to the key found.
*
* @param key
* Key to use as a start position for the cursor. If key is
@@ -421,7 +421,7 @@
try
{
cursor = getCursor();
- return cursor.getRecord();
+ return cursor.next() ? cursor.getRecord() : null;
}
finally
{
@@ -443,7 +443,7 @@
try
{
cursor = getCursor();
- Record<K, V> record = cursor.getRecord();
+ Record<K, V> record = null;
while (cursor.next())
{
record = cursor.getRecord();
@@ -470,15 +470,9 @@
try
{
cursor = getCursor();
- Record<K, V> record = cursor.getRecord();
- if (record == null)
- {
- return 0L;
- }
- long counter = 1L;
+ long counter = 0L;
while (cursor.next())
{
- record = cursor.getRecord();
counter++;
}
return counter;
@@ -580,8 +574,8 @@
/**
* Implements a repositionable cursor on the log file.
* <p>
- * The cursor initially points to a record, that is {@code cursor.getRecord()}
- * is equals to the first record available from the cursor before any call to
+ * The cursor initially points to no record, that is
+ * {@code cursor.getRecord()} is equals to {@code null} before any call to
* {@code cursor.next()} method.
*/
static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
@@ -596,6 +590,12 @@
private Record<K,V> currentRecord;
/**
+ * The initial record when starting from a given key. It must be
+ * stored because it is read in advance.
+ */
+ private Record<K,V> initialRecord;
+
+ /**
* Creates a cursor on the provided log.
*
* @param logFile
@@ -607,16 +607,6 @@
{
this.logFile = logFile;
this.reader = logFile.getReader();
- try
- {
- // position to the first record.
- next();
- }
- catch (ChangelogException e)
- {
- close();
- throw e;
- }
}
/**
@@ -638,6 +628,13 @@
@Override
public boolean next() throws ChangelogException
{
+ if (initialRecord != null)
+ {
+ // initial record is used only once
+ currentRecord = initialRecord;
+ initialRecord = null;
+ return true;
+ }
currentRecord = reader.readRecord();
return currentRecord != null;
}
@@ -654,7 +651,7 @@
public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
final boolean found = result.getFirst();
- currentRecord = found ? result.getSecond() : null;
+ initialRecord = found ? result.getSecond() : null;
return found;
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index e70966b..e08e6d2 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -217,8 +217,19 @@
}
}
- @Test
- public void testGenerateCursorFromWithCursorReinitialization() throws Exception
+ @DataProvider
+ Object[][] dataForTestsWithCursorReinitialization()
+ {
+ return new Object[][] {
+ // the index to use in CSN array for the start key of the cursor
+ { 0 },
+ { 1 },
+ { 4 },
+ };
+ }
+
+ @Test(dataProvider="dataForTestsWithCursorReinitialization")
+ public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception
{
ReplicationServer replicationServer = null;
DBCursor<UpdateMsg> cursor = null;
@@ -231,27 +242,25 @@
CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
- cursor = replicaDB.generateCursorFrom(csns[0]);
+ cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey]);
assertFalse(cursor.next());
- for (int i = 0; i < 5; i++)
+ int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
+ for (int i : indicesToAdd)
{
- if (i != 3)
- {
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
- }
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
}
waitChangesArePersisted(replicaDB, 4);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[1]);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[2]);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[4]);
+ for (int i = csnIndexForStartKey+1; i < 5; i++)
+ {
+ if (i != 3)
+ {
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i);
+ }
+ }
assertFalse(cursor.next());
- StaticUtils.close(cursor);
-
}
finally
{
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
index 83292b4..47e574d 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -104,8 +104,7 @@
try {
cursor = changelog.getCursor();
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyRead(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, changelog);
@@ -120,8 +119,7 @@
try {
cursor = changelog.getCursor("key05");
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
- assertThatCursorCanBeFullyRead(cursor, 6, 10);
+ assertThatCursorCanBeFullyRead(cursor, 5, 10);
}
finally {
StaticUtils.close(cursor, changelog);
@@ -153,9 +151,7 @@
try {
cursor = changelog.getCursor(null);
- // should start from start
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyRead(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, changelog);
@@ -170,9 +166,7 @@
try {
cursor = changelog.getNearestCursor("key01");
- // lowest higher key is key2
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key02", "value2"));
- assertThatCursorCanBeFullyRead(cursor, 3, 10);
+ assertThatCursorCanBeFullyRead(cursor, 2, 10);
}
finally {
StaticUtils.close(cursor, changelog);
@@ -187,9 +181,7 @@
try {
cursor = changelog.getNearestCursor("key00");
- // lowest higher key is key1
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyRead(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, changelog);
@@ -204,9 +196,7 @@
try {
cursor = changelog.getNearestCursor(null);
- // should start from start
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyRead(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, changelog);
@@ -296,8 +286,7 @@
// ensure log can be fully read including the new record
cursor = logFile.getCursor("key05");
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
- assertThatCursorCanBeFullyRead(cursor, 6, 11);
+ assertThatCursorCanBeFullyRead(cursor, 5, 11);
}
finally
{
@@ -359,6 +348,7 @@
private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
throws Exception
{
+ assertThat(cursor.getRecord()).isNull();
for (int i = fromIndex; i <= endIndex; i++)
{
assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
index 72bf1b6..1824aaa 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -88,8 +88,7 @@
try {
cursor = log.getCursor();
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, log);
@@ -104,8 +103,7 @@
try {
cursor = log.getCursor("key005");
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key005", "value5"));
- assertThatCursorCanBeFullyRead(cursor, 6, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10);
}
finally {
StaticUtils.close(cursor, log);
@@ -138,9 +136,7 @@
try {
cursor = log.getCursor(null);
- // should start from first record
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, log);
@@ -155,20 +151,14 @@
try {
// this key is the first key of the log file "key1_key2.log"
cursor1 = log.getNearestCursor("key001");
- // lowest higher key is key2
- assertThat(cursor1.getRecord()).isEqualTo(Record.from("key002", "value2"));
- assertThatCursorCanBeFullyRead(cursor1, 3, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor1, 2, 10);
// this key is the last key of the log file "key3_key4.log"
cursor2 = log.getNearestCursor("key004");
- // lowest higher key is key5
- assertThat(cursor2.getRecord()).isEqualTo(Record.from("key005", "value5"));
- assertThatCursorCanBeFullyRead(cursor2, 6, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor2, 5, 10);
cursor3 = log.getNearestCursor("key009");
- // lowest higher key is key10
- assertThat(cursor3.getRecord()).isEqualTo(Record.from("key010", "value10"));
- assertThatCursorIsExhausted(cursor3);
+ assertThatCursorCanBeFullyReadFromStart(cursor3, 10, 10);
}
finally {
StaticUtils.close(cursor1, cursor2, cursor3, log);
@@ -200,9 +190,7 @@
// key is between key005 and key006
cursor = log.getNearestCursor("key005000");
- // lowest higher key is key006
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key006", "value6"));
- assertThatCursorCanBeFullyRead(cursor, 7, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor, 6, 10);
}
finally {
StaticUtils.close(cursor, log);
@@ -217,9 +205,7 @@
try {
cursor = log.getNearestCursor(null);
- // should start from start
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
- assertThatCursorCanBeFullyRead(cursor, 2, 10);
+ assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
}
finally {
StaticUtils.close(cursor, log);
@@ -324,7 +310,7 @@
}
writeLog1.syncToFileSystem();
cursor = writeLog1.getCursor("key020");
- for (int i = 1; i <= 60; i++)
+ for (int i = 1; i <= 61; i++)
{
assertThat(cursor.next()).isTrue();
}
@@ -370,7 +356,7 @@
log = openLog(LogFileTest.RECORD_PARSER);
cursor = log.getCursor();
// advance cursor to last record to ensure it is pointing to ahead log file
- advanceCursorFromFirstRecordTo(cursor, 10);
+ advanceCursorUpTo(cursor, 1, 10);
// add new records to ensure the ahead log file is rotated
for (int i = 11; i <= 20; i++)
@@ -396,13 +382,13 @@
{
log = openLog(LogFileTest.RECORD_PARSER);
cursor1 = log.getCursor();
- advanceCursorFromFirstRecordTo(cursor1, 1);
+ advanceCursorUpTo(cursor1, 1, 1);
cursor2 = log.getCursor();
- advanceCursorFromFirstRecordTo(cursor2, 4);
+ advanceCursorUpTo(cursor2, 1, 4);
cursor3 = log.getCursor();
- advanceCursorFromFirstRecordTo(cursor3, 9);
+ advanceCursorUpTo(cursor3, 1, 9);
cursor4 = log.getCursor();
- advanceCursorFromFirstRecordTo(cursor4, 10);
+ advanceCursorUpTo(cursor4, 1, 10);
// add new records to ensure the ahead log file is rotated
for (int i = 11; i <= 20; i++)
@@ -503,6 +489,7 @@
log.purgeUpTo(purgeKey);
cursor = log.getCursor();
+ assertThat(cursor.next()).isTrue();
assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
}
@@ -512,13 +499,6 @@
}
}
- private void advanceCursorFromFirstRecordTo(DBCursor<Record<String, String>> cursor, int endIndex)
- throws Exception
- {
- assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
- advanceCursorUpTo(cursor, 2, endIndex);
- }
-
private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
throws Exception
{
@@ -540,6 +520,16 @@
assertThatCursorIsExhausted(cursor);
}
+ /**
+ * Read the cursor until exhaustion, beginning at start of cursor.
+ */
+ private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ assertThat(cursor.getRecord()).isNull();
+ assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex);
+ }
+
private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
{
assertThat(cursor.next()).isFalse();
--
Gitblit v1.10.0