From b877a7554a1fa1c47a2982541972efe780dfad9a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 11 Jun 2015 13:53:40 +0000
Subject: [PATCH] OPENDJ-1705 File based changelog: handle concurrency between purge and cursors

---
 opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java |   52 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 52 insertions(+), 0 deletions(-)

diff --git a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
index 530f1e5..7b5f4a6 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -36,6 +36,7 @@
 
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
@@ -542,6 +543,56 @@
     }
   }
 
+  /**
+   * Similar to testPurge() test but with a concurrent cursor opened before starting the purge.
+   * <p>
+   * For all keys but "key000" the concurrent cursor should be aborted because the corresponding log file
+   * has been purged.
+   */
+  @Test(dataProvider="purgeKeys")
+  public void testPurgeWithConcurrentCursorOpened(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+      int cursorStartIndex, int cursorEndIndex) throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+        DBCursor<Record<String, String>> concurrentCursor = log.getCursor())
+    {
+      concurrentCursor.next();
+      assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+
+      log.purgeUpTo(purgeKey);
+
+      cursor = log.getCursor();
+      assertThat(cursor.next()).isTrue();
+      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+
+      // concurrent cursor is expected to be aborted on the next() call for all cases but one
+      assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+      if (purgeKey.equals("key000"))
+      {
+        // in that case no purge has been done, so cursor should not be aborted
+        assertThatCursorCanBeFullyRead(concurrentCursor, cursorStartIndex, cursorEndIndex);
+      }
+      else
+      {
+        // in other cases cursor should be aborted
+        try
+        {
+          concurrentCursor.next();
+          fail("Expected an AbortedChangelogCursorException");
+        }
+        catch (AbortedChangelogCursorException e) {
+          // nothing to do
+        }
+      }
+    }
+    finally
+    {
+      StaticUtils.close(cursor);
+    }
+  }
+
   static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>()
       {
         @Override
@@ -629,6 +680,7 @@
       final AtomicReference<ChangelogException> exceptionRef)
   {
     return new Thread() {
+      @Override
       public void run()
       {
         for (int i = 1; i <= 30; i++)

--
Gitblit v1.10.0