From 3b08e64ccf908fa4c57e6ee13aa8901efcc53ee2 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 22 Sep 2014 15:35:12 +0000
Subject: [PATCH] Actual merge of complete changelog.file package which contains implementation of file-based changelog

---
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java                  |  101 
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java             |   72 
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java             |    2 
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java                |  632 ++++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java                       |  607 +++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java                |  214 ++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java                        |  130 +
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java        |  860 ++++++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java           |  144 +
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java                 |  409 ++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java               |  982 ++++++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java                     |  151 +
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java                           | 1187 +++++++++++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java                 |  118 +
 opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java                                       |   24 
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java       |  394 +++
 opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java |   79 
 17 files changed, 6,008 insertions(+), 98 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java b/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
index 12d6b8f..2ba1c94 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -36,6 +36,7 @@
 import org.opends.server.core.ModifyDNOperation;
 import org.opends.server.core.ModifyOperation;
 import org.opends.server.core.SearchOperation;
+import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
 import org.opends.server.types.AttributeType;
@@ -285,4 +286,27 @@
     throw new RuntimeException("Not implemented");
   }
 
+  /**
+   * Get the instance of backend.
+   *
+   * @return the instance
+   */
+  public static ChangelogBackend getInstance()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /**
+   * Notify.
+   *
+   * @param baseDN
+   *            The base DN
+   * @param updateMsg
+   *            The update msg
+   */
+  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg)
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
 }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
new file mode 100644
index 0000000..1290261
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -0,0 +1,632 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A log reader with binary search support.
+ * <p>
+ * The log file contains record offsets at fixed block size : given a block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ * <p>
+ * The reader provides both sequential access, using the {@code readRecord()} method,
+ * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+class BlockLogReader<K extends Comparable<K>, V> implements Closeable
+{
+  static final int SIZE_OF_BLOCK_OFFSET = 4;
+
+  static final int SIZE_OF_RECORD_SIZE = 4;
+
+  /**
+   * Size of a block, after which an offset to the nearest record is written.
+   * <p>
+   * This value has been fixed based on performance tests. See
+   * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
+   * OPENDJ-1472</a> for details.
+   */
+  static final int BLOCK_SIZE = 256;
+
+  private final int blockSize;
+
+  private final RecordParser<K, V> parser;
+
+  private final RandomAccessFile reader;
+
+  private final File file;
+
+  /**
+   * Creates a reader for the provided file, file reader and parser.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param file
+   *          The log file to read.
+   * @param reader
+   *          The random access reader on the log file.
+   * @param parser
+   *          The parser to decode the records read.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
+      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
+  {
+    return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
+  }
+
+  /**
+   * Creates a reader for the provided file, file reader, parser and block size.
+   * <p>
+   * This method is intended for tests only, to allow tuning of the block size.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param file
+   *          The log file to read.
+   * @param reader
+   *          The random access reader on the log file.
+   * @param parser
+   *          The parser to decode the records read.
+   * @param blockSize
+   *          The size of each block, or frequency at which the record offset is
+   *          present in the log file.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
+      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
+  {
+    return new BlockLogReader<K, V>(file, reader, parser, blockSize);
+  }
+
+  private BlockLogReader(
+      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
+  {
+    this.file = file;
+    this.reader = reader;
+    this.parser = parser;
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Position the reader to the record corresponding to the provided key and
+   * matching and positioning strategies. Returns the last record read.
+   *
+   * @param key
+   *          Key to use as a start position. Key must not be {@code null}.
+   * @param matchStrategy
+   *          The key matching strategy.
+   * @param positionStrategy
+   *          The positioning strategy.
+   * @return The pair (key_found, last_record_read). key_found is a boolean
+   *         indicating if reader is successfully positioned. last_record_read
+   *         is the last record that was read. When key_found is equals to
+   *         {@code false}, then last_record_read is always {@code null}. When
+   *         key_found is equals to {@code true}, last_record_read can be valued
+   *         or be {@code null}
+   * @throws ChangelogException
+   *           If an error occurs when seeking the key.
+   */
+  public Pair<Boolean, Record<K,V>> seekToRecord(
+      final K key,
+      final KeyMatchingStrategy matchStrategy,
+      final PositionStrategy positionStrategy)
+          throws ChangelogException
+  {
+    Reject.ifNull(key);
+    final long markerPosition = searchClosestBlockStartToKey(key);
+    if (markerPosition >= 0)
+    {
+      return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
+    }
+    return Pair.of(false, null);
+  }
+
+  /**
+   * Position the reader to the provided file position.
+   *
+   * @param filePosition
+   *            offset from the beginning of the file, in bytes.
+   * @throws ChangelogException
+   *            If an error occurs.
+   */
+  public void seekToPosition(final long filePosition) throws ChangelogException
+  {
+    try
+    {
+      reader.seek(filePosition);
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
+    }
+  }
+
+  /**
+   * Read a record from current position.
+   *
+   * @return the record read
+   * @throws ChangelogException
+   *            If an error occurs during read.
+   */
+  public Record<K,V> readRecord() throws ChangelogException
+  {
+    return readRecord(-1);
+  }
+
+  /**
+   * Returns the file position for this reader.
+   *
+   * @return the position of reader on the log file
+   * @throws ChangelogException
+   *          If an error occurs.
+   */
+  public long getFilePosition() throws ChangelogException
+  {
+    try
+    {
+      return reader.getFilePointer();
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close() throws IOException
+  {
+    reader.close();
+  }
+
+  /**
+   * Read a record, either from the provided start of block position or from
+   * the current position.
+   *
+   * @param blockStartPosition
+   *          The position of start of block, where a record offset is written.
+   *          If provided value is -1, then record is read from current position instead.
+   * @return the record read
+   * @throws ChangelogException
+   *           If an error occurs during read.
+   */
+  private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
+  {
+    try
+    {
+      if (blockStartPosition != -1)
+      {
+        positionToRecordFromBlockStart(blockStartPosition);
+      }
+      final ByteString recordData = readNextRecord();
+      return recordData != null ? parser.decodeRecord(recordData) : null;
+    }
+    catch (Exception io)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
+    }
+  }
+
+  /**
+   * Position to the record given by the offset read from provided block
+   * start.
+   *
+   * @param blockStartPosition
+   *          Position of read pointer in the file, expected to be the start of
+   *          a block where a record offset is written.
+   * @throws IOException
+   *           If an error occurs during read.
+   */
+  private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
+  {
+    reader.seek(blockStartPosition);
+    if (blockStartPosition > 0)
+    {
+      final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
+      reader.readFully(offsetData);
+      final int offsetToRecord = ByteString.wrap(offsetData).toInt();
+      if (offsetToRecord > 0)
+      {
+        reader.seek(blockStartPosition - offsetToRecord);
+      } // if offset is zero, reader is already well positioned
+    }
+  }
+
+  /**
+   * Reads the next record.
+   *
+   * @return the bytes of the next record, or {@code null} if no record is available
+   * @throws IOException
+   *            If an error occurs while reading.
+   */
+  private ByteString readNextRecord() throws IOException
+  {
+    try
+    {
+      // read length of record
+      final long filePosition = reader.getFilePointer();
+      int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
+      final int recordLength = readRecordLength(distanceToBlockStart);
+
+      // read the record
+      long currentPosition = reader.getFilePointer();
+      distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
+      final ByteStringBuilder recordBytes =
+          new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
+      int remainingBytesToRead = recordLength;
+      while (distanceToBlockStart < remainingBytesToRead)
+      {
+        if (distanceToBlockStart != 0)
+        {
+          recordBytes.append(reader, distanceToBlockStart);
+        }
+        // skip the offset
+        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+
+        // next step
+        currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
+        remainingBytesToRead -= distanceToBlockStart;
+        distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
+      }
+      if (remainingBytesToRead > 0)
+      {
+        // last bytes of the record
+        recordBytes.append(reader, remainingBytesToRead);
+      }
+      return recordBytes.toByteString();
+    }
+    catch (EOFException e)
+    {
+      // end of stream, no record or uncomplete record
+      return null;
+    }
+  }
+
+  /**
+   * Returns the total length in bytes taken by a record when stored in log file,
+   * including size taken by block offsets.
+   *
+   * @param recordLength
+   *            The length of record to write.
+   * @param distanceToBlockStart
+   *            Distance before the next block start.
+   * @return the length in bytes necessary to store the record in the log
+   */
+  int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
+  {
+    int totalLength = recordLength;
+    if (recordLength > distanceToBlockStart)
+    {
+      totalLength += SIZE_OF_BLOCK_OFFSET;
+      final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
+      totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
+    }
+    return totalLength;
+  }
+
+  /** Read the length of a record. */
+  private int readRecordLength(final int distanceToBlockStart) throws IOException
+  {
+    final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
+    if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
+    {
+      lengthBytes.append(reader, distanceToBlockStart);
+      // skip the offset
+      reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
+      return lengthBytes.toByteString().toInt();
+    }
+    else
+    {
+      if (distanceToBlockStart == 0)
+      {
+        // skip the offset
+        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+      }
+      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
+      return lengthBytes.toByteString().toInt();
+    }
+  }
+
+  /**
+   * Search the closest block start to the provided key, using binary search.
+   * <p>
+   * Note that position of reader is modified by this method.
+   *
+   * @param key
+   *          The key to search
+   * @return the file position of block start that must be used to find the given key,
+   *      or a negative number if no position could be found.
+   * @throws ChangelogException
+   *          if a problem occurs
+   */
+  long searchClosestBlockStartToKey(K key) throws ChangelogException
+  {
+    final long maxPos = getFileLength() - 1;
+    long lowPos = 0L;
+    long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
+
+    while (lowPos <= highPos)
+    {
+      final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
+      final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
+      final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
+      if (middleRecord == null)
+      {
+        return -1;
+      }
+
+      final int keyComparison = middleRecord.getKey().compareTo(key);
+      if (keyComparison < 0)
+      {
+        if (middleBlockStartPos <= lowPos)
+        {
+          return lowPos;
+        }
+        lowPos = middleBlockStartPos;
+      }
+      else if (keyComparison > 0)
+      {
+        if (middleBlockStartPos >= highPos)
+        {
+          return highPos;
+        }
+        highPos = middleBlockStartPos;
+      }
+      else
+      {
+        return middleBlockStartPos;
+      }
+    }
+    // Unable to find a position where key can be found
+    return -1;
+  }
+
+  private long getFileLength() throws ChangelogException
+  {
+    try
+    {
+      return reader.length();
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
+    }
+  }
+
+  /**
+   * Position before, at or after provided key, starting from provided block
+   * start position and reading sequentially until key is found according to
+   * matching and positioning strategies.
+   *
+   * @param blockStartPosition
+   *          Position of read pointer in the file, expected to be the start of
+   *          a block where a record offset is written
+   * @param key
+   *          The key to find
+   * @param matchStrategy
+   *          The key matching strategy
+   * @param positionStrategy
+   *          The positioning strategy
+   * @return The pair ({@code true}, selected record) if reader is successfully
+   *         positioned (selected record may be null if end of file is reached),
+   *         ({@code false}, null) otherwise.
+   * @throws ChangelogException
+   *           If an error occurs.
+   */
+   Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key,
+       final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+   {
+    Record<K,V> record = readRecord(blockStartPosition);
+    Record<K,V> previousRecord = null;
+    long previousPosition = blockStartPosition;
+    boolean matchingKeyIsLowerThanAnyRecord = true;
+    while (record != null)
+    {
+      final int keysComparison = record.getKey().compareTo(key);
+      if (keysComparison <= 0)
+      {
+        matchingKeyIsLowerThanAnyRecord = false;
+      }
+      if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY)
+          || (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY))
+      {
+        return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord,
+            record, previousRecord, previousPosition);
+      }
+      previousRecord = record;
+      previousPosition = getFilePosition();
+      record = readRecord();
+    }
+
+    if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
+    {
+      return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition);
+    }
+    return Pair.of(false, null);
+  }
+
+  private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy,
+      PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord,
+      Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition)
+          throws ChangelogException
+  {
+    Record<K, V> record = currentRecord;
+
+    if (positionStrategy == AFTER_MATCHING_KEY)
+    {
+      if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord)
+      {
+        return Pair.of(false, null);
+      }
+      if (keysComparison == 0)
+      {
+        // skip matching key
+        record = readRecord();
+      }
+    }
+    else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0)
+    {
+      seekToPosition(previousPosition);
+      return Pair.of(previousRecord != null, previousRecord);
+    }
+    return Pair.of(true, record);
+  }
+
+  private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy(
+      final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition)
+          throws ChangelogException
+  {
+    if (positionStrategy == ON_MATCHING_KEY)
+    {
+      seekToPosition(previousPosition);
+      return Pair.of(previousRecord != null, previousRecord);
+    }
+    else
+    {
+      return Pair.of(true, null);
+    }
+  }
+
+  /**
+   * Returns the closest start of block which has a position lower than or equal
+   * to the provided file position.
+   *
+   * @param filePosition
+   *          The position of reader on file.
+   * @return the file position of the block start.
+   */
+  long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
+  {
+    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+    return dist == 0 ? filePosition : filePosition + dist - blockSize;
+  }
+
+  /**
+   * Returns the closest start of block which has a position strictly
+   * higher than the provided file position.
+   *
+   * @param filePosition
+   *           The position of reader on file.
+   * @return the file position of the block start.
+   */
+  long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
+  {
+    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+    return dist == 0 ? filePosition + blockSize : filePosition + dist;
+  }
+
+  /**
+   * Returns the distance to next block for the provided file position.
+   *
+   * @param filePosition
+   *            offset from the beginning of the file, in bytes.
+   * @param blockSize
+   *            Size of each block in bytes.
+   * @return the distance to next block in bytes
+   */
+  static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
+  {
+    if (filePosition == 0)
+    {
+      return blockSize;
+    }
+    final int distance = (int) (filePosition % blockSize);
+    return distance == 0 ? 0 : blockSize - distance;
+  }
+
+  /**
+   * Check if the log file is valid, by reading the latest records at the end of
+   * the log file.
+   * <p>
+   * The intent of this method is to allow self-recovery in case a partial
+   * record as been written at the end of file (eg, after a server crash).
+   * <p>
+   * Any unexpected exception is considered as a severe error where
+   * self-recovery is not appropriate and thus will lead to a
+   * ChangelogException.
+   *
+   * @return -1 if log is valid, or a positive number if log is invalid, where
+   *         the number represents the last valid position in the log file.
+   * @throws ChangelogException
+   *           if an error occurs while checking the log
+   */
+ long checkLogIsValid() throws ChangelogException
+ {
+   try
+   {
+     final long fileSize = getFileLength();
+     final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
+     positionToRecordFromBlockStart(lastBlockStart);
+
+     long lastValidPosition = lastBlockStart;
+     for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
+       parser.decodeRecord(recordData);
+       lastValidPosition = reader.getFilePointer();
+     }
+
+     final boolean isFileValid = lastValidPosition == fileSize;
+     return isFileValid ? -1 : lastValidPosition;
+   }
+   catch (Exception e)
+   {
+     throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+         file.getPath(),
+         StaticUtils.stackTraceToSingleLineString(e)));
+   }
+ }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
new file mode 100644
index 0000000..d0cd37c
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
@@ -0,0 +1,214 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * A log writer, using fixed-size blocks to allow fast retrieval when reading.
+ * <p>
+ * The log file contains record offsets at fixed block size : given block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+class BlockLogWriter<K extends Comparable<K>, V> implements Closeable
+{
+  private final int blockSize;
+
+  private final RecordParser<K, V> parser;
+
+  private final LogWriter writer;
+
+  /**
+   * Creates a writer for the provided log writer and parser.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param writer
+   *          The writer on the log file.
+   * @param parser
+   *          The parser to encode the records.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter(
+      final LogWriter writer, final RecordParser<K, V> parser)
+  {
+    return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE);
+  }
+
+  /**
+   * Creates a writer for the provided log writer, parser and size for blocks.
+   * <p>
+   * This method is intended for tests only, to allow tuning of the block size.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param writer
+   *          The writer on the log file.
+   * @param parser
+   *          The parser to encode the records.
+   * @param blockSize
+   *          The size of each block, or frequency at which the record offset is
+   *          present in the log file.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests(
+      final LogWriter writer, final RecordParser<K, V> parser, final int blockSize)
+  {
+    return new BlockLogWriter<K, V>(writer, parser, blockSize);
+  }
+
+  /**
+   * Creates the writer with an underlying writer, a parser and a size for blocks.
+   *
+   * @param writer
+   *            The writer to the log file.
+   * @param parser
+   *            The parser to encode the records.
+   * @param blockSize
+   *            The size of each block.
+   */
+  private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize)
+  {
+    Reject.ifNull(writer, parser);
+    this.writer = writer;
+    this.parser = parser;
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Writes the provided record to the log file.
+   *
+   * @param record
+   *            The record to write.
+   * @throws ChangelogException
+   *            If a problem occurs during write.
+   */
+  public void write(final Record<K, V> record) throws ChangelogException
+  {
+    try
+    {
+      write(parser.encodeRecord(record));
+      writer.flush();
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(),
+          writer.getFile().getPath()), e);
+    }
+  }
+
+  /**
+   * Returns the number of bytes written in the log file.
+   *
+   * @return the number of bytes
+   */
+  public long getBytesWritten()
+  {
+    return writer.getBytesWritten();
+  }
+
+  /**
+   * Synchronize all modifications to the log file to the underlying device.
+   *
+   * @throws SyncFailedException
+   *           If synchronization fails.
+   */
+  public void sync() throws SyncFailedException
+  {
+    writer.sync();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    writer.close();
+  }
+
+  /**
+   * Writes the provided byte string to the log file.
+   *
+   * @param record
+   *            The value to write.
+   * @throws IOException
+   *            If an error occurs while writing
+   */
+  private void write(final ByteString record) throws IOException
+  {
+    // Add length of record before writing
+    ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()).
+        append(record.length()).
+        append(record).
+        toByteString();
+
+    int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize);
+    int cumulatedDistanceToBeginning = distanceToBlockStart;
+    int dataPosition = 0;
+    int dataRemaining = data.length();
+    final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET;
+
+    while (distanceToBlockStart < dataRemaining)
+    {
+      if (distanceToBlockStart > 0)
+      {
+        // append part of record
+        final int dataEndPosition = dataPosition + distanceToBlockStart;
+        writer.write(data.subSequence(dataPosition, dataEndPosition));
+        dataPosition = dataEndPosition;
+        dataRemaining -= distanceToBlockStart;
+      }
+      // append the offset to the record
+      writer.write(ByteString.valueOf(cumulatedDistanceToBeginning));
+
+      // next step
+      distanceToBlockStart = dataSizeForOneBlock;
+      cumulatedDistanceToBeginning += blockSize;
+    }
+    // append the remaining bytes to finish the record
+    writer.write(data.subSequence(dataPosition, data.length()));
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
new file mode 100644
index 0000000..897bb4a
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
@@ -0,0 +1,72 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * Exception thrown when a record can't be decoded properly.
+ */
+public class DecodingException extends ChangelogException
+{
+  private static final long serialVersionUID = 5629692522662643737L;
+
+  /**
+   * Creates a new decoding exception with the provided information.
+   *
+   * @param message
+   *          The message that explains the problem that occurred.
+   */
+  public DecodingException(LocalizableMessage message)
+  {
+    super(message);
+  }
+
+  /**
+   * Creates a new decoding exception with the provided information.
+   *
+   * @param cause
+   *          The underlying cause that triggered this exception.
+   */
+  public DecodingException(Throwable cause)
+  {
+    super(cause);
+  }
+
+  /**
+   * Creates a new decoding exception with the provided information.
+   *
+   * @param message
+   *          The message that explains the problem that occurred.
+   * @param cause
+   *          The underlying cause that triggered this exception.
+   */
+  public DecodingException(LocalizableMessage message, Throwable cause)
+  {
+    super(message, cause);
+  }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
new file mode 100644
index 0000000..e4ebf0e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -0,0 +1,394 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.types.*;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Implementation of a ChangeNumberIndexDB with a log.
+ * <p>
+ * This class publishes some monitoring information below <code>
+ * cn=monitor</code>.
+ */
+class FileChangeNumberIndexDB implements ChangeNumberIndexDB
+{
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+  private static final int NO_KEY = 0;
+
+  /** The parser of records stored in this ChangeNumberIndexDB. */
+  static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
+
+  /** The log in which records are persisted. */
+  private final Log<Long, ChangeNumberIndexRecord> log;
+
+  /**
+   * The newest changenumber stored in the DB. It is used to avoid purging the
+   * record with the newest changenumber. The newest record in the changenumber
+   * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
+   * then retrieved on server startup.
+   */
+  private volatile long newestChangeNumber = NO_KEY;
+
+  /**
+   * The last generated value for the change number. It is kept separate from
+   * the {@link #newestChangeNumber} because there is an opportunity for a race
+   * condition between:
+   * <ol>
+   * <li>this atomic long being incremented for a new record ('recordB')</li>
+   * <li>the current newest record ('recordA') being purged from the DB</li>
+   * <li>'recordB' failing to be inserted in the DB</li>
+   * </ol>
+   */
+  private final AtomicLong lastGeneratedChangeNumber;
+
+  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+
+  private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+  /**
+   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
+   *
+   * @param replicationEnv the Database Env to use to create the ReplicationServer DB.
+   * server for this domain.
+   * @throws ChangelogException If a database problem happened
+   */
+  FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
+  {
+    log = replicationEnv.getOrCreateCNIndexDB();
+    final ChangeNumberIndexRecord newestRecord = readLastRecord();
+    newestChangeNumber = getChangeNumber(newestRecord);
+    // initialization of the lastGeneratedChangeNumber from the DB content
+    // if DB is empty => last record does not exist => default to 0
+    lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
+
+    // Monitoring registration
+    DirectoryServer.deregisterMonitorProvider(dbMonitor);
+    DirectoryServer.registerMonitorProvider(dbMonitor);
+  }
+
+  private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
+  {
+    final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
+    return record == null ? null : record.getValue();
+  }
+
+  private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
+  {
+    final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
+    return record == null ? null : record.getValue();
+  }
+
+  private long getChangeNumber(final ChangeNumberIndexRecord record) throws ChangelogException
+  {
+    if (record != null)
+    {
+      return record.getChangeNumber();
+    }
+    return NO_KEY;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long addRecord(final ChangeNumberIndexRecord record) throws ChangelogException
+  {
+    final long changeNumber = nextChangeNumber();
+    final ChangeNumberIndexRecord newRecord =
+        new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
+    log.append(Record.from(newRecord.getChangeNumber(), newRecord));
+    newestChangeNumber = changeNumber;
+
+    if (logger.isTraceEnabled())
+    {
+      logger.trace("In FileChangeNumberIndexDB.addRecord, added: " + newRecord);
+    }
+    return changeNumber;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException
+  {
+    return readFirstRecord();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException
+  {
+    return readLastRecord();
+  }
+
+  private long nextChangeNumber()
+  {
+    return lastGeneratedChangeNumber.incrementAndGet();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getLastGeneratedChangeNumber()
+  {
+    return lastGeneratedChangeNumber.get();
+  }
+
+  /**
+   * Get the number of changes.
+   *
+   * @return Returns the number of changes.
+   * @throws ChangelogException
+   *            If a problem occurs.
+   */
+  long count() throws ChangelogException
+  {
+    return log.getNumberOfRecords();
+  }
+
+  /**
+   * Returns whether this database is empty.
+   *
+   * @return <code>true</code> if this database is empty, <code>false</code>
+   *         otherwise
+   * @throws ChangelogException
+   *           if a problem occurs.
+   */
+  boolean isEmpty() throws ChangelogException
+  {
+    return getNewestRecord() == null;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
+  {
+    return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
+  }
+
+  /**
+   * Shutdown this DB.
+   */
+  void shutdown()
+  {
+    if (shutdown.compareAndSet(false, true))
+    {
+      log.close();
+      DirectoryServer.deregisterMonitorProvider(dbMonitor);
+    }
+  }
+
+  /**
+   * Synchronously purges the change number index DB up to and excluding the
+   * provided timestamp.
+   *
+   * @param purgeCSN
+   *          the timestamp up to which purging must happen
+   * @return the oldest non purged CSN.
+   * @throws ChangelogException
+   *           if a database problem occurs.
+   */
+  CSN purgeUpTo(final CSN purgeCSN) throws ChangelogException
+  {
+    if (isEmpty() || purgeCSN == null)
+    {
+      return null;
+    }
+    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
+    return record != null ? record.getValue().getCSN() : null;
+  }
+
+  /**
+   * Implements the Monitoring capabilities of the FileChangeNumberIndexDB.
+   */
+  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
+  {
+    /** {@inheritDoc} */
+    @Override
+    public List<Attribute> getMonitorData()
+    {
+      final List<Attribute> attributes = new ArrayList<Attribute>();
+      attributes.add(createChangeNumberAttribute(true));
+      attributes.add(createChangeNumberAttribute(false));
+      long numberOfChanges = 0;
+      try
+      {
+         numberOfChanges = count();
+      }
+      catch (ChangelogException e)
+      {
+        logger.traceException(e);
+      }
+      attributes.add(Attributes.create("count", Long.toString(numberOfChanges)));
+      return attributes;
+    }
+
+    private Attribute createChangeNumberAttribute(final boolean isFirst)
+    {
+      final String attributeName = isFirst ? "first-draft-changenumber" : "last-draft-changenumber";
+      final String changeNumber = String.valueOf(readChangeNumber(isFirst));
+      return Attributes.create(attributeName, changeNumber);
+    }
+
+    private long readChangeNumber(final boolean isFirst)
+    {
+      try
+      {
+        return getChangeNumber(isFirst ? readFirstRecord() : readLastRecord());
+      }
+      catch (ChangelogException e)
+      {
+        logger.traceException(e);
+        return NO_KEY;
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getMonitorInstanceName()
+    {
+      return "ChangeNumber Index Database";
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void initializeMonitorProvider(MonitorProviderCfg configuration)
+                            throws ConfigException, InitializationException
+    {
+      // Nothing to do for now
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + ", newestChangeNumber=" + newestChangeNumber;
+  }
+
+  /**
+   * Clear the changes from this DB (from both memory cache and DB storage).
+   *
+   * @throws ChangelogException
+   *           if a database problem occurs.
+   */
+  public void clear() throws ChangelogException
+  {
+    log.clear();
+    newestChangeNumber = NO_KEY;
+  }
+
+  /** Parser of records persisted in the FileChangeNumberIndex log. */
+  private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord>
+  {
+    private static final byte STRING_SEPARATOR = 0;
+
+    @Override
+    public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
+    {
+      final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
+      return new ByteStringBuilder()
+        .append(record.getKey())
+        .append(cnIndexRecord.getBaseDN().toString())
+        .append(STRING_SEPARATOR)
+        .append(cnIndexRecord.getCSN().toByteString()).toByteString();
+    }
+
+    @Override
+    public Record<Long, ChangeNumberIndexRecord> decodeRecord(final ByteString data) throws DecodingException
+    {
+      try
+      {
+        ByteSequenceReader reader = data.asReader();
+        final long changeNumber = reader.getLong();
+        final DN baseDN = DN.valueOf(reader.getString(getNextStringLength(reader)));
+        reader.skip(1);
+        final CSN csn = CSN.valueOf(reader.getByteString(reader.remaining()));
+
+        return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, baseDN, csn));
+      }
+      catch (Exception e)
+      {
+        throw new DecodingException(e);
+      }
+    }
+
+    /** Returns the length of next string by looking for the zero byte used as separator. */
+    private int getNextStringLength(ByteSequenceReader reader)
+    {
+      int length = 0;
+      while (reader.peek(length) != STRING_SEPARATOR)
+      {
+        length++;
+      }
+      return length;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Long decodeKeyFromString(String key) throws ChangelogException
+    {
+      try
+      {
+        return Long.valueOf(key);
+      }
+      catch (NumberFormatException e)
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String encodeKeyToString(Long key)
+    {
+      return key.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Long getMaxKey()
+    {
+      return Long.MAX_VALUE;
+    }
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
new file mode 100644
index 0000000..d492576
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
@@ -0,0 +1,79 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * A cursor on ChangeNumberIndexDB.
+ *
+ * \@NotThreadSafe
+ */
+class FileChangeNumberIndexDBCursor implements DBCursor<ChangeNumberIndexRecord>
+{
+
+  /** The underlying cursor. */
+  private final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor;
+
+  /**
+   * Creates the cursor from provided cursor.
+   *
+   * @param cursor
+   *          The underlying cursor to read log.
+   * @throws ChangelogException
+   *            If an error occurs.
+   */
+  FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor)
+      throws ChangelogException
+  {
+    this.cursor = cursor;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ChangeNumberIndexRecord getRecord()
+  {
+    final Record<Long, ChangeNumberIndexRecord> record = cursor.getRecord();
+    return record != null ? record.getValue() : null;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next() throws ChangelogException
+  {
+    return cursor.next();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    cursor.close();
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index d735b38..4de0aba 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -21,17 +21,36 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2014 ForgeRock AS.
+ *      Copyright 2014 ForgeRock AS
  */
 package org.opends.server.replication.server.changelog.file;
 
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
+import org.forgerock.i18n.LocalizableMessageBuilder;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
 import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
+import org.opends.server.backends.ChangelogBackend;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
@@ -40,43 +59,25 @@
 import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
 import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
 import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
+import org.opends.server.replication.server.changelog.je.DomainDBCursor;
 import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
+import org.opends.server.replication.server.changelog.je.ReplicaCursor;
 import org.opends.server.types.DN;
+import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
+
+import com.forgerock.opendj.util.Pair;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
 
 /**
- * File-based implementation of the ChangelogDB interface.
+ * Log file implementation of the ChangelogDB interface.
  */
 public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
 {
-
-  /**
-   * Creates the changelog DB.
-   *
-   * @param replicationServer
-   *            replication server
-   * @param cfg
-   *            configuration
-   */
-  public FileChangelogDB(ReplicationServer replicationServer,
-      ReplicationServerCfg cfg)
-  {
-    throw new RuntimeException("Not implemented");
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public ServerState getDomainOldestCSNs(DN baseDN)
-  {
-    throw new RuntimeException("Not implemented");
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public ServerState getDomainNewestCSNs(DN baseDN)
-  {
-    throw new RuntimeException("Not implemented");
-  }
-
   /** {@inheritDoc} */
   @Override
   public long getDomainLatestTrimDate(DN baseDN)
@@ -84,136 +85,923 @@
     throw new RuntimeException("Not implemented");
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public void removeDomain(DN baseDN) throws ChangelogException
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+  /**
+   * This map contains the List of updates received from each LDAP server.
+   * <p>
+   * When removing a domainMap, code:
+   * <ol>
+   * <li>first get the domainMap</li>
+   * <li>synchronized on the domainMap</li>
+   * <li>remove the domainMap</li>
+   * <li>then check it's not null</li>
+   * <li>then close all inside</li>
+   * </ol>
+   * When creating a replicaDB, synchronize on the domainMap to avoid
+   * concurrent shutdown.
+   */
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
+      new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
+  /**
+   * \@GuardedBy("itself")
+   */
+  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
+      new HashMap<DN, List<DomainDBCursor>>();
+  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
+      new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
+  private ReplicationEnvironment replicationEnv;
+  private final ReplicationServerCfg config;
+  private final File dbDirectory;
+
+  /**
+   * The handler of the changelog database, the database stores the relation
+   * between a change number and the associated cookie.
+   * <p>
+   * @GuardedBy("cnIndexDBLock")
+   */
+  private FileChangeNumberIndexDB cnIndexDB;
+  private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>();
+
+  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
+  private final Object cnIndexDBLock = new Object();
+
+  /**
+   * The purge delay (in milliseconds). Records in the changelog DB that are
+   * older than this delay might be removed.
+   */
+  private long purgeDelayInMillis;
+  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
+
+  /** The local replication server. */
+  private final ReplicationServer replicationServer;
+  private final AtomicBoolean shutdown = new AtomicBoolean();
+
+  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
+      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
+
+  /**
+   * Creates a new changelog DB.
+   *
+   * @param replicationServer
+   *          the local replication server.
+   * @param config
+   *          the replication server configuration
+   * @throws ConfigException
+   *           if a problem occurs opening the supplied directory
+   */
+  public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
+      throws ConfigException
   {
-    throw new RuntimeException("Not implemented");
+    this.config = config;
+    this.replicationServer = replicationServer;
+    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
-      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
-      throws ChangelogException
+  private File makeDir(final String dbDirName) throws ConfigException
   {
-    throw new RuntimeException("Not implemented");
+    // Check that this path exists or create it.
+    final File dbDirectory = getFileForPath(dbDirName);
+    try
+    {
+      if (!dbDirectory.exists())
+      {
+        dbDirectory.mkdir();
+      }
+      return dbDirectory;
+    }
+    catch (Exception e)
+    {
+      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(e.getLocalizedMessage()).append(" ")
+          .append(String.valueOf(dbDirectory));
+      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
+    }
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
-      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
-      Set<DN> excludedDomainDns) throws ChangelogException
+  private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN)
   {
-    throw new RuntimeException("Not implemented");
+    final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+    if (domainMap != null)
+    {
+      return domainMap;
+    }
+    return Collections.emptyMap();
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
-      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
-      throws ChangelogException
+  private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId)
   {
-    throw new RuntimeException("Not implemented");
+    return getDomainMap(baseDN).get(serverId);
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
-      CSN startCSN, KeyMatchingStrategy matchingStrategy,
-      PositionStrategy positionStrategy) throws ChangelogException
+  /**
+   * Returns a {@link FileReplicaDB}, possibly creating it.
+   *
+   * @param baseDN
+   *          the baseDN for which to create a ReplicaDB
+   * @param serverId
+   *          the serverId for which to create a ReplicaDB
+   * @param server
+   *          the ReplicationServer
+   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
+   * @throws ChangelogException
+   *           if a problem occurred with the database
+   */
+  Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
+      final ReplicationServer server) throws ChangelogException
   {
-    throw new RuntimeException("Not implemented");
+    while (!shutdown.get())
+    {
+      final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
+      final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
+      if (result != null)
+      {
+        final Boolean dbWasCreated = result.getSecond();
+        if (dbWasCreated)
+        { // new replicaDB => update all cursors with it
+          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+          if (cursors != null && !cursors.isEmpty())
+          {
+            for (DomainDBCursor cursor : cursors)
+            {
+              cursor.addReplicaDB(serverId, null);
+            }
+          }
+        }
+
+        return result;
+      }
+    }
+    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public void unregisterCursor(DBCursor<?> cursor)
+  private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
   {
-    throw new RuntimeException("Not implemented");
+    // happy path: the domainMap already exists
+    final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
+    if (currentValue != null)
+    {
+      return currentValue;
+    }
+
+    // unlucky, the domainMap does not exist: take the hit and create the
+    // newValue, even though the same could be done concurrently by another thread
+    final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>();
+    final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+    if (previousValue != null)
+    {
+      // there was already a value associated to the key, let's use it
+      return previousValue;
+    }
+
+    // we just created a new domain => update all cursors
+    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
+    {
+      cursor.addDomain(baseDN, null);
+    }
+    return newValue;
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
-      throws ChangelogException
+  private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
+      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
   {
-    throw new RuntimeException("Not implemented");
-  }
+    // happy path: the replicaDB already exists
+    FileReplicaDB currentValue = domainMap.get(serverId);
+    if (currentValue != null)
+    {
+      return Pair.of(currentValue, false);
+    }
 
-  /** {@inheritDoc} */
-  @Override
-  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
-      throws ChangelogException
-  {
-    throw new RuntimeException("Not implemented");
+    // unlucky, the replicaDB does not exist: take the hit and synchronize
+    // on the domainMap to create a new ReplicaDB
+    synchronized (domainMap)
+    {
+      currentValue = domainMap.get(serverId);
+      if (currentValue != null)
+      {
+        return Pair.of(currentValue, false);
+      }
+
+      if (domainToReplicaDBs.get(baseDN) != domainMap)
+      {
+        // The domainMap could have been concurrently removed because
+        // 1) a shutdown was initiated or 2) an initialize was called.
+        // Return will allow the code to:
+        // 1) shutdown properly or 2) lazily recreate the replicaDB
+        return null;
+      }
+
+      final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv);
+      domainMap.put(serverId, newDB);
+      return Pair.of(newDB, true);
+    }
   }
 
   /** {@inheritDoc} */
   @Override
   public void initializeDB()
   {
-    throw new RuntimeException("Not implemented");
+    try
+    {
+      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
+      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
+      final ChangelogState changelogState = replicationEnv.getChangelogState();
+      initializeToChangelogState(changelogState);
+      if (config.isComputeChangeNumber())
+      {
+        startIndexer(changelogState);
+      }
+      setPurgeDelay(replicationServer.getPurgeDelay());
+    }
+    catch (ChangelogException e)
+    {
+      logger.traceException(e);
+      logger.error(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
+    }
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public void setPurgeDelay(long delayInMillis)
-  {
-    throw new RuntimeException("Not implemented");
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void setComputeChangeNumber(boolean computeChangeNumber)
+  private void initializeToChangelogState(final ChangelogState changelogState)
       throws ChangelogException
   {
-    throw new RuntimeException("Not implemented");
+    for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
+    {
+      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
+    }
+    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
+    {
+      for (int serverId : entry.getValue())
+      {
+        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
+      }
+    }
+  }
+
+  private void shutdownChangeNumberIndexDB() throws ChangelogException
+  {
+    synchronized (cnIndexDBLock)
+    {
+      if (cnIndexDB != null)
+      {
+        cnIndexDB.shutdown();
+      }
+    }
   }
 
   /** {@inheritDoc} */
   @Override
   public void shutdownDB() throws ChangelogException
   {
-    throw new RuntimeException("Not implemented");
+    if (!this.shutdown.compareAndSet(false, true))
+    { // shutdown has already been initiated
+      return;
+    }
+
+    // Remember the first exception because :
+    // - we want to try to remove everything we want to remove
+    // - then throw the first encountered exception
+    ChangelogException firstException = null;
+
+    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+    if (indexer != null)
+    {
+      indexer.initiateShutdown();
+    }
+    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+    if (purger != null)
+    {
+      purger.initiateShutdown();
+    }
+
+    // wait for shutdown of the threads holding cursors
+    try
+    {
+      if (indexer != null)
+      {
+        indexer.join();
+      }
+      if (purger != null)
+      {
+        purger.join();
+      }
+    }
+    catch (InterruptedException e)
+    {
+      // do nothing: we are already shutting down
+    }
+
+    // now we can safely shutdown all DBs
+    try
+    {
+      shutdownChangeNumberIndexDB();
+    }
+    catch (ChangelogException e)
+    {
+      firstException = e;
+    }
+    for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
+        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
+    {
+      final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next();
+      synchronized (domainMap)
+      {
+        it.remove();
+        for (FileReplicaDB replicaDB : domainMap.values())
+        {
+          replicaDB.shutdown();
+        }
+      }
+    }
+    if (replicationEnv != null)
+    {
+      replicationEnv.shutdown();
+    }
+
+    if (firstException != null)
+    {
+      throw firstException;
+    }
+  }
+
+  /**
+   * Clears all records from the changelog (does not remove the changelog itself).
+   *
+   * @throws ChangelogException
+   *           If an error occurs when clearing the changelog.
+   */
+  public void clearDB() throws ChangelogException
+  {
+    if (!dbDirectory.exists())
+    {
+      return;
+    }
+
+    // Remember the first exception because :
+    // - we want to try to remove everything we want to remove
+    // - then throw the first encountered exception
+    ChangelogException firstException = null;
+
+    for (DN baseDN : this.domainToReplicaDBs.keySet())
+    {
+      removeDomain(baseDN);
+    }
+
+    synchronized (cnIndexDBLock)
+    {
+      if (cnIndexDB != null)
+      {
+        try
+        {
+          cnIndexDB.clear();
+        }
+        catch (ChangelogException e)
+        {
+          firstException = e;
+        }
+
+        try
+        {
+          shutdownChangeNumberIndexDB();
+        }
+        catch (ChangelogException e)
+        {
+          if (firstException == null)
+          {
+            firstException = e;
+          }
+          else if (logger.isTraceEnabled())
+          {
+            logger.traceException(e);
+          }
+        }
+
+        cnIndexDB = null;
+      }
+    }
+
+    if (firstException != null)
+    {
+      throw firstException;
+    }
   }
 
   /** {@inheritDoc} */
   @Override
   public void removeDB() throws ChangelogException
   {
-    throw new RuntimeException("Not implemented");
+    shutdownDB();
+    StaticUtils.recursiveDelete(dbDirectory);
   }
 
   /** {@inheritDoc} */
   @Override
-  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
+  public ServerState getDomainOldestCSNs(DN baseDN)
+  {
+    final ServerState result = new ServerState();
+    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
+    {
+      result.update(replicaDB.getOldestCSN());
+    }
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ServerState getDomainNewestCSNs(DN baseDN)
+  {
+    final ServerState result = new ServerState();
+    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
+    {
+      result.update(replicaDB.getNewestCSN());
+    }
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void removeDomain(DN baseDN) throws ChangelogException
+  {
+    // Remember the first exception because :
+    // - we want to try to remove everything we want to remove
+    // - then throw the first encountered exception
+    ChangelogException firstException = null;
+
+    // 1- clear the replica DBs
+    Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+    if (domainMap != null)
+    {
+      final ChangeNumberIndexer indexer = this.cnIndexer.get();
+      if (indexer != null)
+      {
+        indexer.clear(baseDN);
+      }
+      synchronized (domainMap)
+      {
+        domainMap = domainToReplicaDBs.remove(baseDN);
+        for (FileReplicaDB replicaDB : domainMap.values())
+        {
+          try
+          {
+            replicaDB.clear();
+          }
+          catch (ChangelogException e)
+          {
+            firstException = e;
+          }
+          replicaDB.shutdown();
+        }
+      }
+    }
+
+
+    // 2- clear the changelogstate DB
+    try
+    {
+      replicationEnv.clearGenerationId(baseDN);
+    }
+    catch (ChangelogException e)
+    {
+      if (firstException == null)
+      {
+        firstException = e;
+      }
+      else if (logger.isTraceEnabled())
+      {
+        logger.traceException(e);
+      }
+    }
+
+    if (firstException != null)
+    {
+      throw firstException;
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setPurgeDelay(final long purgeDelayInMillis)
+  {
+    this.purgeDelayInMillis = purgeDelayInMillis;
+    final ChangelogDBPurger purger;
+    if (purgeDelayInMillis > 0)
+    {
+      purger = new ChangelogDBPurger();
+      if (cnPurger.compareAndSet(null, purger))
+      {
+        purger.start();
+      } // otherwise a purger was already running
+    }
+    else
+    {
+      purger = cnPurger.getAndSet(null);
+      if (purger != null)
+      {
+        purger.initiateShutdown();
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void setComputeChangeNumber(final boolean computeChangeNumber)
       throws ChangelogException
   {
-    throw new RuntimeException("Not implemented");
+    if (computeChangeNumber)
+    {
+      startIndexer(replicationEnv.getChangelogState());
+    }
+    else
+    {
+      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
+      if (indexer != null)
+      {
+        indexer.initiateShutdown();
+      }
+    }
+  }
+
+  private void startIndexer(final ChangelogState changelogState)
+  {
+    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
+    if (cnIndexer.compareAndSet(null, indexer))
+    {
+      indexer.start();
+    }
   }
 
   /** {@inheritDoc} */
   @Override
   public ChangeNumberIndexDB getChangeNumberIndexDB()
   {
-    throw new RuntimeException("Not implemented");
+    synchronized (cnIndexDBLock)
+    {
+      if (cnIndexDB == null)
+      {
+        try
+        {
+          cnIndexDB = new FileChangeNumberIndexDB(replicationEnv);
+        }
+        catch (Exception e)
+        {
+          logger.traceException(e);
+          logger.error(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
+        }
+      }
+      return cnIndexDB;
+    }
   }
 
   /** {@inheritDoc} */
   @Override
   public ReplicationDomainDB getReplicationDomainDB()
   {
-    throw new RuntimeException("Not implemented");
+    return this;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+      KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    final Set<DN> excludedDomainDns = Collections.emptySet();
+    return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
+      final Set<DN> excludedDomainDns)
+          throws ChangelogException
+  {
+    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
+    registeredMultiDomainCursors.add(cursor);
+    for (DN baseDN : domainToReplicaDBs.keySet())
+    {
+      if (!excludedDomainDns.contains(baseDN))
+      {
+        cursor.addDomain(baseDN, startState.getServerState(baseDN));
+      }
+    }
+    return cursor;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy);
+    for (int serverId : getDomainMap(baseDN).keySet())
+    {
+      // get the last already sent CSN from that server to get a cursor
+      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
+      cursor.addReplicaDB(serverId, lastCSN);
+    }
+    return cursor;
+  }
+
+  private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
+      final PositionStrategy positionStrategy)
+  {
+    synchronized (registeredDomainCursors)
+    {
+      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
+      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
+      if (cursors == null)
+      {
+        cursors = new ArrayList<DomainDBCursor>();
+        registeredDomainCursors.put(baseDN, cursors);
+      }
+      cursors.add(cursor);
+      return cursor;
+    }
+  }
+
+  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
+  {
+    final MultiDomainServerState offlineReplicas =
+        replicationEnv.getChangelogState().getOfflineReplicas();
+    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
+    if (offlineCSN != null
+        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
+    {
+      return offlineCSN;
+    }
+    return null;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
+      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
+    if (replicaDB != null)
+    {
+      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
+      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
+      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+      synchronized (replicaCursors)
+      {
+        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+        if (cursors == null)
+        {
+          cursors = new ArrayList<ReplicaCursor>();
+          replicaCursors.put(replicaID, cursors);
+        }
+        cursors.add(replicaCursor);
+      }
+
+      return replicaCursor;
+    }
+    return EMPTY_CURSOR_REPLICA_DB;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void unregisterCursor(final DBCursor<?> cursor)
+  {
+    if (cursor instanceof MultiDomainDBCursor)
+    {
+      registeredMultiDomainCursors.remove(cursor);
+    }
+    else if (cursor instanceof DomainDBCursor)
+    {
+      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
+      synchronized (registeredMultiDomainCursors)
+      {
+        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+    else if (cursor instanceof ReplicaCursor)
+    {
+      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+      synchronized (replicaCursors)
+      {
+        final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+        if (cursors != null)
+        {
+          cursors.remove(cursor);
+        }
+      }
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
+  {
+    final CSN csn = updateMsg.getCSN();
+    final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
+        csn.getServerId(), replicationServer);
+    final FileReplicaDB replicaDB = pair.getFirst();
+    replicaDB.add(updateMsg);
+
+    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
+
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
+      indexer.publishUpdateMsg(baseDN, updateMsg);
+    }
+    return pair.getSecond(); // replica DB was created
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
+  {
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
+      indexer.publishHeartbeat(baseDN, heartbeatCSN);
+    }
+  }
+
+  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
+      throws ChangelogException
+  {
+    if (indexer.isReplicaOffline(baseDN, serverId))
+    {
+      replicationEnv.notifyReplicaOnline(baseDN, serverId);
+    }
+    updateCursorsWithOfflineCSN(baseDN, serverId, null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
+  {
+    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
+    final ChangeNumberIndexer indexer = cnIndexer.get();
+    if (indexer != null)
+    {
+      indexer.replicaOffline(baseDN, offlineCSN);
+    }
+    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
+  }
+
+  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
+  {
+    synchronized (replicaCursors)
+    {
+      final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+      if (cursors != null)
+      {
+        for (ReplicaCursor cursor : cursors)
+        {
+          cursor.setOfflineCSN(offlineCSN);
+        }
+      }
+    }
   }
 
   /**
-   * Clear the database.
+   * The thread purging the changelogDB on a regular interval. Records are
+   * purged from the changelogDB if they are older than a delay specified in
+   * seconds. The purge process works in two steps:
+   * <ol>
+   * <li>first purge the changeNumberIndexDB and retrieve information to drive
+   * replicaDBs purging</li>
+   * <li>proceed to purge each replicaDBs based on the information collected
+   * when purging the changeNumberIndexDB</li>
+   * </ol>
    */
-  public void clearDB()
+  private final class ChangelogDBPurger extends DirectoryThread
   {
-    throw new RuntimeException("Not implemented");
-  }
+    private static final int DEFAULT_SLEEP = 500;
 
+    protected ChangelogDBPurger()
+    {
+      super("changelog DB purger");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void run()
+    {
+      // initialize CNIndexDB
+      getChangeNumberIndexDB();
+      while (!isShutdownInitiated())
+      {
+        try
+        {
+          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+          final CSN oldestNotPurgedCSN;
+
+          // next code assumes that the compute-change-number config
+          // never changes during the life time of an RS
+          if (!config.isComputeChangeNumber())
+          {
+            oldestNotPurgedCSN = purgeCSN;
+          }
+          else
+          {
+            final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+            if (localCNIndexDB == null)
+            { // shutdown has been initiated
+              return;
+            }
+
+            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+            if (oldestNotPurgedCSN == null)
+            { // shutdown may have been initiated...
+              // ... or the change number index DB is empty,
+              // wait for new changes to come in.
+
+              // Note we cannot sleep for as long as the purge delay
+              // (3 days default), because we might receive late updates
+              // that will have to be purged before the purge delay elapses.
+              // This can particularly happen in case of network partitions.
+              jeFriendlySleep(DEFAULT_SLEEP);
+              continue;
+            }
+          }
+
+          for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
+          {
+            for (final FileReplicaDB replicaDB : domainMap.values())
+            {
+              replicaDB.purgeUpTo(oldestNotPurgedCSN);
+            }
+          }
+
+          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+        }
+        catch (InterruptedException e)
+        {
+          // shutdown initiated?
+        }
+        catch (Exception e)
+        {
+          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
+          if (replicationServer != null)
+          {
+            replicationServer.shutdown();
+          }
+        }
+      }
+    }
+
+    /**
+     * This method implements a sleep() that is friendly to Berkeley JE.
+     * <p>
+     * Originally, {@link Thread#sleep(long)} was used , but waking up a
+     * sleeping threads required calling {@link Thread#interrupt()}, and JE
+     * threw exceptions when invoked on interrupted threads.
+     * <p>
+     * The solution is to replace:
+     * <ol>
+     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
+     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
+     * </ol>
+     */
+    private void jeFriendlySleep(long millis) throws InterruptedException
+    {
+      if (!isShutdownInitiated())
+      {
+        synchronized (this)
+        {
+          if (!isShutdownInitiated())
+          {
+            wait(millis);
+          }
+        }
+      }
+    }
+
+    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+    {
+      final long nextPurgeTime = notPurgedCSN.getTime();
+      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+      if (currentPurgeTime <= nextPurgeTime)
+      {
+        // sleep till the next CSN to purge,
+        return nextPurgeTime - currentPurgeTime;
+      }
+      // wait a bit before purging more
+      return DEFAULT_SLEEP;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void initiateShutdown()
+    {
+      super.initiateShutdown();
+      synchronized (this)
+      {
+        notify(); // wake up the purger thread for faster shutdown
+      }
+    }
+  }
 }
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
new file mode 100644
index 0000000..69cd5ea
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -0,0 +1,409 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.admin.std.server.MonitorProviderCfg;
+import org.opends.server.api.MonitorProvider;
+import org.opends.server.core.DirectoryServer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ProtocolVersion;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.ReplicationServerDomain;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.DN;
+import org.opends.server.types.InitializationException;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Represents a replication server database for one server in the topology.
+ * <p>
+ * It is responsible for efficiently saving the updates that is received from
+ * each master server into stable storage.
+ * <p>
+ * It is also able to generate a {@link DBCursor} that can be used to
+ * read all changes from a given {@link CSN}.
+ * <p>
+ * It publishes some monitoring information below cn=monitor.
+ */
+class FileReplicaDB
+{
+
+  /** The parser of records stored in Replica DB. */
+  static final RecordParser<CSN, UpdateMsg> RECORD_PARSER = new ReplicaDBParser();
+
+  /**
+   * Class that allows atomically setting oldest and newest CSNs without
+   * synchronization.
+   *
+   * @Immutable
+   */
+  private static final class CSNLimits
+  {
+    private final CSN oldestCSN;
+    private final CSN newestCSN;
+
+    public CSNLimits(CSN oldestCSN, CSN newestCSN)
+    {
+      this.oldestCSN = oldestCSN;
+      this.newestCSN = newestCSN;
+    }
+  }
+
+  private final AtomicBoolean shutdown = new AtomicBoolean(false);
+
+  /** The log in which records are persisted. */
+  private final Log<CSN, UpdateMsg> log;
+
+  /**
+   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
+   *
+   * @NonNull
+   */
+  private volatile CSNLimits csnLimits;
+  private final int serverId;
+  private final DN baseDN;
+  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+  private final ReplicationServer replicationServer;
+  private final ReplicationEnvironment replicationEnv;
+
+  /**
+   * Creates a new ReplicaDB associated to a given LDAP server.
+   *
+   * @param serverId
+   *          Id of this server.
+   * @param baseDN
+   *          the replication domain baseDN.
+   * @param replicationServer
+   *          The ReplicationServer that creates this ReplicaDB.
+   * @param replicationEnv
+   *          the Database Env to use to create the ReplicationServer DB. server
+   *          for this domain.
+   * @throws ChangelogException
+   *           If a database problem happened
+   */
+  FileReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
+      final ReplicationEnvironment replicationEnv) throws ChangelogException
+  {
+    this.serverId = serverId;
+    this.baseDN = baseDN;
+    this.replicationServer = replicationServer;
+    this.replicationEnv = replicationEnv;
+    this.log = createLog(replicationEnv);
+    this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
+
+    DirectoryServer.deregisterMonitorProvider(dbMonitor);
+    DirectoryServer.registerMonitorProvider(dbMonitor);
+  }
+
+  private CSN readOldestCSN() throws ChangelogException
+  {
+    final Record<CSN, UpdateMsg> record = log.getOldestRecord();
+    return record == null ? null : record.getKey();
+  }
+
+  private CSN readNewestCSN() throws ChangelogException
+  {
+    final Record<CSN, UpdateMsg> record = log.getNewestRecord();
+    return record == null ? null : record.getKey();
+  }
+
+  private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
+  {
+    final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
+    return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
+  }
+
+  /**
+   * Adds a new message.
+   *
+   * @param updateMsg
+   *          The update message to add.
+   * @throws ChangelogException
+   *           If an error occurs when trying to add the message.
+   */
+  void add(final UpdateMsg updateMsg) throws ChangelogException
+  {
+    if (shutdown.get())
+    {
+      throw new ChangelogException(
+          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
+              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
+    }
+
+    log.append(Record.from(updateMsg.getCSN(), updateMsg));
+
+    final CSNLimits limits = csnLimits;
+    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+    final boolean updateOld = limits.oldestCSN == null;
+    if (updateOld || updateNew)
+    {
+      csnLimits = new CSNLimits(
+          updateOld ? updateMsg.getCSN() : limits.oldestCSN,
+          updateNew ? updateMsg.getCSN() : limits.newestCSN);
+    }
+  }
+
+  /**
+   * Get the oldest CSN that has not been purged yet.
+   *
+   * @return the oldest CSN that has not been purged yet.
+   */
+  CSN getOldestCSN()
+  {
+    return csnLimits.oldestCSN;
+  }
+
+  /**
+   * Get the newest CSN that has not been purged yet.
+   *
+   * @return the newest CSN that has not been purged yet.
+   */
+  CSN getNewestCSN()
+  {
+    return csnLimits.newestCSN;
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the update messages from this DB.
+   * The actual starting position is defined by the provided CSN, the key
+   * matching strategy and the positioning strategy.
+   *
+   * @param startCSN
+   *          The position where the cursor must start. If null, start from the
+   *          oldest CSN
+   * @param matchingStrategy
+   *          Cursor key matching strategy
+   * @param positionStrategy
+   *          Cursor position strategy
+   * @return a new {@link DBCursor} to retrieve update messages.
+   * @throws ChangelogException
+   *           if a database problem happened
+   */
+  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
+      final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy);
+    return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
+  }
+
+  /**
+   * Shutdown this ReplicaDB.
+   */
+  void shutdown()
+  {
+    if (shutdown.compareAndSet(false, true))
+    {
+      log.close();
+      DirectoryServer.deregisterMonitorProvider(dbMonitor);
+    }
+  }
+
+  /**
+   * Synchronously purge changes older than purgeCSN from this replicaDB.
+   *
+   * @param purgeCSN
+   *          The CSN up to which changes can be purged. No purging happens when
+   *          it is {@code null}.
+   * @throws ChangelogException
+   *           In case of database problem.
+   */
+  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
+  {
+    if (purgeCSN == null)
+    {
+      return;
+    }
+    final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
+    if (oldestRecord != null)
+    {
+      csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
+    }
+  }
+
+  /**
+   * Implements monitoring capabilities of the ReplicaDB.
+   */
+  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
+  {
+    /** {@inheritDoc} */
+    @Override
+    public List<Attribute> getMonitorData()
+    {
+      final List<Attribute> attributes = new ArrayList<Attribute>();
+      create(attributes, "replicationServer-database",String.valueOf(serverId));
+      create(attributes, "domain-name", baseDN.toNormalizedString());
+      final CSNLimits limits = csnLimits;
+      if (limits.oldestCSN != null)
+      {
+        create(attributes, "first-change", encode(limits.oldestCSN));
+      }
+      if (limits.newestCSN != null)
+      {
+        create(attributes, "last-change", encode(limits.newestCSN));
+      }
+      return attributes;
+    }
+
+    private void create(final List<Attribute> attributes, final String name, final String value)
+    {
+      attributes.add(Attributes.create(name, value));
+    }
+
+    private String encode(final CSN csn)
+    {
+      return csn + " " + new Date(csn.getTime());
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getMonitorInstanceName()
+    {
+      ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
+      return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void initializeMonitorProvider(MonitorProviderCfg configuration)
+        throws ConfigException, InitializationException
+    {
+      // Nothing to do for now
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    final CSNLimits limits = csnLimits;
+    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
+        + limits.oldestCSN + " " + limits.newestCSN;
+  }
+
+  /**
+   * Clear the changes from this DB, from both memory cache and persistent
+   * storage.
+   *
+   * @throws ChangelogException
+   *           If an error occurs while removing the changes from the DB.
+   */
+  void clear() throws ChangelogException
+  {
+    // Remove all persisted data and reset generationId to default value
+    log.clear();
+    replicationEnv.resetGenerationId(baseDN);
+
+    csnLimits = new CSNLimits(null, null);
+  }
+
+  /**
+   * Return the number of records of this replicaDB.
+   * <p>
+   * For test purpose.
+   *
+   * @return The number of records of this replicaDB.
+   * @throws ChangelogException
+   *            If an error occurs
+   */
+  long getNumberRecords() throws ChangelogException
+  {
+    return log.getNumberOfRecords();
+  }
+
+  /**
+   * Dump this DB as text files, intended for debugging purpose only.
+   *
+   * @throws ChangelogException
+   *           If an error occurs during dump
+   */
+  void dumpAsTextFiles() throws ChangelogException {
+    log.dumpAsTextFile(log.getPath());
+  }
+
+  /** Parser of records persisted in the ReplicaDB log. */
+  private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
+  {
+
+    @Override
+    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
+    {
+      final UpdateMsg message = record.getValue();
+      return ByteString.wrap(message.getBytes());
+    }
+
+    @Override
+    public Record<CSN, UpdateMsg> decodeRecord(final ByteString data) throws DecodingException
+    {
+      try
+      {
+        final UpdateMsg msg =
+            (UpdateMsg) UpdateMsg.generateMsg(data.toByteArray(), ProtocolVersion.REPLICATION_PROTOCOL_V7);
+        return Record.from(msg.getCSN(), msg);
+      }
+      catch (Exception e)
+      {
+        throw new DecodingException(e);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CSN decodeKeyFromString(String key) throws ChangelogException
+    {
+      return new CSN(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String encodeKeyToString(CSN key)
+    {
+      return key.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CSN getMaxKey()
+    {
+      return CSN.MAX_CSN_VALUE;
+    }
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
new file mode 100644
index 0000000..d48b18e
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -0,0 +1,144 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
+
+/**
+ * A cursor on ReplicaDB, which can re-initialize itself after exhaustion.
+ * <p>
+ * The cursor provides a java.sql.ResultSet like API :
+ * <pre>
+ *  FileReplicaDBCursor cursor = ...;
+ *  try {
+ *    while (cursor.next()) {
+ *      Record record = cursor.getRecord();
+ *      // ... can call cursor.getRecord() again: it will return the same result
+ *    }
+ *  }
+ *  finally {
+ *    close(cursor);
+ *  }
+ * }
+ * </pre>
+ * <p>
+ * The cursor automatically re-initializes itself if it is exhausted: if a
+ * record is newly available, a subsequent call to the {@code next()} method will
+ * return {@code true} and the record will be available by calling {@code getRecord()}
+ * method.
+ *
+ * \@NotThreadSafe
+ */
+class FileReplicaDBCursor implements DBCursor<UpdateMsg>
+{
+  /** The underlying cursor. */
+  private final RepositionableCursor<CSN, UpdateMsg> cursor;
+
+  /** The next record to return. */
+  private Record<CSN, UpdateMsg> nextRecord;
+
+  /**  The CSN to re-start with in case the cursor is exhausted. */
+  private CSN lastNonNullCurrentCSN;
+
+  private PositionStrategy positionStrategy;
+
+  /**
+   * Creates the cursor from provided log cursor and start CSN.
+   *
+   * @param cursor
+   *          The underlying log cursor to read log.
+   * @param startCSN
+   *          The CSN to use as a start point (excluded from cursor, the lowest
+   *          CSN higher than this CSN is used as the real start point).
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to choose if cursor must
+   *          start from the provided CSN or just after the provided CSN.
+   */
+  FileReplicaDBCursor(
+      final RepositionableCursor<CSN, UpdateMsg> cursor,
+      final CSN startCSN,
+      final PositionStrategy positionStrategy) {
+    this.cursor = cursor;
+    this.lastNonNullCurrentCSN = startCSN;
+    this.positionStrategy = positionStrategy;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public UpdateMsg getRecord()
+  {
+    return nextRecord == null ? null : nextRecord.getValue();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next() throws ChangelogException
+  {
+    if (cursor.next())
+    {
+      nextRecord = cursor.getRecord();
+      final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN);
+      if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY))
+      {
+        // start CSN is found, switch to position strategy that always find the next
+        lastNonNullCurrentCSN = nextRecord.getKey();
+        positionStrategy = AFTER_MATCHING_KEY;
+        return true;
+      }
+    }
+    // either cursor is exhausted or we still have not reached the start CSN
+    return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
+  }
+
+  /** Re-initialize the cursor after the last non null CSN. */
+  private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
+  {
+    final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
+    if (found && cursor.next())
+    {
+      nextRecord = cursor.getRecord();
+      lastNonNullCurrentCSN = nextRecord.getKey();
+      return true;
+    }
+    nextRecord = null;
+    return false;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    cursor.close();
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
new file mode 100644
index 0000000..3ffc90f
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -0,0 +1,1187 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.Reject;
+import org.forgerock.util.Utils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A multi-file log that features appending key-value records and reading them
+ * using a {@code DBCursor}.
+ * The records must be appended to the log in ascending order of the keys.
+ * <p>
+ * A log is defined for a path - the log path - and contains one to many log files:
+ * <ul>
+ * <li>it has always at least one log file, the head log file, named "head.log",
+ * which is used to append the records.</li>
+ * <li>it may have from zero to many read-only log files, which are named after
+ * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively
+ * the string representation of lowest and highest key present in the log file.</li>
+ * </ul>
+ * A read-only log file is created each time the head log file has reached the
+ * maximum size limit. The head log file is then rotated to the read-only file and a
+ * new empty head log file is opened. There is no limit on the number of read-only
+ * files, but they can be purged.
+ * <p>
+ * A log is obtained using the {@code Log.openLog()} method and must always be
+ * released using the {@code close()} method.
+ * <p>
+ * Usage example:
+ * <pre>
+ * {@code
+ *   Log<K, V> log = null;
+ *   try
+ *   {
+ *     log = Log.openLog(logPath, parser, maxFileSize);
+ *     log.append(key, value);
+ *     DBCursor<K, V> cursor = log.getCursor(someKey);
+ *     // use cursor, then close cursor
+ *   }
+ *   finally
+ *   {
+ *     log.close();
+ *   }
+ * }
+ * </pre>
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+final class Log<K extends Comparable<K>, V> implements Closeable
+{
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+  private static final String LOG_FILE_SUFFIX = ".log";
+
+  static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
+
+  private static final String LOG_FILE_NAME_SEPARATOR = "_";
+
+  private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter()
+  {
+    @Override
+    public boolean accept(File file)
+    {
+      return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) &&
+          !file.getName().equals(HEAD_LOG_FILE_NAME);
+    }
+  };
+
+  /** Map that holds the unique log instance for each log path. */
+  private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>();
+
+  /**
+   * The number of references on this log instance. It is incremented each time
+   * a log is opened on the same log path. The log is effectively closed only
+   * when the {@code close()} method is called and this value is at most 1.
+   */
+  private int referenceCount;
+
+  /** The path of directory for this log, where log files are stored. */
+  private final File logPath;
+
+  /** The parser used for encoding/decoding of records. */
+  private final RecordParser<K, V> recordParser;
+
+  /**
+   * Indicates if this log is closed. When the log is closed, all methods return
+   * immediately with no effect.
+   */
+  private boolean isClosed;
+
+  /**
+   * The log files contained in this log, ordered by key.
+   * <p>
+   * The head log file is always present and is associated with the maximum
+   * possible key, given by the record parser.
+   * <p>
+   * The read-only log files are associated with the highest key they contain.
+   */
+  private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
+
+  /**
+   * The last key appended to the log. In order to keep the ordering of the keys
+   * in the log, any attempt to append a record with a key lower or equal to
+   * this key is rejected (no error but an event is logged).
+   */
+  private K lastAppendedKey;
+
+  /**
+   * The list of non-empty cursors opened on this log. Opened cursors may have
+   * to be updated when rotating the head log file.
+   */
+  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+
+  /**
+   * A log file is rotated once it has exceeded this size limit. The log file can have
+   * a size much larger than this limit if the last record written has a huge size.
+   *
+   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+   * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
+   */
+  private final long sizeLimitPerLogFileInBytes;
+
+  /**
+   * The exclusive lock used for writes and lifecycle operations on this log:
+   * initialize, clear, sync and close.
+   */
+  private final Lock exclusiveLock;
+
+  /**
+   * The shared lock used for reads and cursor operations on this log.
+   */
+  private final Lock sharedLock;
+
+  /**
+   * Open a log with the provided log path, record parser and maximum size per
+   * log file.
+   * <p>
+   * If no log exists for the provided path, a new one is created.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param logPath
+   *          Path of the log.
+   * @param parser
+   *          Parser for encoding/decoding of records.
+   * @param sizeLimitPerFileInBytes
+   *          Limit in bytes before rotating the head log file of the log.
+   * @return a log
+   * @throws ChangelogException
+   *           If a problem occurs during initialization.
+   */
+  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
+      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+  {
+    Reject.ifNull(logPath, parser);
+    @SuppressWarnings("unchecked")
+    Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
+    if (log == null)
+    {
+      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+      logsCache.put(logPath, log);
+    }
+    else
+    {
+      // TODO : check that parser and size limit are compatible with the existing one,
+      // and issue a warning if it is not the case
+      log.referenceCount++;
+    }
+    return log;
+  }
+
+  /**
+   * Release a reference to the log corresponding to provided path. The log is
+   * closed if this is the last reference.
+   */
+  private static synchronized void releaseLog(final File logPath)
+  {
+    Log<?, ?> log = logsCache.get(logPath);
+    if (log == null)
+    {
+      // this should never happen
+      logger.error(ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
+      return;
+    }
+    if (log.referenceCount > 1)
+    {
+      log.referenceCount--;
+    }
+    else
+    {
+      log.doClose();
+      logsCache.remove(logPath);
+    }
+  }
+
+  /**
+   * Creates a new log.
+   *
+   * @param logPath
+   *            The directory path of the log.
+   * @param parser
+   *          Parser of records.
+   * @param sizeLimitPerFile
+   *            Limit in bytes before rotating a log file.
+   * @throws ChangelogException
+   *            If a problem occurs during initialization.
+   */
+  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
+      throws ChangelogException
+  {
+    this.logPath = logPath;
+    this.recordParser = parser;
+    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+    this.referenceCount = 1;
+
+    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
+    this.exclusiveLock = lock.writeLock();
+    this.sharedLock = lock.readLock();
+
+    createOrOpenLogFiles();
+  }
+
+  /** Create or open log files used by this log. */
+  private void createOrOpenLogFiles() throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      createRootDirIfNotExists();
+      openHeadLogFile();
+      for (final File file : getReadOnlyLogFiles())
+      {
+        openReadOnlyLogFile(file);
+      }
+      isClosed = false;
+    }
+    catch (ChangelogException e)
+    {
+      // ensure all log files opened at this point are closed
+      close();
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  private File[] getReadOnlyLogFiles() throws ChangelogException
+  {
+    File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
+    if (files == null)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
+    }
+    return files;
+  }
+
+  private void createRootDirIfNotExists() throws ChangelogException
+  {
+    if (!logPath.exists())
+    {
+      if (!logPath.mkdirs())
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
+      }
+    }
+  }
+
+  /**
+   * Returns the path of this log.
+   *
+   * @return the path of this log directory
+   */
+  public File getPath()
+  {
+    return logPath;
+  }
+
+  /**
+   * Add the provided record at the end of this log.
+   * <p>
+   * The record must have a key strictly higher than the key
+   * of the last record added. If it is not the case, the record is not
+   * appended and the method returns immediately.
+   * <p>
+   * In order to ensure that record is written out of buffers and persisted
+   * to file system, it is necessary to explicitely call the
+   * {@code syncToFileSystem()} method.
+   *
+   * @param record
+   *          The record to add.
+   * @throws ChangelogException
+   *           If an error occurs while adding the record to the log.
+   */
+  public void append(final Record<K, V> record) throws ChangelogException
+  {
+    // If this exclusive lock happens to be a bottleneck :
+    // 1. use a shared lock for appending the record first
+    // 2. switch to an exclusive lock only if rotation is needed
+    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      if (recordIsBreakingKeyOrdering(record))
+      {
+        logger.info(LocalizableMessage.raw(
+            "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
+            lastAppendedKey != null ? lastAppendedKey : "null"));
+        return;
+      }
+      LogFile<K, V> headLogFile = getHeadLogFile();
+      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+      {
+        logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+
+        rotateHeadLogFile();
+        headLogFile = getHeadLogFile();
+      }
+      headLogFile.append(record);
+      lastAppendedKey = record.getKey();
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  /**
+   * Indicates if the provided record has a key that would break the key
+   * ordering in the log.
+   */
+  private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
+  {
+    return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
+  }
+
+  /**
+   * Synchronize all records added with the file system, ensuring that records
+   * are effectively persisted.
+   * <p>
+   * After a successful call to this method, it is guaranteed that all records
+   * added to the log are persisted to the file system.
+   *
+   * @throws ChangelogException
+   *           If the synchronization fails.
+   */
+  public void syncToFileSystem() throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      getHeadLogFile().syncToFileSystem();
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log,
+   * starting at the first position.
+   *
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  public RepositionableCursor<K, V> getCursor() throws ChangelogException
+  {
+    LogCursor<K, V> cursor = null;
+    sharedLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return new EmptyLogCursor<K, V>();
+      }
+      cursor = new LogCursor<K, V>(this);
+      cursor.positionTo(null, null, null);
+      registerCursor(cursor);
+      return cursor;
+    }
+    catch (ChangelogException e)
+    {
+      StaticUtils.close(cursor);
+      throw e;
+    }
+    finally
+    {
+      sharedLock.unlock();
+    }
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log,
+   * starting at the position defined by the provided key.
+   *
+   * @param key
+   *          Key to use as a start position for the cursor. If key is
+   *          {@code null}, cursor will point at the first record of the log.
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
+  {
+    return getCursor(key, EQUAL_TO_KEY, ON_MATCHING_KEY);
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log. The
+   * starting position is defined by the provided key, cursor matching strategy
+   * and cursor positioning strategy.
+   *
+   * @param key
+   *          Key to use as a start position for the cursor. If key is
+   *          {@code null}, cursor will point at the first record of the log.
+   * @param matchingStrategy
+   *          Cursor key matching strategy.
+   * @param positionStrategy
+   *          The cursor positioning strategy.
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  public RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
+      final PositionStrategy positionStrategy) throws ChangelogException
+  {
+    if (key == null)
+    {
+      return getCursor();
+    }
+    LogCursor<K, V> cursor = null;
+    sharedLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return new EmptyLogCursor<K, V>();
+      }
+      cursor = new LogCursor<K, V>(this);
+      final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
+      // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
+      if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
+      {
+        registerCursor(cursor);
+        return cursor;
+      }
+      else
+      {
+        StaticUtils.close(cursor);
+        return new EmptyLogCursor<K, V>();
+      }
+    }
+    catch (ChangelogException e)
+    {
+      StaticUtils.close(cursor);
+      throw e;
+    }
+    finally
+    {
+      sharedLock.unlock();
+    }
+  }
+
+
+  /**
+   * Returns the oldest (first) record from this log.
+   *
+   * @return the oldest record, which may be {@code null} if there is no record
+   *         in the log.
+   * @throws ChangelogException
+   *           If an error occurs while retrieving the record.
+   */
+  public Record<K, V> getOldestRecord() throws ChangelogException
+  {
+    return getOldestLogFile().getOldestRecord();
+  }
+
+
+  /**
+   * Returns the newest (last) record from this log.
+   *
+   * @return the newest record, which may be {@code null}
+   * @throws ChangelogException
+   *           If an error occurs while retrieving the record.
+   */
+  public Record<K, V> getNewestRecord() throws ChangelogException
+  {
+    return getHeadLogFile().getNewestRecord();
+  }
+
+  /**
+   * Returns the number of records in the log.
+   *
+   * @return the number of records
+   * @throws ChangelogException
+   *            If a problem occurs.
+   */
+  public long getNumberOfRecords() throws ChangelogException
+  {
+    long count = 0;
+    for (final LogFile<K, V> logFile : logFiles.values())
+    {
+      count += logFile.getNumberOfRecords();
+    }
+    return count;
+  }
+
+  /**
+   * Purge the log up to and excluding the provided key.
+   *
+   * @param purgeKey
+   *            the key up to which purging must happen
+   * @return the oldest non purged record, or {@code null}
+   *         if no record was purged
+   * @throws ChangelogException
+   *           if a database problem occurs.
+   */
+  public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return null;
+      }
+      final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey);
+      if (logFilesToPurge.isEmpty())
+      {
+        return null;
+      }
+      final List<String> undeletableFiles = new ArrayList<String>();
+      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+      while (entriesToPurge.hasNext())
+      {
+        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+        try
+        {
+          logFile.close();
+          logFile.delete();
+          entriesToPurge.remove();
+        }
+        catch (ChangelogException e)
+        {
+          // The deletion of log file on file system has failed
+          undeletableFiles.add(logFile.getFile().getPath());
+        }
+      }
+      if (!undeletableFiles.isEmpty())
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
+                Utils.joinAsString(", ", undeletableFiles)));
+      }
+      return getOldestRecord();
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+
+  }
+
+  /**
+   * Empties the log, discarding all records it contains.
+   *
+   * @throws ChangelogException
+   *           If cursors are opened on this log, or if a problem occurs during
+   *           clearing operation.
+   */
+  public void clear() throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      if (!openCursors.isEmpty())
+      {
+        // Allow opened cursors at this point, but turn them into empty cursors.
+        // This behavior is needed by the change number indexer thread.
+        switchCursorsOpenedIntoEmptyCursors();
+      }
+
+      // delete all log files
+      final List<String> undeletableFiles = new ArrayList<String>();
+      for (LogFile<K, V> logFile : logFiles.values())
+      {
+        try
+        {
+          logFile.close();
+          logFile.delete();
+        }
+        catch (ChangelogException e)
+        {
+          undeletableFiles.add(logFile.getFile().getPath());
+        }
+      }
+      if (!undeletableFiles.isEmpty())
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
+            Utils.joinAsString(", ", undeletableFiles)));
+      }
+      logFiles.clear();
+
+      // recreate an empty head log file
+      openHeadLogFile();
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  /**
+   * Dump this log as a text files, intended for debugging purpose only.
+   *
+   * @param dumpDirectory
+   *          Directory that will contains log files with text format
+   *          and ".txt" extensions
+   * @throws ChangelogException
+   *           If an error occurs during dump
+   */
+  void dumpAsTextFile(File dumpDirectory) throws ChangelogException
+  {
+    for (LogFile<K, V> logFile : logFiles.values())
+    {
+      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    releaseLog(logPath);
+  }
+
+  /** Effectively close this log. */
+  private void doClose()
+  {
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      if (!openCursors.isEmpty())
+      {
+        logger.error(ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
+      }
+      StaticUtils.close(logFiles.values());
+      isClosed = true;
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  private LogFile<K, V> getHeadLogFile()
+  {
+    return logFiles.lastEntry().getValue();
+  }
+
+  private LogFile<K, V> getOldestLogFile()
+  {
+    return logFiles.firstEntry().getValue();
+  }
+
+  /**
+   * Rotate the head log file to a read-only log file, and open a new empty head
+   * log file to write in.
+   * <p>
+   * All cursors opened on this log are temporarily disabled (closing underlying resources)
+   * and then re-open with their previous state.
+   */
+  private void rotateHeadLogFile() throws ChangelogException
+  {
+    // Temporarily disable cursors opened on head, saving their state
+    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
+
+    final LogFile<K, V> headLogFile = getHeadLogFile();
+    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
+    headLogFile.close();
+    renameHeadLogFileTo(readOnlyLogFile);
+
+    openHeadLogFile();
+    openReadOnlyLogFile(readOnlyLogFile);
+
+    // Re-enable cursors previously opened on head, with the saved state
+    updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
+  }
+
+  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
+  {
+    final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
+    try
+    {
+      StaticUtils.renameFile(headLogFile, rotatedLogFile);
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
+    }
+  }
+
+  /**
+   * Returns the key bounds for the provided log file.
+   *
+   * @return the pair of (lowest key, highest key) that correspond to records
+   *         stored in the corresponding log file.
+   * @throws ChangelogException
+   *            if an error occurs while retrieving the keys
+   */
+   private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException
+   {
+     try
+     {
+       final String name = logFile.getFile().getName();
+       final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
+           .split(LOG_FILE_NAME_SEPARATOR);
+       return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
+     }
+     catch (Exception e)
+     {
+       throw new ChangelogException(
+           ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
+     }
+   }
+
+   /**
+    * Returns the file name to use for the read-only version of the provided
+    * log file.
+    * <p>
+    * The file name is based on the lowest and highest key in the log file.
+    *
+    * @return the name to use for the read-only version of the log file
+    * @throws ChangelogException
+    *            If an error occurs.
+    */
+  private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException
+  {
+    final K lowestKey = logFile.getOldestRecord().getKey();
+    final K highestKey = logFile.getNewestRecord().getKey();
+    return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
+  }
+
+  /** Update the cursors that were pointing to head after a rotation of the head log file. */
+  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
+      throws ChangelogException
+  {
+    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
+    {
+      final CursorState<K, V> cursorState = pair.getSecond();
+
+      // Need to update the cursor only if it is pointing to the head log file
+      if (isHeadLogFile(cursorState.logFile))
+      {
+        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
+        final LogFile<K, V> logFile = findLogFileFor(previousKey);
+        final LogCursor<K, V> cursor = pair.getFirst();
+        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
+      }
+    }
+  }
+
+  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
+  {
+    for (LogCursor<K, V> cursor : openCursors)
+    {
+      cursor.actAsEmptyCursor();
+    }
+    openCursors.clear();
+  }
+
+  /**
+   * Disable the cursors opened on the head log file log, by closing their underlying cursor.
+   * Returns the state of each cursor just before the close operation.
+   *
+   * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
+   * @throws ChangelogException
+   *           If an error occurs.
+   */
+  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
+  {
+    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
+        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
+    for (LogCursor<K, V> cursor : openCursors)
+    {
+      if (isHeadLogFile(cursor.currentLogFile))
+      {
+        openCursorsStates.add(Pair.of(cursor, cursor.getState()));
+        cursor.closeUnderlyingCursor();
+      }
+    }
+    return openCursorsStates;
+  }
+
+  private void openHeadLogFile() throws ChangelogException
+  {
+    final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
+    final Record<K,V> newestRecord = head.getNewestRecord();
+    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
+    logFiles.put(recordParser.getMaxKey(), head);
+  }
+
+  private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
+  {
+    final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
+    final Pair<K, K> bounds = getKeyBounds(logFile);
+    logFiles.put(bounds.getSecond(), logFile);
+  }
+
+  private void registerCursor(final LogCursor<K, V> cursor)
+  {
+    openCursors.add(cursor);
+  }
+
+  private void unregisterCursor(final LogCursor<K, V> cursor)
+  {
+    openCursors.remove(cursor);
+  }
+
+  /**
+   * Returns the log file that is just after the provided log file wrt the order
+   * defined on keys, or {@code null} if the provided log file is the last one
+   * (the head log file).
+   */
+  private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
+  {
+    if (isHeadLogFile(currentLogFile))
+    {
+      return null;
+    }
+    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+    return logFiles.higherEntry(bounds.getSecond()).getValue();
+  }
+
+  private boolean isHeadLogFile(final LogFile<K, V> logFile)
+  {
+    return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
+  }
+
+  /** Returns the log file that should contain the provided key. */
+  private LogFile<K, V> findLogFileFor(final K key)
+  {
+    if (key == null || logFiles.lowerKey(key) == null)
+    {
+      return getOldestLogFile();
+    }
+    return logFiles.ceilingEntry(key).getValue();
+  }
+
+  /**
+   * Represents a DB Cursor than can be repositioned on a given key.
+   * <p>
+   * Note that as a DBCursor, it provides a java.sql.ResultSet like API.
+   */
+  static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
+  {
+    /**
+     * Position the cursor to the record corresponding to the provided key and
+     * provided matching and positioning strategies.
+     *
+     * @param key
+     *          Key to use as a start position for the cursor. If key is
+     *          {@code null}, use the key of the first record instead.
+     * @param matchStrategy
+     *          The cursor key matching strategy.
+     * @param positionStrategy
+     *          The cursor positioning strategy.
+     * @return {@code true} if cursor is successfully positioned, or
+     *         {@code false} otherwise.
+     * @throws ChangelogException
+     *           If an error occurs when positioning cursor.
+     */
+    boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
+        throws ChangelogException;
+  }
+
+  /**
+   * Implements a cursor on the log.
+   * <p>
+   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
+   * <p>
+   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
+   * method.
+   */
+  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+  {
+    private final Log<K, V> log;
+
+    private LogFile<K, V> currentLogFile;
+    private LogFileCursor<K, V> currentCursor;
+    private boolean actAsEmptyCursor;
+
+    /**
+     * Creates a cursor on the provided log.
+     *
+     * @param log
+     *           The log on which the cursor read records.
+     * @throws ChangelogException
+     *           If an error occurs when creating the cursor.
+     */
+    private LogCursor(final Log<K, V> log) throws ChangelogException
+    {
+      this.log = log;
+      this.actAsEmptyCursor = false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Record<K, V> getRecord()
+    {
+      return currentCursor != null ? currentCursor.getRecord() : null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean next() throws ChangelogException
+    {
+      if (actAsEmptyCursor)
+      {
+        return false;
+      }
+      log.sharedLock.lock();
+      try
+      {
+        final boolean hasNext = currentCursor.next();
+        if (hasNext)
+        {
+          return true;
+        }
+        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+        if (logFile != null)
+        {
+          switchToLogFile(logFile);
+          return currentCursor.next();
+        }
+        return false;
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close()
+    {
+      log.sharedLock.lock();
+      try
+      {
+        StaticUtils.close(currentCursor);
+        log.unregisterCursor(this);
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionTo(
+        final K key,
+        final KeyMatchingStrategy matchStrategy,
+        final PositionStrategy positionStrategy)
+            throws ChangelogException
+    {
+      if (actAsEmptyCursor)
+      {
+        return false;
+      }
+      log.sharedLock.lock();
+      try
+      {
+        final LogFile<K, V> logFile = log.findLogFileFor(key);
+        if (logFile != currentLogFile)
+        {
+          switchToLogFile(logFile);
+        }
+        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /** Returns the state of this cursor. */
+    private CursorState<K, V> getState() throws ChangelogException
+    {
+      return !actAsEmptyCursor ?
+          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
+    }
+
+    private void closeUnderlyingCursor()
+    {
+      StaticUtils.close(currentCursor);
+    }
+
+    /** Reinitialize this cursor to the provided state. */
+    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+    {
+      if (!actAsEmptyCursor)
+      {
+        currentLogFile = cursorState.logFile;
+        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+      }
+    }
+
+    /** Turn this cursor into an empty cursor, with no actual resource used. */
+    private void actAsEmptyCursor()
+    {
+      currentLogFile = null;
+      currentCursor = null;
+      actAsEmptyCursor = true;
+    }
+
+    /** Switch the cursor to the provided log file. */
+    private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException
+    {
+      StaticUtils.close(currentCursor);
+      currentLogFile = logFile;
+      currentCursor = currentLogFile.getCursor();
+    }
+
+    /** {@inheritDoc} */
+    public String toString()
+    {
+      return actAsEmptyCursor ?
+          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
+          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
+              log.logPath, currentLogFile.getFile().getName(), currentCursor);
+    }
+  }
+
+  /** An empty cursor, that always return null records and false to {@code next()} method. */
+  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+  {
+    /** {@inheritDoc} */
+    @Override
+    public Record<K,V> getRecord()
+    {
+      return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean next()
+    {
+      return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
+    {
+      return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close()
+    {
+      // nothing to do
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return "EmptyLogCursor";
+    }
+  }
+
+  /**
+   * Represents the state of a cursor.
+   * <p>
+   * The state is used to update a cursor when rotating the head log file : the
+   * state of cursor on head log file must be reported to the new read-only log
+   * file that is created when rotating.
+   */
+  private static class CursorState<K extends Comparable<K>, V>
+  {
+    /** The log file. */
+    private final LogFile<K, V> logFile;
+
+    /**
+     * The position of the reader on the log file. It is the offset from the
+     * beginning of the file, in bytes, at which the next read occurs.
+     */
+    private final long filePosition;
+
+    /** The record the cursor is pointing to. */
+    private final Record<K,V> record;
+
+    private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
+    {
+      this.logFile = logFile;
+      this.filePosition = filePosition;
+      this.record = record;
+    }
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
new file mode 100644
index 0000000..9124076
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -0,0 +1,607 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+import java.io.BufferedWriter;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.forgerock.i18n.LocalizableMessage;
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A log file, containing part of a {@code Log}. The log file may be:
+ * <ul>
+ * <li>write-enabled : allowing to append key-value records and read records
+ * from cursors,</li>
+ * <li>read-only : allowing to read records from cursors.</li>
+ * </ul>
+ * <p>
+ * A log file is NOT intended to be used directly, but only has part of a
+ * {@code Log}. In particular, there is no concurrency management and no checks
+ * to ensure that log is not closed when performing any operation on it. Those
+ * are managed at the {@code Log} level.
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+final class LogFile<K extends Comparable<K>, V> implements Closeable
+{
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+  /** The file containing the records. */
+  private final File logfile;
+
+  /** The pool to obtain a reader on the log. */
+  private final LogReaderPool<K, V> readerPool;
+
+  /**
+   * The writer on the log file, which may be {@code null} if log file is not
+   * write-enabled.
+   */
+  private final BlockLogWriter<K, V> writer;
+
+  /** Indicates if log is enabled for write. */
+  private final boolean isWriteEnabled;
+
+  /**
+   * Creates a new log file.
+   *
+   * @param logFilePath
+   *          Path of the log file.
+   * @param parser
+   *          Parser of records.
+   * @param isWriteEnabled
+   *          {@code true} if this changelog is write-enabled, {@code false}
+   *          otherwise.
+   * @throws ChangelogException
+   *            If a problem occurs during initialization.
+   */
+  private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
+      throws ChangelogException
+  {
+    Reject.ifNull(logFilePath, parser);
+    this.logfile = logFilePath;
+    this.isWriteEnabled = isWriteEnabled;
+
+    createLogFileIfNotExists();
+    if (isWriteEnabled)
+    {
+      ensureLogFileIsValid(parser);
+      writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser);
+    }
+    else
+    {
+      writer = null;
+    }
+    readerPool = new LogReaderPool<K, V>(logfile, parser);
+  }
+
+  /**
+   * Creates a read-only log file with the provided root path and record parser.
+   *
+   * @param <K>
+   *            Type of the key of a record, which must be comparable.
+   * @param <V>
+   *            Type of the value of a record.
+   * @param logFilePath
+   *          Path of the log file.
+   * @param parser
+   *          Parser of records.
+   * @return a read-only log file
+   * @throws ChangelogException
+   *            If a problem occurs during initialization.
+   */
+  static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
+      final RecordParser<K, V> parser) throws ChangelogException
+  {
+    return new LogFile<K, V>(logFilePath, parser, false);
+  }
+
+  /**
+   * Creates a write-enabled log file that appends records to the end of file,
+   * with the provided root path and record parser.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param logFilePath
+   *          Path of the log file.
+   * @param parser
+   *          Parser of records.
+   * @return a write-enabled log file
+   * @throws ChangelogException
+   *            If a problem occurs during initialization.
+   */
+  static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
+      final RecordParser<K, V> parser) throws ChangelogException
+  {
+    return new LogFile<K, V>(logFilePath, parser, true);
+  }
+
+  /**
+   * Returns the file containing the records.
+   *
+   * @return the file
+   */
+  File getFile()
+  {
+    return logfile;
+  }
+
+  private void checkLogIsEnabledForWrite() throws ChangelogException
+  {
+    if (!isWriteEnabled)
+    {
+      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
+    }
+  }
+
+  private void createLogFileIfNotExists() throws ChangelogException
+  {
+    try
+    {
+      if (!logfile.exists())
+      {
+        logfile.createNewFile();
+      }
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e);
+    }
+  }
+
+  /**
+   * Ensure that log file is not corrupted, by checking it is valid and cleaning
+   * the end of file if necessary, to remove a partially written record.
+   * <p>
+   * If log file is cleaned to remove a partially written record, then a message
+   * is logged for information.
+   *
+   * @throws ChangelogException
+   *           If an error occurs or if log file is corrupted and can't be
+   *           cleaned
+   */
+  private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
+  {
+    BlockLogReader<K, V> reader = null;
+    try
+    {
+      final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
+      reader = BlockLogReader.newReader(logfile, readerWriter, parser) ;
+      final long lastValidPosition = reader.checkLogIsValid();
+      if (lastValidPosition != -1)
+      {
+          // truncate the file to point where last valid record has been read
+          readerWriter.setLength(lastValidPosition);
+          logger.error(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath()));
+      }
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
+          logfile.getPath(),
+          StaticUtils.stackTraceToSingleLineString(e)));
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  /**
+   * Add the provided record at the end of this log.
+   * <p>
+   * In order to ensure that record is written out of buffers and persisted
+   * to file system, it is necessary to explicitely call the
+   * {@code syncToFileSystem()} method.
+   *
+   * @param record
+   *          The record to add.
+   * @throws ChangelogException
+   *           If the record can't be added to the log.
+   */
+  void append(final Record<K, V> record) throws ChangelogException
+  {
+    checkLogIsEnabledForWrite();
+    writer.write(record);
+  }
+
+  /**
+   * Dump this log file as a text file, intended for debugging purpose only.
+   *
+   * @param dumpFile
+   *          File that will contains log records using a human-readable format
+   * @throws ChangelogException
+   *           If an error occurs during dump
+   */
+  void dumpAsTextFile(File dumpFile) throws ChangelogException
+  {
+    DBCursor<Record<K, V>> cursor = getCursor();
+    BufferedWriter textWriter = null;
+    try
+    {
+      textWriter = new BufferedWriter(new FileWriter(dumpFile));
+      while (cursor.getRecord() != null)
+      {
+        Record<K, V> record = cursor.getRecord();
+        textWriter.write("key=" + record.getKey());
+        textWriter.write(" | ");
+        textWriter.write("value=" + record.getValue());
+        textWriter.write('\n');
+        cursor.next();
+      }
+    }
+    catch (IOException e)
+    {
+      // No I18N needed, used for debugging purpose only
+      throw new ChangelogException(
+          LocalizableMessage.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile),
+          e);
+    }
+    finally
+    {
+      StaticUtils.close(textWriter);
+    }
+  }
+
+  /**
+   * Synchronize all records added with the file system, ensuring that records
+   * are effectively persisted.
+   * <p>
+   * After a successful call to this method, it is guaranteed that all records
+   * added to the log are persisted to the file system.
+   *
+   * @throws ChangelogException
+   *           If the synchronization fails.
+   */
+  void syncToFileSystem() throws ChangelogException
+  {
+    checkLogIsEnabledForWrite();
+    try
+    {
+      writer.sync();
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
+    }
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log,
+   * starting at the first position.
+   *
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  LogFileCursor<K, V> getCursor() throws ChangelogException
+  {
+    return new LogFileCursor<K, V>(this);
+  }
+
+  /**
+   * Returns a cursor initialised to the provided record and position in file.
+   *
+   * @param record
+   *            The initial record this cursor points on
+   * @param position
+   *            The file position this cursor points on
+   * @return the cursor
+   * @throws ChangelogException
+   *            If a problem occurs while creating the cursor.
+   */
+  LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
+  {
+    return new LogFileCursor<K, V>(this, record, position);
+  }
+
+  /**
+   * Returns the oldest (first) record from this log.
+   *
+   * @return the oldest record, which may be {@code null} if there is no record
+   *         in the log.
+   * @throws ChangelogException
+   *           If an error occurs while retrieving the record.
+   */
+  Record<K, V> getOldestRecord() throws ChangelogException
+  {
+    DBCursor<Record<K, V>> cursor = null;
+    try
+    {
+      cursor = getCursor();
+      return cursor.next() ? cursor.getRecord() : null;
+    }
+    finally
+    {
+      StaticUtils.close(cursor);
+    }
+  }
+
+  /**
+   * Returns the newest (last) record from this log.
+   *
+   * @return the newest record, which may be {@code null}
+   * @throws ChangelogException
+   *           If an error occurs while retrieving the record.
+   */
+  Record<K, V> getNewestRecord() throws ChangelogException
+  {
+    // TODO : need a more efficient way to retrieve it
+    DBCursor<Record<K, V>> cursor = null;
+    try
+    {
+      cursor = getCursor();
+      Record<K, V> record = null;
+      while (cursor.next())
+      {
+        record = cursor.getRecord();
+      }
+      return record;
+    }
+    finally
+    {
+      StaticUtils.close(cursor);
+    }
+  }
+
+  /**
+   * Returns the number of records in the log.
+   *
+   * @return the number of records
+   * @throws ChangelogException
+   *            If an error occurs.
+   */
+  long getNumberOfRecords() throws ChangelogException
+  {
+    // TODO  : need a more efficient way to retrieve it
+    DBCursor<Record<K, V>> cursor = null;
+    try
+    {
+      cursor = getCursor();
+      long counter = 0L;
+      while (cursor.next())
+      {
+        counter++;
+      }
+      return counter;
+    }
+    finally
+    {
+      StaticUtils.close(cursor);
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void close()
+  {
+    if (isWriteEnabled)
+    {
+      try
+      {
+        syncToFileSystem();
+      }
+      catch (ChangelogException e)
+      {
+        logger.traceException(e);
+      }
+      writer.close();
+    }
+    readerPool.shutdown();
+  }
+
+  /**
+   * Delete this log file (file is physically removed). Should be called only
+   * when log file is closed.
+   *
+   * @throws ChangelogException
+   *            If log file can't be deleted.
+   */
+  void delete() throws ChangelogException
+  {
+    final boolean isDeleted = logfile.delete();
+    if (!isDeleted)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
+    }
+  }
+
+  /**
+   * Return the size of this log file in bytes.
+   *
+   * @return the size of log file
+   */
+  long getSizeInBytes()
+  {
+    return writer.getBytesWritten();
+  }
+
+  /** The path of this log file as a String. */
+  private String getPath()
+  {
+    return logfile.getPath();
+  }
+
+  /**
+   * Returns a reader for this log.
+   * <p>
+   * Assumes that calling methods ensure that log is not closed.
+   */
+  private BlockLogReader<K, V> getReader() throws ChangelogException
+  {
+    return readerPool.get();
+  }
+
+  /** Release the provided reader. */
+  private void releaseReader(BlockLogReader<K, V> reader) {
+    readerPool.release(reader);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode()
+  {
+    return logfile.hashCode();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object that)
+  {
+    if (this == that)
+    {
+      return true;
+    }
+    if (!(that instanceof LogFile))
+    {
+      return false;
+    }
+    final LogFile<?, ?> other = (LogFile<?, ?>) that;
+    return logfile.equals(other.logfile);
+  }
+
+  /** Implements a repositionable cursor on the log file. */
+  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+  {
+    /** The underlying log on which entries are read. */
+    private final LogFile<K, V> logFile;
+
+    /** To read the records. */
+    private final BlockLogReader<K, V> reader;
+
+    /** The current available record, may be {@code null}. */
+    private Record<K,V> currentRecord;
+
+    /**
+     * The initial record when starting from a given key. It must be
+     * stored because it is read in advance.
+     */
+    private Record<K,V> initialRecord;
+
+    /**
+     * Creates a cursor on the provided log.
+     *
+     * @param logFile
+     *           The log on which the cursor read records.
+     * @throws ChangelogException
+     *           If an error occurs when creating the cursor.
+     */
+    private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
+    {
+      this.logFile = logFile;
+      this.reader = logFile.getReader();
+    }
+
+    /**
+     * Creates a cursor on the provided log, initialised to the provided record and
+     * pointing to the provided file position.
+     * <p>
+     * Note: there is no check to ensure that provided record and file position are
+     * consistent. It is the responsability of the caller of this method.
+     */
+    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
+    {
+      this.logFile = logFile;
+      this.reader = logFile.getReader();
+      this.currentRecord = record;
+      reader.seekToPosition(filePosition);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean next() throws ChangelogException
+    {
+      if (initialRecord != null)
+      {
+        // initial record is used only once
+        currentRecord = initialRecord;
+        initialRecord = null;
+        return true;
+      }
+      currentRecord = reader.readRecord();
+      return currentRecord != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Record<K,V> getRecord()
+    {
+      return currentRecord;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
+        throws ChangelogException {
+      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
+      final boolean found = result.getFirst();
+      initialRecord = found ? result.getSecond() : null;
+      return found;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close()
+    {
+      logFile.releaseReader(reader);
+    }
+
+    /**
+     * Returns the file position this cursor is pointing at.
+     *
+     * @return the position of reader on the log file
+     * @throws ChangelogException
+     *          If an error occurs.
+     */
+    long getFilePosition() throws ChangelogException
+    {
+      return reader.getFilePosition();
+    }
+
+    /** {@inheritDoc} */
+    public String toString()
+    {
+      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
+    }
+  }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
new file mode 100644
index 0000000..5300ffd
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
@@ -0,0 +1,118 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A Pool of readers to a log file.
+
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+// TODO : implement a real pool - reusing readers instead of opening-closing them each time
+class LogReaderPool<K extends Comparable<K>, V>
+{
+  /** The file to read. */
+  private final File file;
+
+  private final RecordParser<K, V> parser;
+
+  /**
+   * Creates a pool of readers for provided file.
+   *
+   * @param file
+   *          The file to read.
+   * @param parser
+   *          The parser to decode the records read.
+   */
+  LogReaderPool(File file, RecordParser<K, V> parser)
+  {
+    this.file = file;
+    this.parser = parser;
+  }
+
+  /**
+   * Returns a random access reader on the provided file.
+   * <p>
+   * The acquired reader must be released with the {@code release()}
+   * method.
+   *
+   * @return a random access reader
+   * @throws ChangelogException
+   *            If the file can't be found or read.
+   */
+  BlockLogReader<K, V> get() throws ChangelogException
+  {
+    return getReader(file);
+  }
+
+  /**
+   * Release the provided reader.
+   * <p>
+   * Once released, this reader must not be used any more.
+   *
+   * @param reader
+   *          The random access reader to a file previously acquired with this
+   *          pool.
+   */
+  void release(BlockLogReader<K, V> reader)
+  {
+    StaticUtils.close(reader);
+  }
+
+  /** Returns a random access file to read this log. */
+  private BlockLogReader<K, V> getReader(File file) throws ChangelogException
+  {
+    try
+    {
+      return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ;
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_READER_ON_LOG_FILE.get(file.getPath()), e);
+    }
+  }
+
+  /**
+   * Shutdown this pool, releasing all files handles opened
+   * on the file.
+   */
+  void shutdown()
+  {
+    // Nothing to do yet as no file handle is kept opened.
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
new file mode 100644
index 0000000..7ff6aa7
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -0,0 +1,151 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.File;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.SyncFailedException;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.loggers.MeteredStream;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * A writer on a log file.
+ */
+class LogWriter extends OutputStream
+{
+  /** The file to write in. */
+  private final File file;
+
+  /** The stream to write data in the file, capable of counting bytes written. */
+  private final MeteredStream stream;
+
+  /** The file descriptor on the file. */
+  private final FileDescriptor fileDescriptor;
+
+  /**
+   * Creates a writer on the provided file.
+   *
+   * @param file
+   *          The file to write.
+   * @throws ChangelogException
+   *            If a problem occurs at creation.
+   */
+  public LogWriter(final File file) throws ChangelogException
+  {
+    this.file = file;
+    try
+    {
+      FileOutputStream fos = new FileOutputStream(file, true);
+      this.stream = new MeteredStream(fos, file.length());
+      this.fileDescriptor = fos.getFD();
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
+    }
+  }
+
+  /**
+   * Returns the file used by this writer.
+   *
+   * @return the file
+   */
+  File getFile()
+  {
+    return file;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(int b) throws IOException
+  {
+    stream.write(b);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(byte[] b) throws IOException
+  {
+    stream.write(b);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void write(byte[] b, int off, int len) throws IOException
+  {
+    stream.write(b, off, len);
+  }
+
+  /**
+   * Writes the provided byte string to the underlying output stream of this writer.
+   *
+   * @param bs
+   *            The byte string to write.
+   * @throws IOException
+   *           if an I/O error occurs. In particular, an IOException may be
+   *           thrown if the output stream has been closed.
+   */
+  public void write(ByteString bs) throws IOException
+  {
+    bs.copyTo(stream);
+  }
+
+  /**
+   * Returns the number of bytes written in the underlying file.
+   *
+   * @return the number of bytes
+   */
+  public long getBytesWritten()
+  {
+    return stream.getBytesWritten();
+  }
+
+  /**
+   * Synchronize all modifications to the file to the underlying device.
+   *
+   * @throws SyncFailedException
+   *            If synchronization fails.
+   */
+  void sync() throws SyncFailedException {
+    fileDescriptor.sync();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    StaticUtils.close(stream);
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
new file mode 100644
index 0000000..24b25d2
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
@@ -0,0 +1,130 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+/**
+ * Represents a record, which is a pair of key-value.
+ *
+ * @param <K>
+ *         The type of a key.
+ * @param <V>
+ *         The type of a value.
+ */
+class Record<K, V>
+{
+  private final K key;
+  private final V value;
+
+  /**
+   * Creates a record from provided key and value.
+   *
+   * @param key
+   *          The key.
+   * @param value
+   *          The value.
+   */
+  private Record(final K key, final V value)
+  {
+    this.key = key;
+    this.value = value;
+  }
+
+  /**
+   * Create a record from provided key and value.
+   *
+   * @param <K>
+   *          The type of the key.
+   * @param <V>
+   *          The type of the value.
+   * @param key
+   *          The key.
+   * @param value
+   *          The value.
+   * @return a record
+   */
+  static <K, V> Record<K, V> from(final K key, final V value) {
+    return new Record<K, V>(key, value);
+  }
+
+  /**
+   * Returns the key of this record.
+   *
+   * @return the key
+   */
+  K getKey()
+  {
+    return key;
+  }
+
+  /**
+   * Returns the value of this record.
+   *
+   * @return the value
+   */
+  V getValue()
+  {
+    return value;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((key == null) ? 0 : key.hashCode());
+    result = prime * result + ((value == null) ? 0 : value.hashCode());
+    return result;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object that)
+  {
+    if (this == that)
+    {
+      return true;
+    }
+    if (!(that instanceof Record))
+    {
+      return false;
+    }
+    Record<?, ?> other = (Record<?, ?>) that;
+    final boolean keyEquals = key == null ?  other.key == null : key.equals(other.key);
+    if (!keyEquals)
+    {
+      return false;
+    }
+    return value == null ?  other.value == null : value.equals(other.value);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+      return "Record [" + key + ":" + value + "]";
+  }
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
new file mode 100644
index 0000000..b62d80d
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
@@ -0,0 +1,101 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import org.forgerock.opendj.ldap.ByteString;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+
+/**
+ * Parser of a log record.
+ * <p>
+ * The parser allows to convert from a object to its binary representation
+ * and to convert back from the binary representation to the object.
+ *
+ * @param <K>
+ *          Type of the key of the record.
+ * @param <V>
+ *          Type of the value of the record.
+ */
+interface RecordParser<K, V>
+{
+  /**
+   * Decode a record from the provided byte array.
+   * <p>
+   * The record is expected to have been encoded using the {@code writeRecord()}
+   * method.
+   *
+   * @param data
+   *          The raw data to read the record from.
+   * @return the decoded record, or {@code null} if there is no more record to
+   *         read, or only an incomplete record
+   * @throws DecodingException
+   *           If an error occurs while decoding the record.
+   */
+  Record<K, V> decodeRecord(ByteString data) throws DecodingException;
+
+  /**
+   * Encode the provided record to a byte array.
+   * <p>
+   * The returned array is intended to be stored as provided in the log file.
+   *
+   * @param record
+   *          The record to encode.
+   * @return the bytes array representing the (key,value) record
+   */
+  ByteString encodeRecord(Record<K, V> record);
+
+  /**
+   * Read the key from the provided string.
+   *
+   * @param key
+   *          The string representation of key, suitable for use in a filename,
+   *          as written by the {@code encodeKeyToString()} method.
+   * @return the key
+   * @throws ChangelogException
+   *           If key can't be read from the string.
+   */
+  K decodeKeyFromString(String key) throws ChangelogException;
+
+  /**
+   * Returns the provided key as a string that is suitable to be used in a
+   * filename.
+   *
+   * @param key
+   *          The key of a record.
+   * @return a string encoding the key, unambiguously decodable to the original
+   *         key, and suitable for use in a filename. The string should contain
+   *         only ASCII characters and no space.
+   */
+  String encodeKeyToString(K key);
+
+  /**
+   * Returns a key that is guaranted to be always higher than any other key.
+   *
+   * @return the highest possible key
+   */
+  K getMaxKey();
+
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
new file mode 100644
index 0000000..624cda1
--- /dev/null
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -0,0 +1,860 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.UnsupportedEncodingException;
+import java.io.Writer;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ChangelogState;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.util.StaticUtils;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+/**
+ * Represents the replication environment, which allows to manage the lifecycle
+ * of the replication changelog.
+ * <p>
+ * A changelog has a root directory, under which the following directories and files are
+ * created :
+ * <ul>
+ * <li>A "changenumberindex" directory containing the log files for
+ * ChangeNumberIndexDB</li>
+ * <li>A "domains.state" file containing a mapping of each domain DN to an id. The
+ * id is used to name the corresponding domain directory.</li>
+ * <li>One directory per domain, named after "[id].domain" where [id] is the id
+ * assigned to the domain, as specified in the "domains.state" file.</li>
+ * </ul>
+ * <p>
+ * Each domain directory contains the following directories and files :
+ * <ul>
+ * <li>A "generation_[id].id" file, where [id] is the generation id</li>
+ * <li>One directory per server id, named after "[id].server" where [id] is the
+ * id of the server.</li>
+ * </ul>
+ * Each server id directory contains the following files :
+ * <ul>
+ * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
+ * <li>Zero to many read-only log files named after the lowest key
+ * and highest key present in the log file (they all end with the ".log" suffix.</li>
+ * <li>Optionally, a "offline.state" file that indicates that this particular server id
+ *  of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
+ * </ul>
+ * See {@code Log} class for details on the log files.
+ *
+ * <p>
+ * Layout example with two domains "o=test1" and "o=test2", each having server
+ * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
+ *
+ * <pre>
+ * +---changelog
+ * |   \---domains.state  [contains mapping: 1 => "o=test1", 2 => "o=test2"]
+ * |   \---changenumberindex
+ * |      \--- head.log [contains last records written]
+ * |      \--- 1_50.log [contains records with keys in interval [1, 50]]
+ * |   \---1.domain
+ * |       \---generation1.id
+ * |       \---22.server
+ * |           \---head.log [contains last records written]
+ * |       \---33.server
+ * |           \---head.log [contains last records written]
+ *             \---offline.state
+ * |   \---2.domain
+ * |       \---generation1.id
+ * |       \---22.server
+ * |           \---head.log [contains last records written]
+ * |       \---33.server
+ * |           \---head.log [contains last records written]
+ * </pre>
+ */
+class ReplicationEnvironment
+{
+  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+
+  private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024;
+
+  private static final int NO_GENERATION_ID = -1;
+
+  private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
+
+  private static final String DOMAINS_STATE_FILENAME = "domains.state";
+
+  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
+
+  private static final String DOMAIN_STATE_SEPARATOR = ":";
+
+  private static final String DOMAIN_SUFFIX = ".domain";
+
+  private static final String SERVER_ID_SUFFIX = ".server";
+
+  private static final String GENERATION_ID_FILE_PREFIX = "generation";
+
+  private static final String GENERATION_ID_FILE_SUFFIX = ".id";
+
+  private static final String UTF8_ENCODING = "UTF-8";
+
+  private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
+  {
+    @Override
+    public boolean accept(File file)
+    {
+      return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX);
+    }
+  };
+
+  private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter()
+  {
+    @Override
+    public boolean accept(File file)
+    {
+      return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX);
+    }
+  };
+
+  private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter()
+  {
+    @Override
+    public boolean accept(File file)
+    {
+      return file.isFile()
+          && file.getName().startsWith(GENERATION_ID_FILE_PREFIX)
+          && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX);
+    }
+  };
+
+  /** Root path where the replication log is stored. */
+  private final String replicationRootPath;
+  /**
+   * The current changelogState. This is in-memory version of what is inside the
+   * on-disk changelogStateDB. It improves performances in case the
+   * changelogState is read often.
+   *
+   * @GuardedBy("domainsLock")
+   */
+  private final ChangelogState changelogState;
+
+  /** The list of logs that are in use. */
+  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
+
+  /**
+   * Maps each domain DN to a domain id that is used to name directory in file system.
+   *
+   * @GuardedBy("domainsLock")
+   */
+  private final Map<DN, String> domains = new HashMap<DN, String>();
+
+  /**
+   * Exclusive lock to synchronize:
+   * <ul>
+   * <li>the domains mapping</li>
+   * <li>changes to the in-memory changelogState</li>
+   * <li>changes to the on-disk state of a domain</li>
+   */
+  private final Object domainsLock = new Object();
+
+  /** The underlying replication server. */
+  private final ReplicationServer replicationServer;
+
+  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+
+  /**
+   * Creates the replication environment.
+   *
+   * @param rootPath
+   *          Root path where replication log is stored.
+   * @param replicationServer
+   *          The underlying replication server.
+   * @throws ChangelogException
+   *           If an error occurs during initialization.
+   */
+  ReplicationEnvironment(final String rootPath,
+      final ReplicationServer replicationServer) throws ChangelogException
+  {
+    this.replicationRootPath = rootPath;
+    this.replicationServer = replicationServer;
+    this.changelogState = readOnDiskChangelogState();
+  }
+
+  /**
+   * Returns the state of the replication changelog.
+   *
+   * @return the {@link ChangelogState} read from the changelogState DB
+   * @throws ChangelogException
+   *           if a database problem occurs
+   */
+  ChangelogState readOnDiskChangelogState() throws ChangelogException
+  {
+    final ChangelogState state = new ChangelogState();
+    final File changelogPath = new File(replicationRootPath);
+    synchronized (domainsLock)
+    {
+      readDomainsStateFile();
+      checkDomainDirectories(changelogPath);
+      for (final Entry<DN, String> domainEntry : domains.entrySet())
+      {
+        readStateForDomain(domainEntry, state);
+      }
+    }
+    return state;
+  }
+
+  /**
+   * Returns the current state of the replication changelog.
+   *
+   * @return the current {@link ChangelogState}
+   */
+  ChangelogState getChangelogState()
+  {
+    return changelogState;
+  }
+
+  /**
+   * Finds or creates the log used to store changes from the replication server
+   * with the given serverId and the given baseDN.
+   *
+   * @param domainDN
+   *          The DN that identifies the domain.
+   * @param serverId
+   *          The server id that identifies the server.
+   * @param generationId
+   *          The generationId associated to this domain.
+   * @return the log.
+   * @throws ChangelogException
+   *           if an error occurs.
+   */
+  Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
+      throws ChangelogException
+  {
+    if (logger.isTraceEnabled())
+    {
+      logger.trace("ReplicationEnvironment.getOrCreateReplicaDB(%s, %s, %s)", domainDN, serverId, generationId);
+    }
+
+    try
+    {
+      ensureRootDirectoryExists();
+
+      String domainId = null;
+      synchronized (domainsLock)
+      {
+        domainId = domains.get(domainDN);
+        if (domainId == null)
+        {
+          domainId = createDomainId(domainDN);
+        }
+
+        final File serverIdPath = getServerIdPath(domainId, serverId);
+        ensureServerIdDirectoryExists(serverIdPath);
+        changelogState.addServerIdToDomain(serverId, domainDN);
+
+        final File generationIdPath = getGenerationIdPath(domainId, generationId);
+        ensureGenerationIdFileExists(generationIdPath);
+        changelogState.setDomainGenerationId(domainDN, generationId);
+
+        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
+      }
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_CREATE_REPLICA_DB.get(domainDN.toString(), serverId, generationId), e);
+    }
+  }
+
+  /**
+   * Find or create the log to manage integer change number associated to
+   * multidomain server state.
+   * <p>
+   * TODO: ECL how to manage compatibility of this db
+   * with new domains added or removed ?
+   *
+   * @return the log.
+   * @throws ChangelogException
+   *           when a problem occurs.
+   */
+  Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
+  {
+    final File path = getCNIndexDBPath();
+    try
+    {
+      return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_CREATE_CN_INDEX_DB.get(replicationRootPath, path.getPath()), e);
+    }
+  }
+
+  /**
+   * Shutdown the environment.
+   * <p>
+   * The log DBs are not closed by this method. It assumes they are already
+   * closed.
+   */
+  void shutdown()
+  {
+    if (isShuttingDown.compareAndSet(false, true))
+    {
+      logs.clear();
+    }
+  }
+
+  /**
+   * Clears the generated id associated to the provided domain DN from the state
+   * Db.
+   * <p>
+   * If generation id can't be found, it is not considered as an error, the
+   * method will just return.
+   *
+   * @param domainDN
+   *          The domain DN for which the generationID must be cleared.
+   * @throws ChangelogException
+   *           If a problem occurs during clearing.
+   */
+  void clearGenerationId(final DN domainDN) throws ChangelogException
+  {
+    synchronized (domainsLock)
+    {
+      final String domainId = domains.get(domainDN);
+      if (domainId == null)
+      {
+        return; // unknown domain => no-op
+      }
+      final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
+      if (idFile != null)
+      {
+        final boolean isDeleted = idFile.delete();
+        if (!isDeleted)
+        {
+          throw new ChangelogException(
+              ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
+        }
+      }
+      changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
+    }
+  }
+
+  /**
+   * Reset the generationId to the default value used when there is no
+   * generation id.
+   *
+   * @param baseDN
+   *          The baseDN for which the generationID must be reset.
+   * @throws ChangelogException
+   *           If a problem occurs during reset.
+   */
+  void resetGenerationId(final DN baseDN) throws ChangelogException
+  {
+    synchronized (domainsLock)
+    {
+      clearGenerationId(baseDN);
+      final String domainId = domains.get(baseDN);
+      if (domainId == null)
+      {
+        return; // unknown domain => no-op
+      }
+      final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
+      ensureGenerationIdFileExists(generationIdPath);
+      changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
+    }
+  }
+
+  /**
+   * Notify that the replica corresponding to provided domain and provided CSN
+   * is offline.
+   *
+   * @param domainDN
+   *          the domain of the offline replica
+   * @param offlineCSN
+   *          the offline replica serverId and offline timestamp
+   * @throws ChangelogException
+   *           if a problem occurs
+   */
+  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
+  {
+    synchronized (domainsLock)
+    {
+      final String domainId = domains.get(domainDN);
+      if (domainId == null)
+      {
+        return; // unknown domain => no-op
+      }
+      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
+      if (!serverIdPath.exists())
+      {
+        return; // no serverId anymore => no-op
+      }
+      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+      Writer writer = null;
+      try
+      {
+        // Overwrite file, only the last sent offline CSN is kept
+        writer = newFileWriter(offlineFile);
+        writer.write(offlineCSN.toString());
+        changelogState.addOfflineReplica(domainDN, offlineCSN);
+      }
+      catch (IOException e)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
+            domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
+      }
+      finally
+      {
+        StaticUtils.close(writer);
+      }
+    }
+  }
+
+  /**
+   * Notify that the replica corresponding to provided domain and server id
+   * is online.
+   *
+   * @param domainDN
+   *          the domain of the replica
+   * @param serverId
+   *          the replica serverId
+   * @throws ChangelogException
+   *           if a problem occurs
+   */
+  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
+  {
+    synchronized (domainsLock)
+    {
+      final String domainId = domains.get(domainDN);
+      if (domainId == null)
+      {
+        return; // unknown domain => no-op
+      }
+      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
+      if (offlineFile.exists())
+      {
+        final boolean isDeleted = offlineFile.delete();
+        if (!isDeleted)
+        {
+          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
+              offlineFile.getPath(), domainDN.toString(), serverId));
+        }
+      }
+      changelogState.removeOfflineReplica(domainDN, serverId);
+    }
+  }
+
+  /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
+  private void readDomainsStateFile() throws ChangelogException
+  {
+    final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
+    if (domainsStateFile.exists())
+    {
+      BufferedReader reader = null;
+      String line = null;
+      try
+      {
+        reader = newFileReader(domainsStateFile);
+        while ((line = reader.readLine()) != null)
+        {
+          final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
+          final String domainId = line.substring(0, separatorPos);
+          final DN domainDN = DN.valueOf(line.substring(separatorPos+1));
+          domains.put(domainDN, domainId);
+        }
+      }
+      catch(DirectoryException e)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get(
+            domainsStateFile.getPath(), line), e);
+      }
+      catch(Exception e)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get(
+            domainsStateFile.getPath()), e);
+      }
+      finally {
+        StaticUtils.close(reader);
+      }
+    }
+  }
+
+  /**
+   * Checks that domain directories in file system are consistent with
+   * information from domains mapping.
+   */
+  private void checkDomainDirectories(final File changelogPath) throws ChangelogException
+  {
+    final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
+    if (dnDirectories != null)
+    {
+      final Set<String> domainIdsFromFileSystem = new HashSet<String>();
+      for (final File dnDir : dnDirectories)
+      {
+        final String fileName = dnDir.getName();
+        final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
+        domainIdsFromFileSystem.add(domainId);
+      }
+
+      final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
+      if (!domainIdsFromFileSystem.equals(expectedDomainIds))
+      {
+        throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
+            domainIdsFromFileSystem.toString()));
+      }
+    }
+  }
+
+  /**
+   * Update the changelog state with the state corresponding to the provided
+   * domain DN.
+   */
+  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
+      throws ChangelogException
+  {
+    final File domainDirectory = getDomainPath(domainEntry.getValue());
+    final DN domainDN = domainEntry.getKey();
+    final String generationId = retrieveGenerationId(domainDirectory);
+    if (generationId != null)
+    {
+      state.setDomainGenerationId(domainDN, toGenerationId(generationId));
+    }
+
+    final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
+    if (serverIds == null)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY.get(
+          replicationRootPath, domainDirectory.getPath()));
+    }
+    for (final File serverId : serverIds)
+    {
+      readStateForServerId(domainDN, serverId, state);
+    }
+  }
+
+  private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
+  {
+    state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
+
+    final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
+    if (offlineFile.exists())
+    {
+      final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
+      state.addOfflineReplica(domainDN, offlineCSN);
+    }
+  }
+
+  private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
+  {
+    BufferedReader reader = null;
+    try
+    {
+      reader = newFileReader(offlineFile);
+      String line = reader.readLine();
+      if (line == null || reader.readLine() != null)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
+            domainDN.toString(), offlineFile.getPath()));
+      }
+      return new CSN(line);
+    }
+    catch(IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
+          domainDN.toString(), offlineFile.getPath()), e);
+    }
+    finally {
+      StaticUtils.close(reader);
+    }
+  }
+
+  private String createDomainId(final DN domainDN) throws ChangelogException
+  {
+    final String nextDomainId = findNextDomainId();
+    domains.put(domainDN, nextDomainId);
+    final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
+    Writer writer = null;
+    try
+    {
+      writer = newFileWriter(domainsStateFile);
+      for (final Entry<DN, String> entry : domains.entrySet())
+      {
+        writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
+      }
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
+          domainDN.toString(), domainsStateFile.getPath()), e);
+    }
+    finally
+    {
+      StaticUtils.close(writer);
+    }
+    return nextDomainId;
+  }
+
+  /** Find the next domain id to use. This is the lowest integer that is higher than all existing ids. */
+  private String findNextDomainId()
+  {
+    int nextId = 1;
+    for (final String domainId : domains.values())
+    {
+      final Integer id = Integer.valueOf(domainId);
+      if (nextId <= id)
+      {
+        nextId = id + 1;
+      }
+    }
+    return String.valueOf(nextId);
+  }
+
+  /** Open a log from the provided path and record parser. */
+  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
+      throws ChangelogException
+  {
+    checkShutDownBeforeOpening(serverIdPath);
+
+    final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
+
+    checkShutDownAfterOpening(serverIdPath, log);
+
+    logs.add(log);
+    return log;
+  }
+
+  private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException
+  {
+    if (isShuttingDown.get())
+    {
+      closeLog(log);
+      throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
+          replicationServer.getServerId()));
+    }
+  }
+
+  private void checkShutDownBeforeOpening(final File serverIdPath) throws ChangelogException
+  {
+    if (isShuttingDown.get())
+    {
+      throw new ChangelogException(
+          WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
+              serverIdPath.getPath(), replicationServer.getServerId()));
+    }
+  }
+
+  /**
+   * Retrieve the generation id from the provided directory.
+   *
+   * @return the generation id or {@code null} if the corresponding file can't
+   *         be found
+   */
+  private String retrieveGenerationId(final File directory)
+  {
+    final File generationId = retrieveGenerationIdFile(directory);
+    if (generationId != null)
+    {
+      String filename = generationId.getName();
+      return filename.substring(GENERATION_ID_FILE_PREFIX.length(),
+         filename.length() - GENERATION_ID_FILE_SUFFIX.length());
+    }
+    return null;
+  }
+
+  /**
+   * Retrieve the file named after the generation id from the provided
+   * directory.
+   *
+   * @return the generation id file or {@code null} if the corresponding file
+   *         can't be found
+   */
+  private File retrieveGenerationIdFile(final File directory)
+  {
+    File[] generationIds = directory.listFiles(GENERATION_ID_FILE_FILTER);
+    return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
+  }
+
+  private File getDomainPath(final String domainId)
+  {
+    return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
+  }
+
+  /**
+   * Return the path for the provided domain id and server id.
+   * Package private to be usable in tests.
+   *
+   * @param domainId
+   *            The id corresponding to a domain DN
+   * @param serverId
+   *            The server id to retrieve
+   * @return the path
+   */
+  File getServerIdPath(final String domainId, final int serverId)
+  {
+    return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
+  }
+
+  private File getGenerationIdPath(final String domainId, final long generationId)
+  {
+    return new File(getDomainPath(domainId), GENERATION_ID_FILE_PREFIX + generationId + GENERATION_ID_FILE_SUFFIX);
+  }
+
+  private File getCNIndexDBPath()
+  {
+    return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
+  }
+
+  private void closeLog(final Log<?, ?> log)
+  {
+    logs.remove(log);
+    log.close();
+  }
+
+  private void ensureRootDirectoryExists() throws ChangelogException
+  {
+    final File rootDir = new File(replicationRootPath);
+    if (!rootDir.exists())
+    {
+      final boolean created = rootDir.mkdirs();
+      if (!created)
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(replicationRootPath));
+      }
+    }
+  }
+
+  private void ensureServerIdDirectoryExists(final File serverIdPath) throws ChangelogException
+  {
+    if (!serverIdPath.exists())
+    {
+      boolean created = false;
+      try
+      {
+        created = serverIdPath.mkdirs();
+      }
+      catch (Exception e)
+      {
+        // nothing to do
+      }
+
+      if (!created)
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_CREATE_SERVER_ID_DIRECTORY.get(serverIdPath.getPath(), 0));
+      }
+    }
+  }
+
+  private void ensureGenerationIdFileExists(final File generationIdPath)
+      throws ChangelogException
+  {
+    if (!generationIdPath.exists())
+    {
+      try
+      {
+        boolean isCreated = generationIdPath.createNewFile();
+        if (!isCreated)
+        {
+          throw new ChangelogException(
+              ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
+        }
+      }
+      catch (IOException e)
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
+      }
+    }
+  }
+
+  private void debug(String message)
+  {
+    // Replication server may be null when testing
+    String monitorInstanceName = replicationServer != null ? replicationServer.getMonitorInstanceName() :
+      "no monitor [test]";
+    logger.trace("In %s, %s", monitorInstanceName, message);
+  }
+
+  private int toServerId(final String serverIdName) throws ChangelogException
+  {
+    try
+    {
+      String serverId = serverIdName.substring(0, serverIdName.length() - SERVER_ID_SUFFIX.length());
+      return Integer.parseInt(serverId);
+    }
+    catch (NumberFormatException e)
+    {
+      // should never happen
+      throw new ChangelogException(ERR_CHANGELOG_SERVER_ID_FILENAME_WRONG_FORMAT.get(serverIdName), e);
+    }
+  }
+
+  private long toGenerationId(final String data) throws ChangelogException
+  {
+    try
+    {
+      return Long.parseLong(data);
+    }
+    catch (NumberFormatException e)
+    {
+      // should never happen
+      throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
+    }
+  }
+
+  /** Returns a buffered writer on the provided file. */
+  private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
+  {
+    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
+  }
+
+  /** Returns a buffered reader on the provided file. */
+  private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
+  {
+    return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
+  }
+}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index d29c707..02a107f 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -115,7 +115,7 @@
    * @param changelogState
    *          the changelog state used for initialization
    */
-  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
+  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
   {
     this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
   }

--
Gitblit v1.10.0