From bf7ffad243778cef715606b14a0f9651910095ee Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 23 Sep 2014 12:01:00 +0000
Subject: [PATCH] Actual merge of complete changelog.file test package, which contains test cases for file-based changelog
---
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java | 441 +++++++++
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 602 ++++++++++++
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java | 296 ++++++
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java | 598 ++++++++++++
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java | 552 +++++++++++
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java | 2
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java | 327 ++++++
7 files changed, 2,817 insertions(+), 1 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index e4ebf0e..84b1d98 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -325,7 +325,7 @@
{
final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
return new ByteStringBuilder()
- .append(record.getKey())
+ .append((long) record.getKey())
.append(cnIndexRecord.getBaseDN().toString())
.append(STRING_SEPARATOR)
.append(cnIndexRecord.getCSN().toByteString()).toByteString();
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
new file mode 100644
index 0000000..ef8c19c
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
@@ -0,0 +1,552 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.Pair;
+
+@SuppressWarnings("javadoc")
+public class BlockLogReaderWriterTest extends DirectoryServerTestCase
+{
+ private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+ private static final File TEST_FILE = new File(TEST_DIRECTORY, "file");
+
+ private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser();
+
+ private static final int INT_RECORD_SIZE = 12;
+
+ @BeforeClass
+ void createTestDirectory()
+ {
+ TEST_DIRECTORY.mkdirs();
+ }
+
+ @BeforeMethod
+ void ensureTestFileIsEmpty() throws Exception
+ {
+ StaticUtils.recursiveDelete(TEST_FILE);
+ }
+
+ @AfterClass
+ void cleanTestDirectory()
+ {
+ StaticUtils.recursiveDelete(TEST_DIRECTORY);
+ }
+
+ @DataProvider(name = "recordsData")
+ Object[][] recordsData()
+ {
+ return new Object[][]
+ {
+ // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes
+
+ // size of block, expected size of file after all records are written, records
+ { 12, 12, records(1) }, // zero block marker
+ { 10, 16, records(1) }, // one block marker
+ { 8, 16, records(1) }, // one block marker
+ { 7, 20, records(1) }, // two block markers
+ { 6, 24, records(1) }, // three block markers
+ { 5, 40, records(1) }, // seven block markers
+
+ { 16, 28, records(1,2) }, // one block marker
+ { 12, 32, records(1,2) }, // two block markers
+ { 10, 36, records(1,2) }, // three block markers
+ };
+ }
+
+ /**
+ * Tests that records can be written then read correctly for different block sizes.
+ */
+ @Test(dataProvider="recordsData")
+ public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records)
+ throws Exception
+ {
+ writeRecords(blockSize, records);
+
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+ for (int i = 0; i < records.size(); i++)
+ {
+ Record<Integer, Integer> record = reader.readRecord();
+ assertThat(record).isEqualTo(records.get(i));
+ }
+ assertThat(reader.readRecord()).isNull();
+ assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile);
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ @DataProvider(name = "recordsForSeek")
+ Object[][] recordsForSeek()
+ {
+ Object[][] data = new Object[][] {
+ // records, key, key matching strategy, position strategy, expectedRecord, should be found ?
+
+ // no record
+ { records(), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+ // 1 record
+ { records(1), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(1), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(1), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+ { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
+ { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+ // 3 records equal matching
+ { records(1,2,3), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(1,2,3), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3), 2, EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
+ { records(1,2,3), 3, EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+ { records(1,2,3), 4, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+ { records(1,2,3), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(1,2,3), 2, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
+ { records(1,2,3), 3, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+ // 3 records less than or equal matching
+ { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
+ { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+ { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+
+ { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+ { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
+ { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+ { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+ // 3 records greater or equal matching
+ { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
+ { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+
+ { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
+ { records(1,2,3), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+ { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
+ { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+ { records(1,2,3), 4, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+
+ // 10 records equal matching
+ { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+ { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+ { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
+ { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+
+ // 10 records less than or equal matching
+ { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+ { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+
+ { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
+ { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+ // 10 records greater or equal matching
+ { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+ { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(7), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+ { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
+ { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true },
+ { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+ { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+
+
+ };
+
+ // For each test case, do a test with various block sizes to ensure algorithm is not broken
+ // on a given size
+ int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 };
+ Object[][] finalData = new Object[sizes.length * data.length][7];
+ for (int i = 0; i < data.length; i++)
+ {
+ for (int j = 0; j < sizes.length; j++)
+ {
+ Object[] a = data[i];
+ // add the block size at beginning of each test case
+ finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4], a[5]};
+ }
+ }
+ return finalData;
+ }
+
+ @Test(dataProvider = "recordsForSeek")
+ public void testSeekToRecord(int blockSize, List<Record<Integer, Integer>> records, int key,
+ KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, Record<Integer, Integer> expectedRecord,
+ boolean shouldBeFound) throws Exception
+ {
+ writeRecords(blockSize, records);
+
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+ Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchingStrategy, positionStrategy);
+
+ assertThat(result.getFirst()).isEqualTo(shouldBeFound);
+ assertThat(result.getSecond()).isEqualTo(expectedRecord);
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ @Test
+ public void testGetClosestMarkerBeforeOrAtPosition() throws Exception
+ {
+ final int blockSize = 10;
+ BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20);
+ }
+
+ @Test
+ public void testGetClosestMarkerStrictlyAfterPosition() throws Exception
+ {
+ final int blockSize = 10;
+ BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20);
+ assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30);
+ }
+
+ @Test
+ public void testSearchClosestMarkerToKey() throws Exception
+ {
+ int blockSize = 20;
+ writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20));
+
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+
+ assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0);
+ assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0);
+ assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20);
+ assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20);
+ assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40);
+ assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60);
+ assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80);
+ assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80);
+ assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100);
+ assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120);
+ assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140);
+ assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260);
+ assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280);
+ // out of reach keys
+ assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280);
+ assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280);
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ @Test
+ public void testLengthOfStoredRecord() throws Exception
+ {
+ final int blockSize = 100;
+ BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+ int recordLength = 10;
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+
+ recordLength = 150;
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+
+ recordLength = 200;
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+ assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+ }
+
+ /**
+ * This test is intended to be run only manually to check the performance between binary search
+ * and sequential access.
+ * Note that sequential run may be extremely long when using high values.
+ */
+ @Test(enabled=false)
+ public void seekPerformanceComparison() throws Exception
+ {
+ // You may change these values
+ long fileSizeInBytes = 100*1024*1024;
+ int numberOfValuesToSeek = 50000;
+ int blockSize = 256;
+
+ writeRecordsToReachFileSize(blockSize, fileSizeInBytes);
+ BlockLogReader<Integer, Integer> reader = null;
+ try
+ {
+ reader = newReader(blockSize);
+ List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek);
+ System.out.println("File size: " + TEST_FILE.length() + " bytes");
+
+ System.out.println("\n---- BINARY SEARCH");
+ long minTime = Long.MAX_VALUE;
+ long maxTime = Long.MIN_VALUE;
+ final long t0 = System.nanoTime();
+ for (Integer key : keysToSeek)
+ {
+ final long ts = System.nanoTime();
+ Pair<Boolean, Record<Integer, Integer>> result =
+ reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+ final long te = System.nanoTime() - ts;
+ if (te < minTime) minTime = te;
+ if (te > maxTime) maxTime = te;
+ // show time for seeks that last more than N microseconds (tune as needed)
+ if (te/1000 > 1000)
+ {
+ System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds");
+ }
+ assertThat(result.getSecond()).isEqualTo(record(key));
+ }
+ System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds");
+ System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+ System.out.println("Max time for a search: " + maxTime/1000 + " microseconds");
+ System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds");
+
+ System.out.println("\n---- SEQUENTIAL SEARCH");
+ minTime = Long.MAX_VALUE;
+ maxTime = Long.MIN_VALUE;
+ final long t1 = System.nanoTime();
+ for (Integer val : keysToSeek)
+ {
+ long ts = System.nanoTime();
+ Pair<Boolean, Record<Integer, Integer>> result =
+ reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+ assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
+ long te = System.nanoTime() - ts;
+ if (te < minTime) minTime = te;
+ if (te > maxTime) maxTime = te;
+ }
+ System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds");
+ System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+ System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds");
+ System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds");
+ }
+ finally
+ {
+ StaticUtils.close(reader);
+ }
+ }
+
+ /** Write provided records with the provided block size. */
+ private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException
+ {
+ BlockLogWriter<Integer, Integer> writer = null;
+ try
+ {
+ writer = newWriter(blockSize);
+ for (Record<Integer, Integer> record : records)
+ {
+ writer.write(record);
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writer);
+ }
+ }
+
+ /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */
+ private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception
+ {
+ final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE;
+ final int[] values = new int[numberOfValues];
+ for (int i = 0; i < numberOfValues; i++)
+ {
+ values[i] = i+1;
+ }
+ writeRecords(blockSize, records(values));
+ }
+
+ /** Returns provided number of keys to seek in random order, for a file of provided size. */
+ private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys)
+ {
+ final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE;
+ final List<Integer> values = new ArrayList<Integer>(numberOfValues);
+ for (int i = 0; i < numberOfValues; i++)
+ {
+ values.add(i+1);
+ }
+ Collections.shuffle(values);
+ return values.subList(0, numberOfKeys);
+ }
+
+ private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException
+ {
+ return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock);
+ }
+
+ private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException
+ {
+ return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"),
+ RECORD_PARSER, blockSize);
+ }
+
+ private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException
+ {
+ return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize);
+ }
+
+ /** Helper to build a list of records. */
+ private List<Record<Integer, Integer>> records(int...keys)
+ {
+ List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>();
+ for (int key : keys)
+ {
+ records.add(Record.from(key, key));
+ }
+ return records;
+ }
+
+ /** Helper to build a record. */
+ private Record<Integer, Integer> record(int key)
+ {
+ return Record.from(key, key);
+ }
+
+ /**
+ * Record parser implementation for records with keys and values as integers to be used in tests.
+ * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value),
+ * which is useful for some tests.
+ */
+ private static class IntRecordParser implements RecordParser<Integer, Integer>
+ {
+ public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException
+ {
+ ByteSequenceReader reader = data.asReader();
+ int key = reader.getInt();
+ int value = reader.getInt();
+ return Record.from(key, value);
+ }
+
+ public ByteString encodeRecord(Record<Integer, Integer> record)
+ {
+ return new ByteStringBuilder().append((int) record.getKey()).append((int) record.getValue()).toByteString();
+ }
+
+ @Override
+ public Integer decodeKeyFromString(String key) throws ChangelogException
+ {
+ return Integer.valueOf(key);
+ }
+
+ @Override
+ public String encodeKeyToString(Integer key)
+ {
+ return String.valueOf(key);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public Integer getMaxKey()
+ {
+ return Integer.MAX_VALUE;
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
new file mode 100644
index 0000000..948a221
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -0,0 +1,296 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
+import static org.testng.Assert.*;
+
+@SuppressWarnings("javadoc")
+public class FileChangeNumberIndexDBTest extends ReplicationTestCase
+{
+
+ @DataProvider(name = "messages")
+ Object[][] createMessages() throws Exception
+ {
+ CSN[] csns = generateCSNs(1, 0, 3);
+ DN dn1 = DN.valueOf("o=test1");
+ return new Object[][] {
+ { new ChangeNumberIndexRecord(0L, dn1, csns[1]) },
+ { new ChangeNumberIndexRecord(999L, dn1, csns[2]) },
+ };
+ }
+
+ @Test(dataProvider="messages")
+ public void testRecordParser(ChangeNumberIndexRecord msg) throws Exception
+ {
+ RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
+
+ ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg));
+ Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
+
+ assertThat(record).isNotNull();
+ assertThat(record.getKey()).isEqualTo(msg.getChangeNumber());
+ assertThat(record.getValue().getBaseDN()).isEqualTo(msg.getBaseDN());
+ assertThat(record.getValue().getCSN()).isEqualTo(msg.getCSN());
+ }
+
+ @Test()
+ public void testAddAndReadRecords() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ replicationServer = newReplicationServer();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0);
+ final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+
+ long[] changeNumbers = addThreeRecords(cnIndexDB);
+ long cn1 = changeNumbers[0];
+ long cn2 = changeNumbers[1];
+ long cn3 = changeNumbers[2];
+
+ // Checks
+ assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+
+ assertEquals(cnIndexDB.count(), 3, "Db count");
+ assertFalse(cnIndexDB.isEmpty());
+
+ DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
+ assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
+
+ cursor = cnIndexDB.getCursorFrom(cn2);
+ assertCursorReadsInOrder(cursor, cn2, cn3);
+
+ cursor = cnIndexDB.getCursorFrom(cn3);
+ assertCursorReadsInOrder(cursor, cn3);
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+ @Test()
+ public void testClear() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ replicationServer = newReplicationServer();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0);
+ final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+ addThreeRecords(cnIndexDB);
+
+ cnIndexDB.clear();
+
+ assertNull(cnIndexDB.getOldestRecord());
+ assertNull(cnIndexDB.getNewestRecord());
+ assertEquals(cnIndexDB.count(), 0);
+ assertTrue(cnIndexDB.isEmpty());
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+
+ /**
+ * This test makes basic operations of a ChangeNumberIndexDB:
+ * <ol>
+ * <li>create the db</li>
+ * <li>add records</li>
+ * <li>read them with a cursor</li>
+ * <li>set a very short trim period</li>
+ * <li>wait for the db to be trimmed / here since the changes are not stored
+ * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
+ * </ol>
+ */
+ // TODO : this works only if we ensure that there is a rotation of ahead log file
+ // at the right place. First two records are 37 and 76 bytes long,
+ // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
+ // Re-enable this test when max file size is customizable for log
+ @Test(enabled=false)
+ public void testPurge() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ try
+ {
+ replicationServer = newReplicationServer();
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0); // disable purging
+
+ // Prepare data to be stored in the db
+ DN baseDN1 = DN.valueOf("o=test1");
+ DN baseDN2 = DN.valueOf("o=test2");
+ DN baseDN3 = DN.valueOf("o=test3");
+
+ CSN[] csns = generateCSNs(1, 0, 3);
+
+ // Add records
+ final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+ long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+ addRecord(cnIndexDB, baseDN2, csns[1]);
+ long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+
+ // The ChangeNumber should not get purged
+ final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
+ assertEquals(oldestCN, cn1);
+ assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+
+ DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
+ try
+ {
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
+ assertTrue(cursor.next());
+ assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
+ assertFalse(cursor.next());
+ }
+ finally
+ {
+ StaticUtils.close(cursor);
+ }
+
+ // Now test that purging removes all changes but the last one
+ changelogDB.setPurgeDelay(1);
+ int count = 0;
+ while (cnIndexDB.count() > 1 && count < 100)
+ {
+ Thread.sleep(10);
+ count++;
+ }
+ assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
+ }
+ finally
+ {
+ remove(replicationServer);
+ }
+ }
+
+ private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception
+ {
+ // Prepare data to be stored in the db
+ DN baseDN1 = DN.valueOf("o=test1");
+ DN baseDN2 = DN.valueOf("o=test2");
+ DN baseDN3 = DN.valueOf("o=test3");
+
+ CSN[] csns = generateCSNs(1, 0, 3);
+
+ // Add records
+ long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+ long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
+ long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+ return new long[] { cn1, cn2, cn3 };
+ }
+
+ private long addRecord(FileChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
+ {
+ return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
+ }
+
+ private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
+ {
+ assertEquals(record.getCSN(), csn);
+ assertEquals(record.getBaseDN(), baseDN);
+ }
+
+ private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
+ {
+ final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
+ final FileChangeNumberIndexDB cnIndexDB =
+ (FileChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
+ assertTrue(cnIndexDB.isEmpty());
+ return cnIndexDB;
+ }
+
+ /**
+ * The newest record is no longer cleared to ensure persistence to the last
+ * generated change number across server restarts.
+ */
+ private void assertOnlyNewestRecordIsLeft(FileChangeNumberIndexDB cnIndexDB,
+ int newestChangeNumber) throws ChangelogException
+ {
+ assertEquals(cnIndexDB.count(), 1);
+ assertFalse(cnIndexDB.isEmpty());
+ final ChangeNumberIndexRecord oldest = cnIndexDB.getOldestRecord();
+ final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
+ assertEquals(oldest.getChangeNumber(), newestChangeNumber);
+ assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
+ assertEquals(oldest.getBaseDN(), newest.getBaseDN());
+ assertEquals(oldest.getCSN(), newest.getCSN());
+ }
+
+ private ReplicationServer newReplicationServer() throws Exception
+ {
+ TestCaseUtils.startServer();
+ final int port = TestCaseUtils.findFreePort();
+ final ReplServerFakeConfiguration cfg =
+ new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.LOG, 0, 2, 0, 100, null);
+ cfg.setComputeChangeNumber(true);
+ return new ReplicationServer(cfg);
+ }
+
+ private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
+ long... cns) throws ChangelogException
+ {
+ try
+ {
+ for (long cn : cns)
+ {
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getChangeNumber(), cn);
+ }
+ assertFalse(cursor.next());
+ }
+ finally
+ {
+ cursor.close();
+ }
+ }
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
new file mode 100644
index 0000000..96640ca
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -0,0 +1,602 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.assertj.core.api.SoftAssertions;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.types.DN;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
+
+/**
+ * Test the FileReplicaDB class
+ */
+@SuppressWarnings("javadoc")
+public class FileReplicaDBTest extends ReplicationTestCase
+{
+ private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ private DN TEST_ROOT_DN;
+
+ /**
+ * Utility - log debug message - highlight it is from the test and not
+ * from the server code. Makes easier to observe the test steps.
+ */
+ private void debugInfo(String tn, String s)
+ {
+ logger.trace("** TEST %s ** %s", tn, s);
+ }
+
+ @BeforeClass
+ public void setup() throws Exception
+ {
+ TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
+ }
+
+ @DataProvider(name = "messages")
+ Object[][] createMessages()
+ {
+ CSN[] csns = generateCSNs(1, 0, 2);
+ return new Object[][] {
+ { new DeleteMsg(TEST_ROOT_DN, csns[0], "uid") },
+ { new DeleteMsg(TEST_ROOT_DN, csns[1], "uid") },
+ };
+ }
+
+ @Test(dataProvider="messages")
+ public void testRecordParser(UpdateMsg msg) throws Exception
+ {
+ RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
+
+ ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg));
+ Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
+
+ assertThat(record).isNotNull();
+ assertThat(record.getKey()).isEqualTo(msg.getCSN());
+ assertThat(record.getValue()).isEqualTo(msg);
+ }
+
+ @Test
+ public void testDomainDNWithForwardSlashes() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+
+ replicaDB = newReplicaDB(replicationServer);
+ CSN[] csns = generateCSNs(1, 0, 1);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+
+ waitChangesArePersisted(replicaDB, 1);
+ assertFoundInOrder(replicaDB, csns[0]);
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ @Test
+ public void testAddAndReadRecords() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+
+ replicaDB = newReplicaDB(replicationServer);
+ CSN[] csns = generateCSNs(1, 0, 5);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+
+ waitChangesArePersisted(replicaDB, 3);
+
+ assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+ assertLimits(replicaDB, csns[0], csns[2]);
+
+ DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
+ replicaDB.add(update4);
+ waitChangesArePersisted(replicaDB, 4);
+
+ assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
+ assertFoundInOrder(replicaDB, csns[2], csns[3]);
+ assertFoundInOrder(replicaDB, csns[3]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ @Test
+ public void testGenerateCursorFrom() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+ replicaDB = newReplicaDB(replicationServer);
+
+ final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
+ final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+ csns2.remove(csns[3]);
+
+ for (CSN csn : csns2)
+ {
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
+ }
+ waitChangesArePersisted(replicaDB, 4);
+
+ for (CSN csn : csns2)
+ {
+ assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+ }
+ assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
+
+ for (int i = 0; i < csns2.size() - 1; i++)
+ {
+ assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+ }
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ @DataProvider
+ Object[][] dataForTestsWithCursorReinitialization()
+ {
+ return new Object[][] {
+ // the index to use in CSN array for the start key of the cursor
+ { 0 },
+ { 1 },
+ { 4 },
+ };
+ }
+
+ @Test(dataProvider="dataForTestsWithCursorReinitialization")
+ public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ DBCursor<UpdateMsg> cursor = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+ replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
+
+ cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey],
+ GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+ assertFalse(cursor.next());
+
+ int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
+ for (int i : indicesToAdd)
+ {
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ }
+ waitChangesArePersisted(replicaDB, 4);
+
+ for (int i = csnIndexForStartKey+1; i < 5; i++)
+ {
+ if (i != 3)
+ {
+ assertTrue(cursor.next());
+ assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i);
+ }
+ }
+ assertFalse(cursor.next());
+ }
+ finally
+ {
+ close(cursor);
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ // TODO : this works only if we ensure that there is a rotation of ahead log file
+ // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have
+ // the last record alone in the ahead log file
+ // Re-enable this test when max file size is customizable for log
+ @Test(enabled=false)
+ public void testPurge() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+ replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, 0, 5);
+
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"));
+
+ waitChangesArePersisted(replicaDB, 4);
+
+ replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
+
+ int count = 0;
+ boolean purgeSucceeded = false;
+ final CSN expectedNewestCSN = csns[3];
+ do
+ {
+ Thread.sleep(10);
+
+ final CSN oldestCSN = replicaDB.getOldestCSN();
+ final CSN newestCSN = replicaDB.getNewestCSN();
+ purgeSucceeded =
+ oldestCSN.equals(expectedNewestCSN)
+ && newestCSN.equals(expectedNewestCSN);
+ count++;
+ }
+ while (!purgeSucceeded && count < 100);
+ assertTrue(purgeSucceeded);
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ /**
+ * Test the feature of clearing a FileReplicaDB used by a replication server.
+ * The clear feature is used when a replication server receives a request to
+ * reset the generationId of a given domain.
+ */
+ @Test
+ public void testClear() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100, 5000);
+ replicaDB = newReplicaDB(replicationServer);
+
+ CSN[] csns = generateCSNs(1, 0, 3);
+
+ // Add the changes and check they are here
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+ assertLimits(replicaDB, csns[0], csns[2]);
+
+ // Clear DB and check it is cleared.
+ replicaDB.clear();
+ assertLimits(replicaDB, null, null);
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy, final CSN expectedCSN)
+ throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ /**
+ * Test the logic that manages counter records in the FileReplicaDB in order to
+ * optimize the oldest and newest records in the replication changelog db.
+ */
+ @Test(groups = { "opendj-256" })
+ public void testGetOldestNewestCSNs() throws Exception
+ {
+ // It's worth testing with 2 different setting for counterRecord
+ // - a counter record is put every 10 Update msg in the db - just a unit
+ // setting.
+ // - a counter record is put every 1000 Update msg in the db - something
+ // closer to real setting.
+ // In both cases, we want to test the counting algorithm,
+ // - when start and stop are before the first counter record,
+ // - when start and stop are before and after the first counter record,
+ // - when start and stop are after the first counter record,
+ // - when start and stop are before and after more than one counter record,
+ // After a purge.
+ // After shutting down/closing and reopening the db.
+
+ // TODO : do we need the management of counter records ?
+ // Use unreachable limits for now because it is not implemented
+ testGetOldestNewestCSNs(40, 100);
+ testGetOldestNewestCSNs(4000, 10000);
+ }
+
+ private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
+ {
+ String tn = "testDBCount("+max+","+counterWindow+")";
+ debugInfo(tn, "Starting test");
+
+ File testRoot = null;
+ ReplicationServer replicationServer = null;
+ ReplicationEnvironment dbEnv = null;
+ FileReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+
+ testRoot = createCleanDir();
+ dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
+ replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
+
+ // Populate the db with 'max' msg
+ int mySeqnum = 1;
+ CSN csns[] = new CSN[2 * (max + 1)];
+ long now = System.currentTimeMillis();
+ for (int i=1; i<=max; i++)
+ {
+ csns[i] = new CSN(now + i, mySeqnum, 1);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ mySeqnum+=2;
+ }
+ waitChangesArePersisted(replicaDB, max, counterWindow);
+ assertLimits(replicaDB, csns[1], csns[max]);
+
+ // Now we want to test that after closing and reopening the db, the
+ // counting algo is well reinitialized and when new messages are added
+ // the new counter are correctly generated.
+ debugInfo(tn, "SHUTDOWN replicaDB and recreate");
+ replicaDB.shutdown();
+
+ replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
+ assertLimits(replicaDB, csns[1], csns[max]);
+
+ // Populate the db with 'max' msg
+ for (int i=max+1; i<=2 * max; i++)
+ {
+ csns[i] = new CSN(now + i, mySeqnum, 1);
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+ mySeqnum+=2;
+ }
+ waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
+ assertLimits(replicaDB, csns[1], csns[2 * max]);
+
+ replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
+
+ String testcase = "AFTER PURGE (oldest, newest)=";
+ debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
+ assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
+
+ // Clear ...
+ debugInfo(tn,"clear:");
+ replicaDB.clear();
+
+ // Check the db is cleared.
+ assertEquals(null, replicaDB.getOldestCSN());
+ assertEquals(null, replicaDB.getNewestCSN());
+ debugInfo(tn,"Success");
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ if (dbEnv != null)
+ {
+ dbEnv.shutdown();
+ }
+ remove(replicationServer);
+ TestCaseUtils.deleteDirectory(testRoot);
+ }
+ }
+
+ private void assertLimits(FileReplicaDB replicaDB, CSN oldestCSN, CSN newestCSN)
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(replicaDB.getOldestCSN()).as("Wrong oldest CSN").isEqualTo(oldestCSN);
+ softly.assertThat(replicaDB.getNewestCSN()).as("Wrong newest CSN").isEqualTo(newestCSN);
+ softly.assertAll();
+ }
+
+ private void shutdown(FileReplicaDB replicaDB)
+ {
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ }
+
+ static CSN[] generateCSNs(int serverId, long timestamp, int number)
+ {
+ CSNGenerator gen = new CSNGenerator(serverId, timestamp);
+ CSN[] csns = new CSN[number];
+ for (int i = 0; i < csns.length; i++)
+ {
+ csns[i] = gen.newCSN();
+ }
+ return csns;
+ }
+
+ private void waitChangesArePersisted(FileReplicaDB replicaDB,
+ int nbRecordsInserted) throws Exception
+ {
+ waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
+ }
+
+ private void waitChangesArePersisted(FileReplicaDB replicaDB,
+ int nbRecordsInserted, int counterWindow) throws Exception
+ {
+ // one counter record is inserted every time "counterWindow"
+ // records have been inserted
+ int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
+
+ int count = 0;
+ while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
+ {
+ Thread.sleep(10);
+ count++;
+ }
+ assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
+ }
+
+ private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
+ throws IOException, ConfigException
+ {
+ final int changelogPort = findFreePort();
+ final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
+ changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null);
+ return new ReplicationServer(conf);
+ }
+
+ private FileReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
+ {
+ final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
+ return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
+ }
+
+ private File createCleanDir() throws IOException
+ {
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
+ + File.separator + "build");
+ path = path + File.separator + "unit-tests" + File.separator + "FileReplicaDB";
+ final File testRoot = new File(path);
+ TestCaseUtils.deleteDirectory(testRoot);
+ testRoot.mkdirs();
+ return testRoot;
+ }
+
+ private void assertFoundInOrder(FileReplicaDB replicaDB, CSN... csns) throws Exception
+ {
+ if (csns.length == 0)
+ {
+ return;
+ }
+
+ assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+ assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
+ }
+
+ private void assertFoundInOrder(FileReplicaDB replicaDB,
+ final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+ try
+ {
+ assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+ for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+ {
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).as(msg).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+ softly.assertAll();
+ }
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
new file mode 100644
index 0000000..a1750d8
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -0,0 +1,441 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogFileTest extends DirectoryServerTestCase
+{
+ private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+ private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME);
+
+ static final StringRecordParser RECORD_PARSER = new StringRecordParser();
+
+ @BeforeClass
+ public void createTestDirectory()
+ {
+ TEST_DIRECTORY.mkdirs();
+ }
+
+ @BeforeMethod
+ /**
+ * Create a new log file with ten records starting from (key01, value1) until (key10, value10).
+ * So log contains keys "key01", "key02", ..., "key10"
+ */
+ public void initialize() throws Exception
+ {
+ if (TEST_LOG_FILE.exists())
+ {
+ TEST_LOG_FILE.delete();
+ }
+ LogFile<String, String> logFile = null;
+ try
+ {
+ logFile = getLogFile(RECORD_PARSER);
+
+ for (int i = 1; i <= 10; i++)
+ {
+ logFile.append(Record.from(String.format("key%02d", i), "value"+i));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(logFile);
+ }
+ }
+
+ @AfterClass
+ public void cleanTestChangelogDirectory()
+ {
+ StaticUtils.recursiveDelete(TEST_DIRECTORY);
+ }
+
+ private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
+ {
+ return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser);
+ }
+
+ @Test
+ public void testCursor() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = changelog.getCursor();
+
+ assertThatCursorCanBeFullyRead(cursor, 1, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @DataProvider
+ Object[][] cursorPositionTo()
+ {
+ return new Object[][] {
+ // key to position to, key matching strategy, position strategy, position is found ?,
+ // expected start index of cursor (use -1 if cursor should be exhausted), expected end index of cursor
+
+ // equal
+ { "key00", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+ { "key02", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
+ { "key05", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+ { "key050", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+ { "key07", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
+ { "key10", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+ { "key11", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+
+ { "key00", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+ { "key02", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
+ { "key05", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+ { "key050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+ { "key07", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
+ { "key10", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+ { "key11", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+
+ // less than or equal
+
+ // key00 is a special case : position is not found but cursor is positioned on beginning
+ // so it is possible to iterate on it from start to end
+ { "key00", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, 1, 10},
+ { "key02", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
+ { "key05", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+ { "key050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+ { "key07", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
+ { "key10", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+ { "key11", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+
+ // key00 is a special case : position is not found but cursor is positioned on beginning
+ // so it is possible to iterate on it from 2 to end
+ { "key00", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, 2, 10},
+ { "key02", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
+ { "key05", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+ { "key050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+ { "key07", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
+ { "key10", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+ { "key11", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+
+ // greater than or equal
+ { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 1, 10},
+ { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
+ { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+ { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 6, 10},
+ { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
+ { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+ { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+
+ { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 1, 10},
+ { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
+ { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+ { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+ { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
+ { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+ { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+
+ };
+ }
+
+ /**
+ * Test cursor positioning for a given key, matching strategy and position strategy.
+ * Cursor is fully read from the expected starting index to the expected end index, unless it is expected
+ * to be directly exhausted.
+ */
+ @Test(dataProvider="cursorPositionTo")
+ public void testCursorPositionTo(String key, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
+ boolean positionShouldBeFound, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ LogFileCursor<String, String> cursor = null;
+ try {
+ cursor = changelog.getCursor();
+ boolean success = cursor.positionTo(key, matchingStrategy, positionStrategy);
+
+ assertThat(success).isEqualTo(positionShouldBeFound);
+ if (cursorShouldStartAt >= 0)
+ {
+ assertThatCursorCanBeFullyRead(cursor, cursorShouldStartAt, cursorShouldEndAt);
+ }
+ else
+ {
+ assertThatCursorIsExhausted(cursor);
+ }
+ }
+ finally {
+ StaticUtils.close(cursor, changelog);
+ }
+ }
+
+ @Test
+ public void testGetOldestRecord() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = changelog.getOldestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key01", "value1"));
+ }
+ finally {
+ StaticUtils.close(changelog);
+ }
+ }
+
+ @Test
+ public void testGetNewestRecord() throws Exception
+ {
+ LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = changelog.getNewestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key10", "value10"));
+ }
+ finally {
+ StaticUtils.close(changelog);
+ }
+ }
+
+ @DataProvider(name = "corruptedRecordData")
+ Object[][] corruptedRecordData()
+ {
+ return new Object[][]
+ {
+ // write partial record size (should be 4 bytes)
+ { 1, new ByteStringBuilder().append((byte) 0) },
+ // write partial record size (should be 4 bytes)
+ { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) },
+ // write size only
+ { 3, new ByteStringBuilder().append(10) },
+ // write size + key
+ { 4, new ByteStringBuilder().append(100).append("key") },
+ // write size + key + separator
+ { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) },
+ // write size + key + separator + partial value
+ { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") },
+ };
+ }
+
+ @Test(dataProvider="corruptedRecordData")
+ public void testRecoveryOnCorruptedLogFile(
+ @SuppressWarnings("unused") int unusedId,
+ ByteStringBuilder corruptedRecordData) throws Exception
+ {
+ LogFile<String, String> logFile = null;
+ DBCursor<Record<String, String>> cursor = null;
+ try
+ {
+ corruptTestLogFile(corruptedRecordData);
+
+ // open the log file: the file should be repaired at this point
+ logFile = getLogFile(RECORD_PARSER);
+
+ // write a new valid record
+ logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11));
+
+ // ensure log can be fully read including the new record
+ cursor = logFile.getCursor();
+ assertThatCursorCanBeFullyRead(cursor, 1, 11);
+ }
+ finally
+ {
+ StaticUtils.close(logFile);
+ }
+ }
+
+ /**
+ * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log
+ * file.
+ */
+ private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception
+ {
+ RandomAccessFile output = null;
+ try {
+ output = new RandomAccessFile(TEST_LOG_FILE, "rwd");
+ output.seek(output.length());
+ output.write(corruptedRecordData.toByteArray());
+ }
+ finally
+ {
+ StaticUtils.close(output);
+ }
+ }
+
+ @Test
+ /**
+ * Test that changes are visible immediately to a reader after a write.
+ */
+ public void testWriteAndReadOnSameLogFile() throws Exception
+ {
+ LogFile<String, String> writeLog = null;
+ LogFile<String, String> readLog = null;
+ try
+ {
+ writeLog = getLogFile(RECORD_PARSER);
+ readLog = getLogFile(RECORD_PARSER);
+
+ for (int i = 1; i <= 100; i++)
+ {
+ Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
+ writeLog.append(record);
+ assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+ assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
+ assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+ assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1"));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writeLog, readLog);
+ }
+ }
+
+ /**
+ * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+ * endIndex, using (keyN, valueN) where N is the index.
+ */
+ private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ assertThat(cursor.getRecord()).isNull();
+ for (int i = fromIndex; i <= endIndex; i++)
+ {
+ assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i));
+ }
+ assertThatCursorIsExhausted(cursor);
+ }
+
+ private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
+ {
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ /**
+ * Record parser implementation for records with keys as String and values as
+ * String, to be used in tests.
+ */
+ private static class StringRecordParser implements RecordParser<String, String>
+ {
+ private static final byte STRING_SEPARATOR = 0;
+
+ public Record<String, String> decodeRecord(final ByteString data) throws DecodingException
+ {
+ ByteSequenceReader reader = data.asReader();
+ String key = reader.getString(getNextStringLength(reader));
+ reader.skip(1);
+ String value = reader.getString(getNextStringLength(reader));
+ return key.isEmpty() || value.isEmpty() ? null : Record.from(key, value);
+ }
+
+ /** Returns the length of next string by looking for the zero byte used as separator. */
+ private int getNextStringLength(ByteSequenceReader reader)
+ {
+ int length = 0;
+ while (reader.peek(length) != STRING_SEPARATOR)
+ {
+ length++;
+ }
+ return length;
+ }
+
+ public ByteString encodeRecord(Record<String, String> record)
+ {
+ return new ByteStringBuilder()
+ .append(record.getKey()).append(STRING_SEPARATOR)
+ .append(record.getValue()).append(STRING_SEPARATOR).toByteString();
+ }
+
+ @Override
+ public String decodeKeyFromString(String key) throws ChangelogException
+ {
+ return key;
+ }
+
+ @Override
+ public String encodeKeyToString(String key)
+ {
+ return key;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String getMaxKey()
+ {
+ // '~' character has the highest ASCII value
+ return "~~~~";
+ }
+ }
+
+ /** A parser that can be set to fail when reading. */
+ static class FailingStringRecordParser extends StringRecordParser
+ {
+ private boolean failToRead = false;
+
+ @Override
+ public Record<String, String> decodeRecord(ByteString data) throws DecodingException
+ {
+ if (failToRead)
+ {
+ throw new DecodingException(LocalizableMessage.raw("Error when parsing record"));
+ }
+ return super.decodeRecord(data);
+ }
+
+ void setFailToRead(boolean shouldFail)
+ {
+ failToRead = shouldFail;
+ }
+ }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
new file mode 100644
index 0000000..b810314
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -0,0 +1,598 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogTest extends DirectoryServerTestCase
+{
+ // Use a directory dedicated to this test class
+ private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+ @BeforeMethod
+ public void initialize() throws Exception
+ {
+ // Delete any previous log
+ if (LOG_DIRECTORY.exists())
+ {
+ StaticUtils.recursiveDelete(LOG_DIRECTORY);
+ }
+
+ // Build a log with 10 records with String keys and String values
+ // Keys are using the format keyNNN where N is a figure
+ // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly
+ Log<String, String> log = openLog(RECORD_PARSER);
+ for (int i = 1; i <= 10; i++)
+ {
+ log.append(Record.from(String.format("key%03d", i), "value" + i));
+ }
+ log.close();
+ }
+
+ private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException
+ {
+ // Each string record has a length of approximately 18 bytes
+ // This size is set in order to have 2 records per log file before the rotation happens
+ // This allow to ensure rotation mechanism is thoroughly tested
+ // Some tests rely on having 2 records per log file (especially the purge tests), so take care
+ // if this value has to be changed
+ int sizeLimitPerFileInBytes = 30;
+
+ return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
+ }
+
+ @Test
+ public void testCursor() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor();
+
+ assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenAnExistingKey() throws Exception
+ {
+ Log<String, String> log = openLog(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor("key005");
+
+ assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenAnUnexistingKey() throws Exception
+ {
+ Log<String, String> log = openLog(RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ // key is between key005 and key006
+ cursor = log.getCursor("key005000");
+
+ assertThat(cursor).isNotNull();
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isFalse();
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorWhenGivenANullKey() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor(null);
+
+ assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @DataProvider
+ Object[][] cursorData()
+ {
+ return new Object[][] {
+ // 3 first values are input data : key to position to, key matching strategy, position strategy,
+ // 2 last values are expected output :
+ // first index of cursor (-1 if cursor should be exhausted), last index of cursor
+ { "key000", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+ { "key001", EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+ { "key004", EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
+ { "key0050", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+ { "key009", EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
+ { "key010", EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+ { "key011", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+
+ { "key000", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key001", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+ { "key004", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+ { "key0050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key009", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+ { "key010", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key011", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+
+ { "key000", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+ { "key001", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+ { "key004", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
+ { "key005", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 },
+ { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 },
+ { "key006", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 },
+ { "key009", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
+ { "key010", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+ { "key011", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+
+ { "key000", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key001", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+ { "key004", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+ { "key005", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
+ { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
+ { "key006", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 7, 10 },
+ { "key009", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+ { "key010", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key011", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+
+ { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+ { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+ { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
+ { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 },
+ { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
+ { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+ { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+
+ { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
+ { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+ { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+ { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
+ { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+ { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ };
+ }
+
+ @Test(dataProvider="cursorData")
+ public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor(key, matchingStrategy, positionStrategy);
+
+ if (cursorShouldStartAt != -1)
+ {
+ assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt);
+ }
+ else
+ {
+ assertThatCursorIsExhausted(cursor);
+ }
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ DBCursor<Record<String, String>> cursor = null;
+ try {
+ cursor = log.getCursor(null, null, null);
+
+ assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
+ }
+ finally {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testCursorWhenParserFailsToRead() throws Exception
+ {
+ FailingStringRecordParser parser = new FailingStringRecordParser();
+ Log<String, String> log = openLog(parser);
+ parser.setFailToRead(true);
+ try {
+ log.getCursor("key");
+ }
+ finally {
+ StaticUtils.close(log);
+ }
+ }
+
+ @Test
+ public void testGetOldestRecord() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = log.getOldestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key001", "value1"));
+ }
+ finally {
+ StaticUtils.close(log);
+ }
+ }
+
+ @Test
+ public void testGetNewestRecord() throws Exception
+ {
+ Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+ try
+ {
+ Record<String, String> record = log.getNewestRecord();
+
+ assertThat(record).isEqualTo(Record.from("key010", "value10"));
+ }
+ finally {
+ StaticUtils.close(log);
+ }
+ }
+
+ /**
+ * Test that changes are visible immediately to a reader after a write.
+ */
+ @Test
+ public void testWriteAndReadOnSameLog() throws Exception
+ {
+ Log<String, String> writeLog = null;
+ Log<String, String> readLog = null;
+ try
+ {
+ writeLog = openLog(LogFileTest.RECORD_PARSER);
+ readLog = openLog(LogFileTest.RECORD_PARSER);
+
+ for (int i = 1; i <= 10; i++)
+ {
+ Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
+ writeLog.append(record);
+ assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+ assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1"));
+ assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+ assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writeLog, readLog);
+ }
+ }
+
+ @Test
+ public void testTwoConcurrentWrite() throws Exception
+ {
+ Log<String, String> writeLog1 = null;
+ Log<String, String> writeLog2 = null;
+ DBCursor<Record<String, String>> cursor = null;
+ try
+ {
+ writeLog1 = openLog(LogFileTest.RECORD_PARSER);
+ writeLog2 = openLog(LogFileTest.RECORD_PARSER);
+ writeLog1.append(Record.from("key020", "starting record"));
+ AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
+ Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
+ Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
+ write1.run();
+ write2.run();
+
+ write1.join();
+ write2.join();
+ if (exceptionRef.get() != null)
+ {
+ throw exceptionRef.get();
+ }
+ writeLog1.syncToFileSystem();
+ cursor = writeLog1.getCursor("key020");
+ for (int i = 1; i <= 61; i++)
+ {
+ assertThat(cursor.next()).isTrue();
+ }
+ assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
+ }
+ finally
+ {
+ StaticUtils.close(cursor, writeLog1, writeLog2);
+ }
+ }
+
+ /**
+ * This test should be disabled.
+ * Enable it locally when you need to have an rough idea of write performance.
+ */
+ @Test(enabled=false)
+ public void logWriteSpeed() throws Exception
+ {
+ Log<String, String> writeLog = null;
+ try
+ {
+ long sizeOf1MB = 1024*1024;
+ writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
+
+ for (int i = 1; i < 1000000; i++)
+ {
+ writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
+ }
+ }
+ finally
+ {
+ StaticUtils.close(writeLog);
+ }
+ }
+
+ @Test
+ public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ cursor = log.getCursor();
+ // advance cursor to last record to ensure it is pointing to ahead log file
+ advanceCursorUpTo(cursor, 1, 10);
+
+ // add new records to ensure the ahead log file is rotated
+ for (int i = 11; i <= 20; i++)
+ {
+ log.append(Record.from(String.format("key%03d", i), "value" + i));
+ }
+
+ // check that cursor can fully read the new records
+ assertThatCursorCanBeFullyRead(cursor, 11, 20);
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @Test
+ public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ cursor1 = log.getCursor();
+ advanceCursorUpTo(cursor1, 1, 1);
+ cursor2 = log.getCursor();
+ advanceCursorUpTo(cursor2, 1, 4);
+ cursor3 = log.getCursor();
+ advanceCursorUpTo(cursor3, 1, 9);
+ cursor4 = log.getCursor();
+ advanceCursorUpTo(cursor4, 1, 10);
+
+ // add new records to ensure the ahead log file is rotated
+ for (int i = 11; i <= 20; i++)
+ {
+ log.append(Record.from(String.format("key%03d", i), "value" + i));
+ }
+
+ // check that cursors can fully read the new records
+ assertThatCursorCanBeFullyRead(cursor1, 2, 20);
+ assertThatCursorCanBeFullyRead(cursor2, 5, 20);
+ assertThatCursorCanBeFullyRead(cursor3, 10, 20);
+ assertThatCursorCanBeFullyRead(cursor4, 11, 20);
+ }
+ finally
+ {
+ StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
+ }
+ }
+
+ @Test
+ public void testClear() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ log.clear();
+
+ cursor = log.getCursor();
+ assertThatCursorIsExhausted(cursor);
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved
+ @Test(enabled=false, expectedExceptions=ChangelogException.class)
+ public void testClearWhenCursorIsOpened() throws Exception
+ {
+ DBCursor<Record<String, String>> cursor = null;
+ Log<String, String> log = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+ cursor = log.getCursor();
+ log.clear();
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ @DataProvider(name = "purgeKeys")
+ Object[][] purgeKeys()
+ {
+ // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor
+ return new Object[][]
+ {
+ // lowest key of the read-only log file "key005_key006.log"
+ { "key005", Record.from("key005", "value5"), 6, 10 },
+ // key that is not the lowest of the read-only log file "key005_key006.log"
+ { "key006", Record.from("key005", "value5"), 6, 10 },
+ // lowest key of the ahead log file "ahead.log"
+ { "key009", Record.from("key009", "value9"), 10, 10 },
+ // key that is not the lowest of the ahead log file "ahead.log"
+ { "key010", Record.from("key009", "value9"), 10, 10 },
+
+ // key not present in log, which is between key005 and key006
+ { "key005a", Record.from("key005", "value5"), 6, 10 },
+ // key not present in log, which is between key006 and key007
+ { "key006a", Record.from("key007", "value7"), 8, 10 },
+ // key not present in log, which is lower than oldest key key001
+ { "key000", Record.from("key001", "value1"), 2, 10 },
+ // key not present in log, which is higher than newest key key010
+ // should return the lowest key present in ahead log
+ { "key011", Record.from("key009", "value9"), 10, 10 },
+ };
+ }
+
+ /**
+ * Given a purge key, after purge is done, expects a new cursor to point on first record provided and
+ * then to be fully read starting at provided start index and finishing at provided end index.
+ */
+ @Test(dataProvider="purgeKeys")
+ public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+ int cursorStartIndex, int cursorEndIndex) throws Exception
+ {
+ Log<String, String> log = null;
+ DBCursor<Record<String, String>> cursor = null;
+ try
+ {
+ log = openLog(LogFileTest.RECORD_PARSER);
+
+ log.purgeUpTo(purgeKey);
+
+ cursor = log.getCursor();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+ assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+ }
+ finally
+ {
+ StaticUtils.close(cursor, log);
+ }
+ }
+
+ private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ for (int i = fromIndex; i <= endIndex; i++)
+ {
+ assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+ assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i));
+ }
+ }
+
+ /**
+ * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+ * endIndex, using (keyN, valueN) where N is the index.
+ */
+ private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+ throws Exception
+ {
+ advanceCursorUpTo(cursor, fromIndex, endIndex);
+ assertThatCursorIsExhausted(cursor);
+ }
+
+ /**
+ * Read the cursor until exhaustion, beginning at start of cursor.
+ */
+ private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex,
+ int endIndex) throws Exception
+ {
+ assertThat(cursor.getRecord()).isNull();
+ assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex);
+ }
+
+ private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
+ {
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ /** Returns a thread that write N records to the provided log. */
+ private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix,
+ final AtomicReference<ChangelogException> exceptionRef)
+ {
+ return new Thread() {
+ public void run()
+ {
+ for (int i = 1; i <= 30; i++)
+ {
+ Record<String, String> record = Record.from(
+ String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i);
+ try
+ {
+ writeLog.append(record);
+ }
+ catch (ChangelogException e)
+ {
+ // keep the first exception only
+ exceptionRef.compareAndSet(null, e);
+ }
+ }
+ }
+ };
+ }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
new file mode 100644
index 0000000..3eeca60
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -0,0 +1,327 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.data.MapEntry;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
+
+@SuppressWarnings("javadoc")
+public class ReplicationEnvironmentTest extends DirectoryServerTestCase
+{
+ private static final int SERVER_ID_1 = 1;
+ private static final int SERVER_ID_2 = 2;
+
+ private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
+ private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
+ private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
+
+ private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
+
+ @BeforeClass
+ public void setUp() throws Exception
+ {
+ // This test suite depends on having the schema available for DN decoding.
+ TestCaseUtils.startFakeServer();
+ }
+
+ @AfterClass
+ public void tearDown() throws Exception
+ {
+ TestCaseUtils.shutdownFakeServer();
+ }
+
+ @AfterMethod
+ public void cleanTestChangelogDirectory()
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ if (rootPath.exists())
+ {
+ StaticUtils.recursiveDelete(rootPath);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithSingleDN() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.valueOf(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+ replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
+
+ final ChangelogState state = environment.readOnDiskChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+ assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
+ assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+
+ assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB, replicaDB2);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithMultipleDN() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
+ try
+ {
+ File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING));
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ for (int i = 0; i <= 2 ; i++)
+ {
+ for (int j = 1; j <= 10; j++)
+ {
+ // 3 domains, 10 server id each, generation id is different for each domain
+ replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+ }
+ }
+
+ final ChangelogState state = environment.readOnDiskChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
+ for (int i = 0; i <= 2 ; i++)
+ {
+ assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+ }
+ assertThat(state.getDomainToGenerationId()).containsOnly(
+ MapEntry.entry(domainDNs.get(0), 1L),
+ MapEntry.entry(domainDNs.get(1), 2L),
+ MapEntry.entry(domainDNs.get(2), 3L));
+
+ assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB);
+ StaticUtils.close(replicaDBs);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithReplicaOffline() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.valueOf(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ // put server id 1 offline
+ CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+ environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+ final ChangelogState state = environment.readOnDiskChangelogState();
+
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+ assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.valueOf(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
+ offlineStateFile.createNewFile();
+
+ environment.readOnDiskChangelogState();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.valueOf(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ // put server id 1 offline twice
+ CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100);
+ environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN());
+ CSN lastOfflineCSN = csnGenerator.newCSN();
+ environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
+
+ final ChangelogState state = environment.readOnDiskChangelogState();
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+ assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test
+ public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.valueOf(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+ // put server id 1 offline
+ environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1));
+ // put server id 1 online again
+ environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
+
+ final ChangelogState state = environment.readOnDiskChangelogState();
+ assertThat(state.getOfflineReplicas()).isEmpty();
+ assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test
+ public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null;
+ try
+ {
+ final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ final DN domainDN = DN.valueOf(DN1_AS_STRING);
+
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ cnDB = environment.getOrCreateCNIndexDB();
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+ CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+ environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+ final ChangelogState state = environment.readOnDiskChangelogState();
+
+ assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+ assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
+ assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+ assertThat(state.getOfflineReplicas().getSnapshot())
+ .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+ assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB);
+ }
+ }
+
+ @Test(expectedExceptions=ChangelogException.class)
+ public void testMissingDomainDirectory() throws Exception
+ {
+ Log<Long,ChangeNumberIndexRecord> cnDB = null;
+ Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+ try
+ {
+ File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+ DN domainDN = DN.valueOf(DN1_AS_STRING);
+ ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+ replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+ replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
+
+ // delete the domain directory created for the 2 replica DBs to break the
+ // consistency with domain state file
+ StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+
+ environment.readOnDiskChangelogState();
+ }
+ finally
+ {
+ StaticUtils.close(cnDB, replicaDB, replicaDB2);
+ }
+ }
+}
--
Gitblit v1.10.0