From 3c931b0f1ba72ce655f1fe03295aff77b4bfcf38 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 12 Jun 2014 15:08:37 +0000
Subject: [PATCH] OPENDJ-1472 : File based changelog : optimize random seek in each log file CR-3727

---
 opends/src/server/org/opends/server/types/ByteStringBuilder.java                                                            |   31 +
 opends/src/server/org/opends/server/util/StaticUtils.java                                                                   |   32 
 opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java                                    |   27 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java |  490 ++++++++++++++++++
 opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java                                   |  536 ++++++++++++++++++++
 opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java                                          |  155 -----
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java              |   37 
 opends/src/server/org/opends/server/replication/server/changelog/file/Log.java                                              |    2 
 opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java                                        |   10 
 opends/src/messages/messages/replication.properties                                                                         |    4 
 opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java                                   |  214 ++++++++
 11 files changed, 1,364 insertions(+), 174 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index cb27792..844c5c1 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -612,4 +612,6 @@
 SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
  replica offline state file '%s' for domain %s
 SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
- offline state file '%s' for domain %s and server id %d
\ No newline at end of file
+ offline state file '%s' for domain %s and server id %d
+SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \
+ file '%s'
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
new file mode 100644
index 0000000..5aedc91
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -0,0 +1,536 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * A log reader with binary search support.
+ * <p>
+ * The log file contains record offsets at fixed block size : given a block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ * <p>
+ * The reader provides both sequential access, using the {@code readRecord()} method,
+ * and reasonably fast random access, using the {@code seek(K, boolean)} method.
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+class BlockLogReader<K extends Comparable<K>, V> implements Closeable
+{
+  static final int SIZE_OF_BLOCK_OFFSET = 4;
+
+  static final int SIZE_OF_RECORD_SIZE = 4;
+
+  /**
+   * Size of a block, after which an offset to the nearest record is written.
+   * <p>
+   * This value has been fixed based on performance tests. See
+   * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
+   * OPENDJ-1472</a> for details.
+   */
+  static final int BLOCK_SIZE = 256;
+
+  private final int blockSize;
+
+  private final RecordParser<K, V> parser;
+
+  private final RandomAccessFile reader;
+
+  private final File file;
+
+  /**
+   * Creates a reader for the provided file, file reader and parser.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param file
+   *          The log file to read.
+   * @param reader
+   *          The random access reader on the log file.
+   * @param parser
+   *          The parser to decode the records read.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
+      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
+  {
+    return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
+  }
+
+  /**
+   * Creates a reader for the provided file, file reader, parser and block size.
+   * <p>
+   * This method is intended for tests only, to allow tuning of the block size.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param file
+   *          The log file to read.
+   * @param reader
+   *          The random access reader on the log file.
+   * @param parser
+   *          The parser to decode the records read.
+   * @param blockSize
+   *          The size of each block, or frequency at which the record offset is
+   *          present in the log file.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
+      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
+  {
+    return new BlockLogReader<K, V>(file, reader, parser, blockSize);
+  }
+
+  private BlockLogReader(
+      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
+  {
+    this.file = file;
+    this.reader = reader;
+    this.parser = parser;
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Position the reader to the record corresponding to the provided key or to
+   * the nearest key (the lowest key higher than the provided key), and returns
+   * the last record read.
+   *
+   * @param key
+   *          Key to use as a start position. Key must not be {@code null}.
+   * @param findNextRecord
+   *          If {@code true}, start position is the lowest key that is higher
+   *          than the provided key, otherwise start position is the provided
+   *          key.
+   * @return The pair (key_found, last_record_read). key_found is a boolean
+   *         indicating if reader is successfully positioned to the key or the
+   *         nearest key. last_record_read is the last record that was read.
+   *         When key_found is equals to {@code false}, then last_record_read is
+   *         always {@code null}. When key_found is equals to {@code true},
+   *         last_record_read can be valued or be {@code null}
+   * @throws ChangelogException
+   *           If an error occurs when seeking the key.
+   */
+  public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException
+  {
+    final long markerPosition = searchClosestBlockStartToKey(key);
+    if (markerPosition >= 0)
+    {
+      return positionToKeySequentially(markerPosition, key, findNextRecord);
+    }
+    return Pair.of(false, null);
+  }
+
+  /**
+   * Position the reader to the provided file position.
+   *
+   * @param filePosition
+   *            offset from the beginning of the file, in bytes.
+   * @throws ChangelogException
+   *            If an error occurs.
+   */
+  public void seekToPosition(final long filePosition) throws ChangelogException
+  {
+    try
+    {
+      reader.seek(filePosition);
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
+    }
+  }
+
+  /**
+   * Read a record from current position.
+   *
+   * @return the record read
+   * @throws ChangelogException
+   *            If an error occurs during read.
+   */
+  public Record<K,V> readRecord() throws ChangelogException
+  {
+    return readRecord(-1);
+  }
+
+  /**
+   * Returns the file position for this reader.
+   *
+   * @return the position of reader on the log file
+   * @throws ChangelogException
+   *          If an error occurs.
+   */
+  public long getFilePosition() throws ChangelogException
+  {
+    try
+    {
+      return reader.getFilePointer();
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(
+          ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close() throws IOException
+  {
+    reader.close();
+  }
+
+  /**
+   * Read a record, either from the provided start of block position or from
+   * the current position.
+   *
+   * @param blockStartPosition
+   *          The position of start of block, where a record offset is written.
+   *          If provided value is -1, then record is read from current position instead.
+   * @return the record read
+   * @throws ChangelogException
+   *           If an error occurs during read.
+   */
+  private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
+  {
+    try
+    {
+      final ByteString recordData = blockStartPosition == -1 ?
+          readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
+      return recordData != null ? parser.decodeRecord(recordData) : null;
+    }
+    catch (IOException io)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
+    }
+    catch (DecodingException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
+    }
+  }
+
+  /**
+   * Reads the first record found after the provided file position of block
+   * start.
+   *
+   * @param blockStartPosition
+   *          Position of read pointer in the file, expected to be the start of
+   *          a block where a record offset is written.
+   * @return the record read
+   * @throws IOException
+   *           If an error occurs during read.
+   */
+  private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
+  {
+    reader.seek(blockStartPosition);
+    if (blockStartPosition > 0)
+    {
+      final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
+      reader.readFully(offsetData);
+      final int offsetToRecord = ByteString.wrap(offsetData).toInt();
+      if (offsetToRecord > 0)
+      {
+        reader.seek(blockStartPosition - offsetToRecord);
+      } // if offset is zero, reader is already well positioned
+    }
+    return readNextRecord();
+  }
+
+  /**
+   * Reads the next record.
+   *
+   * @return the bytes of the next record, or {@code null} if no record is available
+   * @throws IOException
+   *            If an error occurs while reading.
+   */
+  private ByteString readNextRecord() throws IOException
+  {
+    try
+    {
+      // read length of record
+      final long filePosition = reader.getFilePointer();
+      int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
+      final int recordLength = readRecordLength(distanceToBlockStart);
+
+      // read the record
+      long currentPosition = reader.getFilePointer();
+      distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
+      final ByteStringBuilder recordBytes =
+          new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
+      int remainingBytesToRead = recordLength;
+      while (distanceToBlockStart < remainingBytesToRead)
+      {
+        if (distanceToBlockStart != 0)
+        {
+          recordBytes.append(reader, distanceToBlockStart);
+        }
+        // skip the offset
+        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+
+        // next step
+        currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
+        remainingBytesToRead -= distanceToBlockStart;
+        distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
+      }
+      if (remainingBytesToRead > 0)
+      {
+        // last bytes of the record
+        recordBytes.append(reader, remainingBytesToRead);
+      }
+      return recordBytes.toByteString();
+    }
+    catch(EOFException e)
+    {
+      // end of stream, no record or uncomplete record
+      return null;
+    }
+  }
+
+  /**
+   * Returns the total length in bytes taken by a record when stored in log file,
+   * including size taken by block offsets.
+   *
+   * @param recordLength
+   *            The length of record to write.
+   * @param distanceToBlockStart
+   *            Distance before the next block start.
+   * @return the length in bytes necessary to store the record in the log
+   */
+  int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
+  {
+    int totalLength = recordLength;
+    if (recordLength > distanceToBlockStart)
+    {
+      totalLength += SIZE_OF_BLOCK_OFFSET;
+      final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
+      totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
+    }
+    return totalLength;
+  }
+
+  /** Read the length of a record. */
+  private int readRecordLength(final int distanceToBlockStart) throws IOException
+  {
+    final ByteStringBuilder lengthBytes = new ByteStringBuilder();
+    if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
+    {
+      lengthBytes.append(reader, distanceToBlockStart);
+      // skip the offset
+      reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
+      return lengthBytes.toByteString().toInt();
+    }
+    else
+    {
+      if (distanceToBlockStart == 0)
+      {
+        // skip the offset
+        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
+      }
+      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
+      return lengthBytes.toByteString().toInt();
+    }
+  }
+
+  /**
+   * Search the closest block start to the provided key, using binary search.
+   * <p>
+   * Note that position of reader is modified by this method.
+   *
+   * @param key
+   *          The key to search
+   * @return the file position of block start that must be used to find the given key,
+   *      or a negative number if no position could be found.
+   * @throws ChangelogException
+   *          if a problem occurs
+   */
+  long searchClosestBlockStartToKey(K key) throws ChangelogException
+  {
+    final long maxPos = getLastPositionInFile();
+    long lowPos = 0L;
+    long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
+
+    while (lowPos <= highPos)
+    {
+      final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
+      final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
+      final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
+      if (middleRecord == null)
+      {
+        return -1;
+      }
+
+      final int keyComparison = middleRecord.getKey().compareTo(key);
+      if (keyComparison < 0)
+      {
+        if (middleBlockStartPos <= lowPos)
+        {
+          return lowPos;
+        }
+        lowPos = middleBlockStartPos;
+      }
+      else if (keyComparison > 0)
+      {
+        if (middleBlockStartPos >= highPos)
+        {
+          return highPos;
+        }
+        highPos = middleBlockStartPos;
+      }
+      else
+      {
+        return middleBlockStartPos;
+      }
+    }
+    // Unable to find a position where key can be found
+    return -1;
+  }
+
+  private long getLastPositionInFile() throws ChangelogException
+  {
+    try
+    {
+      return reader.length() - 1;
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
+    }
+  }
+
+  /**
+   * Position to provided key, starting from provided block start position and
+   * reading sequentially until key is found.
+   *
+   * @param blockStartPosition
+   *          Position of read pointer in the file, expected to be the start of
+   *          a block where a record offset is written.
+   * @param key
+   *          The key to find.
+   * @param findNextRecord
+   *          If {@code true}, position at the end of this method is the lowest
+   *          key that is higher than the provided key, otherwise position is
+   *          the provided key.
+   * @return The pair ({@code true}, last record read) if reader is successfully
+   *         positioned to the key or the nearest key (last record may be null
+   *         if end of file is reached), ({@code false}, null) otherwise.
+   * @throws ChangelogException
+   *            If an error occurs.
+   */
+   Pair<Boolean, Record<K,V>> positionToKeySequentially(
+       final long blockStartPosition,
+       final K key,
+       final boolean findNextRecord)
+       throws ChangelogException {
+    Record<K,V> record = readRecord(blockStartPosition);
+    do {
+      if (record != null)
+      {
+        final int keysComparison = record.getKey().compareTo(key);
+        final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key);
+        if (matches)
+        {
+          if (findNextRecord && keysComparison == 0)
+          {
+            // skip key in order to position on lowest higher key
+            record = readRecord();
+          }
+          return Pair.of(true, record);
+        }
+      }
+      record = readRecord();
+    }
+    while (record != null);
+    return Pair.of(false, null);
+  }
+
+  /**
+   * Returns the closest start of block which has a position lower than or equal
+   * to the provided file position.
+   *
+   * @param filePosition
+   *          The position of reader on file.
+   * @return the file position of the block start.
+   */
+  long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
+  {
+    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+    return dist == 0 ? filePosition : filePosition + dist - blockSize;
+  }
+
+  /**
+   * Returns the closest start of block which has a position strictly
+   * higher than the provided file position.
+   *
+   * @param filePosition
+   *           The position of reader on file.
+   * @return the file position of the block start.
+   */
+  long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
+  {
+    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
+    return dist == 0 ? filePosition + blockSize : filePosition + dist;
+  }
+
+  /**
+   * Returns the distance to next block for the provided file position.
+   *
+   * @param filePosition
+   *            offset from the beginning of the file, in bytes.
+   * @param blockSize
+   *            Size of each block in bytes.
+   * @return the distance to next block in bytes
+   */
+  static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
+  {
+    if (filePosition == 0)
+    {
+      return blockSize;
+    }
+    final int distance = (int) (filePosition % blockSize);
+    return distance == 0 ? 0 : blockSize - distance;
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
new file mode 100644
index 0000000..6fef3f8
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
@@ -0,0 +1,214 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.SyncFailedException;
+
+import org.forgerock.util.Reject;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+/**
+ * A log writer, using fixed-size blocks to allow fast retrieval when reading.
+ * <p>
+ * The log file contains record offsets at fixed block size : given block size N,
+ * an offset is written at every N bytes. The offset contains the number of bytes to
+ * reach the beginning of previous record (or next record if offset equals 0).
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
+ */
+class BlockLogWriter<K extends Comparable<K>, V> implements Closeable
+{
+  private final int blockSize;
+
+  private final RecordParser<K, V> parser;
+
+  private final LogWriter writer;
+
+  /**
+   * Creates a writer for the provided log writer and parser.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param writer
+   *          The writer on the log file.
+   * @param parser
+   *          The parser to encode the records.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter(
+      final LogWriter writer, final RecordParser<K, V> parser)
+  {
+    return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE);
+  }
+
+  /**
+   * Creates a writer for the provided log writer, parser and size for blocks.
+   * <p>
+   * This method is intended for tests only, to allow tuning of the block size.
+   *
+   * @param <K>
+   *          Type of the key of a record, which must be comparable.
+   * @param <V>
+   *          Type of the value of a record.
+   * @param writer
+   *          The writer on the log file.
+   * @param parser
+   *          The parser to encode the records.
+   * @param blockSize
+   *          The size of each block, or frequency at which the record offset is
+   *          present in the log file.
+   * @return a new log reader
+   */
+  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests(
+      final LogWriter writer, final RecordParser<K, V> parser, final int blockSize)
+  {
+    return new BlockLogWriter<K, V>(writer, parser, blockSize);
+  }
+
+  /**
+   * Creates the writer with an underlying writer, a parser and a size for blocks.
+   *
+   * @param writer
+   *            The writer to the log file.
+   * @param parser
+   *            The parser to encode the records.
+   * @param blockSize
+   *            The size of each block.
+   */
+  private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize)
+  {
+    Reject.ifNull(writer, parser);
+    this.writer = writer;
+    this.parser = parser;
+    this.blockSize = blockSize;
+  }
+
+  /**
+   * Writes the provided record to the log file.
+   *
+   * @param record
+   *            The record to write.
+   * @throws ChangelogException
+   *            If a problem occurs during write.
+   */
+  public void write(final Record<K, V> record) throws ChangelogException
+  {
+    try
+    {
+      write(parser.encodeRecord(record));
+      writer.flush();
+    }
+    catch (IOException e)
+    {
+      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(),
+          writer.getFile().getPath()), e);
+    }
+  }
+
+  /**
+   * Returns the number of bytes written in the log file.
+   *
+   * @return the number of bytes
+   */
+  public long getBytesWritten()
+  {
+    return writer.getBytesWritten();
+  }
+
+  /**
+   * Synchronize all modifications to the log file to the underlying device.
+   *
+   * @throws SyncFailedException
+   *           If synchronization fails.
+   */
+  public void sync() throws SyncFailedException
+  {
+    writer.sync();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    writer.close();
+  }
+
+  /**
+   * Writes the provided byte string to the log file.
+   *
+   * @param record
+   *            The value to write.
+   * @throws IOException
+   *            If an error occurs while writing
+   */
+  private void write(final ByteString record) throws IOException
+  {
+    // Add length of record before writing
+    ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()).
+        append(record.length()).
+        append(record).
+        toByteString();
+
+    int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize);
+    int cumulatedDistanceToBeginning = distanceToBlockStart;
+    int dataPosition = 0;
+    int dataRemaining = data.length();
+    final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET;
+
+    while (distanceToBlockStart < dataRemaining)
+    {
+      if (distanceToBlockStart > 0)
+      {
+        // append part of record
+        final int dataEndPosition = dataPosition + distanceToBlockStart;
+        writer.write(data.subSequence(dataPosition, dataEndPosition));
+        dataPosition = dataEndPosition;
+        dataRemaining -= distanceToBlockStart;
+      }
+      // append the offset to the record
+      writer.write(ByteString.valueOf(cumulatedDistanceToBeginning));
+
+      // next step
+      distanceToBlockStart = dataSizeForOneBlock;
+      cumulatedDistanceToBeginning += blockSize;
+    }
+    // append the remaining bytes to finish the record
+    writer.write(data.subSequence(dataPosition, data.length()));
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 20522bd..28f0366 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@
   /**
    * The last key appended to the log. In order to keep the ordering of the keys
    * in the log, any attempt to append a record with a key lower or equal to
-   * this is rejected (no error but an event is logged).
+   * this key is rejected (no error but an event is logged).
    */
   private K lastAppendedKey;
 
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index f0bb7e9..b27c78e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -30,11 +30,9 @@
 
 import java.io.BufferedWriter;
 import java.io.Closeable;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 
 import org.forgerock.util.Reject;
 import org.opends.messages.Message;
@@ -42,11 +40,11 @@
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
 import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.ByteStringBuilder;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
 
+import com.forgerock.opendj.util.Pair;
+
 /**
  * A log file, containing part of a {@code Log}. The log file may be:
  * <ul>
@@ -72,17 +70,14 @@
   /** The file containing the records. */
   private final File logfile;
 
-  /** The parser of records, to convert bytes to record and record to bytes. */
-  private final RecordParser<K, V> parser;
-
   /** The pool to obtain a reader on the log. */
-  private LogReaderPool readerPool;
+  private final LogReaderPool<K, V> readerPool;
 
   /**
    * The writer on the log file, which may be {@code null} if log file is not
-   * write-enabled
+   * write-enabled.
    */
-  private LogWriter writer;
+  private final BlockLogWriter<K, V> writer;
 
   /** Indicates if log is enabled for write. */
   private final boolean isWriteEnabled;
@@ -105,10 +100,11 @@
   {
     Reject.ifNull(logFilePath, parser);
     this.logfile = logFilePath;
-    this.parser = parser;
     this.isWriteEnabled = isWriteEnabled;
 
-    initialize();
+    createLogFileIfNotExists();
+    writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
+    readerPool = new LogReaderPool<K, V>(logfile, parser);
   }
 
   /**
@@ -155,25 +151,6 @@
   }
 
   /**
-   * Initialize this log.
-   * <p>
-   * Create directories and file if necessary, and create a writer
-   * and pool of readers.
-   *
-   * @throws ChangelogException
-   *            If an errors occurs during initialization.
-   */
-  private void initialize() throws ChangelogException
-  {
-    createLogFileIfNotExists();
-    if (isWriteEnabled)
-    {
-      writer = new LogWriter(logfile);
-    }
-    readerPool = new LogReaderPool(logfile);
-  }
-
-  /**
    * Returns the file containing the records.
    *
    * @return the file
@@ -221,23 +198,7 @@
   void append(final Record<K, V> record) throws ChangelogException
   {
     checkLogIsEnabledForWrite();
-    try
-    {
-      writer.write(encodeRecord(record));
-    }
-    catch (IOException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
-    }
-  }
-
-  private ByteString encodeRecord(final Record<K, V> record)
-  {
-    final ByteString data = parser.encodeRecord(record);
-    return new ByteStringBuilder()
-      .append(data.length())
-      .append(data)
-      .toByteString();
+    writer.write(record);
   }
 
   /**
@@ -530,73 +491,21 @@
     return logfile.getPath();
   }
 
-  /** Read a record from the provided reader. */
-  private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
-  {
-    try
-    {
-      final ByteString recordData = readEncodedRecord(reader);
-      return recordData != null ? parser.decodeRecord(recordData) : null;
-    }
-    catch(DecodingException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
-    }
-  }
-
-  private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
-  {
-    try
-    {
-      final byte[] lengthData = new byte[4];
-      reader.readFully(lengthData);
-      int recordLength = ByteString.wrap(lengthData).toInt();
-
-      final byte[] recordData = new byte[recordLength];
-      reader.readFully(recordData);
-      return ByteString.wrap(recordData);
-    }
-    catch(EOFException e)
-    {
-      // end of stream, no record or uncomplete record
-      return null;
-    }
-    catch (IOException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
-    }
-  }
-
-  /** Seek to given position on the provided reader. */
-  private void seek(RandomAccessFile reader, long position) throws ChangelogException
-  {
-    try
-    {
-      reader.seek(position);
-    }
-    catch (IOException e)
-    {
-      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
-    }
-  }
-
   /**
-   * Returns a random access file to read this log.
+   * Returns a reader for this log.
    * <p>
    * Assumes that calling methods ensure that log is not closed.
    */
-  private RandomAccessFile getReader() throws ChangelogException
+  private BlockLogReader<K, V> getReader() throws ChangelogException
   {
     return readerPool.get();
   }
 
   /** Release the provided reader. */
-  private void releaseReader(RandomAccessFile reader) {
+  private void releaseReader(BlockLogReader<K, V> reader) {
     readerPool.release(reader);
   }
 
-
-
   /** {@inheritDoc} */
   @Override
   public int hashCode()
@@ -633,7 +542,7 @@
     private final LogFile<K, V> logFile;
 
     /** To read the records. */
-    private final RandomAccessFile reader;
+    private final BlockLogReader<K, V> reader;
 
     /** The current available record, may be {@code null}. */
     private Record<K,V> currentRecord;
@@ -674,14 +583,14 @@
       this.logFile = logFile;
       this.reader = logFile.getReader();
       this.currentRecord = record;
-      logFile.seek(reader, filePosition);
+      reader.seekToPosition(filePosition);
     }
 
     /** {@inheritDoc} */
     @Override
     public boolean next() throws ChangelogException
     {
-      currentRecord = logFile.readRecord(reader);
+      currentRecord = reader.readRecord();
       return currentRecord != null;
     }
 
@@ -695,26 +604,10 @@
     /** {@inheritDoc} */
     @Override
     public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
-      do
-      {
-        if (currentRecord != null)
-        {
-          final boolean matches = findNearest ?
-              currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
-          if (matches)
-          {
-            if (findNearest && currentRecord.getKey().equals(key))
-            {
-              // skip key in order to position on lowest higher key
-              next();
-            }
-            return true;
-          }
-        }
-        next();
-      }
-      while (currentRecord != null);
-      return false;
+      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
+      final boolean found = result.getFirst();
+      currentRecord = found ? result.getSecond() : null;
+      return found;
     }
 
     /** {@inheritDoc} */
@@ -733,15 +626,7 @@
      */
     long getFilePosition() throws ChangelogException
     {
-      try
-      {
-        return reader.getFilePointer();
-      }
-      catch (IOException e)
-      {
-        throw new ChangelogException(
-            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
-      }
+      return reader.getFilePosition();
     }
 
     /** {@inheritDoc} */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
index b5aed97..5300ffd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
@@ -35,22 +35,33 @@
 
 /**
  * A Pool of readers to a log file.
+
+ *
+ * @param <K>
+ *          Type of the key of a record, which must be comparable.
+ * @param <V>
+ *          Type of the value of a record.
  */
 // TODO : implement a real pool - reusing readers instead of opening-closing them each time
-class LogReaderPool
+class LogReaderPool<K extends Comparable<K>, V>
 {
   /** The file to read. */
   private final File file;
 
+  private final RecordParser<K, V> parser;
+
   /**
    * Creates a pool of readers for provided file.
    *
    * @param file
-   *            The file to read.
+   *          The file to read.
+   * @param parser
+   *          The parser to decode the records read.
    */
-  LogReaderPool(File file)
+  LogReaderPool(File file, RecordParser<K, V> parser)
   {
     this.file = file;
+    this.parser = parser;
   }
 
   /**
@@ -63,9 +74,9 @@
    * @throws ChangelogException
    *            If the file can't be found or read.
    */
-  RandomAccessFile get() throws ChangelogException
+  BlockLogReader<K, V> get() throws ChangelogException
   {
-    return getRandomAccess(file);
+    return getReader(file);
   }
 
   /**
@@ -77,17 +88,17 @@
    *          The random access reader to a file previously acquired with this
    *          pool.
    */
-  void release(RandomAccessFile reader)
+  void release(BlockLogReader<K, V> reader)
   {
     StaticUtils.close(reader);
   }
 
   /** Returns a random access file to read this log. */
-  private RandomAccessFile getRandomAccess(File file) throws ChangelogException
+  private BlockLogReader<K, V> getReader(File file) throws ChangelogException
   {
     try
     {
-      return new RandomAccessFile(file, "r");
+      return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ;
     }
     catch (Exception e)
     {
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
index 44ce685..5ec5f2b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -76,6 +76,16 @@
     }
   }
 
+  /**
+   * Returns the file used by this writer.
+   *
+   * @return the file
+   */
+  File getFile()
+  {
+    return file;
+  }
+
   /** {@inheritDoc} */
   @Override
   public void write(int b) throws IOException
diff --git a/opends/src/server/org/opends/server/types/ByteStringBuilder.java b/opends/src/server/org/opends/server/types/ByteStringBuilder.java
index e6010a3..fd18874 100644
--- a/opends/src/server/org/opends/server/types/ByteStringBuilder.java
+++ b/opends/src/server/org/opends/server/types/ByteStringBuilder.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions copyright 2012 ForgeRock AS.
+ *      Portions copyright 2012-2014 ForgeRock AS.
  */
 package org.opends.server.types;
 
@@ -30,6 +30,8 @@
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
+import java.io.DataInput;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -566,7 +568,34 @@
     return bytesRead;
   }
 
+  /**
+   * Appends the provided {@code DataInput} to this byte string
+   * builder.
+   *
+   * @param stream
+   *          The data input stream to be appended to this byte string
+   *          builder.
+   * @param length
+   *          The maximum number of bytes to be appended from {@code
+   *          input}.
+   * @throws IndexOutOfBoundsException
+   *           If {@code length} is less than zero.
+   * @throws EOFException
+   *           If this stream reaches the end before reading all the bytes.
+   * @throws IOException
+   *           If an I/O error occurs.
+   */
+  public void append(DataInput stream, int length) throws IndexOutOfBoundsException, EOFException, IOException
+  {
+    if (length < 0)
+    {
+      throw new IndexOutOfBoundsException();
+    }
 
+    ensureAdditionalCapacity(length);
+    stream.readFully(buffer, this.length, length);
+    this.length += length;
+  }
 
   /**
    * Appends the provided {@code ReadableByteChannel} to this byte string
diff --git a/opends/src/server/org/opends/server/util/StaticUtils.java b/opends/src/server/org/opends/server/util/StaticUtils.java
index bc27bed..b3cfa18 100644
--- a/opends/src/server/org/opends/server/util/StaticUtils.java
+++ b/opends/src/server/org/opends/server/util/StaticUtils.java
@@ -3463,32 +3463,36 @@
 
 
   /**
-   * Attempts to delete the specified file or directory.  If it is a directory,
+   * Attempts to delete the specified file or directory. If it is a directory,
    * then any files or subdirectories that it contains will be recursively
    * deleted as well.
    *
-   * @param  file  The file or directory to be removed.
-   *
-   * @return  <CODE>true</CODE> if the specified file and any subordinates are
-   *          all successfully removed, or <CODE>false</CODE> if at least one
-   *          element in the subtree could not be removed.
+   * @param file
+   *          The file or directory to be removed.
+   * @return {@code true} if the specified file and any subordinates are all
+   *         successfully removed, or {@code false} if at least one element in
+   *         the subtree could not be removed or file does not exists.
    */
   public static boolean recursiveDelete(File file)
   {
-    boolean successful = true;
-    if (file.isDirectory())
+    if (file.exists())
     {
-      File[] childList = file.listFiles();
-      if (childList != null)
+      boolean successful = true;
+      if (file.isDirectory())
       {
-        for (File f : childList)
+        File[] childList = file.listFiles();
+        if (childList != null)
         {
-          successful &= recursiveDelete(f);
+          for (File f : childList)
+          {
+            successful &= recursiveDelete(f);
+          }
         }
       }
-    }
 
-    return (successful & file.delete());
+      return (successful & file.delete());
+    }
+    return false;
   }
 
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
new file mode 100644
index 0000000..7561613
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
@@ -0,0 +1,490 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.replication.server.changelog.file;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+import org.opends.server.util.StaticUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.Pair;
+
+@SuppressWarnings("javadoc")
+public class BlockLogReaderWriterTest extends DirectoryServerTestCase
+{
+  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
+
+  private static final File TEST_FILE = new File(TEST_DIRECTORY, "file");
+
+  private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser();
+
+  private static final int INT_RECORD_SIZE = 12;
+
+  @BeforeClass
+  void createTestDirectory()
+  {
+    TEST_DIRECTORY.mkdirs();
+  }
+
+  @BeforeMethod
+  void ensureTestFileIsEmpty() throws Exception
+  {
+    StaticUtils.recursiveDelete(TEST_FILE);
+  }
+
+  @AfterClass
+  void cleanTestDirectory()
+  {
+    StaticUtils.recursiveDelete(TEST_DIRECTORY);
+  }
+
+  @DataProvider(name = "recordsData")
+  Object[][] recordsData()
+  {
+    return new Object[][]
+    {
+      // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes
+
+      // size of block, expected size of file after all records are written, records
+      { 12, 12, records(1) }, // zero block marker
+      { 10, 16, records(1) }, // one block marker
+      { 8, 16, records(1) },  // one block marker
+      { 7, 20, records(1) },  // two block markers
+      { 6, 24, records(1) },  // three block markers
+      { 5, 40, records(1) },  // seven block markers
+
+      { 16, 28, records(1,2) }, // one block marker
+      { 12, 32, records(1,2) }, // two block markers
+      { 10, 36, records(1,2) }, // three block markers
+    };
+  }
+
+  /**
+   * Tests that records can be written then read correctly for different block sizes.
+   */
+  @Test(dataProvider="recordsData")
+  public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records)
+      throws Exception
+  {
+    writeRecords(blockSize, records);
+
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+      for (int i = 0; i < records.size(); i++)
+      {
+         Record<Integer, Integer> record = reader.readRecord();
+         assertThat(record).isEqualTo(records.get(i));
+      }
+      assertThat(reader.readRecord()).isNull();
+      assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile);
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  @DataProvider(name = "recordsForSeek")
+  Object[][] recordsForSeek()
+  {
+    Object[][] data = new Object[][] {
+      // records, key, findNearest, expectedRecord, expectedFound
+
+      // no record
+      { records(), 1, false, null, false },
+
+      // 1 record exact find
+      { records(1), 1, false, record(1), true },
+
+      // 1 record nearest find
+      { records(1), 1, true, null, true },
+
+      // 2 records
+      { records(1,2), 1, false, record(1), true },
+      { records(1,2), 2, false, record(2), true },
+      { records(1,2), 1, true, record(2), true },
+      { records(1,2), 2, true, null, true },
+
+      // 3 records exact find
+      { records(1,2,3), 0, false, null, false },
+      { records(1,2,3), 1, false, record(1), true },
+      { records(1,2,3), 2, false, record(2), true },
+      { records(1,2,3), 3, false, record(3), true },
+      { records(1,2,3), 4, false, null, false },
+
+      // 3 records nearest find
+      { records(1,2,3), 0, true, record(1), true },
+      { records(1,2,3), 1, true, record(2), true },
+      { records(1,2,3), 2, true, record(3), true },
+      { records(1,2,3), 3, true, null, true },
+      { records(1,2,3), 4, true, null, false },
+
+      // 10 records exact find
+      { records(1,2,3,4,5,6,7,8,9,10), 0, false, null, false },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, false, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, false, record(5), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, false, record(10), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, false, null, false },
+
+      // 10 records nearest find
+      { records(1,2,3,4,5,6,7,8,9,10), 0, true, record(1), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 1, true, record(2), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 5, true, record(6), true },
+      { records(1,2,3,4,5,6,7,8,9,10), 10, true, null, true },
+      { records(1,2,3,4,5,6,7,8,9,10), 11, true, null, false },
+    };
+
+    // For each test case, do a test with various block sizes to ensure algorithm is not broken
+    // on a given size
+    int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 };
+    Object[][] finalData = new Object[sizes.length*data.length][6];
+    for (int i = 0; i < data.length; i++)
+    {
+      for (int j = 0; j < sizes.length; j++)
+      {
+        Object[] a = data[i];
+        // add the block size at beginning of each test case
+        finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4]};
+      }
+    }
+    return finalData;
+  }
+
+  @Test(dataProvider="recordsForSeek")
+  public void testSeek(int blockSize, List<Record<Integer, Integer>> records, int key, boolean findNearest,
+      Record<Integer, Integer> expectedRecord, boolean expectedFound) throws Exception
+  {
+    writeRecords(blockSize, records);
+
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+      Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest);
+
+      assertThat(result.getFirst()).isEqualTo(expectedFound);
+      assertThat(result.getSecond()).isEqualTo(expectedRecord);
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  @Test
+  public void testGetClosestMarkerBeforeOrAtPosition() throws Exception
+  {
+    final int blockSize = 10;
+    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20);
+  }
+
+  @Test
+  public void testGetClosestMarkerStrictlyAfterPosition() throws Exception
+  {
+    final int blockSize = 10;
+    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20);
+    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30);
+  }
+
+  @Test
+  public void testSearchClosestMarkerToKey() throws Exception
+  {
+    int blockSize = 20;
+    writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20));
+
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+
+      assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0);
+      assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0);
+      assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20);
+      assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20);
+      assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40);
+      assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60);
+      assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80);
+      assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80);
+      assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100);
+      assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120);
+      assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140);
+      assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260);
+      assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280);
+      // out of reach keys
+      assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280);
+      assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280);
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  @Test
+  public void testLengthOfStoredRecord() throws Exception
+  {
+    final int blockSize = 100;
+    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
+
+    int recordLength = 10;
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+
+    recordLength = 150;
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+
+    recordLength = 200;
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
+  }
+
+  /**
+   * This test is intended to be run only manually to check the performance between binary search
+   * and sequential access.
+   * Note that sequential run may be extremely long when using high values.
+   */
+  @Test(enabled=false)
+  public void seekPerformanceComparison() throws Exception
+  {
+    // You may change these values
+    long fileSizeInBytes = 100*1024*1024;
+    int numberOfValuesToSeek = 50000;
+    int blockSize = 256;
+
+    writeRecordsToReachFileSize(blockSize, fileSizeInBytes);
+    BlockLogReader<Integer, Integer> reader = null;
+    try
+    {
+      reader = newReader(blockSize);
+      List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek);
+      System.out.println("File size: " + TEST_FILE.length() + " bytes");
+
+      System.out.println("\n---- BINARY SEARCH");
+      long minTime = Long.MAX_VALUE;
+      long maxTime = Long.MIN_VALUE;
+      final long t0 = System.nanoTime();
+      for (Integer key : keysToSeek)
+      {
+        final long ts = System.nanoTime();
+        Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false);
+        final long te = System.nanoTime() - ts;
+        if (te < minTime) minTime = te;
+        if (te > maxTime) maxTime = te;
+        // show time for seeks that last more than N microseconds (tune as needed)
+        if (te/1000 > 1000)
+        {
+          System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds");
+        }
+        assertThat(result.getSecond()).isEqualTo(record(key));
+      }
+      System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds");
+      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+      System.out.println("Max time for a search: " + maxTime/1000 + " microseconds");
+      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds");
+
+      System.out.println("\n---- SEQUENTIAL SEARCH");
+      minTime = Long.MAX_VALUE;
+      maxTime = Long.MIN_VALUE;
+      final long t1 = System.nanoTime();
+      for (Integer val : keysToSeek)
+      {
+        long ts = System.nanoTime();
+        Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false);
+        assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
+        long te = System.nanoTime() - ts;
+        if (te < minTime) minTime = te;
+        if (te > maxTime) maxTime = te;
+      }
+      System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds");
+      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
+      System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds");
+      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds");
+    }
+    finally
+    {
+      StaticUtils.close(reader);
+    }
+  }
+
+  /** Write provided records. */
+  private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException
+  {
+    BlockLogWriter<Integer, Integer> writer = null;
+    try
+    {
+      writer = newWriter(blockSize);
+      for (Record<Integer, Integer> record : records)
+      {
+        writer.write(record);
+      }
+    }
+    finally
+    {
+      StaticUtils.close(writer);
+    }
+  }
+
+  /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */
+  private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception
+  {
+      final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE;
+      final int[] values = new int[numberOfValues];
+      for (int i = 0; i < numberOfValues; i++)
+      {
+        values[i] = i+1;
+      }
+      writeRecords(blockSize, records(values));
+  }
+
+  /** Returns provided number of keys to seek in random order, for a file of provided size. */
+  private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys)
+  {
+    final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE;
+    final List<Integer> values = new ArrayList<Integer>(numberOfValues);
+    for (int i = 0; i < numberOfValues; i++)
+    {
+      values.add(i+1);
+    }
+    Collections.shuffle(values);
+    return values.subList(0, numberOfKeys);
+  }
+
+  private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException
+  {
+    return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock);
+  }
+
+  private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException
+  {
+    return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"),
+        RECORD_PARSER, blockSize);
+  }
+
+  private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException
+  {
+    return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize);
+  }
+
+  /** Helper to build a list of records. */
+  private List<Record<Integer, Integer>> records(int...keys)
+  {
+    List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>();
+    for (int key : keys)
+    {
+      records.add(Record.from(key, key));
+    }
+    return records;
+  }
+
+  /** Helper to build a record. */
+  private Record<Integer, Integer> record(int key)
+  {
+    return Record.from(key, key);
+  }
+
+  /**
+   * Record parser implementation for records with keys and values as integers to be used in tests.
+   * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value),
+   * which is useful for some tests.
+   */
+  private static class IntRecordParser implements RecordParser<Integer, Integer>
+  {
+    public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException
+    {
+      ByteSequenceReader reader = data.asReader();
+      int key = reader.getInt();
+      int value = reader.getInt();
+      return Record.from(key, value);
+    }
+
+    public ByteString encodeRecord(Record<Integer, Integer> record)
+    {
+      return new ByteStringBuilder().append(record.getKey()).append(record.getValue()).toByteString();
+    }
+
+    @Override
+    public Integer decodeKeyFromString(String key) throws ChangelogException
+    {
+      return Integer.valueOf(key);
+    }
+
+    @Override
+    public String encodeKeyToString(Integer key)
+    {
+      return String.valueOf(key);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public Integer getMaxKey()
+    {
+      return Integer.MAX_VALUE;
+    }
+  }
+
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
index 524d95a..2af0e8c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -58,6 +58,15 @@
       }
   };
 
+  static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() {
+    @Override
+    public ByteString encodeRecord(Record<String, String> record)
+    {
+      // write the key only, to corrupt the log file
+      return new ByteStringBuilder().append(record.getKey()).toByteString();
+    }
+  };
+
   @BeforeClass
   public void createTestDirectory()
   {
@@ -78,7 +87,7 @@
 
     for (int i = 1; i <= 10; i++)
     {
-      logFile.append(Record.from("key"+i, "value"+i));
+      logFile.append(Record.from(String.format("key%02d", i), "value"+i));
     }
     logFile.close();
   }
@@ -106,7 +115,7 @@
     try {
       cursor = changelog.getCursor();
 
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
       assertThatCursorCanBeFullyRead(cursor, 2, 10);
     }
     finally {
@@ -120,9 +129,9 @@
     LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
     DBCursor<Record<String, String>> cursor = null;
     try {
-      cursor = changelog.getCursor("key5");
+      cursor = changelog.getCursor("key05");
 
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key5", "value5"));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
       assertThatCursorCanBeFullyRead(cursor, 6, 10);
     }
     finally {
@@ -156,7 +165,7 @@
       cursor = changelog.getCursor(null);
 
       // should start from start
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
       assertThatCursorCanBeFullyRead(cursor, 2, 10);
     }
     finally {
@@ -170,10 +179,10 @@
     LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
     DBCursor<Record<String, String>> cursor = null;
     try {
-      cursor = changelog.getNearestCursor("key1");
+      cursor = changelog.getNearestCursor("key01");
 
       // lowest higher key is key2
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key2", "value2"));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key02", "value2"));
       assertThatCursorCanBeFullyRead(cursor, 3, 10);
     }
     finally {
@@ -187,10 +196,10 @@
     LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
     DBCursor<Record<String, String>> cursor = null;
     try {
-      cursor = changelog.getNearestCursor("key0");
+      cursor = changelog.getNearestCursor("key00");
 
       // lowest higher key is key1
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
       assertThatCursorCanBeFullyRead(cursor, 2, 10);
     }
     finally {
@@ -207,7 +216,7 @@
       cursor = changelog.getNearestCursor(null);
 
       // should start from start
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
       assertThatCursorCanBeFullyRead(cursor, 2, 10);
     }
     finally {
@@ -235,7 +244,7 @@
     {
       Record<String, String> record = changelog.getOldestRecord();
 
-      assertThat(record).isEqualTo(Record.from("key1", "value1"));
+      assertThat(record).isEqualTo(Record.from("key01", "value1"));
     }
     finally {
       StaticUtils.close(changelog);
@@ -275,9 +284,9 @@
         Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
         writeLog.append(record);
         assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
-        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
+        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
         assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
-        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key1", "value1"));
+        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1"));
       }
     }
     finally
@@ -296,7 +305,7 @@
     for (int i = fromIndex; i <= endIndex; i++)
     {
       assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
-      assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
+      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i));
     }
     assertThatCursorIsExhausted(cursor);
   }

--
Gitblit v1.10.0