From 01eb7d07467b57c61868c73e9a94bff1d0b2dcd1 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Wed, 21 May 2014 15:56:41 +0000
Subject: [PATCH] OPENDJ-1389 – Add support for replication changelog DB rotation

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java                             |    5 
 opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java                                             |  510 +++-------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java                 |  107 -
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java  |   17 
 opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java                                        |   43 
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                                     |   10 
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java                             |   69 +
 opends/src/server/org/opends/server/replication/server/changelog/file/Log.java                                                 | 1091 +++++++++++++++++++++++
 opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java                                       |   55 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java                 |    3 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java                                        |    3 
 opends/src/messages/messages/replication.properties                                                                            |   26 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java                     |  571 ++++++++++++
 opends/src/server/org/opends/server/loggers/MeteredStream.java                                                                 |   15 
 opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java                                 |    7 
 opends/src/server/org/opends/server/replication/common/CSN.java                                                                |    5 
 opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java                                                 |    7 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java |    7 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java           |    7 
 opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java                                                       |    3 
 opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java                              |   54 
 opends/src/server/org/opends/server/loggers/RotatableLogFile.java                                                              |   51 +
 opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java                                           |  112 -
 opends/src/server/org/opends/server/loggers/MultifileTextWriter.java                                                           |   20 
 opends/src/server/org/opends/server/loggers/RotationPolicy.java                                                                |   10 
 opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java                                                       |    3 
 opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java                                                       |    3 
 27 files changed, 2,198 insertions(+), 616 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 40600ff..e030d37 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -118,7 +118,7 @@
 SEVERE_ERR_BAD_HISTORICAL_56=Entry %s was containing some unknown historical \
  information, This may cause some inconsistency for this entry
 MILD_ERR_CANNOT_ADD_CONFLICT_ATTRIBUTE_57=A conflict was detected but the \
- conflict information could not be added. Operation: %s, Result: %s 
+ conflict information could not be added. Operation: %s, Result: %s
 MILD_ERR_CANNOT_RENAME_CONFLICT_ENTRY_58=An error happened trying to \
  rename a conflicting entry. DN: %s, Operation: %s, Result: %s
 MILD_ERR_EXCEPTION_RENAME_CONFLICT_ENTRY_59=An Exception happened when \
@@ -562,7 +562,7 @@
  on log file '%s'
 SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD_254=Could not decode a record from data \
  read in log file '%s'
-SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_255=Could not delete log file(s): '%s'
 SEVERE_ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE_256=Could not create log file '%s'
 SEVERE_WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE_257=The changelog '%s' has been opened in \
  read-only mode, it is not enabled for write
@@ -583,3 +583,25 @@
  Actual domain ids found in file system: '%s'
 SEVERE_ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE_265=Could not create a new domain \
  id %s for domain DN %s and save it in domain state file '%s"
+SEVERE_ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE_266=Could not get reader \
+ position for cursor in log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING_267=Could not decode the key from \
+ string [%s]
+SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG_268=When cleaning log '%s', \
+ found %d cursor(s) still opened on the log
+SEVERE_ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG_269=When closing log '%s', \
+ found %d cursor(s) still opened on the log
+SEVERE_ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG_270=Could not initialize \
+ the log '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE_271=Could not \
+ retrieve key bounds from log file '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST_272=Could not \
+ retrieve read-only log files from log '%s'
+SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING_273=While purging log, could not \
+ delete log file(s): '%s'
+SEVERE_ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING_274 =The following log \
+ '%s' must be released but it is not referenced."
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE_275=Could not rename \
+ head log file from '%s' to '%s'
+INFO_CHANGELOG_LOG_FILE_ROTATION_276=Rotation needed for log file '%s', \
+ size of head log file is %d bytes
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java b/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
index 96a8f6d..159863e 100644
--- a/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/FixedTimeRotationPolicy.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2014 ForgeRock AS
  */
 package org.opends.server.loggers;
 import org.opends.messages.Message;
@@ -112,7 +113,7 @@
   /**
    * {@inheritDoc}
    */
-  public boolean rotateFile(MultifileTextWriter writer)
+  public boolean rotateFile(RotatableLogFile writer)
   {
     Calendar lastRotationTime = writer.getLastRotationTime();
 
diff --git a/opends/src/server/org/opends/server/loggers/MeteredStream.java b/opends/src/server/org/opends/server/loggers/MeteredStream.java
index 59c053d..803284b 100644
--- a/opends/src/server/org/opends/server/loggers/MeteredStream.java
+++ b/opends/src/server/org/opends/server/loggers/MeteredStream.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2014 ForgeRock AS
  */
 package org.opends.server.loggers;
 
@@ -33,7 +34,7 @@
  *  (a) forwards all its output to a target stream
  *  (b) keeps track of how many bytes have been written.
  */
-class MeteredStream extends OutputStream
+public final class MeteredStream extends OutputStream
 {
   OutputStream out;
   long written;
@@ -45,7 +46,7 @@
    * @param out     The target output stream to keep track of.
    * @param written The number of bytes written to the stream.
    */
-  MeteredStream(OutputStream out, long written)
+  public MeteredStream(OutputStream out, long written)
   {
     this.out = out;
     this.written = written;
@@ -111,5 +112,15 @@
   {
     out.close();
   }
+
+  /**
+   * Returns the number of bytes written in this stream.
+   *
+   * @return the number of bytes
+   */
+  public long getBytesWritten()
+  {
+    return written;
+  }
 }
 
diff --git a/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java b/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
index bfa7f2a..7fd6ba7 100644
--- a/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
+++ b/opends/src/server/org/opends/server/loggers/MultifileTextWriter.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2014 ForgeRock AS
  */
 package org.opends.server.loggers;
 
@@ -57,7 +58,7 @@
  * new one named in accordance with a specified FileNamingPolicy.
  */
 public class MultifileTextWriter
-    implements ServerShutdownListener, TextWriter,
+    implements ServerShutdownListener, TextWriter, RotatableLogFile,
     ConfigurationChangeListener<SizeLimitLogRotationPolicyCfg>
 {
   /**
@@ -180,7 +181,6 @@
     outputStream = new MeteredStream(stream, file.length());
 
     OutputStreamWriter osw = new OutputStreamWriter(outputStream, encoding);
-    BufferedWriter bw = null;
     if(bufferSize <= 0)
     {
       writer = new BufferedWriter(osw);
@@ -687,11 +687,8 @@
     this.actions = actions;
   }
 
-  /**
-   * Retrieves the number of bytes written to the current log file.
-   *
-   * @return The number of bytes written to the current log file.
-   */
+  /** {@inheritDoc} */
+  @Override
   public long getBytesWritten()
   {
     return outputStream.written;
@@ -719,13 +716,8 @@
     return lastCleanCount;
   }
 
-  /**
-   * Retrieves the last time a log file was rotated in this instance of
-   * Directory Server. If a log rotation never
-   * occurred, this value will be the time the server started.
-   *
-   * @return The last time log rotation occurred.
-   */
+  /** {@inheritDoc} */
+  @Override
   public Calendar getLastRotationTime()
   {
     return lastRotationTime;
diff --git a/opends/src/server/org/opends/server/loggers/RotatableLogFile.java b/opends/src/server/org/opends/server/loggers/RotatableLogFile.java
new file mode 100644
index 0000000..eaf33db
--- /dev/null
+++ b/opends/src/server/org/opends/server/loggers/RotatableLogFile.java
@@ -0,0 +1,51 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.loggers;
+
+import java.util.Calendar;
+
+/**
+ * Represents a file that can be rotated based on size or on time.
+ */
+public interface RotatableLogFile
+{
+
+  /**
+   * Retrieves the number of bytes written to the file.
+   *
+   * @return The number of bytes written to the file.
+   */
+  long getBytesWritten();
+
+  /**
+   * Retrieves the last time the file was rotated. If a file rotation never
+   * occurred, this value will be the time the server started.
+   *
+   * @return The last time file rotation occurred.
+   */
+  Calendar getLastRotationTime();
+
+}
diff --git a/opends/src/server/org/opends/server/loggers/RotationPolicy.java b/opends/src/server/org/opends/server/loggers/RotationPolicy.java
index dcbc8cc..af45956 100644
--- a/opends/src/server/org/opends/server/loggers/RotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/RotationPolicy.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2014 ForgeRock AS
  */
 package org.opends.server.loggers;
 
@@ -60,14 +61,13 @@
 
 
   /**
-   * This method indicates if the log file should be
-   * rotated or not.
+   * This method indicates if the log file should be rotated or not.
    *
-   * @param writer The multi file writer writing the file to be
-   *        checked.
+   * @param writer
+   *          the file writer to be checked.
    * @return true if the log file should be rotated, false otherwise.
    */
-  public boolean rotateFile(MultifileTextWriter writer);
+  public boolean rotateFile(RotatableLogFile writer);
 
 
 }
diff --git a/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java b/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
index fc8a213..198bb6d 100644
--- a/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/SizeBasedRotationPolicy.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2014 ForgeRock AS
  */
 package org.opends.server.loggers;
 import org.opends.messages.Message;
@@ -97,7 +98,7 @@
    * @param writer The multi file text writer writing the log file.
    * @return true if the file needs to be rotated, false otherwise.
   */
-  public boolean rotateFile(MultifileTextWriter writer)
+  public boolean rotateFile(RotatableLogFile writer)
   {
     long fileSize = writer.getBytesWritten();
 
diff --git a/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java b/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
index 3d14e63..a7b50c2 100644
--- a/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
+++ b/opends/src/server/org/opends/server/loggers/TimeLimitRotationPolicy.java
@@ -22,6 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
+ *      Portions Copyright 2014 ForgeRock AS
  */
 package org.opends.server.loggers;
 import org.opends.messages.Message;
@@ -90,7 +91,7 @@
    * @param writer The mutli file text writer written the log file.
    * @return true if the file should be rotated, false otherwise.
    */
-  public boolean rotateFile(MultifileTextWriter writer)
+  public boolean rotateFile(RotatableLogFile writer)
   {
     long currInterval = TimeThread.getTime() -
         writer.getLastRotationTime().getTimeInMillis();
diff --git a/opends/src/server/org/opends/server/replication/common/CSN.java b/opends/src/server/org/opends/server/replication/common/CSN.java
index 0460b7e..f31e8f0 100644
--- a/opends/src/server/org/opends/server/replication/common/CSN.java
+++ b/opends/src/server/org/opends/server/replication/common/CSN.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2013 ForgeRock AS.
+ *      Portions Copyright 2013-2014 ForgeRock AS.
  */
 package org.opends.server.replication.common;
 
@@ -61,6 +61,9 @@
    */
   public static final int STRING_ENCODING_LENGTH = 28;
 
+  /** The maximum possible value for a CSN. */
+  public static final CSN MAX_CSN_VALUE = new CSN(Long.MAX_VALUE, Integer.MAX_VALUE, Short.MAX_VALUE);
+
   private static final long serialVersionUID = -8802722277749190740L;
   private final long timeStamp;
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
index 53ee20e..9a91ff9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
@@ -29,6 +29,7 @@
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.opends.server.admin.std.server.MonitorProviderCfg;
 import org.opends.server.api.MonitorProvider;
 import org.opends.server.config.ConfigException;
@@ -38,10 +39,11 @@
 import org.opends.server.replication.server.changelog.api.*;
 import org.opends.server.types.*;
 
+import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
 /**
- * Logfile-based implementation of a ChangeNumberIndexDB.
+ * Implementation of a ChangeNumberIndexDB with a log.
  * <p>
  * This class publishes some monitoring information below <code>
  * cn=monitor</code>.
@@ -57,7 +59,7 @@
   static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
 
   /** The log in which records are persisted. */
-  private final LogFile<Long, ChangeNumberIndexRecord> logFile;
+  private final Log<Long, ChangeNumberIndexRecord> log;
 
   /**
    * The newest changenumber stored in the DB. It is used to avoid purging the
@@ -92,7 +94,7 @@
    */
   FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
   {
-    logFile = replicationEnv.getOrCreateCNIndexDB();
+    log = replicationEnv.getOrCreateCNIndexDB();
     final ChangeNumberIndexRecord newestRecord = readLastRecord();
     newestChangeNumber = getChangeNumber(newestRecord);
     // initialization of the lastGeneratedChangeNumber from the DB content
@@ -106,13 +108,13 @@
 
   private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
   {
-    final Record<Long, ChangeNumberIndexRecord> record = logFile.getNewestRecord();
+    final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
     return record == null ? null : record.getValue();
   }
 
   private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
   {
-    final Record<Long, ChangeNumberIndexRecord> record = logFile.getOldestRecord();
+    final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
     return record == null ? null : record.getValue();
   }
 
@@ -132,7 +134,7 @@
     final long changeNumber = nextChangeNumber();
     final ChangeNumberIndexRecord newRecord =
         new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN());
-    logFile.addRecord(newRecord.getChangeNumber(), newRecord);
+    log.append(Record.from(newRecord.getChangeNumber(), newRecord));
     newestChangeNumber = changeNumber;
 
     if (debugEnabled())
@@ -177,7 +179,7 @@
    */
   long count() throws ChangelogException
   {
-    return logFile.getNumberOfRecords();
+    return log.getNumberOfRecords();
   }
 
   /**
@@ -197,7 +199,7 @@
   @Override
   public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
   {
-    return new FileChangeNumberIndexDBCursor(logFile.getCursor(startChangeNumber));
+    return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
   }
 
   /**
@@ -207,7 +209,7 @@
   {
     if (shutdown.compareAndSet(false, true))
     {
-      logFile.close();
+      log.close();
       DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
   }
@@ -228,11 +230,8 @@
     {
       return null;
     }
-
-    // TODO : no purge implemented yet as implementation is based on a single-file log.
-    // The purge must be implemented once we handle a log with multiple files.
-    // The purge will only delete whole files.
-    return null;
+    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
+    return record != null ? record.getValue().getCSN() : null;
   }
 
   /**
@@ -321,7 +320,7 @@
    */
   public void clear() throws ChangelogException
   {
-    logFile.clear();
+    log.clear();
     newestChangeNumber = NO_KEY;
   }
 
@@ -331,15 +330,16 @@
     private static final byte STRING_SEPARATOR = 0;
 
     @Override
-    public ByteString encodeRecord(final Long changeNumber, final ChangeNumberIndexRecord record)
+    public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
     {
+      final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
       return new ByteStringBuilder()
-        .append(changeNumber)
-        .append(record.getPreviousCookie())
+        .append(record.getKey())
+        .append(cnIndexRecord.getPreviousCookie())
         .append(STRING_SEPARATOR)
-        .append(record.getBaseDN().toString())
+        .append(cnIndexRecord.getBaseDN().toString())
         .append(STRING_SEPARATOR)
-        .append(record.getCSN().toByteString()).toByteString();
+        .append(cnIndexRecord.getCSN().toByteString()).toByteString();
     }
 
     @Override
@@ -378,6 +378,35 @@
       }
       return length;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public Long decodeKeyFromString(String key) throws ChangelogException
+    {
+      try
+      {
+        return Long.valueOf(key);
+      }
+      catch (NumberFormatException e)
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String encodeKeyToString(Long key)
+    {
+      return key.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Long getMaxKey()
+    {
+      return Long.MAX_VALUE;
+    }
   }
 
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index e2b4f08..36e8a7a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -49,13 +49,12 @@
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
 import org.opends.server.util.TimeThread;
-
 import com.forgerock.opendj.util.Pair;
 
-import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.*;
+import static org.opends.messages.ReplicationMessages.*;
 
 /**
  * Log file implementation of the ChangelogDB interface.
@@ -106,10 +105,11 @@
 
   /** The local replication server. */
   private final ReplicationServer replicationServer;
+
   private final AtomicBoolean shutdown = new AtomicBoolean();
 
   static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
-      new FileReplicaDBCursor(new LogFile.EmptyLogCursor<CSN, UpdateMsg>(), null);
+      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null);
 
   /**
    * Creates a new changelog DB.
@@ -122,10 +122,10 @@
    *           if a problem occurs opening the supplied directory
    */
   public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
-      throws ConfigException
+     throws ConfigException
   {
-    this.config = config;
     this.replicationServer = replicationServer;
+    this.config = config;
     this.dbDirectory = makeDir(config.getReplicationDBDirectory());
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 1526830..0525996 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -43,7 +43,7 @@
 import org.opends.server.replication.server.ReplicationServerDomain;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
 import org.opends.server.types.Attribute;
 import org.opends.server.types.Attributes;
 import org.opends.server.types.ByteString;
@@ -92,7 +92,7 @@
   private final AtomicBoolean shutdown = new AtomicBoolean(false);
 
   /** The log in which records are persisted. */
-  private final LogFile<CSN, UpdateMsg> logFile;
+  private final Log<CSN, UpdateMsg> log;
 
   /**
    * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -133,7 +133,7 @@
     this.baseDN = baseDN;
     this.replicationServer = replicationServer;
     this.replicationEnv = replicationEnv;
-    this.logFile = createLogFile(replicationEnv);
+    this.log = createLog(replicationEnv);
     this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
 
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -142,17 +142,17 @@
 
   private CSN readOldestCSN() throws ChangelogException
   {
-    final Record<CSN, UpdateMsg> record = logFile.getOldestRecord();
+    final Record<CSN, UpdateMsg> record = log.getOldestRecord();
     return record == null ? null : record.getKey();
   }
 
   private CSN readNewestCSN() throws ChangelogException
   {
-    final Record<CSN, UpdateMsg> record = logFile.getNewestRecord();
+    final Record<CSN, UpdateMsg> record = log.getNewestRecord();
     return record == null ? null : record.getKey();
   }
 
-  private LogFile<CSN, UpdateMsg> createLogFile(final ReplicationEnvironment replicationEnv) throws ChangelogException
+  private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
   {
     final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
     return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
@@ -174,7 +174,7 @@
           ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
               .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
     }
-    logFile.addRecord(Record.from(updateMsg.getCSN(), updateMsg));
+    log.append(Record.from(updateMsg.getCSN(), updateMsg));
     final CSNLimits limits = csnLimits;
     final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
     final boolean updateOld = limits.oldestCSN == null;
@@ -239,7 +239,7 @@
    */
   DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
   {
-    LogCursor<CSN, UpdateMsg> cursor = logFile.getNearestCursor(startAfterCSN);
+    RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
     return new FileReplicaDBCursor(cursor, startAfterCSN);
   }
 
@@ -250,7 +250,7 @@
   {
     if (shutdown.compareAndSet(false, true))
     {
-      logFile.close();
+      log.close();
       DirectoryServer.deregisterMonitorProvider(dbMonitor);
     }
   }
@@ -270,10 +270,11 @@
     {
       return;
     }
-
-    // TODO : no purge implemented yet, as we have a single-file log.
-    // The purge must be implemented once we handle a log with multiple files.
-    // The purge will only delete whole files.
+    final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
+    if (oldestRecord != null)
+    {
+      csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
+    }
   }
 
   /**
@@ -346,7 +347,7 @@
   void clear() throws ChangelogException
   {
     // Remove all persisted data and reset generationId to default value
-    logFile.clear();
+    log.clear();
     replicationEnv.resetGenerationId(baseDN);
 
     csnLimits = new CSNLimits(null, null);
@@ -363,7 +364,7 @@
    */
   long getNumberRecords() throws ChangelogException
   {
-    return logFile.getNumberOfRecords();
+    return log.getNumberOfRecords();
   }
 
   /** Parser of records persisted in the ReplicaDB log. */
@@ -372,8 +373,9 @@
     private static final DebugTracer TRACER = getTracer();
 
     @Override
-    public ByteString encodeRecord(CSN key, UpdateMsg message)
+    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
     {
+      final UpdateMsg message = record.getValue();
       try
       {
         return ByteString.wrap(message.getBytes());
@@ -400,6 +402,27 @@
         throw new DecodingException(e);
       }
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public CSN decodeKeyFromString(String key) throws ChangelogException
+    {
+      return new CSN(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String encodeKeyToString(CSN key)
+    {
+      return key.toString();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public CSN getMaxKey()
+    {
+      return CSN.MAX_CSN_VALUE;
+    }
   }
 
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
index 3f3a2fa..e067b6a 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -29,7 +29,7 @@
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.file.LogFile.LogCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
 
 /**
  * A cursor on ReplicaDB.
@@ -49,7 +49,7 @@
 {
 
   /** The underlying cursor. */
-  private final LogCursor<CSN, UpdateMsg> cursor;
+  private final RepositionableCursor<CSN, UpdateMsg> cursor;
 
   /** The next record to return. */
   private Record<CSN, UpdateMsg> nextRecord;
@@ -66,7 +66,7 @@
    *          The CSN to use as a start point (excluded from cursor, the lowest
    *          CSN higher than this CSN is used as the real start point).
    */
-  FileReplicaDBCursor(LogCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
+  FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
     this.cursor = cursor;
     this.lastNonNullCurrentCSN = startAfterCSN;
   }
@@ -90,7 +90,6 @@
     else
     {
       // Exhausted cursor must be able to reinitialize itself
-      cursor.rewind();
       cursor.positionTo(lastNonNullCurrentCSN, true);
 
       nextRecord = cursor.getRecord();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
new file mode 100644
index 0000000..9667a56
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -0,0 +1,1091 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.opends.messages.ReplicationMessages.*;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.forgerock.util.Reject;
+import org.opends.server.loggers.ErrorLogger;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
+import org.opends.server.util.StaticUtils;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A multi-file log that features appending key-value records and reading them
+ * using a {@code DBCursor}.
+ * The records must be appended to the log in ascending order of the keys.
+ * <p>
+ * A log is defined for a path - the log path - and contains one to many log files:
+ * <ul>
+ * <li>it has always at least one log file, the head log file, named "head.log",
+ * which is used to append the records.</li>
+ * <li>it may have from zero to many read-only log files, which are named after
+ * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively
+ * the string representation of lowest and highest key present in the log file.</li>
+ * </ul>
+ * A read-only log file is created each time the head log file has reached the
+ * maximum size limit. The head log file is then rotated to the read-only file and a
+ * new empty head log file is opened. There is no limit on the number of read-only
+ * files, but they can be purged.
+ * <p>
+ * A log is obtained using the {@code Log.openLog()} method and must always be
+ * released using the {@code close()} method.
+ * <p>
+ * Usage example:
+ * <pre>
+ * {@code
+ *   Log<K, V> log = null;
+ *   try
+ *   {
+ *     log = Log.openLog(logPath, parser, maxFileSize);
+ *     log.append(key, value);
+ *     DBCursor<K, V> cursor = log.getCursor(someKey);
+ *     // use cursor, then close cursor
+ *   }
+ *   finally
+ *   {
+ *     log.close();
+ *   }
+ * }
+ * </pre>
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+final class Log<K extends Comparable<K>, V> implements Closeable
+{
+  private static final DebugTracer TRACER = getTracer();
+
+  private static final String LOG_FILE_SUFFIX = ".log";
+
+  static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
+
+  private static final String LOG_FILE_NAME_SEPARATOR = "_";
+
+  private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter()
+  {
+    @Override
+    public boolean accept(File file)
+    {
+      return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) &&
+          !file.getName().equals(HEAD_LOG_FILE_NAME);
+    }
+  };
+
+  /** Map that holds the unique log instance for each log path. */
+  private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>();
+
+  /**
+   * The number of references on this log instance. It is incremented each time
+   * a log is opened on the same log path. The log is effectively closed only
+   * when the {@code close()} method is called and this value is at most 1.
+   */
+  private int referenceCount;
+
+  /** The path of directory for this log, where log files are stored. */
+  private final File logPath;
+
+  /** The parser used for encoding/decoding of records. */
+  private final RecordParser<K, V> recordParser;
+
+  /**
+   * Indicates if this log is closed. When the log is closed, all methods return
+   * immediately with no effect.
+   */
+  private boolean isClosed;
+
+  /**
+   * The log files contained in this log, ordered by key.
+   * <p>
+   * The head log file is always present and is associated with the maximum
+   * possible key, given by the record parser.
+   * <p>
+   * The read-only log files are associated with the highest key they contain.
+   */
+  private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
+
+  /**
+   * The list of non-empty cursors opened on this log. Opened cursors may have
+   * to be updated when rotating the head log file.
+   */
+  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
+
+  /**
+   * A log file is rotated once it has exceeded this size limit. The log file can have
+   * a size much larger than this limit if the last record written has a huge size.
+   *
+   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
+   * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
+   */
+  private final long sizeLimitPerLogFileInBytes;
+
+  /**
+   * The exclusive lock used for writes and lifecycle operations on this log:
+   * initialize, clear, sync and close.
+   */
+  private final Lock exclusiveLock;
+
+  /**
+   * The shared lock used for reads and cursor operations on this log.
+   */
+  private final Lock sharedLock;
+
+  /**
+   * Open a log with the provided log path, record parser and maximum size per
+   * log file.
+   * <p>
+   * If no log exists for the provided path, a new one is created.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param logPath
+   *          Path of the log.
+   * @param parser
+   *          Parser for encoding/decoding of records.
+   * @param sizeLimitPerFileInBytes
+   *          Limit in bytes before rotating the head log file of the log.
+   * @return a log
+   * @throws ChangelogException
+   *           If a problem occurs during initialization.
+   */
+  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
+      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
+  {
+    Reject.ifNull(logPath, parser);
+    @SuppressWarnings("unchecked")
+    Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
+    if (log == null)
+    {
+      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
+      logsCache.put(logPath, log);
+    }
+    else
+    {
+      // TODO : check that parser and size limit are compatible with the existing one,
+      // and issue a warning if it is not the case
+      log.referenceCount++;
+    }
+    return log;
+  }
+
+  /**
+   * Release a reference to the log corresponding to provided path. The log is
+   * closed if this is the last reference.
+   */
+  private static synchronized void releaseLog(final File logPath)
+  {
+    Log<?, ?> log = logsCache.get(logPath);
+    if (log == null)
+    {
+      // this should never happen
+      ErrorLogger.logError(
+          ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
+      return;
+    }
+    if (log.referenceCount > 1)
+    {
+      log.referenceCount--;
+    }
+    else
+    {
+      log.doClose();
+      logsCache.remove(logPath);
+    }
+  }
+
+  /**
+   * Creates a new log.
+   *
+   * @param logPath
+   *            The directory path of the log.
+   * @param parser
+   *          Parser of records.
+   * @param sizeLimitPerFile
+   *            Limit in bytes before rotating a log file.
+   * @throws ChangelogException
+   *            If a problem occurs during initialization.
+   */
+  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
+      throws ChangelogException
+  {
+    this.logPath = logPath;
+    this.recordParser = parser;
+    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
+    this.referenceCount = 1;
+
+    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
+    this.exclusiveLock = lock.writeLock();
+    this.sharedLock = lock.readLock();
+
+    createOrOpenLogFiles();
+  }
+
+  /** Create or open log files used by this log. */
+  private void createOrOpenLogFiles() throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      createRootDirIfNotExists();
+      openHeadLogFile();
+      for (final File file : getReadOnlyLogFiles())
+      {
+        openReadOnlyLogFile(file);
+      }
+      isClosed = false;
+    }
+    catch (ChangelogException e)
+    {
+      // ensure all log files opened at this point are closed
+      close();
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  private File[] getReadOnlyLogFiles() throws ChangelogException
+  {
+    File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
+    if (files == null)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
+    }
+    return files;
+  }
+
+  private void createRootDirIfNotExists() throws ChangelogException
+  {
+    if (!logPath.exists())
+    {
+      if (!logPath.mkdirs())
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
+      }
+    }
+  }
+
+  /**
+   * Returns the path of this log.
+   *
+   * @return the path of this log directory
+   */
+  public File getPath()
+  {
+    return logPath;
+  }
+
+  /**
+   * Add the provided record at the end of this log.
+   * <p>
+   * In order to ensure that record is written out of buffers and persisted
+   * to file system, it is necessary to explicitely call the
+   * {@code syncToFileSystem()} method.
+   *
+   * @param record
+   *          The record to add.
+   * @throws ChangelogException
+   *           If the record can't be added to the log.
+   */
+  public void append(final Record<K, V> record) throws ChangelogException
+  {
+    // If this exclusive lock happens to be a bottleneck :
+    // 1. use a shared lock for appending the record first
+    // 2. switch to an exclusive lock only if rotation is needed
+    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      LogFile<K, V> headLogFile = getHeadLogFile();
+      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
+      {
+        ErrorLogger.logError(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
+
+        rotateHeadLogFile();
+        headLogFile = getHeadLogFile();
+      }
+      headLogFile.append(record);
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  /**
+   * Synchronize all records added with the file system, ensuring that records
+   * are effectively persisted.
+   * <p>
+   * After a successful call to this method, it is guaranteed that all records
+   * added to the log are persisted to the file system.
+   *
+   * @throws ChangelogException
+   *           If the synchronization fails.
+   */
+  public void syncToFileSystem() throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      getHeadLogFile().syncToFileSystem();
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log,
+   * starting at the first position.
+   * <p>
+   * The returned cursor initially points to record corresponding to the first
+   * key, that is {@code cursor.getRecord()} is equals to the record
+   * corresponding to the first key before any call to {@code cursor.next()}
+   * method.
+   *
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  public RepositionableCursor<K, V> getCursor() throws ChangelogException
+  {
+    LogCursor<K, V> cursor = null;
+    sharedLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return new EmptyLogCursor<K, V>();
+      }
+      cursor = new LogCursor<K, V>(this);
+      cursor.positionTo(null, false);
+      registerCursor(cursor);
+      return cursor;
+    }
+    catch (ChangelogException e)
+    {
+      StaticUtils.close(cursor);
+      throw e;
+    }
+    finally
+    {
+      sharedLock.unlock();
+    }
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log,
+   * starting at the position defined by the provided key.
+   * <p>
+   * The returned cursor initially points to record corresponding to the key,
+   * that is {@code cursor.getRecord()} is equals to the record corresponding to
+   * the key before any call to {@code cursor.next()} method.
+   *
+   * @param key
+   *          Key to use as a start position for the cursor. If key is
+   *          {@code null}, cursor will point at the first record of the log.
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
+  {
+    return getCursor(key, false);
+  }
+
+  /**
+   * Returns a cursor that allows to retrieve the records from this log,
+   * starting at the position defined by the smallest key that is higher than
+   * the provided key.
+   * <p>
+   * The returned cursor initially points to record corresponding to the key
+   * found, that is {@code cursor.getRecord()} is equals to the record
+   * corresponding to the key found before any call to {@code cursor.next()}
+   * method.
+   *
+   * @param key
+   *          Key to use as a start position for the cursor. If key is
+   *          {@code null}, cursor will point at the first record of the log.
+   * @return a cursor on the log records, which is never {@code null}
+   * @throws ChangelogException
+   *           If the cursor can't be created.
+   */
+  public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+  {
+    return getCursor(key, true);
+  }
+
+  /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */
+  private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException
+  {
+    if (key == null)
+    {
+      return getCursor();
+    }
+    LogCursor<K, V> cursor = null;
+    sharedLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return new EmptyLogCursor<K, V>();
+      }
+      cursor = new LogCursor<K, V>(this);
+      final boolean isFound = cursor.positionTo(key, findNearest);
+      // for nearest case, it is ok if the target is not found
+      if (isFound || findNearest)
+      {
+        registerCursor(cursor);
+        return cursor;
+      }
+      else
+      {
+        StaticUtils.close(cursor);
+        return new EmptyLogCursor<K, V>();
+      }
+    }
+    catch (ChangelogException e)
+    {
+      StaticUtils.close(cursor);
+      throw e;
+    }
+    finally
+    {
+      sharedLock.unlock();
+    }
+  }
+
+
+  /**
+   * Returns the oldest (first) record from this log.
+   *
+   * @return the oldest record, which may be {@code null} if there is no record
+   *         in the log.
+   * @throws ChangelogException
+   *           If an error occurs while retrieving the record.
+   */
+  public Record<K, V> getOldestRecord() throws ChangelogException
+  {
+    return getOldestLogFile().getOldestRecord();
+  }
+
+
+  /**
+   * Returns the newest (last) record from this log.
+   *
+   * @return the newest record, which may be {@code null}
+   * @throws ChangelogException
+   *           If an error occurs while retrieving the record.
+   */
+  public Record<K, V> getNewestRecord() throws ChangelogException
+  {
+    return getHeadLogFile().getNewestRecord();
+  }
+
+  /**
+   * Returns the number of records in the log.
+   *
+   * @return the number of records
+   * @throws ChangelogException
+   *            If a problem occurs.
+   */
+  public long getNumberOfRecords() throws ChangelogException
+  {
+    long count = 0;
+    for (final LogFile<K, V> logFile : logFiles.values())
+    {
+      count += logFile.getNumberOfRecords();
+    }
+    return count;
+  }
+
+  /**
+   * Purge the log up to and excluding the provided key.
+   *
+   * @param purgeKey
+   *            the key up to which purging must happen
+   * @return the oldest non purged record, or {@code null}
+   *         if no record was purged
+   * @throws ChangelogException
+   *           if a database problem occurs.
+   */
+  public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return null;
+      }
+      final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey);
+      if (logFilesToPurge.isEmpty())
+      {
+        return null;
+      }
+      final List<String> undeletableFiles = new ArrayList<String>();
+      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
+      while (entriesToPurge.hasNext())
+      {
+        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
+        try
+        {
+          logFile.close();
+          logFile.delete();
+          entriesToPurge.remove();
+        }
+        catch (ChangelogException e)
+        {
+          // The deletion of log file on file system has failed
+          undeletableFiles.add(logFile.getFile().getPath());
+        }
+      }
+      if (!undeletableFiles.isEmpty())
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
+                StaticUtils.listToString(undeletableFiles, ", ")));
+      }
+      return getOldestRecord();
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+
+  }
+
+  /**
+   * Empties the log, discarding all records it contains.
+   *
+   * @throws ChangelogException
+   *           If cursors are opened on this log, or if a problem occurs during
+   *           clearing operation.
+   */
+  public void clear() throws ChangelogException
+  {
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      if (!openCursors.isEmpty())
+      {
+        // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
+        // there is one open cursor when clearing.
+        // Switch to logging until this issue is solved
+        // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
+        //    logPath.getPath(), openCursors.size()));
+        ErrorLogger.logError(
+            ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
+      }
+
+      // delete all log files
+      final List<String> undeletableFiles = new ArrayList<String>();
+      for (LogFile<K, V> logFile : logFiles.values())
+      {
+        try
+        {
+          logFile.close();
+          logFile.delete();
+        }
+        catch (ChangelogException e)
+        {
+          undeletableFiles.add(logFile.getFile().getPath());
+        }
+      }
+      if (!undeletableFiles.isEmpty())
+      {
+        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
+            StaticUtils.listToString(undeletableFiles, ", ")));
+      }
+      logFiles.clear();
+
+      // recreate an empty head log file
+      openHeadLogFile();
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    releaseLog(logPath);
+  }
+
+  /** Effectively close this log. */
+  private void doClose()
+  {
+    exclusiveLock.lock();
+    try
+    {
+      if (isClosed)
+      {
+        return;
+      }
+      if (!openCursors.isEmpty())
+      {
+        ErrorLogger.logError(
+            ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
+      }
+      StaticUtils.close(logFiles.values());
+      isClosed = true;
+    }
+    finally
+    {
+      exclusiveLock.unlock();
+    }
+  }
+
+  private LogFile<K, V> getHeadLogFile()
+  {
+    return logFiles.lastEntry().getValue();
+  }
+
+  private LogFile<K, V> getOldestLogFile()
+  {
+    return logFiles.firstEntry().getValue();
+  }
+
+  /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
+  private void rotateHeadLogFile() throws ChangelogException
+  {
+    final LogFile<K, V> headLogFile = getHeadLogFile();
+    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
+    headLogFile.close();
+    renameHeadLogFileTo(readOnlyLogFile);
+
+    openHeadLogFile();
+    openReadOnlyLogFile(readOnlyLogFile);
+    updateCursorsOpenedOnHead();
+  }
+
+  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
+  {
+    final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
+    try
+    {
+      StaticUtils.renameFile(headLogFile, rotatedLogFile);
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
+    }
+  }
+
+  /**
+   * Returns the key bounds for the provided log file.
+   *
+   * @return the pair of (lowest key, highest key) that correspond to records
+   *         stored in the corresponding log file.
+   * @throws ChangelogException
+   *            if an error occurs while retrieving the keys
+   */
+   private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException
+   {
+     try
+     {
+       final String name = logFile.getFile().getName();
+       final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
+           .split(LOG_FILE_NAME_SEPARATOR);
+       return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
+     }
+     catch (Exception e)
+     {
+       throw new ChangelogException(
+           ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
+     }
+   }
+
+   /**
+    * Returns the file name to use for the read-only version of the provided
+    * log file.
+    * <p>
+    * The file name is based on the lowest and highest key in the log file.
+    *
+    * @return the name to use for the read-only version of the log file
+    * @throws ChangelogException
+    *            If an error occurs.
+    */
+  private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException
+  {
+    final K lowestKey = logFile.getOldestRecord().getKey();
+    final K highestKey = logFile.getNewestRecord().getKey();
+    return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
+  }
+
+  /** Update the open cursors after a rotation of the head log file. */
+  private void updateCursorsOpenedOnHead() throws ChangelogException
+  {
+    for (LogCursor<K, V> cursor : openCursors)
+    {
+      final CursorState<K, V> state = cursor.getState();
+      // Need to update the cursor only if it is pointing to the head log file
+      if (isHeadLogFile(state.logFile))
+      {
+        updateOpenCursor(cursor, state);
+      }
+    }
+  }
+
+  /**
+   * Update the provided open cursor with the provided state.
+   * <p>
+   * The cursor must report the previous state on the head log file to the same
+   * state (position in file, current record) in the read-only log file just created.
+   */
+  private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException
+  {
+    final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
+    final LogFile<K, V> logFile = findLogFileFor(previousKey);
+    cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record));
+  }
+
+  private void openHeadLogFile() throws ChangelogException
+  {
+    logFiles.put(recordParser.getMaxKey(),
+        LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser));
+  }
+
+  private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
+  {
+    final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
+    final Pair<K, K> bounds = getKeyBounds(logFile);
+    logFiles.put(bounds.getSecond(), logFile);
+  }
+
+  private void registerCursor(final LogCursor<K, V> cursor)
+  {
+    openCursors.add(cursor);
+  }
+
+  private void unregisterCursor(final LogCursor<K, V> cursor)
+  {
+    openCursors.remove(cursor);
+  }
+
+  /**
+   * Returns the log file that is just after the provided log file wrt the order
+   * defined on keys, or {@code null} if the provided log file is the last one
+   * (the head log file).
+   */
+  private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
+  {
+    if (isHeadLogFile(currentLogFile))
+    {
+      return null;
+    }
+    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
+    return logFiles.higherEntry(bounds.getSecond()).getValue();
+  }
+
+  private boolean isHeadLogFile(final LogFile<K, V> logFile)
+  {
+    return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
+  }
+
+  /** Returns the log file that should contain the provided key. */
+  private LogFile<K, V> findLogFileFor(final K key)
+  {
+    if (key == null || logFiles.lowerKey(key) == null)
+    {
+      return getOldestLogFile();
+    }
+    return logFiles.ceilingEntry(key).getValue();
+  }
+
+  /**
+   * Represents a cursor than can be repositioned on a given key.
+   */
+  static interface RepositionableCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+  {
+    /**
+     * Position the cursor to the record corresponding to the provided key or to
+     * the nearest key (the lowest key higher than the provided key).
+     *
+     * @param key
+     *          Key to use as a start position for the cursor. If key is
+     *          {@code null}, use the key of the first record instead.
+     * @param findNearest
+     *          If {@code true}, start position is the lowest key that is higher
+     *          than the provided key, otherwise start position is the provided
+     *          key.
+     * @return {@code true} if cursor is successfully positionned to the key or
+     *         the nearest key, {@code false} otherwise.
+     * @throws ChangelogException
+     *           If an error occurs when positioning cursor.
+     */
+    boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+  }
+
+  /**
+   * Implements a cursor on the log.
+   * <p>
+   * The cursor initially points to a record, that is {@code cursor.getRecord()}
+   * is equals to the first record available from the cursor before any call to
+   * {@code cursor.next()} method.
+   * <p>
+   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
+   */
+  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
+  {
+    private final Log<K, V> log;
+
+    private LogFile<K, V> currentLogFile;
+    private LogFileCursor<K, V> currentCursor;
+
+    /**
+     * Creates a cursor on the provided log.
+     *
+     * @param log
+     *           The log on which the cursor read records.
+     * @throws ChangelogException
+     *           If an error occurs when creating the cursor.
+     */
+    private LogCursor(final Log<K, V> log) throws ChangelogException
+    {
+      this.log = log;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Record<K, V> getRecord()
+    {
+      return currentCursor.getRecord();
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean next() throws ChangelogException
+    {
+      log.sharedLock.lock();
+      try
+      {
+        final boolean hasNext = currentCursor.next();
+        if (hasNext)
+        {
+          return true;
+        }
+        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
+        if (logFile != null)
+        {
+          switchToLogFile(logFile);
+          return true;
+        }
+        return false;
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close()
+    {
+      log.sharedLock.lock();
+      try
+      {
+        StaticUtils.close(currentCursor);
+        log.unregisterCursor(this);
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
+    {
+      log.sharedLock.lock();
+      try
+      {
+        final LogFile<K, V> logFile = log.findLogFileFor(key);
+        if (logFile != currentLogFile)
+        {
+          switchToLogFile(logFile);
+        }
+        if (key != null)
+        {
+          boolean isFound = currentCursor.positionTo(key, findNearest);
+          if (isFound && getRecord() == null)
+          {
+            // The key to position to may be in the next file, force the switch
+            isFound = next();
+          }
+          return isFound;
+        }
+        return true;
+      }
+      finally
+      {
+        log.sharedLock.unlock();
+      }
+    }
+
+    /** Returns the state of this cursor. */
+    private CursorState<K, V> getState() throws ChangelogException
+    {
+      return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
+    }
+
+    /** Reinitialize this cursor to the provided state. */
+    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
+    {
+      StaticUtils.close(currentCursor);
+      currentLogFile = cursorState.logFile;
+      currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
+    }
+
+    /** Switch the cursor to the provided log file. */
+    private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException
+    {
+      StaticUtils.close(currentCursor);
+      currentLogFile = logFile;
+      currentCursor = currentLogFile.getCursor();
+    }
+  }
+
+  /** An empty cursor, that always return null records and false to {@code next()} method. */
+  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
+  {
+    /** {@inheritDoc} */
+    @Override
+    public Record<K,V> getRecord()
+    {
+      return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean next()
+    {
+      return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
+    {
+      return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void close()
+    {
+      // nothing to do
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      return "EmptyLogCursor";
+    }
+  }
+
+  /**
+   * Represents the state of a cursor.
+   * <p>
+   * The state is used to update a cursor when rotating the head log file : the
+   * state of cursor on head log file must be reported to the new read-only log
+   * file that is created when rotating.
+   */
+  private static class CursorState<K extends Comparable<K>, V>
+  {
+    /** The log file. */
+    private final LogFile<K, V> logFile;
+
+    /**
+     * The position of the reader on the log file. It is the offset from the
+     * beginning of the file, in bytes, at which the next read occurs.
+     */
+    private final long filePosition;
+
+    /** The record the cursor is pointing to. */
+    private final Record<K,V> record;
+
+    private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
+    {
+      this.logFile = logFile;
+      this.filePosition = filePosition;
+      this.record = record;
+    }
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index 005c8c7..f0bb7e9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -25,8 +25,8 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
+import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
 
 import java.io.BufferedWriter;
 import java.io.Closeable;
@@ -35,24 +35,30 @@
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.forgerock.util.Reject;
 import org.opends.messages.Message;
 import org.opends.server.loggers.debug.DebugTracer;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
 import org.opends.server.types.ByteString;
 import org.opends.server.types.ByteStringBuilder;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
 
-import static org.opends.messages.ReplicationMessages.*;
-
 /**
- * A file-based log that allow to append key-value records and
- * read them using a {@code DBCursor}.
+ * A log file, containing part of a {@code Log}. The log file may be:
+ * <ul>
+ * <li>write-enabled : allowing to append key-value records and read records
+ * from cursors,</li>
+ * <li>read-only : allowing to read records from cursors.</li>
+ * </ul>
+ * <p>
+ * A log file is NOT intended to be used directly, but only has part of a
+ * {@code Log}. In particular, there is no concurrency management and no checks
+ * to ensure that log is not closed when performing any operation on it. Those
+ * are managed at the {@code Log} level.
  *
  * @param <K>
  *          Type of the key of a record, which must be comparable.
@@ -61,16 +67,9 @@
  */
 final class LogFile<K extends Comparable<K>, V> implements Closeable
 {
-
   private static final DebugTracer TRACER = getTracer();
 
-  // Non private for use in tests
-  static final String LOG_FILE_NAME = "current.log";
-
-  /** The path of directory that contains the log file. */
-  private final File rootPath;
-
-  /** The log file containing the records. */
+  /** The file containing the records. */
   private final File logfile;
 
   /** The parser of records, to convert bytes to record and record to bytes. */
@@ -79,30 +78,20 @@
   /** The pool to obtain a reader on the log. */
   private LogReaderPool readerPool;
 
-  /** The writer on the log, which may be {@code null} if log is not write-enabled */
+  /**
+   * The writer on the log file, which may be {@code null} if log file is not
+   * write-enabled
+   */
   private LogWriter writer;
 
   /** Indicates if log is enabled for write. */
   private final boolean isWriteEnabled;
 
-  /** Indicates if the log is closed. */
-  private volatile boolean isClosed;
-
-  /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */
-  private final Lock exclusiveLock;
-
-  /**
-   * The shared lock used for read, write and flush operations on this log file.
-   * Write and flush operations can be shared because they're synchronized in the underlying writer.
-   * Reads can be done safely when writing because partially written records are handled.
-   */
-  private final Lock sharedLock;
-
   /**
    * Creates a new log file.
    *
-   * @param rootPath
-   *          Path of root directory that contains the log file.
+   * @param logFilePath
+   *          Path of the log file.
    * @param parser
    *          Parser of records.
    * @param isWriteEnabled
@@ -111,17 +100,13 @@
    * @throws ChangelogException
    *            If a problem occurs during initialization.
    */
-  private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled)
+  private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
       throws ChangelogException
   {
-    this.rootPath = rootPath;
+    Reject.ifNull(logFilePath, parser);
+    this.logfile = logFilePath;
     this.parser = parser;
     this.isWriteEnabled = isWriteEnabled;
-    this.logfile = new File(rootPath, LOG_FILE_NAME);
-
-    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
-    this.exclusiveLock = lock.writeLock();
-    this.sharedLock = lock.readLock();
 
     initialize();
   }
@@ -133,18 +118,18 @@
    *            Type of the key of a record, which must be comparable.
    * @param <V>
    *            Type of the value of a record.
-   * @param rootPath
-   *          Path of root directory that contains the log file.
+   * @param logFilePath
+   *          Path of the log file.
    * @param parser
    *          Parser of records.
    * @return a read-only log file
    * @throws ChangelogException
    *            If a problem occurs during initialization.
    */
-  public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath,
+  static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
       final RecordParser<K, V> parser) throws ChangelogException
   {
-    return new LogFile<K, V>(rootPath, parser, false);
+    return new LogFile<K, V>(logFilePath, parser, false);
   }
 
   /**
@@ -155,18 +140,18 @@
    *          Type of the key of a record, which must be comparable.
    * @param <V>
    *          Type of the value of a record.
-   * @param rootPath
-   *          Path of root directory that contains the log file.
+   * @param logFilePath
+   *          Path of the log file.
    * @param parser
    *          Parser of records.
    * @return a write-enabled log file
    * @throws ChangelogException
    *            If a problem occurs during initialization.
    */
-  public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath,
+  static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
       final RecordParser<K, V> parser) throws ChangelogException
   {
-    return new LogFile<K, V>(rootPath, parser, true);
+    return new LogFile<K, V>(logFilePath, parser, true);
   }
 
   /**
@@ -180,89 +165,29 @@
    */
   private void initialize() throws ChangelogException
   {
-    exclusiveLock.lock();
-    try
+    createLogFileIfNotExists();
+    if (isWriteEnabled)
     {
-      createRootDirIfNotExists();
-      createLogFileIfNotExists();
-      isClosed = false;
-      if (isWriteEnabled)
-      {
-        writer = LogWriter.acquireWriter(logfile);
-      }
-      readerPool = new LogReaderPool(logfile);
+      writer = new LogWriter(logfile);
     }
-    finally
-    {
-      exclusiveLock.unlock();
-    }
+    readerPool = new LogReaderPool(logfile);
   }
 
   /**
-   * Returns the name of this log.
+   * Returns the file containing the records.
    *
-   * @return the name, which corresponds to the directory containing the log
+   * @return the file
    */
-  public String getName()
+  File getFile()
   {
-    return logfile.getParent().toString();
-  }
-
-  /**
-   * Empties the log, discarding all records it contains.
-   * <p>
-   * This method should not be called with open cursors or
-   * when multiple instances of the log are opened.
-   *
-   * @throws ChangelogException
-   *            If a problem occurs.
-   */
-  public void clear() throws ChangelogException
-  {
-    checkLogIsEnabledForWrite();
-
-    exclusiveLock.lock();
-    try
-    {
-      if (isClosed)
-      {
-        return;
-      }
-      close();
-      final boolean isDeleted = logfile.delete();
-      if (!isDeleted)
-      {
-        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath()));
-      }
-      initialize();
-    }
-    catch (Exception e)
-    {
-      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e)));
-    }
-    finally
-    {
-      exclusiveLock.unlock();
-    }
+    return logfile;
   }
 
   private void checkLogIsEnabledForWrite() throws ChangelogException
   {
     if (!isWriteEnabled)
     {
-      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath()));
-    }
-  }
-
-  private void createRootDirIfNotExists() throws ChangelogException
-  {
-    if (!rootPath.exists())
-    {
-      final boolean created = rootPath.mkdirs();
-      if (!created)
-      {
-        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath()));
-      }
+      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
     }
   }
 
@@ -282,25 +207,6 @@
   }
 
   /**
-   * Add a record at the end of this log from the provided key and value.
-   * <p>
-   * In order to ensure that record is written out of buffers and persisted
-   * to file system, it is necessary to explicitely call the
-   * {@code syncToFileSystem()} method.
-   *
-   * @param key
-   *          The key of the record.
-   * @param value
-   *          The value of the record.
-   * @throws ChangelogException
-   *           If the record can't be added to the log.
-   */
-  public void addRecord(final K key, final V value) throws ChangelogException
-  {
-    addRecord(Record.from(key, value));
-  }
-
-  /**
    * Add the provided record at the end of this log.
    * <p>
    * In order to ensure that record is written out of buffers and persisted
@@ -312,33 +218,22 @@
    * @throws ChangelogException
    *           If the record can't be added to the log.
    */
-  public void addRecord(final Record<K, V> record) throws ChangelogException
+  void append(final Record<K, V> record) throws ChangelogException
   {
     checkLogIsEnabledForWrite();
-
-    sharedLock.lock();
     try
     {
-      if (isClosed)
-      {
-        return;
-      }
       writer.write(encodeRecord(record));
-      writer.flush();
     }
     catch (IOException e)
     {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e);
-    }
-    finally
-    {
-      sharedLock.unlock();
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
     }
   }
 
   private ByteString encodeRecord(final Record<K, V> record)
   {
-    final ByteString data = parser.encodeRecord(record.getKey(), record.getValue());
+    final ByteString data = parser.encodeRecord(record);
     return new ByteStringBuilder()
       .append(data.length())
       .append(data)
@@ -346,14 +241,14 @@
   }
 
   /**
-   * Dump this log as text file, intended for debugging purpose only.
+   * Dump this log file as a text file, intended for debugging purpose only.
    *
    * @param dumpFile
    *          File that will contains log records using a human-readable format
    * @throws ChangelogException
    *           If an error occurs during dump
    */
-  public void dumpAsTextFile(File dumpFile) throws ChangelogException
+  void dumpAsTextFile(File dumpFile) throws ChangelogException
   {
     DBCursor<Record<K, V>> cursor = getCursor();
     BufferedWriter textWriter = null;
@@ -364,7 +259,7 @@
       {
         Record<K, V> record = cursor.getRecord();
         textWriter.write("key=" + record.getKey());
-        textWriter.write(" -- ");
+        textWriter.write(" | ");
         textWriter.write("value=" + record.getValue());
         textWriter.write('\n');
         cursor.next();
@@ -374,7 +269,7 @@
     {
       // No I18N needed, used for debugging purpose only
       throw new ChangelogException(
-          Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e);
+          Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e);
     }
     finally
     {
@@ -392,20 +287,16 @@
    * @throws ChangelogException
    *           If the synchronization fails.
    */
-  public void syncToFileSystem() throws ChangelogException
+  void syncToFileSystem() throws ChangelogException
   {
-    exclusiveLock.lock();
+    checkLogIsEnabledForWrite();
     try
     {
       writer.sync();
     }
     catch (Exception e)
     {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e);
-    }
-    finally
-    {
-      exclusiveLock.unlock();
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
     }
   }
 
@@ -422,21 +313,9 @@
    * @throws ChangelogException
    *           If the cursor can't be created.
    */
-  public LogCursor<K, V> getCursor() throws ChangelogException
+  LogFileCursor<K, V> getCursor() throws ChangelogException
   {
-    sharedLock.lock();
-    try
-    {
-      if (isClosed)
-      {
-        return new EmptyLogCursor<K, V>();
-      }
-      return new LogFileCursor<K, V>(this);
-    }
-    finally
-    {
-      sharedLock.unlock();
-    }
+    return new LogFileCursor<K, V>(this);
   }
 
   /**
@@ -454,7 +333,7 @@
    * @throws ChangelogException
    *           If the cursor can't be created.
    */
-  public LogCursor<K, V> getCursor(final K key) throws ChangelogException
+  LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
   {
     return getCursor(key, false);
   }
@@ -476,13 +355,13 @@
    * @throws ChangelogException
    *           If the cursor can't be created.
    */
-  public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+  LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
   {
     return getCursor(key, true);
   }
 
   /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
-  private LogCursor<K, V> getCursor(final K key, boolean findNearest)
+  private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
       throws ChangelogException
   {
     if (key == null)
@@ -490,13 +369,8 @@
       return getCursor();
     }
     LogFileCursor<K, V> cursor = null;
-    sharedLock.lock();
     try
     {
-      if (isClosed)
-      {
-        return new EmptyLogCursor<K, V>();
-      }
       cursor = new LogFileCursor<K, V>(this);
       cursor.positionTo(key, findNearest);
       // if target is not found, cursor is positioned at end of stream
@@ -506,10 +380,22 @@
       StaticUtils.close(cursor);
       throw e;
     }
-    finally
-    {
-      sharedLock.unlock();
-    }
+  }
+
+  /**
+   * Returns a cursor initialised to the provided record and position in file.
+   *
+   * @param record
+   *            The initial record this cursor points on
+   * @param position
+   *            The file position this cursor points on
+   * @return the cursor
+   * @throws ChangelogException
+   *            If a problem occurs while creating the cursor.
+   */
+  LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
+  {
+    return new LogFileCursor<K, V>(this, record, position);
   }
 
   /**
@@ -520,7 +406,7 @@
    * @throws ChangelogException
    *           If an error occurs while retrieving the record.
    */
-  public Record<K, V> getOldestRecord() throws ChangelogException
+  Record<K, V> getOldestRecord() throws ChangelogException
   {
     DBCursor<Record<K, V>> cursor = null;
     try
@@ -541,7 +427,7 @@
    * @throws ChangelogException
    *           If an error occurs while retrieving the record.
    */
-  public Record<K, V> getNewestRecord() throws ChangelogException
+  Record<K, V> getNewestRecord() throws ChangelogException
   {
     // TODO : need a more efficient way to retrieve it
     DBCursor<Record<K, V>> cursor = null;
@@ -597,45 +483,58 @@
   /** {@inheritDoc} */
   public void close()
   {
-    exclusiveLock.lock();
-    try
+    if (isWriteEnabled)
     {
-      if (isClosed)
+      try
       {
-        return;
+        syncToFileSystem();
       }
+      catch (ChangelogException e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      }
+      writer.close();
+    }
+    readerPool.shutdown();
+  }
 
-      if (isWriteEnabled)
-      {
-        try
-        {
-          syncToFileSystem();
-        }
-        catch (ChangelogException e)
-        {
-          TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        }
-        writer.close();
-      }
-      readerPool.shutdown();
-      isClosed = true;
-    }
-    finally
+  /**
+   * Delete this log file (file is physically removed). Should be called only
+   * when log file is closed.
+   *
+   * @throws ChangelogException
+   *            If log file can't be deleted.
+   */
+  void delete() throws ChangelogException
+  {
+    final boolean isDeleted = logfile.delete();
+    if (!isDeleted)
     {
-      exclusiveLock.unlock();
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
     }
   }
 
+  /**
+   * Return the size of this log file in bytes.
+   *
+   * @return the size of log file
+   */
+  long getSizeInBytes()
+  {
+    return writer.getBytesWritten();
+  }
+
+  /** The path of this log file as a String. */
+  private String getPath()
+  {
+    return logfile.getPath();
+  }
+
   /** Read a record from the provided reader. */
   private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
   {
-    sharedLock.lock();
     try
     {
-      if (isClosed)
-      {
-        return null;
-      }
       final ByteString recordData = readEncodedRecord(reader);
       return recordData != null ? parser.decodeRecord(recordData) : null;
     }
@@ -643,10 +542,6 @@
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
     }
-    finally
-    {
-      sharedLock.unlock();
-    }
   }
 
   private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
@@ -672,26 +567,17 @@
     }
   }
 
-  /** Seek to provided position on the provided reader. */
+  /** Seek to given position on the provided reader. */
   private void seek(RandomAccessFile reader, long position) throws ChangelogException
   {
-    sharedLock.lock();
     try
     {
-      if (isClosed)
-      {
-        return;
-      }
       reader.seek(position);
     }
     catch (IOException e)
     {
       throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
     }
-    finally
-    {
-      sharedLock.unlock();
-    }
   }
 
   /**
@@ -706,66 +592,42 @@
 
   /** Release the provided reader. */
   private void releaseReader(RandomAccessFile reader) {
-    sharedLock.lock();
-    try
-    {
-      if (isClosed)
-      {
-        return;
-      }
-      readerPool.release(reader);
-    }
-    finally
-    {
-      sharedLock.unlock();
-    }
+    readerPool.release(reader);
   }
 
-  /**
-   * A cursor on the log.
-   */
-  static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
+
+
+  /** {@inheritDoc} */
+  @Override
+  public int hashCode()
   {
-    /**
-     * Position the cursor to the record corresponding to the provided key or to
-     * the nearest key (the lowest key higher than the provided key).
-     * <p>
-     * The record is only searched forward. To search backward, it is first
-     * necessary to call the {@code rewind()} method to start from beginning of
-     * log file.
-     *
-     * @param key
-     *          Key to use as a start position for the cursor. If key is
-     *          {@code null}, use the key of the first record instead.
-     * @param findNearest
-     *          If {@code true}, start position is the lowest key that is higher
-     *          than the provided key, otherwise start position is the provided
-     *          key.
-     * @return {@code true} if cursor is successfully positionned to the key or
-     *         the the nearest key, {@code false} otherwise.
-     * @throws ChangelogException
-     *           If an error occurs when positioning cursor.
-     */
-    boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+    return logfile.hashCode();
+  }
 
-    /**
-     * Rewind the cursor, positioning it to the beginning of the log file,
-     * pointing to no record initially.
-     *
-     * @throws ChangelogException
-     *          If an error occurs when rewinding to zero.
-     */
-    void rewind() throws ChangelogException;
+  /** {@inheritDoc} */
+  @Override
+  public boolean equals(Object that)
+  {
+    if (this == that)
+    {
+      return true;
+    }
+    if (!(that instanceof LogFile))
+    {
+      return false;
+    }
+    final LogFile<?, ?> other = (LogFile<?, ?>) that;
+    return logfile.equals(other.logfile);
   }
 
   /**
-   * Implements a cursor on the log.
+   * Implements a repositionable cursor on the log file.
    * <p>
    * The cursor initially points to a record, that is {@code cursor.getRecord()}
    * is equals to the first record available from the cursor before any call to
    * {@code cursor.next()} method.
    */
-  private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
+  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
   {
     /** The underlying log on which entries are read. */
     private final LogFile<K, V> logFile;
@@ -784,7 +646,7 @@
      * @throws ChangelogException
      *           If an error occurs when creating the cursor.
      */
-    LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
+    private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
     {
       this.logFile = logFile;
       this.reader = logFile.getReader();
@@ -800,17 +662,19 @@
       }
     }
 
-    /** {@inheritDoc} */
-    public String toString()
+    /**
+     * Creates a cursor on the provided log, initialised to the provided record and
+     * pointing to the provided file position.
+     * <p>
+     * Note: there is no check to ensure that provided record and file position are
+     * consistent. It is the responsability of the caller of this method.
+     */
+    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
     {
-      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public Record<K,V> getRecord()
-    {
-      return currentRecord;
+      this.logFile = logFile;
+      this.reader = logFile.getReader();
+      this.currentRecord = record;
+      logFile.seek(reader, filePosition);
     }
 
     /** {@inheritDoc} */
@@ -823,6 +687,13 @@
 
     /** {@inheritDoc} */
     @Override
+    public Record<K,V> getRecord()
+    {
+      return currentRecord;
+    }
+
+    /** {@inheritDoc} */
+    @Override
     public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
       do
       {
@@ -848,64 +719,35 @@
 
     /** {@inheritDoc} */
     @Override
-    public void rewind() throws ChangelogException
-    {
-      logFile.seek(reader, 0);
-      currentRecord = null;
-    }
-
-    /** {@inheritDoc} */
-    @Override
     public void close()
     {
       logFile.releaseReader(reader);
     }
-  }
 
-  /** An empty cursor, that always return null records and false to {@code next()} method. */
-  static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
-  {
-    /** {@inheritDoc} */
-    @Override
-    public Record<K,V> getRecord()
+    /**
+     * Returns the file position this cursor is pointing at.
+     *
+     * @return the position of reader on the log file
+     * @throws ChangelogException
+     *          If an error occurs.
+     */
+    long getFilePosition() throws ChangelogException
     {
-      return null;
+      try
+      {
+        return reader.getFilePointer();
+      }
+      catch (IOException e)
+      {
+        throw new ChangelogException(
+            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
+      }
     }
 
     /** {@inheritDoc} */
-    @Override
-    public boolean next()
-    {
-      return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
-    {
-      return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void rewind() throws ChangelogException
-    {
-      // nothing to do
-    }
-
-    /** {@inheritDoc} */
-    @Override
-    public void close()
-    {
-      // nothing to do
-    }
-
-    /** {@inheritDoc} */
-    @Override
     public String toString()
     {
-      return "EmptyLogCursor";
+      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
     }
-
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
index 037c048..44ce685 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -25,18 +25,14 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
-import static org.opends.server.loggers.debug.DebugLogger.*;
-
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.SyncFailedException;
-import java.util.HashMap;
-import java.util.Map;
 
+import org.opends.server.loggers.MeteredStream;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.ByteString;
 import org.opends.server.util.StaticUtils;
@@ -45,80 +41,38 @@
 
 /**
  * A writer on a log file.
- * <p>
- * The writer is cached in order to have a single writer per file in the JVM.
  */
 class LogWriter extends OutputStream
 {
-  /** The cache of log writers. There is a single writer per file in the JVM.  */
-  private static final Map<File, LogWriter> logWritersCache = new HashMap<File, LogWriter>();
-
-  /** The exclusive lock used to acquire or close a log writer. */
-  private static final Object lock = new Object();
-
   /** The file to write in. */
   private final File file;
 
-  /** The stream to write data in the file. */
-  private final BufferedOutputStream stream;
+  /** The stream to write data in the file, capable of counting bytes written. */
+  private final MeteredStream stream;
 
   /** The file descriptor on the file. */
   private final FileDescriptor fileDescriptor;
 
-  /** The number of references on this writer. */
-  private int referenceCount;
-
   /**
    * Creates a writer on the provided file.
    *
    * @param file
    *          The file to write.
-   * @param stream
-   *          The stream to write in the file.
-   * @param fileDescriptor
-   *          The descriptor on the file.
+   * @throws ChangelogException
+   *            If a problem occurs at creation.
    */
-  private LogWriter(final File file, BufferedOutputStream stream, FileDescriptor fileDescriptor)
-      throws ChangelogException
+  public LogWriter(final File file) throws ChangelogException
   {
     this.file = file;
-    this.stream = stream;
-    this.fileDescriptor = fileDescriptor;
-    this.referenceCount = 1;
-  }
-
-  /**
-   * Returns a log writer on the provided file, creating it if necessary.
-   *
-   * @param file
-   *            The log file to write in.
-   * @return the log writer
-   * @throws ChangelogException
-   *            If a problem occurs.
-   */
-  public static LogWriter acquireWriter(File file) throws ChangelogException
-  {
-    synchronized (lock)
+    try
     {
-      LogWriter logWriter = logWritersCache.get(file);
-      if (logWriter == null)
-      {
-        try
-        {
-          final FileOutputStream stream = new FileOutputStream(file, true);
-          logWriter = new LogWriter(file, new BufferedOutputStream(stream), stream.getFD());
-        }
-        catch (Exception e)
-        {
-          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
-        }
-        logWritersCache.put(file, logWriter);
-      }
-      else
-      {
-        logWriter.incrementRefCounter();
-      }
-      return logWriter;
+      FileOutputStream fos = new FileOutputStream(file, true);
+      this.stream = new MeteredStream(fos, file.length());
+      this.fileDescriptor = fos.getFD();
+    }
+    catch (Exception e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
     }
   }
 
@@ -157,11 +111,14 @@
     bs.copyTo(stream);
   }
 
-  /** {@inheritDoc} */
-  @Override
-  public void flush() throws IOException
+  /**
+   * Returns the number of bytes written in the underlying file.
+   *
+   * @return the number of bytes
+   */
+  public long getBytesWritten()
   {
-    stream.flush();
+    return stream.getBytesWritten();
   }
 
   /**
@@ -178,32 +135,7 @@
   @Override
   public void close()
   {
-    synchronized (lock)
-    {
-      LogWriter writer = logWritersCache.get(file);
-      if (writer == null)
-      {
-        // writer is already closed
-        return;
-      }
-      // counter == 0 should never happen
-      if (referenceCount == 0 || referenceCount == 1)
-      {
-        StaticUtils.close(stream);
-        logWritersCache.remove(file);
-        referenceCount = 0;
-      }
-      else
-      {
-        referenceCount--;
-      }
-    }
+    StaticUtils.close(stream);
   }
 
-  private void incrementRefCounter()
-  {
-    referenceCount++;
-  }
-
-
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java b/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
index 94e65de..7009636 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
@@ -25,6 +25,7 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
+import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.ByteString;
 
 /**
@@ -40,7 +41,6 @@
  */
 interface RecordParser<K, V>
 {
-
   /**
    * Decode a record from the provided byte array.
    * <p>
@@ -57,16 +57,45 @@
   Record<K, V> decodeRecord(ByteString data) throws DecodingException;
 
   /**
-   * Encode the provided key and value to a byte array.
+   * Encode the provided record to a byte array.
    * <p>
    * The returned array is intended to be stored as provided in the log file.
    *
-   * @param key
-   *          The key of the record.
-   * @param value
-   *          The value of the record.
+   * @param record
+   *          The record to encode.
    * @return the bytes array representing the (key,value) record
    */
-  ByteString encodeRecord(K key, V value);
+  ByteString encodeRecord(Record<K, V> record);
+
+  /**
+   * Read the key from the provided string.
+   *
+   * @param key
+   *          The string representation of key, suitable for use in a filename,
+   *          as written by the {@code encodeKeyToString()} method.
+   * @return the key
+   * @throws ChangelogException
+   *           If key can't be read from the string.
+   */
+  K decodeKeyFromString(String key) throws ChangelogException;
+
+  /**
+   * Returns the provided key as a string that is suitable to be used in a
+   * filename.
+   *
+   * @param key
+   *          The key of a record.
+   * @return a string encoding the key, unambiguously decodable to the original
+   *         key, and suitable for use in a filename. The string should contain
+   *         only ASCII characters and no space.
+   */
+  String encodeKeyToString(K key);
+
+  /**
+   * Returns a key that is guaranted to be always higher than any other key.
+   *
+   * @return the highest possible key
+   */
+  K getMaxKey();
 
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
index 78d4686..ee69dd7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
@@ -55,9 +55,7 @@
 import org.opends.server.types.DirectoryException;
 import org.opends.server.util.StaticUtils;
 
-import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
 import static org.opends.messages.ReplicationMessages.*;
 
 /**
@@ -82,7 +80,9 @@
  * id of the server. Each directory contains the log files for the given server
  * id.</li>
  * </ul>
- * All log files end with the ".log" suffix.
+ * All log files end with the ".log" suffix. Log files always include the "head.log"
+ * file and optionally zero to many read-only log files named after the lowest key
+ * and highest key present in the log file.
  * <p>
  * Layout example with two domains "o=test1" and "o=test2", each having server
  * ids 22 and 33 :
@@ -91,25 +91,29 @@
  * +---changelog
  * |   \---domains.state  [contains mapping: 1 => "o=test1", 2 => "o=test2"]
  * |   \---changenumberindex
- * |      \--- current.log
+ * |      \--- head.log [contains last records written]
+ * |      \--- 1_50.log [contains records with keys in interval [1, 50]]
  * |   \---1.domain
  * |       \---generation1.id
  * |       \---22.server
- * |           \---current.log
+ * |           \---head.log
  * |       \---33.server
- * |           \---current.log
+ * |           \---head.log
  * |   \---2.domain
  * |       \---generation1.id
  * |       \---22.server
- * |           \---current.log
+ * |           \---head.log
  * |       \---33.server
- * |           \---current.log
+ * |           \---head.log
  * </pre>
  */
 class ReplicationEnvironment
 {
   private static final DebugTracer TRACER = getTracer();
 
+  // TODO : to replace by configurable value
+  private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024;
+
   private static final int NO_GENERATION_ID = -1;
 
   private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
@@ -159,7 +163,7 @@
   private final String replicationRootPath;
 
   /** The list of logs that are in use. */
-  private final List<LogFile<?, ?>> logs = new CopyOnWriteArrayList<LogFile<?, ?>>();
+  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
 
   /** Maps each domain DN to a domain id that is used to name directory in file system. */
   private final Map<DN, String> domains = new HashMap<DN, String>();
@@ -228,7 +232,7 @@
    * @throws ChangelogException
    *           if an error occurs.
    */
-  LogFile<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
+  Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
       throws ChangelogException
   {
     if (debugEnabled())
@@ -276,7 +280,7 @@
    * @throws ChangelogException
    *           when a problem occurs.
    */
-  LogFile<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
+  Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
   {
     final File path = getCNIndexDBPath();
     try
@@ -305,24 +309,6 @@
   }
 
   /**
-   * Clears the content of replication database.
-   *
-   * @param log
-   *          The log to clear.
-   */
-  void clearDB(final LogFile<?, ?> log)
-  {
-    try
-    {
-      log.clear();
-    }
-    catch (ChangelogException e)
-    {
-      logError(ERR_ERROR_CLEARING_DB.get(log.getName(), stackTraceToSingleLineString(e)));
-    }
-  }
-
-  /**
    * Clears the generated id associated to the provided domain DN from the state
    * Db.
    * <p>
@@ -506,12 +492,12 @@
   }
 
   /** Open a log from the provided path and record parser. */
-  private <K extends Comparable<K>, V> LogFile<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
+  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
       throws ChangelogException
   {
     checkShutDownBeforeOpening(serverIdPath);
 
-    final LogFile<K, V> log = LogFile.newAppendableLogFile(serverIdPath, parser);
+    final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
 
     checkShutDownAfterOpening(serverIdPath, log);
 
@@ -519,11 +505,11 @@
     return log;
   }
 
-  private void checkShutDownAfterOpening(final File serverIdPath, final LogFile<?, ?> log) throws ChangelogException
+  private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException
   {
     if (isShuttingDown.get())
     {
-      closeDB(log);
+      closeLog(log);
       throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
           replicationServer.getServerId()));
     }
@@ -590,7 +576,7 @@
     return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
   }
 
-  private void closeDB(final LogFile<?, ?> log)
+  private void closeLog(final Log<?, ?> log)
   {
     logs.remove(log);
     log.close();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
index bc3aee3..54ea0c8 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/TestCaseUtils.java
@@ -1101,6 +1101,13 @@
     return new File(testResourceDir, filename);
   }
 
+  public static File getUnitTestRootPath()
+  {
+    final String buildRoot = System.getProperty(PROPERTY_BUILD_ROOT);
+    final String path = System.getProperty(PROPERTY_BUILD_DIR, buildRoot + File.separator + "build");
+    return new File(path, "unit-tests");
+  }
+
   /**
    * Prevent instantiation.
    */
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
index 5fc8214..c56c37f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ChangeNumberControlPluginTestCase.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2008 Sun Microsystems, Inc.
- *      Portions copyright 2013 ForgeRock AS
+ *      Portions Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication;
 
@@ -79,6 +79,7 @@
         + "cn: Replication Server\n"
         + "ds-cfg-replication-port: " + replServerPort + "\n"
         + "ds-cfg-replication-db-directory: ChangeNumberControlDbTest\n"
+        + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
         + "ds-cfg-replication-server-id: 103\n";
 
     // suffix synchronized
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
index bb657c8..96043fe 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ReSyncTest.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2007-2009 Sun Microsystems, Inc.
- *      Portions copyright 2013 ForgeRock AS
+ *      Portions Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication;
 
@@ -100,6 +100,7 @@
         + "cn: Replication Server\n"
         + "ds-cfg-replication-port:" + replServerPort + "\n"
         + "ds-cfg-replication-db-directory: ReSyncTest\n"
+        + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
         + "ds-cfg-replication-server-id: 104\n";
 
     // suffix synchronized
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
index 16926d6..30d4c87 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/HistoricalTest.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2008-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.plugin;
 
@@ -81,6 +81,7 @@
          + "cn: replication Server\n"
          + "ds-cfg-replication-port: " + replServerPort + "\n"
          + "ds-cfg-replication-db-directory: HistoricalTest\n"
+         + "ds-cfg-replication-db-implementation: " + replicationDbImplementation + "\n"
          + "ds-cfg-replication-server-id: 102\n";
 
     // The suffix to be synchronized.
@@ -489,7 +490,7 @@
 
     addEntriesWithHistorical(1, entryCnt);
     // leave a little delay between adding/modifying test entries
-    // and configuring the purge delay. 
+    // and configuring the purge delay.
     Thread.sleep(10);
 
     // set the purge delay to 1 minute
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
index 2873fd4..fe5428e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -80,7 +80,7 @@
   {
     RecordParser<Long, ChangeNumberIndexRecord> parser = FileChangeNumberIndexDB.RECORD_PARSER;
 
-    ByteString data = parser.encodeRecord(msg.getChangeNumber(), msg);
+    ByteString data = parser.encodeRecord(Record.from(msg.getChangeNumber(), msg));
     Record<Long, ChangeNumberIndexRecord> record = parser.decodeRecord(data);
 
     assertThat(record).isNotNull();
@@ -169,7 +169,10 @@
    * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
    * </ol>
    */
-  // TODO :: enable when purge is implemented with multi-files log
+  // TODO : this works only if we ensure that there is a rotation of ahead log file
+  // at the right place. First two records are 37 and 76 bytes long,
+  // so it means : 37 < max file size < 113 to have the last record alone in the ahead log file
+  // Re-enable this test when max file size is customizable for log
   @Test(enabled=false)
   public void testPurge() throws Exception
   {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index 9f7eca7..e70966b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -95,7 +95,7 @@
   {
     RecordParser<CSN, UpdateMsg> parser = FileReplicaDB.RECORD_PARSER;
 
-    ByteString data = parser.encodeRecord(msg.getCSN(), msg);
+    ByteString data = parser.encodeRecord(Record.from(msg.getCSN(), msg));
     Record<CSN, UpdateMsg> record = parser.decodeRecord(data);
 
     assertThat(record).isNotNull();
@@ -264,7 +264,10 @@
     }
   }
 
-  // TODO : enable when purge is enabled with multi-files log implementation
+  // TODO : this works only if we ensure that there is a rotation of ahead log file
+  // at right place. Each record takes 54 bytes, so it means : 108 < max file size < 162 to have
+  // the last record alone in the ahead log file
+  // Re-enable this test when max file size is customizable for log
   @Test(enabled=false)
   public void testPurge() throws Exception
   {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
index 0a0c37c..524d95a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -35,7 +35,8 @@
 import org.opends.server.types.ByteString;
 import org.opends.server.types.ByteStringBuilder;
 import org.opends.server.util.StaticUtils;
-import org.testng.annotations.AfterMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -45,11 +46,11 @@
 @Test(sequential=true)
 public class LogFileTest extends DirectoryServerTestCase
 {
-  private static final String TEST_DIRECTORY_CHANGELOG = "test-output/changelog";
+  private static final String TEST_DIRECTORY_CHANGELOG = "test-output" + File.separator + "changelog";
 
-  private static final StringRecordParser RECORD_PARSER = new StringRecordParser();
+  static final StringRecordParser RECORD_PARSER = new StringRecordParser();
 
-  private static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
+  static final RecordParser<String,String> RECORD_PARSER_FAILING_TO_READ = new StringRecordParser() {
       @Override
       public Record<String, String> decodeRecord(ByteString data) throws DecodingException
       {
@@ -57,11 +58,18 @@
       }
   };
 
+  @BeforeClass
+  public void createTestDirectory()
+  {
+    File logDir = new File(TEST_DIRECTORY_CHANGELOG);
+    logDir.mkdirs();
+  }
+
   @BeforeMethod
   /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */
   public void initialize() throws Exception
   {
-    File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, LogFile.LOG_FILE_NAME);
+    File theLogFile = new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME);
     if (theLogFile.exists())
     {
       theLogFile.delete();
@@ -70,12 +78,12 @@
 
     for (int i = 1; i <= 10; i++)
     {
-      logFile.addRecord("key"+i, "value"+i);
+      logFile.append(Record.from("key"+i, "value"+i));
     }
     logFile.close();
   }
 
-  @AfterMethod
+  @AfterClass
   public void cleanTestChangelogDirectory()
   {
     final File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
@@ -87,8 +95,7 @@
 
   private LogFile<String, String> getLogFile(RecordParser<String, String> parser) throws ChangelogException
   {
-    LogFile<String, String> logFile = LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG), parser);
-    return logFile;
+    return LogFile.newAppendableLogFile(new File(TEST_DIRECTORY_CHANGELOG, Log.HEAD_LOG_FILE_NAME), parser);
   }
 
   @Test
@@ -266,7 +273,7 @@
       for (int i = 1; i <= 100; i++)
       {
         Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
-        writeLog.addRecord(record);
+        writeLog.append(record);
         assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
         assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
         assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
@@ -279,35 +286,6 @@
     }
   }
 
-  @Test()
-  public void testTwoConcurrentWrite() throws Exception
-  {
-    final LogFile<String, String> writeLog1 = getLogFile(RECORD_PARSER);
-    final LogFile<String, String> writeLog2 = getLogFile(RECORD_PARSER);
-    try
-    {
-      writeLog1.addRecord(Record.from("startkey", "startvalue"));
-      Thread write1 = getWriteLogThread(writeLog1, "a");
-      Thread write2 = getWriteLogThread(writeLog2, "b");
-      write1.run();
-      write2.run();
-
-      write1.join();
-      write2.join();
-      writeLog1.syncToFileSystem();
-      DBCursor<Record<String, String>> cursor = writeLog1.getCursor("startkey");
-      for (int i = 1; i <= 200; i++)
-      {
-         assertThat(cursor.next()).isTrue();
-      }
-      assertThat(cursor.getRecord()).isIn(Record.from("k-b100", "v-b100"), Record.from("k-a100", "v-a100"));
-    }
-    finally
-    {
-      StaticUtils.close(writeLog1, writeLog2);
-    }
-  }
-
   /**
    * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
    * endIndex, using (keyN, valueN) where N is the index.
@@ -320,30 +298,13 @@
       assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
       assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
     }
-    assertThat(cursor.next()).isFalse();
-    assertThat(cursor.getRecord()).isNull();
+    assertThatCursorIsExhausted(cursor);
   }
 
-  /** Returns a thread that write 100 records to the provided log. */
-  private Thread getWriteLogThread(final LogFile<String, String> writeLog, final String recordPrefix)
+  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
   {
-    return new Thread() {
-      public void run()
-      {
-        for (int i = 1; i <= 100; i++)
-        {
-          Record<String, String> record = Record.from("k-" + recordPrefix + i, "v-" + recordPrefix + i);
-          try
-          {
-            writeLog.addRecord(record);
-          }
-          catch (ChangelogException e)
-          {
-            e.printStackTrace();
-          }
-        }
-      }
-    };
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
   }
 
   /**
@@ -374,11 +335,31 @@
       return length;
     }
 
-    public ByteString encodeRecord(String key, String value)
+    public ByteString encodeRecord(Record<String, String> record)
     {
       return new ByteStringBuilder()
-        .append(key).append(STRING_SEPARATOR)
-        .append(value).append(STRING_SEPARATOR).toByteString();
+        .append(record.getKey()).append(STRING_SEPARATOR)
+        .append(record.getValue()).append(STRING_SEPARATOR).toByteString();
+    }
+
+    @Override
+    public String decodeKeyFromString(String key) throws ChangelogException
+    {
+      return key;
+    }
+
+    @Override
+    public String encodeKeyToString(String key)
+    {
+      return key;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String getMaxKey()
+    {
+      // '~' character has the highest ASCII value
+      return "~~~~";
     }
   }
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
new file mode 100644
index 0000000..321d256
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -0,0 +1,571 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+@SuppressWarnings("javadoc")
+@Test(sequential=true)
+public class LogTest extends DirectoryServerTestCase
+{
+  // Use a directory dedicated to this test class
+  private static final File LOG_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+  @BeforeMethod
+  public void initialize() throws Exception
+  {
+    // Delete any previous log
+    if (LOG_DIRECTORY.exists())
+    {
+      StaticUtils.recursiveDelete(LOG_DIRECTORY);
+    }
+
+    // Build a log with 10 records with String keys and String values
+    // Keys are using the format keyNNN where N is a figure
+    // You should always ensure keys are correctly ordered otherwise tests may break unexpectedly
+    Log<String, String> log = openLog(RECORD_PARSER);
+    for (int i = 1; i <= 10; i++)
+    {
+      log.append(Record.from(String.format("key%03d", i), "value" + i));
+    }
+    log.close();
+  }
+
+  private Log<String, String> openLog(RecordParser<String, String> parser) throws ChangelogException
+  {
+    // Each string record has a length of approximately 18 bytes
+    // This size is set in order to have 2 records per log file before the rotation happens
+    // This allow to ensure rotation mechanism is thoroughly tested
+    // Some tests rely on having 2 records per log file (especially the purge tests), so take care
+    // if this value has to be changed
+    int sizeLimitPerFileInBytes = 30;
+
+    return Log.openLog(LOG_DIRECTORY, parser, sizeLimitPerFileInBytes);
+  }
+
+  @Test
+  public void testCursor() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor();
+
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+      assertThatCursorCanBeFullyRead(cursor, 2, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorWhenGivenAnExistingKey() throws Exception
+  {
+    Log<String, String> log = openLog(RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor("key005");
+
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key005", "value5"));
+      assertThatCursorCanBeFullyRead(cursor, 6, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorWhenGivenAnUnexistingKey() throws Exception
+  {
+    Log<String, String> log = openLog(RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      // key is between key005 and key006
+      cursor = log.getCursor("key005000");
+
+      assertThat(cursor).isNotNull();
+      assertThat(cursor.getRecord()).isNull();
+      assertThat(cursor.next()).isFalse();
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testCursorWhenGivenANullKey() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getCursor(null);
+
+      // should start from first record
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+      assertThatCursorCanBeFullyRead(cursor, 2, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testNearestCursorWhenGivenAnExistingKey() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null;
+    try {
+      // this key is the first key of the log file "key1_key2.log"
+      cursor1 = log.getNearestCursor("key001");
+      // lowest higher key is key2
+      assertThat(cursor1.getRecord()).isEqualTo(Record.from("key002", "value2"));
+      assertThatCursorCanBeFullyRead(cursor1, 3, 10);
+
+      // this key is the last key of the log file "key3_key4.log"
+      cursor2 = log.getNearestCursor("key004");
+      // lowest higher key is key5
+      assertThat(cursor2.getRecord()).isEqualTo(Record.from("key005", "value5"));
+      assertThatCursorCanBeFullyRead(cursor2, 6, 10);
+
+      cursor3 = log.getNearestCursor("key009");
+      // lowest higher key is key10
+      assertThat(cursor3.getRecord()).isEqualTo(Record.from("key010", "value10"));
+      assertThatCursorIsExhausted(cursor3);
+    }
+    finally {
+      StaticUtils.close(cursor1, cursor2, cursor3, log);
+    }
+  }
+
+  @Test
+  public void testNearestCursorWhenGivenAnExistingKey_KeyIsTheLastOne() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getNearestCursor("key010");
+
+      // lowest higher key does not exist
+      assertThatCursorIsExhausted(cursor);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      // key is between key005 and key006
+      cursor = log.getNearestCursor("key005000");
+
+      // lowest higher key is key006
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key006", "value6"));
+      assertThatCursorCanBeFullyRead(cursor, 7, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testNearestCursorWhenGivenANullKey() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    DBCursor<Record<String, String>> cursor = null;
+    try {
+      cursor = log.getNearestCursor(null);
+
+      // should start from start
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+      assertThatCursorCanBeFullyRead(cursor, 2, 10);
+    }
+    finally {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test(expectedExceptions=ChangelogException.class)
+  public void testCursorWhenParserFailsToRead() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER_FAILING_TO_READ);
+    try {
+      log.getCursor("key");
+    }
+    finally {
+      StaticUtils.close(log);
+    }
+  }
+
+  @Test
+  public void testGetOldestRecord() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    try
+    {
+      Record<String, String> record = log.getOldestRecord();
+
+      assertThat(record).isEqualTo(Record.from("key001", "value1"));
+    }
+    finally {
+      StaticUtils.close(log);
+    }
+  }
+
+  @Test
+  public void testGetNewestRecord() throws Exception
+  {
+    Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
+    try
+    {
+      Record<String, String> record = log.getNewestRecord();
+
+      assertThat(record).isEqualTo(Record.from("key010", "value10"));
+    }
+    finally {
+      StaticUtils.close(log);
+    }
+  }
+
+  /**
+   * Test that changes are visible immediately to a reader after a write.
+   */
+  @Test
+  public void testWriteAndReadOnSameLog() throws Exception
+  {
+    Log<String, String> writeLog = null;
+    Log<String, String> readLog = null;
+    try
+    {
+      writeLog = openLog(LogFileTest.RECORD_PARSER);
+      readLog = openLog(LogFileTest.RECORD_PARSER);
+
+      for (int i = 1; i <= 10; i++)
+      {
+        Record<String, String> record = Record.from(String.format("nkey%03d", i), "nvalue" + i);
+        writeLog.append(record);
+        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
+        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key001", "value1"));
+        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
+        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key001", "value1"));
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writeLog, readLog);
+    }
+  }
+
+  @Test
+  public void testTwoConcurrentWrite() throws Exception
+  {
+    Log<String, String> writeLog1 = null;
+    Log<String, String> writeLog2 = null;
+    DBCursor<Record<String, String>> cursor = null;
+    try
+    {
+      writeLog1 = openLog(LogFileTest.RECORD_PARSER);
+      writeLog2 = openLog(LogFileTest.RECORD_PARSER);
+      writeLog1.append(Record.from("key020", "starting record"));
+      AtomicReference<ChangelogException> exceptionRef = new AtomicReference<ChangelogException>();
+      Thread write1 = getWriteLogThread(writeLog1, "a", exceptionRef);
+      Thread write2 = getWriteLogThread(writeLog2, "b", exceptionRef);
+      write1.run();
+      write2.run();
+
+      write1.join();
+      write2.join();
+      if (exceptionRef.get() != null)
+      {
+        throw exceptionRef.get();
+      }
+      writeLog1.syncToFileSystem();
+      cursor = writeLog1.getCursor("key020");
+      for (int i = 1; i <= 60; i++)
+      {
+         assertThat(cursor.next()).isTrue();
+      }
+      assertThat(cursor.getRecord()).isIn(Record.from("nkb030", "vb30"), Record.from("nka030", "va30"));
+    }
+    finally
+    {
+      StaticUtils.close(cursor, writeLog1, writeLog2);
+    }
+  }
+
+  /**
+   *  This test should be disabled.
+   *  Enable it locally when you need to have an rough idea of write performance.
+   */
+  @Test(enabled=false)
+  public void logWriteSpeed() throws Exception
+  {
+    Log<String, String> writeLog = null;
+    try
+    {
+      long sizeOf1MB = 1024*1024;
+      writeLog = Log.openLog(LOG_DIRECTORY, LogFileTest.RECORD_PARSER, sizeOf1MB);
+
+      for (int i = 1; i < 1000000; i++)
+      {
+        writeLog.append(Record.from(String.format("key%010d", i), "value" + i));
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writeLog);
+    }
+  }
+
+  @Test
+  public void testWriteWhenCursorIsOpenedAndAheadLogFileIsRotated() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      cursor = log.getCursor();
+      // advance cursor to last record to ensure it is pointing to ahead log file
+      advanceCursorFromFirstRecordTo(cursor, 10);
+
+      // add new records to ensure the ahead log file is rotated
+      for (int i = 11; i <= 20; i++)
+      {
+        log.append(Record.from(String.format("key%03d", i), "value" + i));
+      }
+
+      // check that cursor can fully read the new records
+      assertThatCursorCanBeFullyRead(cursor, 11, 20);
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @Test
+  public void testWriteWhenMultiplesCursorsAreOpenedAndAheadLogFileIsRotated() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null, cursor4 = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      cursor1 = log.getCursor();
+      advanceCursorFromFirstRecordTo(cursor1, 1);
+      cursor2 = log.getCursor();
+      advanceCursorFromFirstRecordTo(cursor2, 4);
+      cursor3 = log.getCursor();
+      advanceCursorFromFirstRecordTo(cursor3, 9);
+      cursor4 = log.getCursor();
+      advanceCursorFromFirstRecordTo(cursor4, 10);
+
+      // add new records to ensure the ahead log file is rotated
+      for (int i = 11; i <= 20; i++)
+      {
+        log.append(Record.from(String.format("key%03d", i), "value" + i));
+      }
+
+      // check that cursors can fully read the new records
+      assertThatCursorCanBeFullyRead(cursor1, 2, 20);
+      assertThatCursorCanBeFullyRead(cursor2, 5, 20);
+      assertThatCursorCanBeFullyRead(cursor3, 10, 20);
+      assertThatCursorCanBeFullyRead(cursor4, 11, 20);
+    }
+    finally
+    {
+      StaticUtils.close(cursor1, cursor2, cursor3, cursor4, log);
+    }
+  }
+
+  @Test
+  public void testClear() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      log.clear();
+
+      cursor = log.getCursor();
+      assertThatCursorIsExhausted(cursor);
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  // TODO : Should be re-enabled once the issue with robot functional test replication/totalupdate.txt is solved
+  @Test(enabled=false, expectedExceptions=ChangelogException.class)
+  public void testClearWhenCursorIsOpened() throws Exception
+  {
+    DBCursor<Record<String, String>> cursor = null;
+    Log<String, String> log = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+      cursor = log.getCursor();
+      log.clear();
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  @DataProvider(name = "purgeKeys")
+  Object[][] purgeKeys()
+  {
+    // purge key, first record expected in the cursor, startIndex + endIndex to fully read the cursor
+    return new Object[][]
+    {
+      // lowest key of the read-only log file "key005_key006.log"
+      { "key005", Record.from("key005", "value5"), 6, 10},
+      // key that is not the lowest of the read-only log file "key005_key006.log"
+      { "key006", Record.from("key005", "value5"), 6, 10},
+      // lowest key of the ahead log file "ahead.log"
+      { "key009", Record.from("key009", "value9"), 10, 10},
+      // key that is not the lowest of the ahead log file "ahead.log"
+      { "key010", Record.from("key009", "value9"), 10, 10},
+
+      // key not present in log, which is between key005 and key006
+      { "key005a", Record.from("key005", "value5"), 6, 10},
+      // key not present in log, which is between key006 and key007
+      { "key006a", Record.from("key007", "value7"), 8, 10},
+      // key not present in log, which is lower than oldest key key001
+      { "key000", Record.from("key001", "value1"), 2, 10},
+      // key not present in log, which is higher than newest key key010
+      // should return the lowest key present in ahead log
+      { "key011", Record.from("key009", "value9"), 10, 10},
+    };
+  }
+
+  /**
+   * Given a purge key, after purge is done, expects a new cursor to point on first record provided and
+   * then to be fully read starting at provided start index and finishing at provided end index.
+   */
+  @Test(dataProvider="purgeKeys")
+  public void testPurge(String purgeKey, Record<String,String> firstRecordExpectedAfterPurge,
+      int cursorStartIndex, int cursorEndIndex) throws Exception
+  {
+    Log<String, String> log = null;
+    DBCursor<Record<String, String>> cursor = null;
+    try
+    {
+      log = openLog(LogFileTest.RECORD_PARSER);
+
+      log.purgeUpTo(purgeKey);
+
+      cursor = log.getCursor();
+      assertThat(cursor.getRecord()).isEqualTo(firstRecordExpectedAfterPurge);
+      assertThatCursorCanBeFullyRead(cursor, cursorStartIndex, cursorEndIndex);
+    }
+    finally
+    {
+      StaticUtils.close(cursor, log);
+    }
+  }
+
+  private void advanceCursorFromFirstRecordTo(DBCursor<Record<String, String>> cursor, int endIndex)
+      throws Exception
+  {
+    assertThat(cursor.getRecord()).isEqualTo(Record.from("key001", "value1"));
+    advanceCursorUpTo(cursor, 2, endIndex);
+  }
+
+  private void advanceCursorUpTo(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+      throws Exception
+  {
+    for (int i = fromIndex; i <= endIndex; i++)
+    {
+      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%03d", i), "value" + i));
+    }
+  }
+
+  /**
+   * Read the cursor until exhaustion, ensuring that its first value is fromIndex and its last value
+   * endIndex, using (keyN, valueN) where N is the index.
+   */
+  private void assertThatCursorCanBeFullyRead(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex)
+      throws Exception
+  {
+    advanceCursorUpTo(cursor, fromIndex, endIndex);
+    assertThatCursorIsExhausted(cursor);
+  }
+
+  private void assertThatCursorIsExhausted(DBCursor<Record<String, String>> cursor) throws Exception
+  {
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  /** Returns a thread that write N records to the provided log. */
+  private Thread getWriteLogThread(final Log<String, String> writeLog, final String recordPrefix,
+      final AtomicReference<ChangelogException> exceptionRef)
+  {
+    return new Thread() {
+      public void run()
+      {
+        for (int i = 1; i <= 30; i++)
+        {
+          Record<String, String> record = Record.from(
+              String.format("nk%s%03d", recordPrefix, i), "v" + recordPrefix + i);
+          try
+          {
+            writeLog.append(record);
+          }
+          catch (ChangelogException e)
+          {
+            // keep the first exception only
+            exceptionRef.compareAndSet(null, e);
+          }
+        }
+      }
+    };
+  }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
index 06ab151..8f3e173 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironmentTest.java
@@ -80,14 +80,15 @@
     final DN domainDN = DN.decode(DN1_AS_STRING);
 
     ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
-    LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
-    LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
-    LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
+    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
+    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
     StaticUtils.close(cnDB, replicaDB, replicaDB2);
 
     ChangelogState state = environment.readChangelogState();
 
-    assertThat(state.getDomainToServerIds()).containsExactly(MapEntry.entry(domainDN, Arrays.asList(1, 2)));
+    assertThat(state.getDomainToServerIds()).containsKeys(domainDN);
+    assertThat(state.getDomainToServerIds().get(domainDN)).containsOnly(1, 2);
     assertThat(state.getDomainToGenerationId()).containsExactly(MapEntry.entry(domainDN, 1L));
   }
 
@@ -98,8 +99,8 @@
     List<DN> domainDNs = Arrays.asList(DN.decode(DN1_AS_STRING), DN.decode(DN2_AS_STRING), DN.decode(DN3_AS_STRING));
 
     ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
-    LogFile<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
-    List<LogFile<CSN,UpdateMsg>> replicaDBs = new ArrayList<LogFile<CSN,UpdateMsg>>();
+    Log<Long,ChangeNumberIndexRecord> cnDB = environment.getOrCreateCNIndexDB();
+    List<Log<CSN,UpdateMsg>> replicaDBs = new ArrayList<Log<CSN,UpdateMsg>>();
     for (int i = 0; i <= 2 ; i++)
     {
       for (int j = 1; j <= 10; j++)
@@ -129,8 +130,8 @@
     File rootPath = new File(TEST_DIRECTORY_CHANGELOG);
     DN domainDN = DN.decode(DN1_AS_STRING);
     ReplicationEnvironment environment = new ReplicationEnvironment(rootPath.getAbsolutePath(), null);
-    LogFile<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
-    LogFile<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
+    Log<CSN,UpdateMsg> replicaDB = environment.getOrCreateReplicaDB(domainDN, 1, 1);
+    Log<CSN,UpdateMsg> replicaDB2 = environment.getOrCreateReplicaDB(domainDN, 2, 1);
     StaticUtils.close(replicaDB, replicaDB2);
 
     // delete the domain directory created for the 2 replica DBs to break the

--
Gitblit v1.10.0