Used try-with-resources as much as possible
| | |
| | | public class BlockLogReaderWriterTest extends DirectoryServerTestCase |
| | | { |
| | | private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | private static final File TEST_FILE = new File(TEST_DIRECTORY, "file"); |
| | | |
| | | private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser(); |
| | | |
| | | private static final int INT_RECORD_SIZE = 12; |
| | | |
| | | @BeforeClass |
| | |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | try (BlockLogReader<Integer, Integer> reader = newReader(blockSize)) |
| | | { |
| | | reader = newReader(blockSize); |
| | | for (int i = 0; i < records.size(); i++) |
| | | { |
| | | Record<Integer, Integer> record = reader.readRecord(); |
| | |
| | | assertThat(reader.readRecord()).isNull(); |
| | | assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "recordsForSeek") |
| | |
| | | { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | |
| | | }; |
| | | |
| | | // For each test case, do a test with various block sizes to ensure algorithm is not broken |
| | |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | try (BlockLogReader<Integer, Integer> reader = newReader(blockSize)) |
| | | { |
| | | reader = newReader(blockSize); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchingStrategy, positionStrategy); |
| | | |
| | | assertThat(result.getFirst()).isEqualTo(shouldBeFound); |
| | | assertThat(result.getSecond()).isEqualTo(expectedRecord); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | |
| | | int blockSize = 20; |
| | | writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | try (BlockLogReader<Integer, Integer> reader = newReader(blockSize)) |
| | | { |
| | | reader = newReader(blockSize); |
| | | |
| | | assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0); |
| | | assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0); |
| | | assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20); |
| | |
| | | assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280); |
| | | assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | |
| | | int blockSize = 256; |
| | | |
| | | writeRecordsToReachFileSize(blockSize, fileSizeInBytes); |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | try (BlockLogReader<Integer, Integer> reader = newReader(blockSize)) |
| | | { |
| | | reader = newReader(blockSize); |
| | | List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek); |
| | | System.out.println("File size: " + TEST_FILE.length() + " bytes"); |
| | | |
| | |
| | | System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds"); |
| | | System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds"); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | /** Write provided records with the provided block size. */ |
| | | private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException |
| | | { |
| | | BlockLogWriter<Integer, Integer> writer = null; |
| | | try |
| | | try (BlockLogWriter<Integer, Integer> writer = newWriter(blockSize)) |
| | | { |
| | | writer = newWriter(blockSize); |
| | | for (Record<Integer, Integer> record : records) |
| | | { |
| | | writer.write(record); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | } |
| | | |
| | | /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */ |
| | |
| | | return String.valueOf(key); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Integer getMaxKey() |
| | | { |
| | | return Integer.MAX_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | |
| | | public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | |
| | | |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], |
| | | GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | |
| | | int[] indicesToAdd = new int[] { 0, 1, 2, 4 }; |
| | | for (int i : indicesToAdd) |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | csns[csnIndexForStartKey], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY)) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | } |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | assertFalse(cursor.next()); |
| | | |
| | | for (int i = csnIndexForStartKey+1; i < 5; i++) |
| | | { |
| | | if (i != 3) |
| | | int[] indicesToAdd = { 0, 1, 2, 4 }; |
| | | for (int i : indicesToAdd) |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | } |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | for (int i = csnIndexForStartKey+1; i < 5; i++) |
| | | { |
| | | if (i != 3) |
| | | { |
| | | final String indexNbMsg = "index i=" + i; |
| | | assertTrue(cursor.next(), indexNbMsg); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[i], indexNbMsg); |
| | | } |
| | | } |
| | | assertFalse(cursor.next()); |
| | | } |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | |
| | | final PositionStrategy positionStrategy, final CSN expectedCSN) |
| | | throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy)) |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy)) |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | private void assertFoundInOrder(FileReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy)) |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | | |
| | |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | public class LogFileTest extends DirectoryServerTestCase |
| | | { |
| | | private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME); |
| | | |
| | | static final StringRecordParser RECORD_PARSER = new StringRecordParser(); |
| | | |
| | | @BeforeClass |
| | |
| | | TEST_DIRECTORY.mkdirs(); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | /** |
| | | * Create a new log file with ten records starting from (key01, value1) until (key10, value10). |
| | | * So log contains keys "key01", "key02", ..., "key10" |
| | | */ |
| | | @BeforeMethod |
| | | public void initialize() throws Exception |
| | | { |
| | | if (TEST_LOG_FILE.exists()) |
| | | { |
| | | TEST_LOG_FILE.delete(); |
| | | } |
| | | LogFile<String, String> logFile = null; |
| | | try |
| | | try (LogFile<String, String> logFile = getLogFile(RECORD_PARSER)) |
| | | { |
| | | logFile = getLogFile(RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | logFile.append(Record.from(String.format("key%02d", i), "value"+i)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(logFile); |
| | | } |
| | | } |
| | | |
| | | @AfterClass |
| | |
| | | @Test |
| | | public void testCursor() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor(); |
| | | |
| | | try (LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = changelog.getCursor()) |
| | | { |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | |
| | | public void testCursorPositionTo(String key, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, |
| | | boolean positionShouldBeFound, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | LogFileCursor<String, String> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor(); |
| | | try (LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | LogFileCursor<String, String> cursor = changelog.getCursor()) |
| | | { |
| | | boolean success = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | |
| | | assertThat(success).isEqualTo(positionShouldBeFound); |
| | |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | try |
| | | try (LogFile<String, String> changelog = getLogFile(RECORD_PARSER)) |
| | | { |
| | | Record<String, String> record = changelog.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key01", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetNewestRecord() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | try |
| | | try (LogFile<String, String> changelog = getLogFile(RECORD_PARSER)) |
| | | { |
| | | Record<String, String> record = changelog.getNewestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key10", "value10")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(changelog); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "corruptedRecordData") |
| | |
| | | @SuppressWarnings("unused") int unusedId, |
| | | ByteStringBuilder corruptedRecordData) throws Exception |
| | | { |
| | | LogFile<String, String> logFile = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | corruptTestLogFile(corruptedRecordData); |
| | | |
| | | // open the log file: the file should be repaired at this point |
| | | try (LogFile<String, String> logFile = getLogFile(RECORD_PARSER)) |
| | | { |
| | | corruptTestLogFile(corruptedRecordData); |
| | | |
| | | // open the log file: the file should be repaired at this point |
| | | logFile = getLogFile(RECORD_PARSER); |
| | | |
| | | // write a new valid record |
| | | logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11)); |
| | | |
| | | // ensure log can be fully read including the new record |
| | | cursor = logFile.getCursor(); |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 11); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(logFile); |
| | | try (DBCursor<Record<String, String>> cursor = logFile.getCursor()) |
| | | { |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 11); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log |
| | | * file. |
| | | */ |
| | | /** Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log file. */ |
| | | private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception |
| | | { |
| | | RandomAccessFile output = null; |
| | | try { |
| | | output = new RandomAccessFile(TEST_LOG_FILE, "rwd"); |
| | | try (RandomAccessFile output = new RandomAccessFile(TEST_LOG_FILE, "rwd")) |
| | | { |
| | | output.seek(output.length()); |
| | | output.write(corruptedRecordData.toByteArray()); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(output); |
| | | } |
| | | } |
| | | |
| | | /** Test that changes are visible immediately to a reader after a write. */ |
| | | @Test |
| | | /** |
| | | * Test that changes are visible immediately to a reader after a write. |
| | | */ |
| | | public void testWriteAndReadOnSameLogFile() throws Exception |
| | | { |
| | | LogFile<String, String> writeLog = null; |
| | | LogFile<String, String> readLog = null; |
| | | try |
| | | try (LogFile<String, String> writeLog = getLogFile(RECORD_PARSER); |
| | | LogFile<String, String> readLog = getLogFile(RECORD_PARSER)) |
| | | { |
| | | writeLog = getLogFile(RECORD_PARSER); |
| | | readLog = getLogFile(RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 100; i++) |
| | | { |
| | | Record<String, String> record = Record.from("newkey" + i, "newvalue" + i); |
| | |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog, readLog); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | private static final byte STRING_SEPARATOR = 0; |
| | | |
| | | @Override |
| | | public Record<String, String> decodeRecord(final ByteString data) throws DecodingException |
| | | { |
| | | ByteSequenceReader reader = data.asReader(); |
| | |
| | | return length; |
| | | } |
| | | |
| | | @Override |
| | | public ByteString encodeRecord(Record<String, String> record) |
| | | { |
| | | return new ByteStringBuilder() |
| | |
| | | return key; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMaxKey() |
| | | { |
| | |
| | | failToRead = shouldFail; |
| | | } |
| | | } |
| | | |
| | | } |
| | |
| | | @Test |
| | | public void testCursor() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnExistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor("key005"); |
| | | |
| | | try (Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor("key005")) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getCursor("key005000"); |
| | | |
| | | try (Log<String, String> log = openLog(RECORD_PARSER); |
| | | // key is between key005 and key006 |
| | | DBCursor<Record<String, String>> cursor = log.getCursor("key005000")) |
| | | { |
| | | assertThat(cursor).isNotNull(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isFalse(); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor(null)) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | |
| | | public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(key, matchingStrategy, positionStrategy); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor(key, matchingStrategy, positionStrategy)) |
| | | { |
| | | if (cursorShouldStartAt != -1) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt); |
| | |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null, null, null); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor(null, null, null)) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testCursorWhenParserFailsToRead() throws Exception |
| | | { |
| | | FailingStringRecordParser parser = new FailingStringRecordParser(); |
| | | Log<String, String> log = openLog(parser); |
| | | parser.setFailToRead(true); |
| | | try { |
| | | try (Log<String, String> log = openLog(parser)) |
| | | { |
| | | parser.setFailToRead(true); |
| | | log.getCursor("key"); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | Record<String, String> record = log.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetNewestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | Record<String, String> record = log.getNewestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key010", "value10")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test that changes are visible immediately to a reader after a write. |
| | | */ |
| | | /** Test that changes are visible immediately to a reader after a write. */ |
| | | @Test |
| | | public void testWriteAndReadOnSameLog() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | Log<String, String> readLog = null; |
| | | try |
| | | try (Log<String, String> writeLog = openLog(LogFileTest.RECORD_PARSER); |
| | | Log<String, String> readLog = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | writeLog = openLog(LogFileTest.RECORD_PARSER); |
| | | readLog = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i); |
| | |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog, readLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testTwoConcurrentWrite() throws Exception |
| | | { |
| | | Log<String, String> writeLog1 = null; |
| | | Log<String, String> writeLog2 = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | try (Log<String, String> writeLog1 = openLog(LogFileTest.RECORD_PARSER); |
| | | Log<String, String> writeLog2 = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | writeLog1 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog2 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog1.append(Record.from("key020", "starting record")); |
| | | AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>(); |
| | | Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef); |
| | |
| | | throw exceptionRef.get(); |
| | | } |
| | | writeLog1.syncToFileSystem(); |
| | | cursor = writeLog1.getCursor("key020"); |
| | | for (int i = 1; i <= 61; i++) |
| | | |
| | | try (DBCursor<Record<String, String>> cursor = writeLog1.getCursor("key020")) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | for (int i = 1; i <= 61; i++) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30")); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30")); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, writeLog1, writeLog2); |
| | | } |
| | | } |
| | | |
| | |
| | | @Test(enabled=false) |
| | | public void logWriteSpeed() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | try |
| | | { |
| | | long sizeOf10MB = 10*1024*1024; |
| | | final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION, |
| | | NO_TIME_BASED_LOG_ROTATION); |
| | | final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class); |
| | | writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams); |
| | | long sizeOf10MB = 10 * 1024 * 1024; |
| | | final LogRotationParameters rotationParams = new LogRotationParameters( |
| | | sizeOf10MB, NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION); |
| | | final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class); |
| | | |
| | | try (Log<String, String> writeLog = |
| | | Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams)) |
| | | { |
| | | for (int i = 1; i < 1000000; i++) |
| | | { |
| | | writeLog.append(Record.from(String.format("key%010d", i), "value" + i)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | // advance cursor to last record to ensure it is pointing to ahead log file |
| | | advanceCursorUpTo(cursor, 1, 10); |
| | | |
| | |
| | | // check that cursor can fully read the new records |
| | | assertThatCursorCanBeFullyRead(cursor, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor1 = log.getCursor(); |
| | | DBCursor<Record<String, String>> cursor2 = log.getCursor(); |
| | | DBCursor<Record<String, String>> cursor3 = log.getCursor(); |
| | | DBCursor<Record<String, String>> cursor4 = log.getCursor()) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor1 = log.getCursor(); |
| | | advanceCursorUpTo(cursor1, 1, 1); |
| | | cursor2 = log.getCursor(); |
| | | advanceCursorUpTo(cursor2, 1, 4); |
| | | cursor3 = log.getCursor(); |
| | | advanceCursorUpTo(cursor3, 1, 9); |
| | | cursor4 = log.getCursor(); |
| | | advanceCursorUpTo(cursor4, 1, 10); |
| | | |
| | | // add new records to ensure the ahead log file is rotated |
| | |
| | | assertThatCursorCanBeFullyRead(cursor3, 10, 20); |
| | | assertThatCursorCanBeFullyRead(cursor4, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testClear() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | log.clear(); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | try (DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | @Test(enabled=false, expectedExceptions=ChangelogException.class) |
| | | public void testClearWhenCursorIsOpened() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | log.clear(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "purgeKeys") |
| | |
| | | public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge, |
| | | int cursorStartIndex, int cursorEndIndex) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | log.purgeUpTo(purgeKey); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | try (DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | @Test(dataProvider = "findBoundaryKeyData") |
| | | public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, beginning at start of cursor. |
| | | */ |
| | | /** Read the cursor until exhaustion, beginning at start of cursor. */ |
| | | private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, |
| | | int endIndex) throws Exception |
| | | { |
| | |
| | | } |
| | | }; |
| | | } |
| | | |
| | | } |
| | |
| | | import java.util.List; |
| | | |
| | | import org.assertj.core.api.SoftAssertions; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Test the JEReplicaDB class. |
| | | */ |
| | | /** Test the JEReplicaDB class. */ |
| | | @SuppressWarnings("javadoc") |
| | | public class JEReplicaDBTest extends ReplicationTestCase |
| | | { |
| | |
| | | public void testGenerateCursor(CSN[] csns, CSN startCsn, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, int startIndex, int endIndex) throws Exception |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | { |
| | | if (replicationServer == null) |
| | |
| | | } |
| | | if (csns == null) |
| | | { |
| | | return; // stop line, time to clean replication artefacts |
| | | return; // stop line, time to clean replication artifacts |
| | | } |
| | | |
| | | cursor = replicaDB.generateCursorFrom(startCsn, matchingStrategy, positionStrategy); |
| | | if (startIndex != -1) |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCsn, matchingStrategy, positionStrategy)) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, csns, startIndex, endIndex); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | if (startIndex != -1) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, csns, startIndex, endIndex); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | if (csns == null) |
| | | { |
| | | // stop line, stop and remove replication |
| | |
| | | private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy)) |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | try (DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy)) |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | | |
| | |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | } |