| | |
| | | @Test |
| | | public void testCursor() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnExistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor("key005"); |
| | | |
| | | try (Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor("key005")) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getCursor("key005000"); |
| | | |
| | | try (Log<String, String> log = openLog(RECORD_PARSER); |
| | | // key is between key005 and key006 |
| | | DBCursor<Record<String, String>> cursor = log.getCursor("key005000")) |
| | | { |
| | | assertThat(cursor).isNotNull(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isFalse(); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor(null)) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | |
| | | public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(key, matchingStrategy, positionStrategy); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor(key, matchingStrategy, positionStrategy)) |
| | | { |
| | | if (cursorShouldStartAt != -1) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt); |
| | |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null, null, null); |
| | | |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor(null, null, null)) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testCursorWhenParserFailsToRead() throws Exception |
| | | { |
| | | FailingStringRecordParser parser = new FailingStringRecordParser(); |
| | | Log<String, String> log = openLog(parser); |
| | | parser.setFailToRead(true); |
| | | try { |
| | | try (Log<String, String> log = openLog(parser)) |
| | | { |
| | | parser.setFailToRead(true); |
| | | log.getCursor("key"); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | Record<String, String> record = log.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetNewestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | Record<String, String> record = log.getNewestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key010", "value10")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test that changes are visible immediately to a reader after a write. |
| | | */ |
| | | /** Test that changes are visible immediately to a reader after a write. */ |
| | | @Test |
| | | public void testWriteAndReadOnSameLog() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | Log<String, String> readLog = null; |
| | | try |
| | | try (Log<String, String> writeLog = openLog(LogFileTest.RECORD_PARSER); |
| | | Log<String, String> readLog = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | writeLog = openLog(LogFileTest.RECORD_PARSER); |
| | | readLog = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i); |
| | |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog, readLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testTwoConcurrentWrite() throws Exception |
| | | { |
| | | Log<String, String> writeLog1 = null; |
| | | Log<String, String> writeLog2 = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | try (Log<String, String> writeLog1 = openLog(LogFileTest.RECORD_PARSER); |
| | | Log<String, String> writeLog2 = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | writeLog1 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog2 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog1.append(Record.from("key020", "starting record")); |
| | | AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>(); |
| | | Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef); |
| | |
| | | throw exceptionRef.get(); |
| | | } |
| | | writeLog1.syncToFileSystem(); |
| | | cursor = writeLog1.getCursor("key020"); |
| | | for (int i = 1; i <= 61; i++) |
| | | |
| | | try (DBCursor<Record<String, String>> cursor = writeLog1.getCursor("key020")) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | for (int i = 1; i <= 61; i++) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30")); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30")); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, writeLog1, writeLog2); |
| | | } |
| | | } |
| | | |
| | |
| | | @Test(enabled=false) |
| | | public void logWriteSpeed() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | try |
| | | { |
| | | long sizeOf10MB = 10*1024*1024; |
| | | final LogRotationParameters rotationParams = new LogRotationParameters(sizeOf10MB, NO_TIME_BASED_LOG_ROTATION, |
| | | NO_TIME_BASED_LOG_ROTATION); |
| | | final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class); |
| | | writeLog = Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams); |
| | | long sizeOf10MB = 10 * 1024 * 1024; |
| | | final LogRotationParameters rotationParams = new LogRotationParameters( |
| | | sizeOf10MB, NO_TIME_BASED_LOG_ROTATION, NO_TIME_BASED_LOG_ROTATION); |
| | | final ReplicationEnvironment replicationEnv = mock(ReplicationEnvironment.class); |
| | | |
| | | try (Log<String, String> writeLog = |
| | | Log.openLog(replicationEnv, LOG_DIRECTORY, LogFileTest.RECORD_PARSER, rotationParams)) |
| | | { |
| | | for (int i = 1; i < 1000000; i++) |
| | | { |
| | | writeLog.append(Record.from(String.format("key%010d", i), "value" + i)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | // advance cursor to last record to ensure it is pointing to ahead log file |
| | | advanceCursorUpTo(cursor, 1, 10); |
| | | |
| | |
| | | // check that cursor can fully read the new records |
| | | assertThatCursorCanBeFullyRead(cursor, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor1 = log.getCursor(); |
| | | DBCursor<Record<String, String>> cursor2 = log.getCursor(); |
| | | DBCursor<Record<String, String>> cursor3 = log.getCursor(); |
| | | DBCursor<Record<String, String>> cursor4 = log.getCursor()) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor1 = log.getCursor(); |
| | | advanceCursorUpTo(cursor1, 1, 1); |
| | | cursor2 = log.getCursor(); |
| | | advanceCursorUpTo(cursor2, 1, 4); |
| | | cursor3 = log.getCursor(); |
| | | advanceCursorUpTo(cursor3, 1, 9); |
| | | cursor4 = log.getCursor(); |
| | | advanceCursorUpTo(cursor4, 1, 10); |
| | | |
| | | // add new records to ensure the ahead log file is rotated |
| | |
| | | assertThatCursorCanBeFullyRead(cursor3, 10, 20); |
| | | assertThatCursorCanBeFullyRead(cursor4, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testClear() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | log.clear(); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | try (DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | @Test(enabled=false, expectedExceptions=ChangelogException.class) |
| | | public void testClearWhenCursorIsOpened() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | log.clear(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "purgeKeys") |
| | |
| | | public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge, |
| | | int cursorStartIndex, int cursorEndIndex) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | log.purgeUpTo(purgeKey); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | try (DBCursor<Record<String, String>> cursor = log.getCursor()) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | @Test(dataProvider = "findBoundaryKeyData") |
| | | public void testFindBoundaryKeyFromRecord(int limitValue, String expectedKey) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | try (Log<String, String> log = openLog(LogFileTest.RECORD_PARSER)) |
| | | { |
| | | assertThat(log.findBoundaryKeyFromRecord(MAPPER, limitValue)).isEqualTo(expectedKey); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, beginning at start of cursor. |
| | | */ |
| | | /** Read the cursor until exhaustion, beginning at start of cursor. */ |
| | | private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, |
| | | int endIndex) throws Exception |
| | | { |
| | |
| | | } |
| | | }; |
| | | } |
| | | |
| | | } |