| | |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.AbortedChangelogCursorException; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Similar to testPurge() test but with a concurrent cursor opened before starting the purge. |
| | | * <p> |
| | | * For all keys but "key000" the concurrent cursor should be aborted because the corresponding log file |
| | | * has been purged. |
| | | */ |
| | | @Test(dataProvider="purgeKeys") |
| | | public void testPurgeWithConcurrentCursorOpened(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge, |
| | | int cursorStartIndex, int cursorEndIndex) throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> concurrentCursor = log.getCursor()) |
| | | { |
| | | concurrentCursor.next(); |
| | | assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1")); |
| | | |
| | | log.purgeUpTo(purgeKey); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | |
| | | // concurrent cursor is expected to be aborted on the next() call for all cases but one |
| | | assertThat(concurrentCursor.getRecord()).isEqualTo(Record.from("key001", "value1")); |
| | | if (purgeKey.equals("key000")) |
| | | { |
| | | // in that case no purge has been done, so cursor should not be aborted |
| | | assertThatCursorCanBeFullyRead(concurrentCursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | else |
| | | { |
| | | // in other cases cursor should be aborted |
| | | try |
| | | { |
| | | concurrentCursor.next(); |
| | | fail("Expected an AbortedChangelogCursorException"); |
| | | } |
| | | catch (AbortedChangelogCursorException e) { |
| | | // nothing to do |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | static final Mapper<String, Integer> MAPPER = new Record.Mapper<String, Integer>() |
| | | { |
| | | @Override |
| | |
| | | final AtomicReference<ChangelogException> exceptionRef) |
| | | { |
| | | return new Thread() { |
| | | @Override |
| | | public void run() |
| | | { |
| | | for (int i = 1; i <= 30; i++) |