Actual merge of complete changelog.file test package,
which contains test cases for file-based changelog
6 files added
1 files modified
| | |
| | | { |
| | | final ChangeNumberIndexRecord cnIndexRecord = record.getValue(); |
| | | return new ByteStringBuilder() |
| | | .append(record.getKey()) |
| | | .append((long) record.getKey()) |
| | | .append(cnIndexRecord.getBaseDN().toString()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(cnIndexRecord.getCSN().toByteString()).toByteString(); |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.file.BlockLogReader.*; |
| | | |
| | | import java.io.File; |
| | | import java.io.FileNotFoundException; |
| | | import java.io.RandomAccessFile; |
| | | import java.util.ArrayList; |
| | | import java.util.Collections; |
| | | import java.util.List; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteSequenceReader; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class BlockLogReaderWriterTest extends DirectoryServerTestCase |
| | | { |
| | | private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | private static final File TEST_FILE = new File(TEST_DIRECTORY, "file"); |
| | | |
| | | private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser(); |
| | | |
| | | private static final int INT_RECORD_SIZE = 12; |
| | | |
| | | @BeforeClass |
| | | void createTestDirectory() |
| | | { |
| | | TEST_DIRECTORY.mkdirs(); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | void ensureTestFileIsEmpty() throws Exception |
| | | { |
| | | StaticUtils.recursiveDelete(TEST_FILE); |
| | | } |
| | | |
| | | @AfterClass |
| | | void cleanTestDirectory() |
| | | { |
| | | StaticUtils.recursiveDelete(TEST_DIRECTORY); |
| | | } |
| | | |
| | | @DataProvider(name = "recordsData") |
| | | Object[][] recordsData() |
| | | { |
| | | return new Object[][] |
| | | { |
| | | // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes |
| | | |
| | | // size of block, expected size of file after all records are written, records |
| | | { 12, 12, records(1) }, // zero block marker |
| | | { 10, 16, records(1) }, // one block marker |
| | | { 8, 16, records(1) }, // one block marker |
| | | { 7, 20, records(1) }, // two block markers |
| | | { 6, 24, records(1) }, // three block markers |
| | | { 5, 40, records(1) }, // seven block markers |
| | | |
| | | { 16, 28, records(1,2) }, // one block marker |
| | | { 12, 32, records(1,2) }, // two block markers |
| | | { 10, 36, records(1,2) }, // three block markers |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Tests that records can be written then read correctly for different block sizes. |
| | | */ |
| | | @Test(dataProvider="recordsData") |
| | | public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records) |
| | | throws Exception |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | for (int i = 0; i < records.size(); i++) |
| | | { |
| | | Record<Integer, Integer> record = reader.readRecord(); |
| | | assertThat(record).isEqualTo(records.get(i)); |
| | | } |
| | | assertThat(reader.readRecord()).isNull(); |
| | | assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "recordsForSeek") |
| | | Object[][] recordsForSeek() |
| | | { |
| | | Object[][] data = new Object[][] { |
| | | // records, key, key matching strategy, position strategy, expectedRecord, should be found ? |
| | | |
| | | // no record |
| | | { records(), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | // 1 record |
| | | { records(1), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true }, |
| | | { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 3 records equal matching |
| | | { records(1,2,3), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 2, EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 3, EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 4, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | { records(1,2,3), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 2, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 3, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 3 records less than or equal matching |
| | | { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | |
| | | { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 3 records greater or equal matching |
| | | { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | |
| | | { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3), 4, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | // 10 records equal matching |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | // 10 records less than or equal matching |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 10 records greater or equal matching |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | |
| | | }; |
| | | |
| | | // For each test case, do a test with various block sizes to ensure algorithm is not broken |
| | | // on a given size |
| | | int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 }; |
| | | Object[][] finalData = new Object[sizes.length * data.length][7]; |
| | | for (int i = 0; i < data.length; i++) |
| | | { |
| | | for (int j = 0; j < sizes.length; j++) |
| | | { |
| | | Object[] a = data[i]; |
| | | // add the block size at beginning of each test case |
| | | finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4], a[5]}; |
| | | } |
| | | } |
| | | return finalData; |
| | | } |
| | | |
| | | @Test(dataProvider = "recordsForSeek") |
| | | public void testSeekToRecord(int blockSize, List<Record<Integer, Integer>> records, int key, |
| | | KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, Record<Integer, Integer> expectedRecord, |
| | | boolean shouldBeFound) throws Exception |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchingStrategy, positionStrategy); |
| | | |
| | | assertThat(result.getFirst()).isEqualTo(shouldBeFound); |
| | | assertThat(result.getSecond()).isEqualTo(expectedRecord); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetClosestMarkerBeforeOrAtPosition() throws Exception |
| | | { |
| | | final int blockSize = 10; |
| | | BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize); |
| | | |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20); |
| | | } |
| | | |
| | | @Test |
| | | public void testGetClosestMarkerStrictlyAfterPosition() throws Exception |
| | | { |
| | | final int blockSize = 10; |
| | | BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize); |
| | | |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20); |
| | | assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30); |
| | | } |
| | | |
| | | @Test |
| | | public void testSearchClosestMarkerToKey() throws Exception |
| | | { |
| | | int blockSize = 20; |
| | | writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20)); |
| | | |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | |
| | | assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0); |
| | | assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0); |
| | | assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20); |
| | | assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20); |
| | | assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40); |
| | | assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60); |
| | | assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80); |
| | | assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80); |
| | | assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100); |
| | | assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120); |
| | | assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140); |
| | | assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260); |
| | | assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280); |
| | | // out of reach keys |
| | | assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280); |
| | | assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testLengthOfStoredRecord() throws Exception |
| | | { |
| | | final int blockSize = 100; |
| | | BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize); |
| | | |
| | | int recordLength = 10; |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | |
| | | recordLength = 150; |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | |
| | | recordLength = 200; |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET); |
| | | assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET); |
| | | } |
| | | |
| | | /** |
| | | * This test is intended to be run only manually to check the performance between binary search |
| | | * and sequential access. |
| | | * Note that sequential run may be extremely long when using high values. |
| | | */ |
| | | @Test(enabled=false) |
| | | public void seekPerformanceComparison() throws Exception |
| | | { |
| | | // You may change these values |
| | | long fileSizeInBytes = 100*1024*1024; |
| | | int numberOfValuesToSeek = 50000; |
| | | int blockSize = 256; |
| | | |
| | | writeRecordsToReachFileSize(blockSize, fileSizeInBytes); |
| | | BlockLogReader<Integer, Integer> reader = null; |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek); |
| | | System.out.println("File size: " + TEST_FILE.length() + " bytes"); |
| | | |
| | | System.out.println("\n---- BINARY SEARCH"); |
| | | long minTime = Long.MAX_VALUE; |
| | | long maxTime = Long.MIN_VALUE; |
| | | final long t0 = System.nanoTime(); |
| | | for (Integer key : keysToSeek) |
| | | { |
| | | final long ts = System.nanoTime(); |
| | | Pair<Boolean, Record<Integer, Integer>> result = |
| | | reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | final long te = System.nanoTime() - ts; |
| | | if (te < minTime) minTime = te; |
| | | if (te > maxTime) maxTime = te; |
| | | // show time for seeks that last more than N microseconds (tune as needed) |
| | | if (te/1000 > 1000) |
| | | { |
| | | System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds"); |
| | | } |
| | | assertThat(result.getSecond()).isEqualTo(record(key)); |
| | | } |
| | | System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds"); |
| | | System.out.println("Min time for a search: " + minTime/1000 + " microseconds"); |
| | | System.out.println("Max time for a search: " + maxTime/1000 + " microseconds"); |
| | | System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds"); |
| | | |
| | | System.out.println("\n---- SEQUENTIAL SEARCH"); |
| | | minTime = Long.MAX_VALUE; |
| | | maxTime = Long.MIN_VALUE; |
| | | final long t1 = System.nanoTime(); |
| | | for (Integer val : keysToSeek) |
| | | { |
| | | long ts = System.nanoTime(); |
| | | Pair<Boolean, Record<Integer, Integer>> result = |
| | | reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | assertThat(result.getSecond()).isEqualTo(Record.from(val, val)); |
| | | long te = System.nanoTime() - ts; |
| | | if (te < minTime) minTime = te; |
| | | if (te > maxTime) maxTime = te; |
| | | } |
| | | System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds"); |
| | | System.out.println("Min time for a search: " + minTime/1000 + " microseconds"); |
| | | System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds"); |
| | | System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds"); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(reader); |
| | | } |
| | | } |
| | | |
| | | /** Write provided records with the provided block size. */ |
| | | private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException |
| | | { |
| | | BlockLogWriter<Integer, Integer> writer = null; |
| | | try |
| | | { |
| | | writer = newWriter(blockSize); |
| | | for (Record<Integer, Integer> record : records) |
| | | { |
| | | writer.write(record); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writer); |
| | | } |
| | | } |
| | | |
| | | /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */ |
| | | private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception |
| | | { |
| | | final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE; |
| | | final int[] values = new int[numberOfValues]; |
| | | for (int i = 0; i < numberOfValues; i++) |
| | | { |
| | | values[i] = i+1; |
| | | } |
| | | writeRecords(blockSize, records(values)); |
| | | } |
| | | |
| | | /** Returns provided number of keys to seek in random order, for a file of provided size. */ |
| | | private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys) |
| | | { |
| | | final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE; |
| | | final List<Integer> values = new ArrayList<Integer>(numberOfValues); |
| | | for (int i = 0; i < numberOfValues; i++) |
| | | { |
| | | values.add(i+1); |
| | | } |
| | | Collections.shuffle(values); |
| | | return values.subList(0, numberOfKeys); |
| | | } |
| | | |
| | | private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException |
| | | { |
| | | return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock); |
| | | } |
| | | |
| | | private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException |
| | | { |
| | | return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"), |
| | | RECORD_PARSER, blockSize); |
| | | } |
| | | |
| | | private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException |
| | | { |
| | | return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize); |
| | | } |
| | | |
| | | /** Helper to build a list of records. */ |
| | | private List<Record<Integer, Integer>> records(int...keys) |
| | | { |
| | | List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>(); |
| | | for (int key : keys) |
| | | { |
| | | records.add(Record.from(key, key)); |
| | | } |
| | | return records; |
| | | } |
| | | |
| | | /** Helper to build a record. */ |
| | | private Record<Integer, Integer> record(int key) |
| | | { |
| | | return Record.from(key, key); |
| | | } |
| | | |
| | | /** |
| | | * Record parser implementation for records with keys and values as integers to be used in tests. |
| | | * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value), |
| | | * which is useful for some tests. |
| | | */ |
| | | private static class IntRecordParser implements RecordParser<Integer, Integer> |
| | | { |
| | | public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException |
| | | { |
| | | ByteSequenceReader reader = data.asReader(); |
| | | int key = reader.getInt(); |
| | | int value = reader.getInt(); |
| | | return Record.from(key, value); |
| | | } |
| | | |
| | | public ByteString encodeRecord(Record<Integer, Integer> record) |
| | | { |
| | | return new ByteStringBuilder().append((int) record.getKey()).append((int) record.getValue()).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | | public Integer decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return Integer.valueOf(key); |
| | | } |
| | | |
| | | @Override |
| | | public String encodeKeyToString(Integer key) |
| | | { |
| | | return String.valueOf(key); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Integer getMaxKey() |
| | | { |
| | | return Integer.MAX_VALUE; |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class FileChangeNumberIndexDBTest extends ReplicationTestCase |
| | | { |
| | | |
| | | @DataProvider(name = "messages") |
| | | Object[][] createMessages() throws Exception |
| | | { |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | DN dn1 = DN.valueOf("o=test1"); |
| | | return new Object[][] { |
| | | { new ChangeNumberIndexRecord(0L, dn1, csns[1]) }, |
| | | { new ChangeNumberIndexRecord(999L, dn1, csns[2]) }, |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider="messages") |
| | | public void testRecordParser(ChangeNumberIndexRecord msg) throws Exception |
| | | { |
| | | RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER; |
| | | |
| | | ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg)); |
| | | Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data); |
| | | |
| | | assertThat(record).isNotNull(); |
| | | assertThat(record.getKey()).isEqualTo(msg.getChangeNumber()); |
| | | assertThat(record.getValue().getBaseDN()).isEqualTo(msg.getBaseDN()); |
| | | assertThat(record.getValue().getCSN()).isEqualTo(msg.getCSN()); |
| | | } |
| | | |
| | | @Test() |
| | | public void testAddAndReadRecords() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | replicationServer = newReplicationServer(); |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | changelogDB.setPurgeDelay(0); |
| | | final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | | |
| | | long[] changeNumbers = addThreeRecords(cnIndexDB); |
| | | long cn1 = changeNumbers[0]; |
| | | long cn2 = changeNumbers[1]; |
| | | long cn3 = changeNumbers[2]; |
| | | |
| | | // Checks |
| | | assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1); |
| | | assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3); |
| | | |
| | | assertEquals(cnIndexDB.count(), 3, "Db count"); |
| | | assertFalse(cnIndexDB.isEmpty()); |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1); |
| | | assertCursorReadsInOrder(cursor, cn1, cn2, cn3); |
| | | |
| | | cursor = cnIndexDB.getCursorFrom(cn2); |
| | | assertCursorReadsInOrder(cursor, cn2, cn3); |
| | | |
| | | cursor = cnIndexDB.getCursorFrom(cn3); |
| | | assertCursorReadsInOrder(cursor, cn3); |
| | | } |
| | | finally |
| | | { |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | @Test() |
| | | public void testClear() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | replicationServer = newReplicationServer(); |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | changelogDB.setPurgeDelay(0); |
| | | final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | | addThreeRecords(cnIndexDB); |
| | | |
| | | cnIndexDB.clear(); |
| | | |
| | | assertNull(cnIndexDB.getOldestRecord()); |
| | | assertNull(cnIndexDB.getNewestRecord()); |
| | | assertEquals(cnIndexDB.count(), 0); |
| | | assertTrue(cnIndexDB.isEmpty()); |
| | | } |
| | | finally |
| | | { |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This test makes basic operations of a ChangeNumberIndexDB: |
| | | * <ol> |
| | | * <li>create the db</li> |
| | | * <li>add records</li> |
| | | * <li>read them with a cursor</li> |
| | | * <li>set a very short trim period</li> |
| | | * <li>wait for the db to be trimmed / here since the changes are not stored |
| | | * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li> |
| | | * </ol> |
| | | */ |
| | | // TODO : this works only if we ensure that there is a rotation of ahead log file |
| | | // at the right place. First two records are 37 and 76 bytes long, |
| | | // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file |
| | | // Re-enable this test when max file size is customizable for log |
| | | @Test(enabled=false) |
| | | public void testPurge() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | try |
| | | { |
| | | replicationServer = newReplicationServer(); |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | changelogDB.setPurgeDelay(0); // disable purging |
| | | |
| | | // Prepare data to be stored in the db |
| | | DN baseDN1 = DN.valueOf("o=test1"); |
| | | DN baseDN2 = DN.valueOf("o=test2"); |
| | | DN baseDN3 = DN.valueOf("o=test3"); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | | long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); |
| | | addRecord(cnIndexDB, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber(); |
| | | assertEquals(oldestCN, cn1); |
| | | assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3); |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN); |
| | | try |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3); |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | |
| | | // Now test that purging removes all changes but the last one |
| | | changelogDB.setPurgeDelay(1); |
| | | int count = 0; |
| | | while (cnIndexDB.count() > 1 && count < 100) |
| | | { |
| | | Thread.sleep(10); |
| | | count++; |
| | | } |
| | | assertOnlyNewestRecordIsLeft(cnIndexDB, 3); |
| | | } |
| | | finally |
| | | { |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception |
| | | { |
| | | // Prepare data to be stored in the db |
| | | DN baseDN1 = DN.valueOf("o=test1"); |
| | | DN baseDN2 = DN.valueOf("o=test2"); |
| | | DN baseDN3 = DN.valueOf("o=test3"); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); |
| | | long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); |
| | | return new long[] { cn1, cn2, cn3 }; |
| | | } |
| | | |
| | | private long addRecord(FileChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException |
| | | { |
| | | return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | } |
| | | |
| | | private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN) |
| | | { |
| | | assertEquals(record.getCSN(), csn); |
| | | assertEquals(record.getBaseDN(), baseDN); |
| | | } |
| | | |
| | | private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException |
| | | { |
| | | final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB(); |
| | | final FileChangeNumberIndexDB cnIndexDB = |
| | | (FileChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(); |
| | | assertTrue(cnIndexDB.isEmpty()); |
| | | return cnIndexDB; |
| | | } |
| | | |
| | | /** |
| | | * The newest record is no longer cleared to ensure persistence to the last |
| | | * generated change number across server restarts. |
| | | */ |
| | | private void assertOnlyNewestRecordIsLeft(FileChangeNumberIndexDB cnIndexDB, |
| | | int newestChangeNumber) throws ChangelogException |
| | | { |
| | | assertEquals(cnIndexDB.count(), 1); |
| | | assertFalse(cnIndexDB.isEmpty()); |
| | | final ChangeNumberIndexRecord oldest = cnIndexDB.getOldestRecord(); |
| | | final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord(); |
| | | assertEquals(oldest.getChangeNumber(), newestChangeNumber); |
| | | assertEquals(oldest.getChangeNumber(), newest.getChangeNumber()); |
| | | assertEquals(oldest.getBaseDN(), newest.getBaseDN()); |
| | | assertEquals(oldest.getCSN(), newest.getCSN()); |
| | | } |
| | | |
| | | private ReplicationServer newReplicationServer() throws Exception |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | final int port = TestCaseUtils.findFreePort(); |
| | | final ReplServerFakeConfiguration cfg = |
| | | new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.LOG, 0, 2, 0, 100, null); |
| | | cfg.setComputeChangeNumber(true); |
| | | return new ReplicationServer(cfg); |
| | | } |
| | | |
| | | private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor, |
| | | long... cns) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | for (long cn : cns) |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getChangeNumber(), cn); |
| | | } |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | |
| | | import org.assertj.core.api.SoftAssertions; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Test the FileReplicaDB class |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | public class FileReplicaDBTest extends ReplicationTestCase |
| | | { |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | private DN TEST_ROOT_DN; |
| | | |
| | | /** |
| | | * Utility - log debug message - highlight it is from the test and not |
| | | * from the server code. Makes easier to observe the test steps. |
| | | */ |
| | | private void debugInfo(String tn, String s) |
| | | { |
| | | logger.trace("** TEST %s ** %s", tn, s); |
| | | } |
| | | |
| | | @BeforeClass |
| | | public void setup() throws Exception |
| | | { |
| | | TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING); |
| | | } |
| | | |
| | | @DataProvider(name = "messages") |
| | | Object[][] createMessages() |
| | | { |
| | | CSN[] csns = generateCSNs(1, 0, 2); |
| | | return new Object[][] { |
| | | { new DeleteMsg(TEST_ROOT_DN, csns[0], "uid") }, |
| | | { new DeleteMsg(TEST_ROOT_DN, csns[1], "uid") }, |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider="messages") |
| | | public void testRecordParser(UpdateMsg msg) throws Exception |
| | | { |
| | | RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER; |
| | | |
| | | ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg)); |
| | | Record<CSN, UpdateMsg> record = parser.decodeRecord(data); |
| | | |
| | | assertThat(record).isNotNull(); |
| | | assertThat(record.getKey()).isEqualTo(msg.getCSN()); |
| | | assertThat(record.getValue()).isEqualTo(msg); |
| | | } |
| | | |
| | | @Test |
| | | public void testDomainDNWithForwardSlashes() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | CSN[] csns = generateCSNs(1, 0, 1); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | |
| | | waitChangesArePersisted(replicaDB, 1); |
| | | assertFoundInOrder(replicaDB, csns[0]); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testAddAndReadRecords() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | CSN[] csns = generateCSNs(1, 0, 5); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | |
| | | waitChangesArePersisted(replicaDB, 3); |
| | | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | assertLimits(replicaDB, csns[0], csns[2]); |
| | | |
| | | DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"); |
| | | replicaDB.add(update4); |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]); |
| | | assertFoundInOrder(replicaDB, csns[2], csns[3]); |
| | | assertFoundInOrder(replicaDB, csns[3]); |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGenerateCursorFrom() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns)); |
| | | csns2.remove(csns[3]); |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn); |
| | | } |
| | | assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]); |
| | | |
| | | for (int i = 0; i < csns2.size() - 1; i++) |
| | | { |
| | | assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1)); |
| | | } |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | | Object[][] dataForTestsWithCursorReinitialization() |
| | | { |
| | | return new Object[][] { |
| | | // the index to use in CSN array for the start key of the cursor |
| | | { 0 }, |
| | | { 1 }, |
| | | { 4 }, |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider="dataForTestsWithCursorReinitialization") |
| | | public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], |
| | | GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | |
| | | int[] indicesToAdd = new int[] { 0, 1, 2, 4 }; |
| | | for (int i : indicesToAdd) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | } |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | for (int i = csnIndexForStartKey+1; i < 5; i++) |
| | | { |
| | | if (i != 3) |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i); |
| | | } |
| | | } |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | // TODO : this works only if we ensure that there is a rotation of ahead log file |
| | | // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have |
| | | // the last record alone in the ahead log file |
| | | // Re-enable this test when max file size is customizable for log |
| | | @Test(enabled=false) |
| | | public void testPurge() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 5); |
| | | |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[3], "uid")); |
| | | |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); |
| | | |
| | | int count = 0; |
| | | boolean purgeSucceeded = false; |
| | | final CSN expectedNewestCSN = csns[3]; |
| | | do |
| | | { |
| | | Thread.sleep(10); |
| | | |
| | | final CSN oldestCSN = replicaDB.getOldestCSN(); |
| | | final CSN newestCSN = replicaDB.getNewestCSN(); |
| | | purgeSucceeded = |
| | | oldestCSN.equals(expectedNewestCSN) |
| | | && newestCSN.equals(expectedNewestCSN); |
| | | count++; |
| | | } |
| | | while (!purgeSucceeded && count < 100); |
| | | assertTrue(purgeSucceeded); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test the feature of clearing a FileReplicaDB used by a replication server. |
| | | * The clear feature is used when a replication server receives a request to |
| | | * reset the generationId of a given domain. |
| | | */ |
| | | @Test |
| | | public void testClear() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add the changes and check they are here |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid")); |
| | | assertLimits(replicaDB, csns[0], csns[2]); |
| | | |
| | | // Clear DB and check it is cleared. |
| | | replicaDB.clear(); |
| | | assertLimits(replicaDB, null, null); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy, final CSN expectedCSN) |
| | | throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test the logic that manages counter records in the FileReplicaDB in order to |
| | | * optimize the oldest and newest records in the replication changelog db. |
| | | */ |
| | | @Test(groups = { "opendj-256" }) |
| | | public void testGetOldestNewestCSNs() throws Exception |
| | | { |
| | | // It's worth testing with 2 different setting for counterRecord |
| | | // - a counter record is put every 10 Update msg in the db - just a unit |
| | | // setting. |
| | | // - a counter record is put every 1000 Update msg in the db - something |
| | | // closer to real setting. |
| | | // In both cases, we want to test the counting algorithm, |
| | | // - when start and stop are before the first counter record, |
| | | // - when start and stop are before and after the first counter record, |
| | | // - when start and stop are after the first counter record, |
| | | // - when start and stop are before and after more than one counter record, |
| | | // After a purge. |
| | | // After shutting down/closing and reopening the db. |
| | | |
| | | // TODO : do we need the management of counter records ? |
| | | // Use unreachable limits for now because it is not implemented |
| | | testGetOldestNewestCSNs(40, 100); |
| | | testGetOldestNewestCSNs(4000, 10000); |
| | | } |
| | | |
| | | private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception |
| | | { |
| | | String tn = "testDBCount("+max+","+counterWindow+")"; |
| | | debugInfo(tn, "Starting test"); |
| | | |
| | | File testRoot = null; |
| | | ReplicationServer replicationServer = null; |
| | | ReplicationEnvironment dbEnv = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | |
| | | testRoot = createCleanDir(); |
| | | dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer); |
| | | replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv); |
| | | |
| | | // Populate the db with 'max' msg |
| | | int mySeqnum = 1; |
| | | CSN csns[] = new CSN[2 * (max + 1)]; |
| | | long now = System.currentTimeMillis(); |
| | | for (int i=1; i<=max; i++) |
| | | { |
| | | csns[i] = new CSN(now + i, mySeqnum, 1); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | mySeqnum+=2; |
| | | } |
| | | waitChangesArePersisted(replicaDB, max, counterWindow); |
| | | assertLimits(replicaDB, csns[1], csns[max]); |
| | | |
| | | // Now we want to test that after closing and reopening the db, the |
| | | // counting algo is well reinitialized and when new messages are added |
| | | // the new counter are correctly generated. |
| | | debugInfo(tn, "SHUTDOWN replicaDB and recreate"); |
| | | replicaDB.shutdown(); |
| | | |
| | | replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv); |
| | | assertLimits(replicaDB, csns[1], csns[max]); |
| | | |
| | | // Populate the db with 'max' msg |
| | | for (int i=max+1; i<=2 * max; i++) |
| | | { |
| | | csns[i] = new CSN(now + i, mySeqnum, 1); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | mySeqnum+=2; |
| | | } |
| | | waitChangesArePersisted(replicaDB, 2 * max, counterWindow); |
| | | assertLimits(replicaDB, csns[1], csns[2 * max]); |
| | | |
| | | replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); |
| | | |
| | | String testcase = "AFTER PURGE (oldest, newest)="; |
| | | debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN()); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest="); |
| | | |
| | | // Clear ... |
| | | debugInfo(tn,"clear:"); |
| | | replicaDB.clear(); |
| | | |
| | | // Check the db is cleared. |
| | | assertEquals(null, replicaDB.getOldestCSN()); |
| | | assertEquals(null, replicaDB.getNewestCSN()); |
| | | debugInfo(tn,"Success"); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | if (dbEnv != null) |
| | | { |
| | | dbEnv.shutdown(); |
| | | } |
| | | remove(replicationServer); |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | } |
| | | } |
| | | |
| | | private void assertLimits(FileReplicaDB replicaDB, CSN oldestCSN, CSN newestCSN) |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(replicaDB.getOldestCSN()).as("Wrong oldest CSN").isEqualTo(oldestCSN); |
| | | softly.assertThat(replicaDB.getNewestCSN()).as("Wrong newest CSN").isEqualTo(newestCSN); |
| | | softly.assertAll(); |
| | | } |
| | | |
| | | private void shutdown(FileReplicaDB replicaDB) |
| | | { |
| | | if (replicaDB != null) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | } |
| | | |
| | | static CSN[] generateCSNs(int serverId, long timestamp, int number) |
| | | { |
| | | CSNGenerator gen = new CSNGenerator(serverId, timestamp); |
| | | CSN[] csns = new CSN[number]; |
| | | for (int i = 0; i < csns.length; i++) |
| | | { |
| | | csns[i] = gen.newCSN(); |
| | | } |
| | | return csns; |
| | | } |
| | | |
| | | private void waitChangesArePersisted(FileReplicaDB replicaDB, |
| | | int nbRecordsInserted) throws Exception |
| | | { |
| | | waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000); |
| | | } |
| | | |
| | | private void waitChangesArePersisted(FileReplicaDB replicaDB, |
| | | int nbRecordsInserted, int counterWindow) throws Exception |
| | | { |
| | | // one counter record is inserted every time "counterWindow" |
| | | // records have been inserted |
| | | int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow; |
| | | |
| | | int count = 0; |
| | | while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100) |
| | | { |
| | | Thread.sleep(10); |
| | | count++; |
| | | } |
| | | assertEquals(replicaDB.getNumberRecords(), expectedNbRecords); |
| | | } |
| | | |
| | | private ReplicationServer configureReplicationServer(int windowSize, int queueSize) |
| | | throws IOException, ConfigException |
| | | { |
| | | final int changelogPort = findFreePort(); |
| | | final ReplicationServerCfg conf = new ReplServerFakeConfiguration( |
| | | changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null); |
| | | return new ReplicationServer(conf); |
| | | } |
| | | |
| | | private FileReplicaDB newReplicaDB(ReplicationServer rs) throws Exception |
| | | { |
| | | final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB(); |
| | | return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst(); |
| | | } |
| | | |
| | | private File createCleanDir() throws IOException |
| | | { |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot |
| | | + File.separator + "build"); |
| | | path = path + File.separator + "unit-tests" + File.separator + "FileReplicaDB"; |
| | | final File testRoot = new File(path); |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | testRoot.mkdirs(); |
| | | return testRoot; |
| | | } |
| | | |
| | | private void assertFoundInOrder(FileReplicaDB replicaDB, CSN... csns) throws Exception |
| | | { |
| | | if (csns.length == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns); |
| | | assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns); |
| | | } |
| | | |
| | | private void assertFoundInOrder(FileReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | | |
| | | for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++) |
| | | { |
| | | final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).as(msg).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]); |
| | | softly.assertAll(); |
| | | } |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.opendj.ldap.ByteSequenceReader; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | @Test(sequential=true) |
| | | public class LogFileTest extends DirectoryServerTestCase |
| | | { |
| | | private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME); |
| | | |
| | | static final StringRecordParser RECORD_PARSER = new StringRecordParser(); |
| | | |
| | | @BeforeClass |
| | | public void createTestDirectory() |
| | | { |
| | | TEST_DIRECTORY.mkdirs(); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | /** |
| | | * Create a new log file with ten records starting from (key01, value1) until (key10, value10). |
| | | * So log contains keys "key01", "key02", ..., "key10" |
| | | */ |
| | | public void initialize() throws Exception |
| | | { |
| | | if (TEST_LOG_FILE.exists()) |
| | | { |
| | | TEST_LOG_FILE.delete(); |
| | | } |
| | | LogFile<String, String> logFile = null; |
| | | try |
| | | { |
| | | logFile = getLogFile(RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | logFile.append(Record.from(String.format("key%02d", i), "value"+i)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(logFile); |
| | | } |
| | | } |
| | | |
| | | @AfterClass |
| | | public void cleanTestChangelogDirectory() |
| | | { |
| | | StaticUtils.recursiveDelete(TEST_DIRECTORY); |
| | | } |
| | | |
| | | private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException |
| | | { |
| | | return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser); |
| | | } |
| | | |
| | | @Test |
| | | public void testCursor() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor(); |
| | | |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | | Object[][] cursorPositionTo() |
| | | { |
| | | return new Object[][] { |
| | | // key to position to, key matching strategy, position strategy, position is found ?, |
| | | // expected start index of cursor (use -1 if cursor should be exhausted), expected end index of cursor |
| | | |
| | | // equal |
| | | { "key00", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | { "key02", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10}, |
| | | { "key05", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key050", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | { "key07", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10}, |
| | | { "key10", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | { "key11", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | { "key00", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | { "key02", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10}, |
| | | { "key05", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | { "key07", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10}, |
| | | { "key10", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | { "key11", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | // less than or equal |
| | | |
| | | // key00 is a special case : position is not found but cursor is positioned on beginning |
| | | // so it is possible to iterate on it from start to end |
| | | { "key00", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, 1, 10}, |
| | | { "key02", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10}, |
| | | { "key05", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key07", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10}, |
| | | { "key10", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | { "key11", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | |
| | | // key00 is a special case : position is not found but cursor is positioned on beginning |
| | | // so it is possible to iterate on it from 2 to end |
| | | { "key00", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, 2, 10}, |
| | | { "key02", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10}, |
| | | { "key05", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key07", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10}, |
| | | { "key10", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | { "key11", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | |
| | | // greater than or equal |
| | | { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 1, 10}, |
| | | { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10}, |
| | | { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 6, 10}, |
| | | { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10}, |
| | | { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 1, 10}, |
| | | { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10}, |
| | | { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10}, |
| | | { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Test cursor positioning for a given key, matching strategy and position strategy. |
| | | * Cursor is fully read from the expected starting index to the expected end index, unless it is expected |
| | | * to be directly exhausted. |
| | | */ |
| | | @Test(dataProvider="cursorPositionTo") |
| | | public void testCursorPositionTo(String key, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, |
| | | boolean positionShouldBeFound, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | LogFileCursor<String, String> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor(); |
| | | boolean success = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | |
| | | assertThat(success).isEqualTo(positionShouldBeFound); |
| | | if (cursorShouldStartAt >= 0) |
| | | { |
| | | assertThatCursorCanBeFullyRead(cursor, cursorShouldStartAt, cursorShouldEndAt); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | try |
| | | { |
| | | Record<String, String> record = changelog.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key01", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetNewestRecord() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | try |
| | | { |
| | | Record<String, String> record = changelog.getNewestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key10", "value10")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(changelog); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "corruptedRecordData") |
| | | Object[][] corruptedRecordData() |
| | | { |
| | | return new Object[][] |
| | | { |
| | | // write partial record size (should be 4 bytes) |
| | | { 1, new ByteStringBuilder().append((byte) 0) }, |
| | | // write partial record size (should be 4 bytes) |
| | | { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) }, |
| | | // write size only |
| | | { 3, new ByteStringBuilder().append(10) }, |
| | | // write size + key |
| | | { 4, new ByteStringBuilder().append(100).append("key") }, |
| | | // write size + key + separator |
| | | { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) }, |
| | | // write size + key + separator + partial value |
| | | { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") }, |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider="corruptedRecordData") |
| | | public void testRecoveryOnCorruptedLogFile( |
| | | @SuppressWarnings("unused") int unusedId, |
| | | ByteStringBuilder corruptedRecordData) throws Exception |
| | | { |
| | | LogFile<String, String> logFile = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | { |
| | | corruptTestLogFile(corruptedRecordData); |
| | | |
| | | // open the log file: the file should be repaired at this point |
| | | logFile = getLogFile(RECORD_PARSER); |
| | | |
| | | // write a new valid record |
| | | logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11)); |
| | | |
| | | // ensure log can be fully read including the new record |
| | | cursor = logFile.getCursor(); |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 11); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(logFile); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log |
| | | * file. |
| | | */ |
| | | private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception |
| | | { |
| | | RandomAccessFile output = null; |
| | | try { |
| | | output = new RandomAccessFile(TEST_LOG_FILE, "rwd"); |
| | | output.seek(output.length()); |
| | | output.write(corruptedRecordData.toByteArray()); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(output); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | /** |
| | | * Test that changes are visible immediately to a reader after a write. |
| | | */ |
| | | public void testWriteAndReadOnSameLogFile() throws Exception |
| | | { |
| | | LogFile<String, String> writeLog = null; |
| | | LogFile<String, String> readLog = null; |
| | | try |
| | | { |
| | | writeLog = getLogFile(RECORD_PARSER); |
| | | readLog = getLogFile(RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 100; i++) |
| | | { |
| | | Record<String, String> record = Record.from("newkey" + i, "newvalue" + i); |
| | | writeLog.append(record); |
| | | assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record); |
| | | assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1")); |
| | | assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record); |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog, readLog); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value |
| | | * endIndex, using (keyN, valueN) where N is the index. |
| | | */ |
| | | private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | for (int i = fromIndex; i <= endIndex; i++) |
| | | { |
| | | assertThat(cursor.next()).as("next() value when i=" + i).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i)); |
| | | } |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception |
| | | { |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | /** |
| | | * Record parser implementation for records with keys as String and values as |
| | | * String, to be used in tests. |
| | | */ |
| | | private static class StringRecordParser implements RecordParser<String, String> |
| | | { |
| | | private static final byte STRING_SEPARATOR = 0; |
| | | |
| | | public Record<String, String> decodeRecord(final ByteString data) throws DecodingException |
| | | { |
| | | ByteSequenceReader reader = data.asReader(); |
| | | String key = reader.getString(getNextStringLength(reader)); |
| | | reader.skip(1); |
| | | String value = reader.getString(getNextStringLength(reader)); |
| | | return key.isEmpty() || value.isEmpty() ? null : Record.from(key, value); |
| | | } |
| | | |
| | | /** Returns the length of next string by looking for the zero byte used as separator. */ |
| | | private int getNextStringLength(ByteSequenceReader reader) |
| | | { |
| | | int length = 0; |
| | | while (reader.peek(length) != STRING_SEPARATOR) |
| | | { |
| | | length++; |
| | | } |
| | | return length; |
| | | } |
| | | |
| | | public ByteString encodeRecord(Record<String, String> record) |
| | | { |
| | | return new ByteStringBuilder() |
| | | .append(record.getKey()).append(STRING_SEPARATOR) |
| | | .append(record.getValue()).append(STRING_SEPARATOR).toByteString(); |
| | | } |
| | | |
| | | @Override |
| | | public String decodeKeyFromString(String key) throws ChangelogException |
| | | { |
| | | return key; |
| | | } |
| | | |
| | | @Override |
| | | public String encodeKeyToString(String key) |
| | | { |
| | | return key; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMaxKey() |
| | | { |
| | | // '~' character has the highest ASCII value |
| | | return "~~~~"; |
| | | } |
| | | } |
| | | |
| | | /** A parser that can be set to fail when reading. */ |
| | | static class FailingStringRecordParser extends StringRecordParser |
| | | { |
| | | private boolean failToRead = false; |
| | | |
| | | @Override |
| | | public Record<String, String> decodeRecord(ByteString data) throws DecodingException |
| | | { |
| | | if (failToRead) |
| | | { |
| | | throw new DecodingException(LocalizableMessage.raw("Error when parsing record")); |
| | | } |
| | | return super.decodeRecord(data); |
| | | } |
| | | |
| | | void setFailToRead(boolean shouldFail) |
| | | { |
| | | failToRead = shouldFail; |
| | | } |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.file.LogFileTest.*; |
| | | |
| | | import java.io.File; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | @Test(sequential=true) |
| | | public class LogTest extends DirectoryServerTestCase |
| | | { |
| | | // Use a directory dedicated to this test class |
| | | private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit"); |
| | | |
| | | @BeforeMethod |
| | | public void initialize() throws Exception |
| | | { |
| | | // Delete any previous log |
| | | if (LOG_DIRECTORY.exists()) |
| | | { |
| | | StaticUtils.recursiveDelete(LOG_DIRECTORY); |
| | | } |
| | | |
| | | // Build a log with 10 records with String keys and String values |
| | | // Keys are using the format keyNNN where N is a figure |
| | | // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | log.append(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | log.close(); |
| | | } |
| | | |
| | | private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException |
| | | { |
| | | // Each string record has a length of approximately 18 bytes |
| | | // This size is set in order to have 2 records per log file before the rotation happens |
| | | // This allow to ensure rotation mechanism is thoroughly tested |
| | | // Some tests rely on having 2 records per log file (especially the purge tests), so take care |
| | | // if this value has to be changed |
| | | int sizeLimitPerFileInBytes = 30; |
| | | |
| | | return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes); |
| | | } |
| | | |
| | | @Test |
| | | public void testCursor() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnExistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor("key005"); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getCursor("key005000"); |
| | | |
| | | assertThat(cursor).isNotNull(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isFalse(); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider |
| | | Object[][] cursorData() |
| | | { |
| | | return new Object[][] { |
| | | // 3 first values are input data : key to position to, key matching strategy, position strategy, |
| | | // 2 last values are expected output : |
| | | // first index of cursor (-1 if cursor should be exhausted), last index of cursor |
| | | { "key000", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { "key001", EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key004", EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 }, |
| | | { "key0050", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { "key009", EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 }, |
| | | { "key010", EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | { "key011", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { "key000", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key001", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 }, |
| | | { "key004", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 }, |
| | | { "key0050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key009", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 }, |
| | | { "key010", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key011", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { "key000", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { "key001", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key004", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 }, |
| | | { "key005", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 }, |
| | | { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 }, |
| | | { "key006", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 }, |
| | | { "key009", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 }, |
| | | { "key010", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | { "key011", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | |
| | | { "key000", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key001", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 }, |
| | | { "key004", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 }, |
| | | { "key005", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 }, |
| | | { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 }, |
| | | { "key006", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 7, 10 }, |
| | | { "key009", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 }, |
| | | { "key010", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key011", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 }, |
| | | { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 }, |
| | | { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 }, |
| | | { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 }, |
| | | { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 }, |
| | | { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 }, |
| | | { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 }, |
| | | { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 }, |
| | | { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | }; |
| | | } |
| | | |
| | | @Test(dataProvider="cursorData") |
| | | public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(key, matchingStrategy, positionStrategy); |
| | | |
| | | if (cursorShouldStartAt != -1) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getCursor(null, null, null); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testCursorWhenParserFailsToRead() throws Exception |
| | | { |
| | | FailingStringRecordParser parser = new FailingStringRecordParser(); |
| | | Log<String, String> log = openLog(parser); |
| | | parser.setFailToRead(true); |
| | | try { |
| | | log.getCursor("key"); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | { |
| | | Record<String, String> record = log.getOldestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetNewestRecord() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | try |
| | | { |
| | | Record<String, String> record = log.getNewestRecord(); |
| | | |
| | | assertThat(record).isEqualTo(Record.from("key010", "value10")); |
| | | } |
| | | finally { |
| | | StaticUtils.close(log); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test that changes are visible immediately to a reader after a write. |
| | | */ |
| | | @Test |
| | | public void testWriteAndReadOnSameLog() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | Log<String, String> readLog = null; |
| | | try |
| | | { |
| | | writeLog = openLog(LogFileTest.RECORD_PARSER); |
| | | readLog = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | for (int i = 1; i <= 10; i++) |
| | | { |
| | | Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i); |
| | | writeLog.append(record); |
| | | assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record); |
| | | assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1")); |
| | | assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record); |
| | | assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog, readLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testTwoConcurrentWrite() throws Exception |
| | | { |
| | | Log<String, String> writeLog1 = null; |
| | | Log<String, String> writeLog2 = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | { |
| | | writeLog1 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog2 = openLog(LogFileTest.RECORD_PARSER); |
| | | writeLog1.append(Record.from("key020", "starting record")); |
| | | AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>(); |
| | | Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef); |
| | | Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef); |
| | | write1.run(); |
| | | write2.run(); |
| | | |
| | | write1.join(); |
| | | write2.join(); |
| | | if (exceptionRef.get() != null) |
| | | { |
| | | throw exceptionRef.get(); |
| | | } |
| | | writeLog1.syncToFileSystem(); |
| | | cursor = writeLog1.getCursor("key020"); |
| | | for (int i = 1; i <= 61; i++) |
| | | { |
| | | assertThat(cursor.next()).isTrue(); |
| | | } |
| | | assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30")); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, writeLog1, writeLog2); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This test should be disabled. |
| | | * Enable it locally when you need to have an rough idea of write performance. |
| | | */ |
| | | @Test(enabled=false) |
| | | public void logWriteSpeed() throws Exception |
| | | { |
| | | Log<String, String> writeLog = null; |
| | | try |
| | | { |
| | | long sizeOf1MB = 1024*1024; |
| | | writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB); |
| | | |
| | | for (int i = 1; i < 1000000; i++) |
| | | { |
| | | writeLog.append(Record.from(String.format("key%010d", i), "value" + i)); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(writeLog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | // advance cursor to last record to ensure it is pointing to ahead log file |
| | | advanceCursorUpTo(cursor, 1, 10); |
| | | |
| | | // add new records to ensure the ahead log file is rotated |
| | | for (int i = 11; i <= 20; i++) |
| | | { |
| | | log.append(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | |
| | | // check that cursor can fully read the new records |
| | | assertThatCursorCanBeFullyRead(cursor, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor1 = log.getCursor(); |
| | | advanceCursorUpTo(cursor1, 1, 1); |
| | | cursor2 = log.getCursor(); |
| | | advanceCursorUpTo(cursor2, 1, 4); |
| | | cursor3 = log.getCursor(); |
| | | advanceCursorUpTo(cursor3, 1, 9); |
| | | cursor4 = log.getCursor(); |
| | | advanceCursorUpTo(cursor4, 1, 10); |
| | | |
| | | // add new records to ensure the ahead log file is rotated |
| | | for (int i = 11; i <= 20; i++) |
| | | { |
| | | log.append(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | |
| | | // check that cursors can fully read the new records |
| | | assertThatCursorCanBeFullyRead(cursor1, 2, 20); |
| | | assertThatCursorCanBeFullyRead(cursor2, 5, 20); |
| | | assertThatCursorCanBeFullyRead(cursor3, 10, 20); |
| | | assertThatCursorCanBeFullyRead(cursor4, 11, 20); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testClear() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | log.clear(); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved |
| | | @Test(enabled=false, expectedExceptions=ChangelogException.class) |
| | | public void testClearWhenCursorIsOpened() throws Exception |
| | | { |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | Log<String, String> log = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | cursor = log.getCursor(); |
| | | log.clear(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @DataProvider(name = "purgeKeys") |
| | | Object[][] purgeKeys() |
| | | { |
| | | // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor |
| | | return new Object[][] |
| | | { |
| | | // lowest key of the read-only log file "key005_key006.log" |
| | | { "key005", Record.from("key005", "value5"), 6, 10 }, |
| | | // key that is not the lowest of the read-only log file "key005_key006.log" |
| | | { "key006", Record.from("key005", "value5"), 6, 10 }, |
| | | // lowest key of the ahead log file "ahead.log" |
| | | { "key009", Record.from("key009", "value9"), 10, 10 }, |
| | | // key that is not the lowest of the ahead log file "ahead.log" |
| | | { "key010", Record.from("key009", "value9"), 10, 10 }, |
| | | |
| | | // key not present in log, which is between key005 and key006 |
| | | { "key005a", Record.from("key005", "value5"), 6, 10 }, |
| | | // key not present in log, which is between key006 and key007 |
| | | { "key006a", Record.from("key007", "value7"), 8, 10 }, |
| | | // key not present in log, which is lower than oldest key key001 |
| | | { "key000", Record.from("key001", "value1"), 2, 10 }, |
| | | // key not present in log, which is higher than newest key key010 |
| | | // should return the lowest key present in ahead log |
| | | { "key011", Record.from("key009", "value9"), 10, 10 }, |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Given a purge key, after purge is done, expects a new cursor to point on first record provided and |
| | | * then to be fully read starting at provided start index and finishing at provided end index. |
| | | */ |
| | | @Test(dataProvider="purgeKeys") |
| | | public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge, |
| | | int cursorStartIndex, int cursorEndIndex) throws Exception |
| | | { |
| | | Log<String, String> log = null; |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try |
| | | { |
| | | log = openLog(LogFileTest.RECORD_PARSER); |
| | | |
| | | log.purgeUpTo(purgeKey); |
| | | |
| | | cursor = log.getCursor(); |
| | | assertThat(cursor.next()).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge); |
| | | assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | for (int i = fromIndex; i <= endIndex; i++) |
| | | { |
| | | assertThat(cursor.next()).as("next() value when i=" + i).isTrue(); |
| | | assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i)); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value |
| | | * endIndex, using (keyN, valueN) where N is the index. |
| | | */ |
| | | private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | advanceCursorUpTo(cursor, fromIndex, endIndex); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | /** |
| | | * Read the cursor until exhaustion, beginning at start of cursor. |
| | | */ |
| | | private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, |
| | | int endIndex) throws Exception |
| | | { |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex); |
| | | } |
| | | |
| | | private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception |
| | | { |
| | | assertThat(cursor.next()).isFalse(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | } |
| | | |
| | | /** Returns a thread that write N records to the provided log. */ |
| | | private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix, |
| | | final AtomicReference<ChangelogException> exceptionRef) |
| | | { |
| | | return new Thread() { |
| | | public void run() |
| | | { |
| | | for (int i = 1; i <= 30; i++) |
| | | { |
| | | Record<String, String> record = Record.from( |
| | | String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i); |
| | | try |
| | | { |
| | | writeLog.append(record); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | // keep the first exception only |
| | | exceptionRef.compareAndSet(null, e); |
| | | } |
| | | } |
| | | } |
| | | }; |
| | | } |
| | | |
| | | } |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.io.File; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | |
| | | import org.assertj.core.data.MapEntry; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | public class ReplicationEnvironmentTest extends DirectoryServerTestCase |
| | | { |
| | | private static final int SERVER_ID_1 = 1; |
| | | private static final int SERVER_ID_2 = 2; |
| | | |
| | | private static final String DN1_AS_STRING = "cn=test1,dc=company.com"; |
| | | private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com"; |
| | | private static final String DN3_AS_STRING = "cn=test3,dc=company.com"; |
| | | |
| | | private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog"; |
| | | |
| | | @BeforeClass |
| | | public void setUp() throws Exception |
| | | { |
| | | // This test suite depends on having the schema available for DN decoding. |
| | | TestCaseUtils.startFakeServer(); |
| | | } |
| | | |
| | | @AfterClass |
| | | public void tearDown() throws Exception |
| | | { |
| | | TestCaseUtils.shutdownFakeServer(); |
| | | } |
| | | |
| | | @AfterMethod |
| | | public void cleanTestChangelogDirectory() |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | if (rootPath.exists()) |
| | | { |
| | | StaticUtils.recursiveDelete(rootPath); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithSingleDN() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | |
| | | assertThat(state.isEqualTo(environment.getChangelogState())).isTrue(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithMultipleDN() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>(); |
| | | try |
| | | { |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING)); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | for (int i = 0; i <= 2 ; i++) |
| | | { |
| | | for (int j = 1; j <= 10; j++) |
| | | { |
| | | // 3 domains, 10 server id each, generation id is different for each domain |
| | | replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1)); |
| | | } |
| | | } |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2)); |
| | | for (int i = 0; i <= 2 ; i++) |
| | | { |
| | | assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); |
| | | } |
| | | assertThat(state.getDomainToGenerationId()).containsOnly( |
| | | MapEntry.entry(domainDNs.get(0), 1L), |
| | | MapEntry.entry(domainDNs.get(1), 2L), |
| | | MapEntry.entry(domainDNs.get(2), 3L)); |
| | | |
| | | assertThat(state.isEqualTo(environment.getChangelogState())).isTrue(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB); |
| | | StaticUtils.close(replicaDBs); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithReplicaOffline() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | // put server id 1 offline |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | | environment.notifyReplicaOffline(domainDN, offlineCSN); |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | |
| | | assertThat(state.isEqualTo(environment.getChangelogState())).isTrue(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME); |
| | | offlineStateFile.createNewFile(); |
| | | |
| | | environment.readOnDiskChangelogState(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | // put server id 1 offline twice |
| | | CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100); |
| | | environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN()); |
| | | CSN lastOfflineCSN = csnGenerator.newCSN(); |
| | | environment.notifyReplicaOffline(domainDN, lastOfflineCSN); |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN))); |
| | | assertThat(state.isEqualTo(environment.getChangelogState())).isTrue(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | |
| | | // put server id 1 offline |
| | | environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1)); |
| | | // put server id 1 online again |
| | | environment.notifyReplicaOnline(domainDN, SERVER_ID_1); |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | assertThat(state.getOfflineReplicas()).isEmpty(); |
| | | assertThat(state.isEqualTo(environment.getChangelogState())).isTrue(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null; |
| | | try |
| | | { |
| | | final File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | final DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | cnDB = environment.getOrCreateCNIndexDB(); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1); |
| | | environment.notifyReplicaOffline(domainDN, offlineCSN); |
| | | |
| | | final ChangelogState state = environment.readOnDiskChangelogState(); |
| | | |
| | | assertThat(state.getDomainToServerIds()).containsKeys(domainDN); |
| | | assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1); |
| | | assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L)); |
| | | assertThat(state.getOfflineReplicas().getSnapshot()) |
| | | .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN))); |
| | | |
| | | assertThat(state.isEqualTo(environment.getChangelogState())).isTrue(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testMissingDomainDirectory() throws Exception |
| | | { |
| | | Log<Long,ChangeNumberIndexRecord> cnDB = null; |
| | | Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null; |
| | | try |
| | | { |
| | | File rootPath = new File(TEST_DIRECTORY_CHANGELOG); |
| | | DN domainDN = DN.valueOf(DN1_AS_STRING); |
| | | ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null); |
| | | replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1); |
| | | replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1); |
| | | |
| | | // delete the domain directory created for the 2 replica DBs to break the |
| | | // consistency with domain state file |
| | | StaticUtils.recursiveDelete(new File(rootPath, "1.domain")); |
| | | |
| | | environment.readOnDiskChangelogState(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cnDB, replicaDB, replicaDB2); |
| | | } |
| | | } |
| | | } |