From bf7ffad243778cef715606b14a0f9651910095ee Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 23 Sep 2014 12:01:00 +0000
Subject: [PATCH] Actual merge of complete changelog.file test package, which contains test cases for file-based changelog

---
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java                 |  441 +++++++++
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java           |  602 ++++++++++++
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java |  296 ++++++
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java                     |  598 ++++++++++++
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java    |  552 +++++++++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java                             |    2 
 opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java  |  327 ++++++
 7 files changed, 2,817 insertions(+), 1 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index e4ebf0e..84b1d98 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -325,7 +325,7 @@
     {
       final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
       return new ByteStringBuilder()
-        .append(record.getKey())
+        .append((long) record.getKey())
         .append(cnIndexRecord.getBaseDN().toString())
         .append(STRING_SEPARATOR)
         .append(cnIndexRecord.getCSN().toByteString()).toByteString();
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
new file mode 100644
index 0000000..ef8c19c
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
@@ -0,0 +1,552 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.Pair;
+
+@SuppressWarnings("javadoc")
+public class BlockLogReaderWriterTest extends DirectoryServerTestCase
+{
+  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+  private static final File TEST_FILE = new File(TEST_DIRECTORY, "file");
+
+  private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser();
+
+  private static final int INT_RECORD_SIZE = 12;
+
+  @BeforeClass
+  void createTestDirectory()
+  {
+    TEST_DIRECTORY.mkdirs();
+  }
+
+  @BeforeMethod
+  void ensureTestFileIsEmpty() throws Exception
+  {
+    StaticUtils.recursiveDelete(TEST_FILE);
+  }
+
+  @AfterClass
+  void cleanTestDirectory()
+  {
+    StaticUtils.recursiveDelete(TEST_DIRECTORY);
+  }
+
+  @DataProvider(name = "recordsData")
+  Object[][] recordsData()
+  {
+    return new Object[][]
+    {
+      // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes
+
+      // size of block, expected size of file after all records are written, records
+      { 12, 12, records(1) }, // zero block marker
+      { 10, 16, records(1) }, // one block marker
+      { 8, 16, records(1) },  // one block marker
+      { 7, 20, records(1) },  // two block markers
+      { 6, 24, records(1) },  // three block markers
+      { 5, 40, records(1) },  // seven block markers
+
+      { 16, 28, records(1,2) }, // one block marker
+      { 12, 32, records(1,2) }, // two block markers
+      { 10, 36, records(1,2) }, // three block markers
+    };
+  }
+
+  /**
+   * Tests that records can be written then read correctly for different block sizes.
+   */
+  @Test(dataProvider="recordsData")
+  public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records)
+      throws Exception
+  {
+    writeRecords(blockSize, records);
+
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+      for (int i = 0; i < records.size(); i++)
+      {
+         Record<Integer, Integer> record = reader.readRecord();
+         assertThat(record).isEqualTo(records.get(i));
+      }
+      assertThat(reader.readRecord()).isNull();
+      assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile);
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  @DataProvider(name = "recordsForSeek")
+  Object[][] recordsForSeek()
+  {
+    Object[][] data = new Object[][] {
+      // records, key, key matching strategy, position strategy, expectedRecord, should be found ?
+
+      // no record
+      { records(), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+      // 1 record
+      { records(1), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(1), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(1), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+      { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
+      { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+      // 3 records equal matching
+      { records(1,2,3), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(1,2,3), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3), 2, EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
+      { records(1,2,3), 3, EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+      { records(1,2,3), 4, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+      { records(1,2,3), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(1,2,3), 2, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
+      { records(1,2,3), 3, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+      // 3 records less than or equal matching
+      { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
+      { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+      { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+
+      { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+      { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
+      { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+      { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+      // 3 records greater or equal matching
+      { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true },
+      { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true },
+
+      { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
+      { records(1,2,3), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+      { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true },
+      { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+      { records(1,2,3), 4, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+
+      // 10 records equal matching
+      { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+      { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+      { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
+      { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+
+      // 10 records less than or equal matching
+      { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+      { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+
+      { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
+      { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+
+      // 10 records greater or equal matching
+      { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true },
+      { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(7), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false },
+
+      { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true },
+      { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false },
+
+
+    };
+
+    // For each test case, do a test with various block sizes to ensure algorithm is not broken
+    // on a given size
+    int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 };
+    Object[][] finalData = new Object[sizes.length * data.length][7];
+    for (int i = 0; i < data.length; i++)
+    {
+      for (int j = 0; j < sizes.length; j++)
+      {
+        Object[] a = data[i];
+        // add the block size at beginning of each test case
+        finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4], a[5]};
+      }
+    }
+    return finalData;
+  }
+
+  @Test(dataProvider = "recordsForSeek")
+  public void testSeekToRecord(int blockSize, List<Record<Integer, Integer>> records, int key,
+      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, Record<Integer, Integer> expectedRecord,
+      boolean shouldBeFound) throws Exception
+  {
+    writeRecords(blockSize, records);
+
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+      Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchingStrategy, positionStrategy);
+
+      assertThat(result.getFirst()).isEqualTo(shouldBeFound);
+      assertThat(result.getSecond()).isEqualTo(expectedRecord);
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  @Test
+  public void testGetClosestMarkerBeforeOrAtPosition() throws Exception
+  {
+    final int blockSize = 10;
+    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20);
+  }
+
+  @Test
+  public void testGetClosestMarkerStrictlyAfterPosition() throws Exception
+  {
+    final int blockSize = 10;
+    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30);
+  }
+
+  @Test
+  public void testSearchClosestMarkerToKey() throws Exception
+  {
+    int blockSize = 20;
+    writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20));
+
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+
+      assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0);
+      assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0);
+      assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20);
+      assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20);
+      assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40);
+      assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60);
+      assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80);
+      assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80);
+      assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100);
+      assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120);
+      assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140);
+      assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260);
+      assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280);
+      // out of reach keys
+      assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280);
+      assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280);
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  @Test
+  public void testLengthOfStoredRecord() throws Exception
+  {
+    final int blockSize = 100;
+    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+    int recordLength = 10;
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+
+    recordLength = 150;
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+
+    recordLength = 200;
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+  }
+
+  /**
+   * This test is intended to be run only manually to check the performance between binary search
+   * and sequential access.
+   * Note that sequential run may be extremely long when using high values.
+   */
+  @Test(enabled=false)
+  public void seekPerformanceComparison() throws Exception
+  {
+    // You may change these values
+    long fileSizeInBytes = 100*1024*1024;
+    int numberOfValuesToSeek = 50000;
+    int blockSize = 256;
+
+    writeRecordsToReachFileSize(blockSize, fileSizeInBytes);
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+      List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek);
+      System.out.println("File size: " + TEST_FILE.length() + " bytes");
+
+      System.out.println("\n---- BINARY SEARCH");
+      long minTime = Long.MAX_VALUE;
+      long maxTime = Long.MIN_VALUE;
+      final long t0 = System.nanoTime();
+      for (Integer key : keysToSeek)
+      {
+        final long ts = System.nanoTime();
+        Pair<Boolean, Record<Integer, Integer>> result =
+            reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+        final long te = System.nanoTime() - ts;
+        if (te < minTime) minTime = te;
+        if (te > maxTime) maxTime = te;
+        // show time for seeks that last more than N microseconds (tune as needed)
+        if (te/1000 > 1000)
+        {
+          System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds");
+        }
+        assertThat(result.getSecond()).isEqualTo(record(key));
+      }
+      System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds");
+      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+      System.out.println("Max time for a search: " + maxTime/1000 + " microseconds");
+      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds");
+
+      System.out.println("\n---- SEQUENTIAL SEARCH");
+      minTime = Long.MAX_VALUE;
+      maxTime = Long.MIN_VALUE;
+      final long t1 = System.nanoTime();
+      for (Integer val : keysToSeek)
+      {
+        long ts = System.nanoTime();
+        Pair<Boolean, Record<Integer, Integer>> result =
+            reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+        assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
+        long te = System.nanoTime() - ts;
+        if (te < minTime) minTime = te;
+        if (te > maxTime) maxTime = te;
+      }
+      System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds");
+      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+      System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds");
+      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds");
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  /** Write provided records with the provided block size. */
+  private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException
+  {
+    BlockLogWriter<Integer, Integer> writer = null;
+    try
+    {
+      writer = newWriter(blockSize);
+      for (Record<Integer, Integer> record : records)
+      {
+        writer.write(record);
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writer);
+    }
+  }
+
+  /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */
+  private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception
+  {
+      final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE;
+      final int[] values = new int[numberOfValues];
+      for (int i = 0; i < numberOfValues; i++)
+      {
+        values[i] = i+1;
+      }
+      writeRecords(blockSize, records(values));
+  }
+
+  /** Returns provided number of keys to seek in random order, for a file of provided size. */
+  private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys)
+  {
+    final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE;
+    final List<Integer> values = new ArrayList<Integer>(numberOfValues);
+    for (int i = 0; i < numberOfValues; i++)
+    {
+      values.add(i+1);
+    }
+    Collections.shuffle(values);
+    return values.subList(0, numberOfKeys);
+  }
+
+  private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException
+  {
+    return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock);
+  }
+
+  private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException
+  {
+    return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"),
+        RECORD_PARSER, blockSize);
+  }
+
+  private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException
+  {
+    return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize);
+  }
+
+  /** Helper to build a list of records. */
+  private List<Record<Integer, Integer>> records(int...keys)
+  {
+    List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>();
+    for (int key : keys)
+    {
+      records.add(Record.from(key, key));
+    }
+    return records;
+  }
+
+  /** Helper to build a record. */
+  private Record<Integer, Integer> record(int key)
+  {
+    return Record.from(key, key);
+  }
+
+  /**
+   * Record parser implementation for records with keys and values as integers to be used in tests.
+   * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value),
+   * which is useful for some tests.
+   */
+  private static class IntRecordParser implements RecordParser<Integer, Integer>
+  {
+    public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException
+    {
+      ByteSequenceReader reader = data.asReader();
+      int key = reader.getInt();
+      int value = reader.getInt();
+      return Record.from(key, value);
+    }
+
+    public ByteString encodeRecord(Record<Integer, Integer> record)
+    {
+      return new ByteStringBuilder().append((int) record.getKey()).append((int) record.getValue()).toByteString();
+    }
+
+    @Override
+    public Integer decodeKeyFromString(String key) throws ChangelogException
+    {
+      return Integer.valueOf(key);
+    }
+
+    @Override
+    public String encodeKeyToString(Integer key)
+    {
+      return String.valueOf(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Integer getMaxKey()
+    {
+      return Integer.MAX_VALUE;
+    }
+  }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
new file mode 100644
index 0000000..948a221
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -0,0 +1,296 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
+import static org.testng.Assert.*;
+
+@SuppressWarnings("javadoc")
+public class FileChangeNumberIndexDBTest extends ReplicationTestCase
+{
+
+  @DataProvider(name = "messages")
+  Object[][] createMessages() throws Exception
+  {
+    CSN[] csns = generateCSNs(1, 0, 3);
+    DN dn1 = DN.valueOf("o=test1");
+    return new Object[][] {
+      { new ChangeNumberIndexRecord(0L, dn1, csns[1]) },
+      { new ChangeNumberIndexRecord(999L, dn1, csns[2]) },
+    };
+  }
+
+  @Test(dataProvider="messages")
+  public void testRecordParser(ChangeNumberIndexRecord msg) throws Exception
+  {
+    RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
+
+    ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg));
+    Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
+
+    assertThat(record).isNotNull();
+    assertThat(record.getKey()).isEqualTo(msg.getChangeNumber());
+    assertThat(record.getValue().getBaseDN()).isEqualTo(msg.getBaseDN());
+    assertThat(record.getValue().getCSN()).isEqualTo(msg.getCSN());
+  }
+
+  @Test()
+  public void testAddAndReadRecords() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    try
+    {
+      replicationServer = newReplicationServer();
+      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+      changelogDB.setPurgeDelay(0);
+      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+
+      long[] changeNumbers = addThreeRecords(cnIndexDB);
+      long cn1 = changeNumbers[0];
+      long cn2 = changeNumbers[1];
+      long cn3 = changeNumbers[2];
+
+      // Checks
+      assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
+      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+
+      assertEquals(cnIndexDB.count(), 3, "Db count");
+      assertFalse(cnIndexDB.isEmpty());
+
+      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
+      assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
+
+      cursor = cnIndexDB.getCursorFrom(cn2);
+      assertCursorReadsInOrder(cursor, cn2, cn3);
+
+      cursor = cnIndexDB.getCursorFrom(cn3);
+      assertCursorReadsInOrder(cursor, cn3);
+    }
+    finally
+    {
+      remove(replicationServer);
+    }
+  }
+
+  @Test()
+  public void testClear() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    try
+    {
+      replicationServer = newReplicationServer();
+      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+      changelogDB.setPurgeDelay(0);
+      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+      addThreeRecords(cnIndexDB);
+
+      cnIndexDB.clear();
+
+      assertNull(cnIndexDB.getOldestRecord());
+      assertNull(cnIndexDB.getNewestRecord());
+      assertEquals(cnIndexDB.count(), 0);
+      assertTrue(cnIndexDB.isEmpty());
+    }
+    finally
+    {
+      remove(replicationServer);
+    }
+  }
+
+
+  /**
+   * This test makes basic operations of a ChangeNumberIndexDB:
+   * <ol>
+   * <li>create the db</li>
+   * <li>add records</li>
+   * <li>read them with a cursor</li>
+   * <li>set a very short trim period</li>
+   * <li>wait for the db to be trimmed / here since the changes are not stored
+   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
+   * </ol>
+   */
+  // TODO : this works only if we ensure that there is a rotation of ahead log file
+  // at the right place. First two records are 37 and 76 bytes long,
+  // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
+  // Re-enable this test when max file size is customizable for log
+  @Test(enabled=false)
+  public void testPurge() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    try
+    {
+      replicationServer = newReplicationServer();
+      final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+      changelogDB.setPurgeDelay(0); // disable purging
+
+      // Prepare data to be stored in the db
+      DN baseDN1 = DN.valueOf("o=test1");
+      DN baseDN2 = DN.valueOf("o=test2");
+      DN baseDN3 = DN.valueOf("o=test3");
+
+      CSN[] csns = generateCSNs(1, 0, 3);
+
+      // Add records
+      final FileChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+      long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+                 addRecord(cnIndexDB, baseDN2, csns[1]);
+      long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+
+      // The ChangeNumber should not get purged
+      final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
+      assertEquals(oldestCN, cn1);
+      assertEquals(cnIndexDB.getNewestRecord().getChangeNumber(), cn3);
+
+      DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
+      try
+      {
+        assertTrue(cursor.next());
+        assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
+        assertTrue(cursor.next());
+        assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
+        assertTrue(cursor.next());
+        assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
+        assertFalse(cursor.next());
+      }
+      finally
+      {
+        StaticUtils.close(cursor);
+      }
+
+      // Now test that purging removes all changes but the last one
+      changelogDB.setPurgeDelay(1);
+      int count = 0;
+      while (cnIndexDB.count() > 1 && count < 100)
+      {
+        Thread.sleep(10);
+        count++;
+      }
+      assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
+    }
+    finally
+    {
+      remove(replicationServer);
+    }
+  }
+
+  private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception
+  {
+    // Prepare data to be stored in the db
+    DN baseDN1 = DN.valueOf("o=test1");
+    DN baseDN2 = DN.valueOf("o=test2");
+    DN baseDN3 = DN.valueOf("o=test3");
+
+    CSN[] csns = generateCSNs(1, 0, 3);
+
+    // Add records
+    long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+    long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
+    long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
+    return new long[] { cn1, cn2, cn3 };
+  }
+
+  private long addRecord(FileChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
+  {
+    return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
+  }
+
+  private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
+  {
+    assertEquals(record.getCSN(), csn);
+    assertEquals(record.getBaseDN(), baseDN);
+  }
+
+  private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
+  {
+    final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
+    final FileChangeNumberIndexDB cnIndexDB =
+        (FileChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
+    assertTrue(cnIndexDB.isEmpty());
+    return cnIndexDB;
+  }
+
+  /**
+   * The newest record is no longer cleared to ensure persistence to the last
+   * generated change number across server restarts.
+   */
+  private void assertOnlyNewestRecordIsLeft(FileChangeNumberIndexDB cnIndexDB,
+      int newestChangeNumber) throws ChangelogException
+  {
+    assertEquals(cnIndexDB.count(), 1);
+    assertFalse(cnIndexDB.isEmpty());
+    final ChangeNumberIndexRecord oldest = cnIndexDB.getOldestRecord();
+    final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
+    assertEquals(oldest.getChangeNumber(), newestChangeNumber);
+    assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
+    assertEquals(oldest.getBaseDN(), newest.getBaseDN());
+    assertEquals(oldest.getCSN(), newest.getCSN());
+  }
+
+  private ReplicationServer newReplicationServer() throws Exception
+  {
+    TestCaseUtils.startServer();
+    final int port = TestCaseUtils.findFreePort();
+    final ReplServerFakeConfiguration cfg =
+        new ReplServerFakeConfiguration(port, null, ReplicationDBImplementation.LOG, 0, 2, 0, 100, null);
+    cfg.setComputeChangeNumber(true);
+    return new ReplicationServer(cfg);
+  }
+
+  private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
+      long... cns) throws ChangelogException
+  {
+    try
+    {
+      for (long cn : cns)
+      {
+        assertTrue(cursor.next());
+        assertEquals(cursor.getRecord().getChangeNumber(), cn);
+      }
+      assertFalse(cursor.next());
+    }
+    finally
+    {
+      cursor.close();
+    }
+  }
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
new file mode 100644
index 0000000..96640ca
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -0,0 +1,602 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.assertj.core.api.SoftAssertions;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
+import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.types.DN;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
+
+/**
+ * Test the FileReplicaDB class
+ */
+@SuppressWarnings("javadoc")
+public class FileReplicaDBTest extends ReplicationTestCase
+{
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+  private DN TEST_ROOT_DN;
+
+  /**
+   * Utility - log debug message - highlight it is from the test and not
+   * from the server code. Makes easier to observe the test steps.
+   */
+  private void debugInfo(String tn, String s)
+  {
+    logger.trace("** TEST %s ** %s", tn, s);
+  }
+
+  @BeforeClass
+  public void setup() throws Exception
+  {
+    TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
+  }
+
+  @DataProvider(name = "messages")
+  Object[][] createMessages()
+  {
+    CSN[] csns = generateCSNs(1, 0, 2);
+    return new Object[][] {
+      { new DeleteMsg(TEST_ROOT_DN, csns[0], "uid") },
+      { new DeleteMsg(TEST_ROOT_DN, csns[1], "uid") },
+    };
+  }
+
+  @Test(dataProvider="messages")
+  public void testRecordParser(UpdateMsg msg) throws Exception
+  {
+    RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
+
+    ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg));
+    Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
+
+    assertThat(record).isNotNull();
+    assertThat(record.getKey()).isEqualTo(msg.getCSN());
+    assertThat(record.getValue()).isEqualTo(msg);
+  }
+
+  @Test
+  public void testDomainDNWithForwardSlashes() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100, 5000);
+
+      replicaDB = newReplicaDB(replicationServer);
+      CSN[] csns = generateCSNs(1, 0, 1);
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+
+      waitChangesArePersisted(replicaDB, 1);
+      assertFoundInOrder(replicaDB, csns[0]);
+    }
+    finally
+    {
+      shutdown(replicaDB);
+      remove(replicationServer);
+    }
+  }
+
+  @Test
+  public void testAddAndReadRecords() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100, 5000);
+
+      replicaDB = newReplicaDB(replicationServer);
+      CSN[] csns = generateCSNs(1, 0, 5);
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+
+      waitChangesArePersisted(replicaDB, 3);
+
+      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
+      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+      assertLimits(replicaDB, csns[0], csns[2]);
+
+      DeleteMsg update4 = new DeleteMsg(TEST_ROOT_DN, csns[3], "uid");
+      replicaDB.add(update4);
+      waitChangesArePersisted(replicaDB, 4);
+
+      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
+      assertFoundInOrder(replicaDB, csns[2], csns[3]);
+      assertFoundInOrder(replicaDB, csns[3]);
+      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+    }
+    finally
+    {
+      shutdown(replicaDB);
+      remove(replicationServer);
+    }
+  }
+
+  @Test
+  public void testGenerateCursorFrom() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100000, 10);
+      replicaDB = newReplicaDB(replicationServer);
+
+      final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
+      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+      csns2.remove(csns[3]);
+
+      for (CSN csn : csns2)
+      {
+        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
+      }
+      waitChangesArePersisted(replicaDB, 4);
+
+      for (CSN csn : csns2)
+      {
+        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+      }
+      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
+
+      for (int i = 0; i < csns2.size() - 1; i++)
+      {
+        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+      }
+      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+    }
+    finally
+    {
+      shutdown(replicaDB);
+      remove(replicationServer);
+    }
+  }
+
+  @DataProvider
+  Object[][] dataForTestsWithCursorReinitialization()
+  {
+    return new Object[][] {
+      // the index to use in CSN array for the start key of the cursor
+      { 0 },
+      { 1 },
+      { 4 },
+    };
+  }
+
+  @Test(dataProvider="dataForTestsWithCursorReinitialization")
+  public void testGenerateCursorFromWithCursorReinitialization(int csnIndexForStartKey) throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    DBCursor<UpdateMsg> cursor = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100000, 10);
+      replicaDB = newReplicaDB(replicationServer);
+
+      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
+
+      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey],
+          GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+      assertFalse(cursor.next());
+
+      int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
+      for (int i : indicesToAdd)
+      {
+        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+      }
+      waitChangesArePersisted(replicaDB, 4);
+
+      for (int i = csnIndexForStartKey+1; i < 5; i++)
+      {
+        if (i != 3)
+        {
+          assertTrue(cursor.next());
+          assertEquals(cursor.getRecord().getCSN(), csns[i], "index i=" + i);
+        }
+      }
+      assertFalse(cursor.next());
+    }
+    finally
+    {
+      close(cursor);
+      shutdown(replicaDB);
+      remove(replicationServer);
+    }
+  }
+
+  // TODO : this works only if we ensure that there is a rotation of ahead log file
+  // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have
+  // the last record alone in the ahead log file
+  // Re-enable this test when max file size is customizable for log
+  @Test(enabled=false)
+  public void testPurge() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100, 5000);
+      replicaDB = newReplicaDB(replicationServer);
+
+      CSN[] csns = generateCSNs(1, 0, 5);
+
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[3], "uid"));
+
+      waitChangesArePersisted(replicaDB, 4);
+
+      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
+
+      int count = 0;
+      boolean purgeSucceeded = false;
+      final CSN expectedNewestCSN = csns[3];
+      do
+      {
+        Thread.sleep(10);
+
+        final CSN oldestCSN = replicaDB.getOldestCSN();
+        final CSN newestCSN = replicaDB.getNewestCSN();
+        purgeSucceeded =
+            oldestCSN.equals(expectedNewestCSN)
+                && newestCSN.equals(expectedNewestCSN);
+        count++;
+      }
+      while (!purgeSucceeded && count < 100);
+      assertTrue(purgeSucceeded);
+    }
+    finally
+    {
+      shutdown(replicaDB);
+      remove(replicationServer);
+    }
+  }
+
+  /**
+   * Test the feature of clearing a FileReplicaDB used by a replication server.
+   * The clear feature is used when a replication server receives a request to
+   * reset the generationId of a given domain.
+   */
+  @Test
+  public void testClear() throws Exception
+  {
+    ReplicationServer replicationServer = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100, 5000);
+      replicaDB = newReplicaDB(replicationServer);
+
+      CSN[] csns = generateCSNs(1, 0, 3);
+
+      // Add the changes and check they are here
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
+      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[2], "uid"));
+      assertLimits(replicaDB, csns[0], csns[2]);
+
+      // Clear DB and check it is cleared.
+      replicaDB.clear();
+      assertLimits(replicaDB, null, null);
+    }
+    finally
+    {
+      shutdown(replicaDB);
+      remove(replicationServer);
+    }
+  }
+
+  private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN,
+      final PositionStrategy positionStrategy, final CSN expectedCSN)
+      throws ChangelogException
+  {
+    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+    try
+    {
+      final SoftAssertions softly = new SoftAssertions();
+      softly.assertThat(cursor.next()).isTrue();
+      softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
+      softly.assertAll();
+    }
+    finally
+    {
+      close(cursor);
+    }
+  }
+
+  private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN,
+      final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+    try
+    {
+      final SoftAssertions softly = new SoftAssertions();
+      softly.assertThat(cursor.next()).isFalse();
+      softly.assertThat(cursor.getRecord()).isNull();
+      softly.assertAll();
+    }
+    finally
+    {
+      close(cursor);
+    }
+  }
+
+  /**
+   * Test the logic that manages counter records in the FileReplicaDB in order to
+   * optimize the oldest and newest records in the replication changelog db.
+   */
+  @Test(groups = { "opendj-256" })
+  public void testGetOldestNewestCSNs() throws Exception
+  {
+    // It's worth testing with 2 different setting for counterRecord
+    // - a counter record is put every 10 Update msg in the db - just a unit
+    //   setting.
+    // - a counter record is put every 1000 Update msg in the db - something
+    //   closer to real setting.
+    // In both cases, we want to test the counting algorithm,
+    // - when start and stop are before the first counter record,
+    // - when start and stop are before and after the first counter record,
+    // - when start and stop are after the first counter record,
+    // - when start and stop are before and after more than one counter record,
+    // After a purge.
+    // After shutting down/closing and reopening the db.
+
+    // TODO : do we need the management of counter records ?
+    // Use unreachable limits for now because it is not implemented
+    testGetOldestNewestCSNs(40, 100);
+    testGetOldestNewestCSNs(4000, 10000);
+  }
+
+  private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
+  {
+    String tn = "testDBCount("+max+","+counterWindow+")";
+    debugInfo(tn, "Starting test");
+
+    File testRoot = null;
+    ReplicationServer replicationServer = null;
+    ReplicationEnvironment dbEnv = null;
+    FileReplicaDB replicaDB = null;
+    try
+    {
+      TestCaseUtils.startServer();
+      replicationServer = configureReplicationServer(100000, 10);
+
+      testRoot = createCleanDir();
+      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
+      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
+
+      // Populate the db with 'max' msg
+      int mySeqnum = 1;
+      CSN csns[] = new CSN[2 * (max + 1)];
+      long now = System.currentTimeMillis();
+      for (int i=1; i<=max; i++)
+      {
+        csns[i] = new CSN(now + i, mySeqnum, 1);
+        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+        mySeqnum+=2;
+      }
+      waitChangesArePersisted(replicaDB, max, counterWindow);
+      assertLimits(replicaDB, csns[1], csns[max]);
+
+      // Now we want to test that after closing and reopening the db, the
+      // counting algo is well reinitialized and when new messages are added
+      // the new counter are correctly generated.
+      debugInfo(tn, "SHUTDOWN replicaDB and recreate");
+      replicaDB.shutdown();
+
+      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
+      assertLimits(replicaDB, csns[1], csns[max]);
+
+      // Populate the db with 'max' msg
+      for (int i=max+1; i<=2 * max; i++)
+      {
+        csns[i] = new CSN(now + i, mySeqnum, 1);
+        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
+        mySeqnum+=2;
+      }
+      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
+      assertLimits(replicaDB, csns[1], csns[2 * max]);
+
+      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
+
+      String testcase = "AFTER PURGE (oldest, newest)=";
+      debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
+      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Newest=");
+
+      // Clear ...
+      debugInfo(tn,"clear:");
+      replicaDB.clear();
+
+      // Check the db is cleared.
+      assertEquals(null, replicaDB.getOldestCSN());
+      assertEquals(null, replicaDB.getNewestCSN());
+      debugInfo(tn,"Success");
+    }
+    finally
+    {
+      shutdown(replicaDB);
+      if (dbEnv != null)
+      {
+        dbEnv.shutdown();
+      }
+      remove(replicationServer);
+      TestCaseUtils.deleteDirectory(testRoot);
+    }
+  }
+
+  private void assertLimits(FileReplicaDB replicaDB, CSN oldestCSN, CSN newestCSN)
+  {
+    final SoftAssertions softly = new SoftAssertions();
+    softly.assertThat(replicaDB.getOldestCSN()).as("Wrong oldest CSN").isEqualTo(oldestCSN);
+    softly.assertThat(replicaDB.getNewestCSN()).as("Wrong newest CSN").isEqualTo(newestCSN);
+    softly.assertAll();
+  }
+
+  private void shutdown(FileReplicaDB replicaDB)
+  {
+    if (replicaDB != null)
+    {
+      replicaDB.shutdown();
+    }
+  }
+
+  static CSN[] generateCSNs(int serverId, long timestamp, int number)
+  {
+    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
+    CSN[] csns = new CSN[number];
+    for (int i = 0; i < csns.length; i++)
+    {
+      csns[i] = gen.newCSN();
+    }
+    return csns;
+  }
+
+  private void waitChangesArePersisted(FileReplicaDB replicaDB,
+      int nbRecordsInserted) throws Exception
+  {
+    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
+  }
+
+  private void waitChangesArePersisted(FileReplicaDB replicaDB,
+      int nbRecordsInserted, int counterWindow) throws Exception
+  {
+    // one counter record is inserted every time "counterWindow"
+    // records have been inserted
+    int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
+
+    int count = 0;
+    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
+    {
+      Thread.sleep(10);
+      count++;
+    }
+    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
+  }
+
+  private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
+      throws IOException, ConfigException
+  {
+    final int changelogPort = findFreePort();
+    final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
+        changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null);
+    return new ReplicationServer(conf);
+  }
+
+  private FileReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
+  {
+    final FileChangelogDB changelogDB = (FileChangelogDB) rs.getChangelogDB();
+    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
+  }
+
+  private File createCleanDir() throws IOException
+  {
+    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
+            + File.separator + "build");
+    path = path + File.separator + "unit-tests" + File.separator + "FileReplicaDB";
+    final File testRoot = new File(path);
+    TestCaseUtils.deleteDirectory(testRoot);
+    testRoot.mkdirs();
+    return testRoot;
+  }
+
+  private void assertFoundInOrder(FileReplicaDB replicaDB, CSN... csns) throws Exception
+  {
+    if (csns.length == 0)
+    {
+      return;
+    }
+
+    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
+  }
+
+  private void assertFoundInOrder(FileReplicaDB replicaDB,
+      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
+  {
+    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+    try
+    {
+      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+      {
+        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+        final SoftAssertions softly = new SoftAssertions();
+        softly.assertThat(cursor.next()).as(msg).isTrue();
+        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+        softly.assertAll();
+      }
+      final SoftAssertions softly = new SoftAssertions();
+      softly.assertThat(cursor.next()).isFalse();
+      softly.assertThat(cursor.getRecord()).isNull();
+      softly.assertAll();
+    }
+    finally
+    {
+      close(cursor);
+    }
+  }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
new file mode 100644
index 0000000..a1750d8
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -0,0 +1,441 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogFileTest extends DirectoryServerTestCase
+{
+  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+  private static final File TEST_LOG_FILE = new File(TEST_DIRECTORY, Log.HEAD_LOG_FILE_NAME);
+
+  static final StringRecordParser RECORD_PARSER = new StringRecordParser();
+
+  @BeforeClass
+  public void createTestDirectory()
+  {
+    TEST_DIRECTORY.mkdirs();
+  }
+
+  @BeforeMethod
+  /**
+   * Create a new log file with ten records starting from (key01, value1) until (key10, value10).
+   * So log contains keys "key01", "key02", ..., "key10"
+   */
+  public void initialize() throws Exception
+  {
+    if (TEST_LOG_FILE.exists())
+    {
+      TEST_LOG_FILE.delete();
+    }
+    LogFile<String, String> logFile =  null;
+    try
+    {
+      logFile = getLogFile(RECORD_PARSER);
+
+      for (int i = 1; i <= 10; i++)
+      {
+        logFile.append(Record.from(String.format("key%02d", i), "value"+i));
+      }
+    }
+    finally
+    {
+      StaticUtils.close(logFile);
+    }
+  }
+
+  @AfterClass
+  public void cleanTestChangelogDirectory()
+  {
+    StaticUtils.recursiveDelete(TEST_DIRECTORY);
+  }
+
+  private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
+  {
+    return LogFile.newAppendableLogFile(TEST_LOG_FILE, parser);
+  }
+
+  @Test
+  public void testCursor() throws Exception
+  {
+    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = changelog.getCursor();
+
+      assertThatCursorCanBeFullyRead(cursor, 1, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, changelog);
+    }
+  }
+
+  @DataProvider
+  Object[][] cursorPositionTo()
+  {
+    return new Object[][] {
+      // key to position to, key matching strategy, position strategy, position is found ?,
+      //    expected start index of cursor (use -1 if cursor should be exhausted), expected end index of cursor
+
+      // equal
+      { "key00", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+      { "key02", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
+      { "key05", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+      { "key050", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+      { "key07", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
+      { "key10", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+      { "key11", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+
+      { "key00", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+      { "key02", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true,  3, 10},
+      { "key05", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true,  6, 10},
+      { "key050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+      { "key07", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
+      { "key10", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+      { "key11", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+
+      // less than or equal
+
+      // key00 is a special case : position is not found but cursor is positioned on beginning
+      // so it is possible to iterate on it from start to end
+      { "key00", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, 1, 10},
+      { "key02", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
+      { "key05", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+      { "key050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+      { "key07", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
+      { "key10", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+      { "key11", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+
+      // key00 is a special case : position is not found but cursor is positioned on beginning
+      // so it is possible to iterate on it from 2 to end
+      { "key00", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, 2, 10},
+      { "key02", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
+      { "key05", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+      { "key050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+      { "key07", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
+      { "key10", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+      { "key11", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+
+      // greater than or equal
+      { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 1, 10},
+      { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10},
+      { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10},
+      { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 6, 10},
+      { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10},
+      { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10},
+      { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1},
+
+      { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 1, 10},
+      { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10},
+      { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+      { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10},
+      { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10},
+      { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1},
+      { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1},
+
+    };
+  }
+
+  /**
+   * Test cursor positioning for a given key, matching strategy and position strategy.
+   * Cursor is fully read from the expected starting index to the expected end index, unless it is expected
+   * to be directly exhausted.
+   */
+  @Test(dataProvider="cursorPositionTo")
+  public void testCursorPositionTo(String key, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
+      boolean positionShouldBeFound, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
+  {
+    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+    LogFileCursor<String, String> cursor = null;
+    try {
+      cursor = changelog.getCursor();
+      boolean success = cursor.positionTo(key, matchingStrategy, positionStrategy);
+
+      assertThat(success).isEqualTo(positionShouldBeFound);
+      if (cursorShouldStartAt >= 0)
+      {
+        assertThatCursorCanBeFullyRead(cursor, cursorShouldStartAt, cursorShouldEndAt);
+      }
+      else
+      {
+        assertThatCursorIsExhausted(cursor);
+      }
+    }
+    finally {
+      StaticUtils.close(cursor, changelog);
+    }
+  }
+
+  @Test
+  public void testGetOldestRecord() throws Exception
+  {
+    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+    try
+    {
+      Record<String, String> record = changelog.getOldestRecord();
+
+      assertThat(record).isEqualTo(Record.from("key01", "value1"));
+    }
+    finally {
+      StaticUtils.close(changelog);
+    }
+  }
+
+  @Test
+  public void testGetNewestRecord() throws Exception
+  {
+    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
+    try
+    {
+      Record<String, String> record = changelog.getNewestRecord();
+
+      assertThat(record).isEqualTo(Record.from("key10", "value10"));
+    }
+    finally {
+      StaticUtils.close(changelog);
+    }
+  }
+
+  @DataProvider(name = "corruptedRecordData")
+  Object[][] corruptedRecordData()
+  {
+    return new Object[][]
+    {
+      // write partial record size (should be 4 bytes)
+      { 1, new ByteStringBuilder().append((byte) 0) },
+      // write partial record size (should be 4 bytes)
+      { 2, new ByteStringBuilder().append((byte) 0).append((byte) 0).append((byte) 0) },
+      // write size only
+      { 3, new ByteStringBuilder().append(10) },
+      // write size + key
+      { 4, new ByteStringBuilder().append(100).append("key") },
+      // write size + key + separator
+      { 5, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR) },
+      // write size + key + separator + partial value
+      { 6, new ByteStringBuilder().append(100).append("key").append(StringRecordParser.STRING_SEPARATOR).append("v") },
+    };
+  }
+
+  @Test(dataProvider="corruptedRecordData")
+  public void testRecoveryOnCorruptedLogFile(
+      @SuppressWarnings("unused") int unusedId,
+      ByteStringBuilder corruptedRecordData) throws Exception
+  {
+    LogFile<String, String> logFile = null;
+    DBCursor<Record<String, String>> cursor = null;
+    try
+    {
+      corruptTestLogFile(corruptedRecordData);
+
+      // open the log file: the file should be repaired at this point
+      logFile = getLogFile(RECORD_PARSER);
+
+      // write a new valid record
+      logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11));
+
+      // ensure log can be fully read including the new record
+      cursor = logFile.getCursor();
+      assertThatCursorCanBeFullyRead(cursor, 1, 11);
+    }
+    finally
+    {
+      StaticUtils.close(logFile);
+    }
+  }
+
+  /**
+   * Append some raw data to the TEST_LOG_FILE. Intended to corrupt the log
+   * file.
+   */
+  private void corruptTestLogFile(ByteStringBuilder corruptedRecordData) throws Exception
+  {
+    RandomAccessFile output = null;
+    try {
+      output = new RandomAccessFile(TEST_LOG_FILE, "rwd");
+      output.seek(output.length());
+      output.write(corruptedRecordData.toByteArray());
+    }
+    finally
+    {
+      StaticUtils.close(output);
+    }
+  }
+
+  @Test
+  /**
+   * Test that changes are visible immediately to a reader after a write.
+   */
+  public void testWriteAndReadOnSameLogFile() throws Exception
+  {
+    LogFile<String, String> writeLog = null;
+    LogFile<String, String> readLog = null;
+    try
+    {
+      writeLog = getLogFile(RECORD_PARSER);
+      readLog = getLogFile(RECORD_PARSER);
+
+      for (int i = 1; i <= 100; i++)
+      {
+        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
+        writeLog.append(record);
+        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
+        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1"));
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writeLog, readLog);
+    }
+  }
+
+  /**
+   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+   * endIndex, using (keyN, valueN) where N is the index.
+   */
+  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+      throws Exception
+  {
+    assertThat(cursor.getRecord()).isNull();
+    for (int i = fromIndex; i <= endIndex; i++)
+    {
+      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i));
+    }
+    assertThatCursorIsExhausted(cursor);
+  }
+
+  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
+  {
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  /**
+   * Record parser implementation for records with keys as String and values as
+   * String, to be used in tests.
+   */
+  private static class StringRecordParser implements RecordParser<String, String>
+  {
+    private static final byte STRING_SEPARATOR = 0;
+
+    public Record<String, String> decodeRecord(final ByteString data) throws DecodingException
+    {
+      ByteSequenceReader reader = data.asReader();
+      String key = reader.getString(getNextStringLength(reader));
+      reader.skip(1);
+      String value = reader.getString(getNextStringLength(reader));
+      return key.isEmpty() || value.isEmpty() ? null : Record.from(key, value);
+    }
+
+    /** Returns the length of next string by looking for the zero byte used as separator. */
+    private int getNextStringLength(ByteSequenceReader reader)
+    {
+      int length = 0;
+      while (reader.peek(length) != STRING_SEPARATOR)
+      {
+        length++;
+      }
+      return length;
+    }
+
+    public ByteString encodeRecord(Record<String, String> record)
+    {
+      return new ByteStringBuilder()
+        .append(record.getKey()).append(STRING_SEPARATOR)
+        .append(record.getValue()).append(STRING_SEPARATOR).toByteString();
+    }
+
+    @Override
+    public String decodeKeyFromString(String key) throws ChangelogException
+    {
+      return key;
+    }
+
+    @Override
+    public String encodeKeyToString(String key)
+    {
+      return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getMaxKey()
+    {
+      // '~' character has the highest ASCII value
+      return "~~~~";
+    }
+  }
+
+  /** A parser that can be set to fail when reading. */
+  static class FailingStringRecordParser extends StringRecordParser
+  {
+    private boolean failToRead = false;
+
+    @Override
+    public Record<String, String> decodeRecord(ByteString data) throws DecodingException
+    {
+      if (failToRead)
+      {
+        throw new DecodingException(LocalizableMessage.raw("Error when parsing record"));
+      }
+      return super.decodeRecord(data);
+    }
+
+    void setFailToRead(boolean shouldFail)
+    {
+      failToRead = shouldFail;
+    }
+  }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
new file mode 100644
index 0000000..b810314
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -0,0 +1,598 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogTest extends DirectoryServerTestCase
+{
+  // Use a directory dedicated to this test class
+  private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+  @BeforeMethod
+  public void initialize() throws Exception
+  {
+    // Delete any previous log
+    if (LOG_DIRECTORY.exists())
+    {
+      StaticUtils.recursiveDelete(LOG_DIRECTORY);
+    }
+
+    // Build a log with 10 records with String keys and String values
+    // Keys are using the format keyNNN where N is a figure
+    // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly
+    Log<String, String> log = openLog(RECORD_PARSER);
+    for (int i = 1; i <= 10; i++)
+    {
+      log.append(Record.from(String.format("key%03d", i), "value" + i));
+    }
+    log.close();
+  }
+
+  private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException
+  {
+    // Each string record has a length of approximately 18 bytes
+    // This size is set in order to have 2 records per log file before the rotation happens
+    // This allow to ensure rotation mechanism is thoroughly tested
+    // Some tests rely on having 2 records per log file (especially the purge tests), so take care
+    // if this value has to be changed
+    int sizeLimitPerFileInBytes = 30;
+
+    return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
+  }
+
+  @Test
+  public void testCursor() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor();
+
+      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorWhenGivenAnExistingKey() throws Exception
+  {
+    Log<String, String> log = openLog(RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor("key005");
+
+      assertThatCursorCanBeFullyReadFromStart(cursor, 5, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorWhenGivenAnUnexistingKey() throws Exception
+  {
+    Log<String, String> log = openLog(RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      // key is between key005 and key006
+      cursor = log.getCursor("key005000");
+
+      assertThat(cursor).isNotNull();
+      assertThat(cursor.getRecord()).isNull();
+      assertThat(cursor.next()).isFalse();
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorWhenGivenANullKey() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor(null);
+
+      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @DataProvider
+  Object[][] cursorData()
+  {
+    return new Object[][] {
+      // 3 first values are input data : key to position to, key matching strategy, position strategy,
+      // 2 last values are expected output :
+      //    first index of cursor (-1 if cursor should be exhausted), last index of cursor
+      { "key000", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+      { "key001", EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+      { "key004", EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
+      { "key0050", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+      { "key009", EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
+      { "key010", EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+      { "key011", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+
+      { "key000", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key001", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+      { "key004", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+      { "key0050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key009", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+      { "key010", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key011", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+
+      { "key000", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+      { "key001", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+      { "key004", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
+      { "key005", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 },
+      { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 },
+      { "key006", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 },
+      { "key009", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
+      { "key010", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+      { "key011", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+
+      { "key000", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key001", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+      { "key004", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+      { "key005", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
+      { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
+      { "key006", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 7, 10 },
+      { "key009", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+      { "key010", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key011", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+
+      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 },
+      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 },
+      { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 },
+      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 },
+      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 },
+      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+
+      { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 },
+      { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 },
+      { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 },
+      { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 },
+      { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 },
+      { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+      { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+    };
+  }
+
+  @Test(dataProvider="cursorData")
+  public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy,
+      PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor(key, matchingStrategy, positionStrategy);
+
+      if (cursorShouldStartAt != -1)
+      {
+        assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt);
+      }
+      else
+      {
+        assertThatCursorIsExhausted(cursor);
+      }
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor(null, null, null);
+
+      assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test(expectedExceptions=ChangelogException.class)
+  public void testCursorWhenParserFailsToRead() throws Exception
+  {
+    FailingStringRecordParser parser = new FailingStringRecordParser();
+    Log<String, String> log = openLog(parser);
+    parser.setFailToRead(true);
+    try {
+      log.getCursor("key");
+    }
+    finally {
+      StaticUtils.close(log);
+    }
+  }
+
+  @Test
+  public void testGetOldestRecord() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    try
+    {
+      Record<String, String> record = log.getOldestRecord();
+
+      assertThat(record).isEqualTo(Record.from("key001", "value1"));
+    }
+    finally {
+      StaticUtils.close(log);
+    }
+  }
+
+  @Test
+  public void testGetNewestRecord() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    try
+    {
+      Record<String, String> record = log.getNewestRecord();
+
+      assertThat(record).isEqualTo(Record.from("key010", "value10"));
+    }
+    finally {
+      StaticUtils.close(log);
+    }
+  }
+
+  /**
+   * Test that changes are visible immediately to a reader after a write.
+   */
+  @Test
+  public void testWriteAndReadOnSameLog() throws Exception
+  {
+    Log<String, String> writeLog = null;
+    Log<String, String> readLog = null;
+    try
+    {
+      writeLog = openLog(LogFileTest.RECORD_PARSER);
+      readLog = openLog(LogFileTest.RECORD_PARSER);
+
+      for (int i = 1; i <= 10; i++)
+      {
+        Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
+        writeLog.append(record);
+        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1"));
+        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writeLog, readLog);
+    }
+  }
+
+  @Test
+  public void testTwoConcurrentWrite() throws Exception
+  {
+    Log<String, String> writeLog1 = null;
+    Log<String, String> writeLog2 = null;
+    DBCursor<Record<String, String>> cursor = null;
+    try
+    {
+      writeLog1 = openLog(LogFileTest.RECORD_PARSER);
+      writeLog2 = openLog(LogFileTest.RECORD_PARSER);
+      writeLog1.append(Record.from("key020", "starting record"));
+      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
+      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
+      Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
+      write1.run();
+      write2.run();
+
+      write1.join();
+      write2.join();
+      if (exceptionRef.get() != null)
+      {
+        throw exceptionRef.get();
+      }
+      writeLog1.syncToFileSystem();
+      cursor = writeLog1.getCursor("key020");
+      for (int i = 1; i <= 61; i++)
+      {
+         assertThat(cursor.next()).isTrue();
+      }
+      assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
+    }
+    finally
+    {
+      StaticUtils.close(cursor, writeLog1, writeLog2);
+    }
+  }
+
+  /**
+   *  This test should be disabled.
+   *  Enable it locally when you need to have an rough idea of write performance.
+   */
+  @Test(enabled=false)
+  public void logWriteSpeed() throws Exception
+  {
+    Log<String, String> writeLog = null;
+    try
+    {
+      long sizeOf1MB = 1024*1024;
+      writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
+
+      for (int i = 1; i < 1000000; i++)
+      {
+        writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writeLog);
+    }
+  }
+
+  @Test
+  public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      cursor = log.getCursor();
+      // advance cursor to last record to ensure it is pointing to ahead log file
+      advanceCursorUpTo(cursor, 1, 10);
+
+      // add new records to ensure the ahead log file is rotated
+      for (int i = 11; i <= 20; i++)
+      {
+        log.append(Record.from(String.format("key%03d", i), "value" + i));
+      }
+
+      // check that cursor can fully read the new records
+      assertThatCursorCanBeFullyRead(cursor, 11, 20);
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      cursor1 = log.getCursor();
+      advanceCursorUpTo(cursor1, 1, 1);
+      cursor2 = log.getCursor();
+      advanceCursorUpTo(cursor2, 1, 4);
+      cursor3 = log.getCursor();
+      advanceCursorUpTo(cursor3, 1, 9);
+      cursor4 = log.getCursor();
+      advanceCursorUpTo(cursor4, 1, 10);
+
+      // add new records to ensure the ahead log file is rotated
+      for (int i = 11; i <= 20; i++)
+      {
+        log.append(Record.from(String.format("key%03d", i), "value" + i));
+      }
+
+      // check that cursors can fully read the new records
+      assertThatCursorCanBeFullyRead(cursor1, 2, 20);
+      assertThatCursorCanBeFullyRead(cursor2, 5, 20);
+      assertThatCursorCanBeFullyRead(cursor3, 10, 20);
+      assertThatCursorCanBeFullyRead(cursor4, 11, 20);
+    }
+    finally
+    {
+      StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
+    }
+  }
+
+  @Test
+  public void testClear() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      log.clear();
+
+      cursor = log.getCursor();
+      assertThatCursorIsExhausted(cursor);
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved
+  @Test(enabled=false, expectedExceptions=ChangelogException.class)
+  public void testClearWhenCursorIsOpened() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      cursor = log.getCursor();
+      log.clear();
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @DataProvider(name = "purgeKeys")
+  Object[][] purgeKeys()
+  {
+    // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor
+    return new Object[][]
+    {
+      // lowest key of the read-only log file "key005_key006.log"
+      { "key005", Record.from("key005", "value5"), 6, 10 },
+      // key that is not the lowest of the read-only log file "key005_key006.log"
+      { "key006", Record.from("key005", "value5"), 6, 10 },
+      // lowest key of the ahead log file "ahead.log"
+      { "key009", Record.from("key009", "value9"), 10, 10 },
+      // key that is not the lowest of the ahead log file "ahead.log"
+      { "key010", Record.from("key009", "value9"), 10, 10 },
+
+      // key not present in log, which is between key005 and key006
+      { "key005a", Record.from("key005", "value5"), 6, 10 },
+      // key not present in log, which is between key006 and key007
+      { "key006a", Record.from("key007", "value7"), 8, 10 },
+      // key not present in log, which is lower than oldest key key001
+      { "key000", Record.from("key001", "value1"), 2, 10 },
+      // key not present in log, which is higher than newest key key010
+      // should return the lowest key present in ahead log
+      { "key011", Record.from("key009", "value9"), 10, 10 },
+    };
+  }
+
+  /**
+   * Given a purge key, after purge is done, expects a new cursor to point on first record provided and
+   * then to be fully read starting at provided start index and finishing at provided end index.
+   */
+  @Test(dataProvider="purgeKeys")
+  public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+      int cursorStartIndex, int cursorEndIndex) throws Exception
+  {
+    Log<String, String> log = null;
+    DBCursor<Record<String, String>> cursor = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+
+      log.purgeUpTo(purgeKey);
+
+      cursor = log.getCursor();
+      assertThat(cursor.next()).isTrue();
+      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+      throws Exception
+  {
+    for (int i = fromIndex; i <= endIndex; i++)
+    {
+      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i));
+    }
+  }
+
+  /**
+   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+   * endIndex, using (keyN, valueN) where N is the index.
+   */
+  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+      throws Exception
+  {
+    advanceCursorUpTo(cursor, fromIndex, endIndex);
+    assertThatCursorIsExhausted(cursor);
+  }
+
+  /**
+   * Read the cursor until exhaustion, beginning at start of cursor.
+   */
+  private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex,
+      int endIndex) throws Exception
+  {
+    assertThat(cursor.getRecord()).isNull();
+    assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex);
+  }
+
+  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
+  {
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  /** Returns a thread that write N records to the provided log. */
+  private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix,
+      final AtomicReference<ChangelogException> exceptionRef)
+  {
+    return new Thread() {
+      public void run()
+      {
+        for (int i = 1; i <= 30; i++)
+        {
+          Record<String, String> record = Record.from(
+              String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i);
+          try
+          {
+            writeLog.append(record);
+          }
+          catch (ChangelogException e)
+          {
+            // keep the first exception only
+            exceptionRef.compareAndSet(null, e);
+          }
+        }
+      }
+    };
+  }
+
+}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
new file mode 100644
index 0000000..3eeca60
--- /dev/null
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -0,0 +1,327 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.assertj.core.data.MapEntry;
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.ReplicationEnvironment.*;
+
+@SuppressWarnings("javadoc")
+public class ReplicationEnvironmentTest extends DirectoryServerTestCase
+{
+  private static final int SERVER_ID_1 = 1;
+  private static final int SERVER_ID_2 = 2;
+
+  private static final String DN1_AS_STRING = "cn=test1,dc=company.com";
+  private static final String DN2_AS_STRING = "cn=te::st2,dc=company.com";
+  private static final String DN3_AS_STRING = "cn=test3,dc=company.com";
+
+  private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
+
+  @BeforeClass
+  public void setUp() throws Exception
+  {
+    // This test suite depends on having the schema available for DN decoding.
+    TestCaseUtils.startFakeServer();
+  }
+
+  @AfterClass
+  public void tearDown() throws Exception
+  {
+    TestCaseUtils.shutdownFakeServer();
+  }
+
+  @AfterMethod
+  public void cleanTestChangelogDirectory()
+  {
+    final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+    if (rootPath.exists())
+    {
+      StaticUtils.recursiveDelete(rootPath);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithSingleDN() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.valueOf(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
+
+      final ChangelogState state = environment.readOnDiskChangelogState();
+
+      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1, SERVER_ID_2);
+      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+
+      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB, replicaDB2);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithMultipleDN() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
+    try
+    {
+      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      List<DN> domainDNs = Arrays.asList(DN.valueOf(DN1_AS_STRING), DN.valueOf(DN2_AS_STRING), DN.valueOf(DN3_AS_STRING));
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      for (int i = 0; i <= 2 ; i++)
+      {
+        for (int j = 1; j <= 10; j++)
+        {
+          // 3 domains, 10 server id each, generation id is different for each domain
+          replicaDBs.add(environment.getOrCreateReplicaDB(domainDNs.get(i), j, i+1));
+        }
+      }
+
+      final ChangelogState state = environment.readOnDiskChangelogState();
+
+      assertThat(state.getDomainToServerIds()).containsKeys(domainDNs.get(0), domainDNs.get(1), domainDNs.get(2));
+      for (int i = 0; i <= 2 ; i++)
+      {
+        assertThat(state.getDomainToServerIds().get(domainDNs.get(i))).containsOnly(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
+      }
+      assertThat(state.getDomainToGenerationId()).containsOnly(
+          MapEntry.entry(domainDNs.get(0), 1L),
+          MapEntry.entry(domainDNs.get(1), 2L),
+          MapEntry.entry(domainDNs.get(2), 3L));
+
+      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB);
+      StaticUtils.close(replicaDBs);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithReplicaOffline() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.valueOf(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      // put server id 1 offline
+      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+      environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+      final ChangelogState state = environment.readOnDiskChangelogState();
+
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test(expectedExceptions=ChangelogException.class)
+  public void testReadChangelogStateWithReplicaOfflineStateFileCorrupted() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.valueOf(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      File offlineStateFile = new File(environment.getServerIdPath("1", 1), REPLICA_OFFLINE_STATE_FILENAME);
+      offlineStateFile.createNewFile();
+
+      environment.readOnDiskChangelogState();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithReplicaOfflineSentTwice() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.valueOf(DN1_AS_STRING);
+
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      // put server id 1 offline twice
+      CSNGenerator csnGenerator = new CSNGenerator(SERVER_ID_1, 100);
+      environment.notifyReplicaOffline(domainDN, csnGenerator.newCSN());
+      CSN lastOfflineCSN = csnGenerator.newCSN();
+      environment.notifyReplicaOffline(domainDN, lastOfflineCSN);
+
+      final ChangelogState state = environment.readOnDiskChangelogState();
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(lastOfflineCSN)));
+      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test
+  public void testReadChangelogStateWithReplicaOfflineThenReplicaOnline() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.valueOf(DN1_AS_STRING);
+
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+
+      // put server id 1 offline
+      environment.notifyReplicaOffline(domainDN, new CSN(TimeThread.getTime(), 0, SERVER_ID_1));
+      // put server id 1 online again
+      environment.notifyReplicaOnline(domainDN, SERVER_ID_1);
+
+      final ChangelogState state = environment.readOnDiskChangelogState();
+      assertThat(state.getOfflineReplicas()).isEmpty();
+      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test
+  public void testCreateThenReadChangelogStateWithReplicaOffline() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null;
+    try
+    {
+      final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      final DN domainDN = DN.valueOf(DN1_AS_STRING);
+
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      cnDB = environment.getOrCreateCNIndexDB();
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+      CSN offlineCSN = new CSN(TimeThread.getTime(), 0, SERVER_ID_1);
+      environment.notifyReplicaOffline(domainDN, offlineCSN);
+
+      final ChangelogState state = environment.readOnDiskChangelogState();
+
+      assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+      assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(SERVER_ID_1);
+      assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
+      assertThat(state.getOfflineReplicas().getSnapshot())
+          .containsExactly(MapEntry.entry(domainDN, Arrays.asList(offlineCSN)));
+
+      assertThat(state.isEqualTo(environment.getChangelogState())).isTrue();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB);
+    }
+  }
+
+  @Test(expectedExceptions=ChangelogException.class)
+  public void testMissingDomainDirectory() throws Exception
+  {
+    Log<Long,ChangeNumberIndexRecord> cnDB = null;
+    Log<CSN,UpdateMsg> replicaDB = null, replicaDB2 = null;
+    try
+    {
+      File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
+      DN domainDN = DN.valueOf(DN1_AS_STRING);
+      ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
+      replicaDB = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_1, 1);
+      replicaDB2 = environment.getOrCreateReplicaDB(domainDN, SERVER_ID_2, 1);
+
+      // delete the domain directory created for the 2 replica DBs to break the
+      // consistency with domain state file
+      StaticUtils.recursiveDelete(new File(rootPath, "1.domain"));
+
+      environment.readOnDiskChangelogState();
+    }
+    finally
+    {
+      StaticUtils.close(cnDB, replicaDB, replicaDB2);
+    }
+  }
+}

--
Gitblit v1.10.0