From b877a7554a1fa1c47a2982541972efe780dfad9a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 11 Jun 2015 13:53:40 +0000
Subject: [PATCH] OPENDJ-1705 File based changelog: handle concurrency between purge and cursors
---
opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java | 52 ++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 52 insertions(+), 0 deletions(-)
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
index 530f1e5..7b5f4a6 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -36,6 +36,7 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -542,6 +543,56 @@
}
}
+ /**
+ * Similar to testPurge() test but with a concurrent cursor opened before starting the purge.
+ * <p>
+ * For all keys but "key000" the concurrent cursor should be aborted because the corresponding log file
+ * has been purged.
+ */
+ @Test(dataProvider="purgeKeys")
+ public void testPurgeWithConcurrentCursorOpened(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+ int cursorStartIndex, int cursorEndIndex) throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> concurrentCursor = log.getCursor())
+ {
+ concurrentCursor.next();
+ assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+
+ log.purgeUpTo(purgeKey);
+
+ cursor = log.getCursor();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+ assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+
+ // concurrent cursor is expected to be aborted on the next() call for all cases but one
+ assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+ if (purgeKey.equals("key000"))
+ {
+ // in that case no purge has been done, so cursor should not be aborted
+ assertThatCursorCanBeFullyRead(concurrentCursor, cursorStartIndex, cursorEndIndex);
+ }
+ else
+ {
+ // in other cases cursor should be aborted
+ try
+ {
+ concurrentCursor.next();
+ fail("Expected an AbortedChangelogCursorException");
+ }
+ catch (AbortedChangelogCursorException e) {
+ // nothing to do
+ }
+ }
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+ }
+
static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
{
@Override
@@ -629,6 +680,7 @@
final AtomicReference<ChangelogException> exceptionRef)
{
return new Thread() {
+ @Override
public void run()
{
for (int i = 1; i <= 30; i++)
--
Gitblit v1.10.0