mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
12.08.2014 3c931b0f1ba72ce655f1fe03295aff77b4bfcf38
OPENDJ-1472 : File based changelog : optimize random seek in each log file
CR-3727

Implements read and write with blocks, to enable binary search.

* New BlockLogReader class: read records sequentially or using
binary search provided a key

* New BlockLogWriter class: write records by blocks

* New BlockLogReaderWriterTest class : tests read/write with blocks

* Update LogFile : delegate responsibility for read/write to BlockLogXXX

* Add method ByteStringBuilder#append(DataInput, int) to avoid
byte array copy

* Minor changes in other files
3 files added
8 files modified
1538 ■■■■ changed files
opends/src/messages/messages/replication.properties 4 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java 536 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java 214 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java 2 ●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java 155 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java 27 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/types/ByteStringBuilder.java 31 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/util/StaticUtils.java 32 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java 490 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java 37 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -612,4 +612,6 @@
SEVERE_ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE_280=Could not read content of \
 replica offline state file '%s' for domain %s
SEVERE_ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE_281=Could not delete replica \
 offline state file '%s' for domain %s and server id %d
 offline state file '%s' for domain %s and server id %d
SEVERE_ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH_282=Could not retrieve file length of \
 file '%s'
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
New file
@@ -0,0 +1,536 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import com.forgerock.opendj.util.Pair;
/**
 * A log reader with binary search support.
 * <p>
 * The log file contains record offsets at fixed block size : given a block size N,
 * an offset is written at every N bytes. The offset contains the number of bytes to
 * reach the beginning of previous record (or next record if offset equals 0).
 * <p>
 * The reader provides both sequential access, using the {@code readRecord()} method,
 * and reasonably fast random access, using the {@code seek(K, boolean)} method.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
class BlockLogReader<K extends Comparable<K>, V> implements Closeable
{
  static final int SIZE_OF_BLOCK_OFFSET = 4;
  static final int SIZE_OF_RECORD_SIZE = 4;
  /**
   * Size of a block, after which an offset to the nearest record is written.
   * <p>
   * This value has been fixed based on performance tests. See
   * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
   * OPENDJ-1472</a> for details.
   */
  static final int BLOCK_SIZE = 256;
  private final int blockSize;
  private final RecordParser<K, V> parser;
  private final RandomAccessFile reader;
  private final File file;
  /**
   * Creates a reader for the provided file, file reader and parser.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param file
   *          The log file to read.
   * @param reader
   *          The random access reader on the log file.
   * @param parser
   *          The parser to decode the records read.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
  {
    return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
  }
  /**
   * Creates a reader for the provided file, file reader, parser and block size.
   * <p>
   * This method is intended for tests only, to allow tuning of the block size.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param file
   *          The log file to read.
   * @param reader
   *          The random access reader on the log file.
   * @param parser
   *          The parser to decode the records read.
   * @param blockSize
   *          The size of each block, or frequency at which the record offset is
   *          present in the log file.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
  {
    return new BlockLogReader<K, V>(file, reader, parser, blockSize);
  }
  private BlockLogReader(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
  {
    this.file = file;
    this.reader = reader;
    this.parser = parser;
    this.blockSize = blockSize;
  }
  /**
   * Position the reader to the record corresponding to the provided key or to
   * the nearest key (the lowest key higher than the provided key), and returns
   * the last record read.
   *
   * @param key
   *          Key to use as a start position. Key must not be {@code null}.
   * @param findNextRecord
   *          If {@code true}, start position is the lowest key that is higher
   *          than the provided key, otherwise start position is the provided
   *          key.
   * @return The pair (key_found, last_record_read). key_found is a boolean
   *         indicating if reader is successfully positioned to the key or the
   *         nearest key. last_record_read is the last record that was read.
   *         When key_found is equals to {@code false}, then last_record_read is
   *         always {@code null}. When key_found is equals to {@code true},
   *         last_record_read can be valued or be {@code null}
   * @throws ChangelogException
   *           If an error occurs when seeking the key.
   */
  public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException
  {
    final long markerPosition = searchClosestBlockStartToKey(key);
    if (markerPosition >= 0)
    {
      return positionToKeySequentially(markerPosition, key, findNextRecord);
    }
    return Pair.of(false, null);
  }
  /**
   * Position the reader to the provided file position.
   *
   * @param filePosition
   *            offset from the beginning of the file, in bytes.
   * @throws ChangelogException
   *            If an error occurs.
   */
  public void seekToPosition(final long filePosition) throws ChangelogException
  {
    try
    {
      reader.seek(filePosition);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
    }
  }
  /**
   * Read a record from current position.
   *
   * @return the record read
   * @throws ChangelogException
   *            If an error occurs during read.
   */
  public Record<K,V> readRecord() throws ChangelogException
  {
    return readRecord(-1);
  }
  /**
   * Returns the file position for this reader.
   *
   * @return the position of reader on the log file
   * @throws ChangelogException
   *          If an error occurs.
   */
  public long getFilePosition() throws ChangelogException
  {
    try
    {
      return reader.getFilePointer();
    }
    catch (IOException e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close() throws IOException
  {
    reader.close();
  }
  /**
   * Read a record, either from the provided start of block position or from
   * the current position.
   *
   * @param blockStartPosition
   *          The position of start of block, where a record offset is written.
   *          If provided value is -1, then record is read from current position instead.
   * @return the record read
   * @throws ChangelogException
   *           If an error occurs during read.
   */
  private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
  {
    try
    {
      final ByteString recordData = blockStartPosition == -1 ?
          readNextRecord() : readRecordFromBlockStartPosition(blockStartPosition);
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
    catch (IOException io)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
    }
    catch (DecodingException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), e);
    }
  }
  /**
   * Reads the first record found after the provided file position of block
   * start.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written.
   * @return the record read
   * @throws IOException
   *           If an error occurs during read.
   */
  private ByteString readRecordFromBlockStartPosition(final long blockStartPosition) throws IOException
  {
    reader.seek(blockStartPosition);
    if (blockStartPosition > 0)
    {
      final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
      reader.readFully(offsetData);
      final int offsetToRecord = ByteString.wrap(offsetData).toInt();
      if (offsetToRecord > 0)
      {
        reader.seek(blockStartPosition - offsetToRecord);
      } // if offset is zero, reader is already well positioned
    }
    return readNextRecord();
  }
  /**
   * Reads the next record.
   *
   * @return the bytes of the next record, or {@code null} if no record is available
   * @throws IOException
   *            If an error occurs while reading.
   */
  private ByteString readNextRecord() throws IOException
  {
    try
    {
      // read length of record
      final long filePosition = reader.getFilePointer();
      int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
      final int recordLength = readRecordLength(distanceToBlockStart);
      // read the record
      long currentPosition = reader.getFilePointer();
      distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
      final ByteStringBuilder recordBytes =
          new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
      int remainingBytesToRead = recordLength;
      while (distanceToBlockStart < remainingBytesToRead)
      {
        if (distanceToBlockStart != 0)
        {
          recordBytes.append(reader, distanceToBlockStart);
        }
        // skip the offset
        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
        // next step
        currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
        remainingBytesToRead -= distanceToBlockStart;
        distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
      }
      if (remainingBytesToRead > 0)
      {
        // last bytes of the record
        recordBytes.append(reader, remainingBytesToRead);
      }
      return recordBytes.toByteString();
    }
    catch(EOFException e)
    {
      // end of stream, no record or uncomplete record
      return null;
    }
  }
  /**
   * Returns the total length in bytes taken by a record when stored in log file,
   * including size taken by block offsets.
   *
   * @param recordLength
   *            The length of record to write.
   * @param distanceToBlockStart
   *            Distance before the next block start.
   * @return the length in bytes necessary to store the record in the log
   */
  int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
  {
    int totalLength = recordLength;
    if (recordLength > distanceToBlockStart)
    {
      totalLength += SIZE_OF_BLOCK_OFFSET;
      final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
      totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
    }
    return totalLength;
  }
  /** Read the length of a record. */
  private int readRecordLength(final int distanceToBlockStart) throws IOException
  {
    final ByteStringBuilder lengthBytes = new ByteStringBuilder();
    if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
    {
      lengthBytes.append(reader, distanceToBlockStart);
      // skip the offset
      reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
      return lengthBytes.toByteString().toInt();
    }
    else
    {
      if (distanceToBlockStart == 0)
      {
        // skip the offset
        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
      }
      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
      return lengthBytes.toByteString().toInt();
    }
  }
  /**
   * Search the closest block start to the provided key, using binary search.
   * <p>
   * Note that position of reader is modified by this method.
   *
   * @param key
   *          The key to search
   * @return the file position of block start that must be used to find the given key,
   *      or a negative number if no position could be found.
   * @throws ChangelogException
   *          if a problem occurs
   */
  long searchClosestBlockStartToKey(K key) throws ChangelogException
  {
    final long maxPos = getLastPositionInFile();
    long lowPos = 0L;
    long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
    while (lowPos <= highPos)
    {
      final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
      final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
      final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
      if (middleRecord == null)
      {
        return -1;
      }
      final int keyComparison = middleRecord.getKey().compareTo(key);
      if (keyComparison < 0)
      {
        if (middleBlockStartPos <= lowPos)
        {
          return lowPos;
        }
        lowPos = middleBlockStartPos;
      }
      else if (keyComparison > 0)
      {
        if (middleBlockStartPos >= highPos)
        {
          return highPos;
        }
        highPos = middleBlockStartPos;
      }
      else
      {
        return middleBlockStartPos;
      }
    }
    // Unable to find a position where key can be found
    return -1;
  }
  private long getLastPositionInFile() throws ChangelogException
  {
    try
    {
      return reader.length() - 1;
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
    }
  }
  /**
   * Position to provided key, starting from provided block start position and
   * reading sequentially until key is found.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written.
   * @param key
   *          The key to find.
   * @param findNextRecord
   *          If {@code true}, position at the end of this method is the lowest
   *          key that is higher than the provided key, otherwise position is
   *          the provided key.
   * @return The pair ({@code true}, last record read) if reader is successfully
   *         positioned to the key or the nearest key (last record may be null
   *         if end of file is reached), ({@code false}, null) otherwise.
   * @throws ChangelogException
   *            If an error occurs.
   */
   Pair<Boolean, Record<K,V>> positionToKeySequentially(
       final long blockStartPosition,
       final K key,
       final boolean findNextRecord)
       throws ChangelogException {
    Record<K,V> record = readRecord(blockStartPosition);
    do {
      if (record != null)
      {
        final int keysComparison = record.getKey().compareTo(key);
        final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key);
        if (matches)
        {
          if (findNextRecord && keysComparison == 0)
          {
            // skip key in order to position on lowest higher key
            record = readRecord();
          }
          return Pair.of(true, record);
        }
      }
      record = readRecord();
    }
    while (record != null);
    return Pair.of(false, null);
  }
  /**
   * Returns the closest start of block which has a position lower than or equal
   * to the provided file position.
   *
   * @param filePosition
   *          The position of reader on file.
   * @return the file position of the block start.
   */
  long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
  {
    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
    return dist == 0 ? filePosition : filePosition + dist - blockSize;
  }
  /**
   * Returns the closest start of block which has a position strictly
   * higher than the provided file position.
   *
   * @param filePosition
   *           The position of reader on file.
   * @return the file position of the block start.
   */
  long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
  {
    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
    return dist == 0 ? filePosition + blockSize : filePosition + dist;
  }
  /**
   * Returns the distance to next block for the provided file position.
   *
   * @param filePosition
   *            offset from the beginning of the file, in bytes.
   * @param blockSize
   *            Size of each block in bytes.
   * @return the distance to next block in bytes
   */
  static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
  {
    if (filePosition == 0)
    {
      return blockSize;
    }
    final int distance = (int) (filePosition % blockSize);
    return distance == 0 ? 0 : blockSize - distance;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
New file
@@ -0,0 +1,214 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
import java.io.Closeable;
import java.io.IOException;
import java.io.SyncFailedException;
import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
/**
 * A log writer, using fixed-size blocks to allow fast retrieval when reading.
 * <p>
 * The log file contains record offsets at fixed block size : given block size N,
 * an offset is written at every N bytes. The offset contains the number of bytes to
 * reach the beginning of previous record (or next record if offset equals 0).
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
class BlockLogWriter<K extends Comparable<K>, V> implements Closeable
{
  private final int blockSize;
  private final RecordParser<K, V> parser;
  private final LogWriter writer;
  /**
   * Creates a writer for the provided log writer and parser.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param writer
   *          The writer on the log file.
   * @param parser
   *          The parser to encode the records.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter(
      final LogWriter writer, final RecordParser<K, V> parser)
  {
    return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE);
  }
  /**
   * Creates a writer for the provided log writer, parser and size for blocks.
   * <p>
   * This method is intended for tests only, to allow tuning of the block size.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param writer
   *          The writer on the log file.
   * @param parser
   *          The parser to encode the records.
   * @param blockSize
   *          The size of each block, or frequency at which the record offset is
   *          present in the log file.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests(
      final LogWriter writer, final RecordParser<K, V> parser, final int blockSize)
  {
    return new BlockLogWriter<K, V>(writer, parser, blockSize);
  }
  /**
   * Creates the writer with an underlying writer, a parser and a size for blocks.
   *
   * @param writer
   *            The writer to the log file.
   * @param parser
   *            The parser to encode the records.
   * @param blockSize
   *            The size of each block.
   */
  private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize)
  {
    Reject.ifNull(writer, parser);
    this.writer = writer;
    this.parser = parser;
    this.blockSize = blockSize;
  }
  /**
   * Writes the provided record to the log file.
   *
   * @param record
   *            The record to write.
   * @throws ChangelogException
   *            If a problem occurs during write.
   */
  public void write(final Record<K, V> record) throws ChangelogException
  {
    try
    {
      write(parser.encodeRecord(record));
      writer.flush();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(),
          writer.getFile().getPath()), e);
    }
  }
  /**
   * Returns the number of bytes written in the log file.
   *
   * @return the number of bytes
   */
  public long getBytesWritten()
  {
    return writer.getBytesWritten();
  }
  /**
   * Synchronize all modifications to the log file to the underlying device.
   *
   * @throws SyncFailedException
   *           If synchronization fails.
   */
  public void sync() throws SyncFailedException
  {
    writer.sync();
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    writer.close();
  }
  /**
   * Writes the provided byte string to the log file.
   *
   * @param record
   *            The value to write.
   * @throws IOException
   *            If an error occurs while writing
   */
  private void write(final ByteString record) throws IOException
  {
    // Add length of record before writing
    ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()).
        append(record.length()).
        append(record).
        toByteString();
    int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize);
    int cumulatedDistanceToBeginning = distanceToBlockStart;
    int dataPosition = 0;
    int dataRemaining = data.length();
    final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET;
    while (distanceToBlockStart < dataRemaining)
    {
      if (distanceToBlockStart > 0)
      {
        // append part of record
        final int dataEndPosition = dataPosition + distanceToBlockStart;
        writer.write(data.subSequence(dataPosition, dataEndPosition));
        dataPosition = dataEndPosition;
        dataRemaining -= distanceToBlockStart;
      }
      // append the offset to the record
      writer.write(ByteString.valueOf(cumulatedDistanceToBeginning));
      // next step
      distanceToBlockStart = dataSizeForOneBlock;
      cumulatedDistanceToBeginning += blockSize;
    }
    // append the remaining bytes to finish the record
    writer.write(data.subSequence(dataPosition, data.length()));
  }
}
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -158,7 +158,7 @@
  /**
   * The last key appended to the log. In order to keep the ordering of the keys
   * in the log, any attempt to append a record with a key lower or equal to
   * this is rejected (no error but an event is logged).
   * this key is rejected (no error but an event is logged).
   */
  private K lastAppendedKey;
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -30,11 +30,9 @@
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
@@ -42,11 +40,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * A log file, containing part of a {@code Log}. The log file may be:
 * <ul>
@@ -72,17 +70,14 @@
  /** The file containing the records. */
  private final File logfile;
  /** The parser of records, to convert bytes to record and record to bytes. */
  private final RecordParser<K, V> parser;
  /** The pool to obtain a reader on the log. */
  private LogReaderPool readerPool;
  private final LogReaderPool<K, V> readerPool;
  /**
   * The writer on the log file, which may be {@code null} if log file is not
   * write-enabled
   * write-enabled.
   */
  private LogWriter writer;
  private final BlockLogWriter<K, V> writer;
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
@@ -105,10 +100,11 @@
  {
    Reject.ifNull(logFilePath, parser);
    this.logfile = logFilePath;
    this.parser = parser;
    this.isWriteEnabled = isWriteEnabled;
    initialize();
    createLogFileIfNotExists();
    writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
    readerPool = new LogReaderPool<K, V>(logfile, parser);
  }
  /**
@@ -155,25 +151,6 @@
  }
  /**
   * Initialize this log.
   * <p>
   * Create directories and file if necessary, and create a writer
   * and pool of readers.
   *
   * @throws ChangelogException
   *            If an errors occurs during initialization.
   */
  private void initialize() throws ChangelogException
  {
    createLogFileIfNotExists();
    if (isWriteEnabled)
    {
      writer = new LogWriter(logfile);
    }
    readerPool = new LogReaderPool(logfile);
  }
  /**
   * Returns the file containing the records.
   *
   * @return the file
@@ -221,23 +198,7 @@
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    try
    {
      writer.write(encodeRecord(record));
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
    }
  }
  private ByteString encodeRecord(final Record<K, V> record)
  {
    final ByteString data = parser.encodeRecord(record);
    return new ByteStringBuilder()
      .append(data.length())
      .append(data)
      .toByteString();
    writer.write(record);
  }
  /**
@@ -530,73 +491,21 @@
    return logfile.getPath();
  }
  /** Read a record from the provided reader. */
  private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
  {
    try
    {
      final ByteString recordData = readEncodedRecord(reader);
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
    catch(DecodingException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
    }
  }
  private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
  {
    try
    {
      final byte[] lengthData = new byte[4];
      reader.readFully(lengthData);
      int recordLength = ByteString.wrap(lengthData).toInt();
      final byte[] recordData = new byte[recordLength];
      reader.readFully(recordData);
      return ByteString.wrap(recordData);
    }
    catch(EOFException e)
    {
      // end of stream, no record or uncomplete record
      return null;
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
    }
  }
  /** Seek to given position on the provided reader. */
  private void seek(RandomAccessFile reader, long position) throws ChangelogException
  {
    try
    {
      reader.seek(position);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
    }
  }
  /**
   * Returns a random access file to read this log.
   * Returns a reader for this log.
   * <p>
   * Assumes that calling methods ensure that log is not closed.
   */
  private RandomAccessFile getReader() throws ChangelogException
  private BlockLogReader<K, V> getReader() throws ChangelogException
  {
    return readerPool.get();
  }
  /** Release the provided reader. */
  private void releaseReader(RandomAccessFile reader) {
  private void releaseReader(BlockLogReader<K, V> reader) {
    readerPool.release(reader);
  }
  /** {@inheritDoc} */
  @Override
  public int hashCode()
@@ -633,7 +542,7 @@
    private final LogFile<K, V> logFile;
    /** To read the records. */
    private final RandomAccessFile reader;
    private final BlockLogReader<K, V> reader;
    /** The current available record, may be {@code null}. */
    private Record<K,V> currentRecord;
@@ -674,14 +583,14 @@
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      logFile.seek(reader, filePosition);
      reader.seekToPosition(filePosition);
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      currentRecord = logFile.readRecord(reader);
      currentRecord = reader.readRecord();
      return currentRecord != null;
    }
@@ -695,26 +604,10 @@
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
      do
      {
        if (currentRecord != null)
        {
          final boolean matches = findNearest ?
              currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
          if (matches)
          {
            if (findNearest && currentRecord.getKey().equals(key))
            {
              // skip key in order to position on lowest higher key
              next();
            }
            return true;
          }
        }
        next();
      }
      while (currentRecord != null);
      return false;
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
      final boolean found = result.getFirst();
      currentRecord = found ? result.getSecond() : null;
      return found;
    }
    /** {@inheritDoc} */
@@ -733,15 +626,7 @@
     */
    long getFilePosition() throws ChangelogException
    {
      try
      {
        return reader.getFilePointer();
      }
      catch (IOException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
      }
      return reader.getFilePosition();
    }
    /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
@@ -35,22 +35,33 @@
/**
 * A Pool of readers to a log file.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
// TODO : implement a real pool - reusing readers instead of opening-closing them each time
class LogReaderPool
class LogReaderPool<K extends Comparable<K>, V>
{
  /** The file to read. */
  private final File file;
  private final RecordParser<K, V> parser;
  /**
   * Creates a pool of readers for provided file.
   *
   * @param file
   *            The file to read.
   *          The file to read.
   * @param parser
   *          The parser to decode the records read.
   */
  LogReaderPool(File file)
  LogReaderPool(File file, RecordParser<K, V> parser)
  {
    this.file = file;
    this.parser = parser;
  }
  /**
@@ -63,9 +74,9 @@
   * @throws ChangelogException
   *            If the file can't be found or read.
   */
  RandomAccessFile get() throws ChangelogException
  BlockLogReader<K, V> get() throws ChangelogException
  {
    return getRandomAccess(file);
    return getReader(file);
  }
  /**
@@ -77,17 +88,17 @@
   *          The random access reader to a file previously acquired with this
   *          pool.
   */
  void release(RandomAccessFile reader)
  void release(BlockLogReader<K, V> reader)
  {
    StaticUtils.close(reader);
  }
  /** Returns a random access file to read this log. */
  private RandomAccessFile getRandomAccess(File file) throws ChangelogException
  private BlockLogReader<K, V> getReader(File file) throws ChangelogException
  {
    try
    {
      return new RandomAccessFile(file, "r");
      return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ;
    }
    catch (Exception e)
    {
opends/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
@@ -76,6 +76,16 @@
    }
  }
  /**
   * Returns the file used by this writer.
   *
   * @return the file
   */
  File getFile()
  {
    return file;
  }
  /** {@inheritDoc} */
  @Override
  public void write(int b) throws IOException
opends/src/server/org/opends/server/types/ByteStringBuilder.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2012 ForgeRock AS.
 *      Portions copyright 2012-2014 ForgeRock AS.
 */
package org.opends.server.types;
@@ -30,6 +30,8 @@
import static org.opends.server.loggers.debug.DebugLogger.*;
import java.io.DataInput;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -566,7 +568,34 @@
    return bytesRead;
  }
  /**
   * Appends the provided {@code DataInput} to this byte string
   * builder.
   *
   * @param stream
   *          The data input stream to be appended to this byte string
   *          builder.
   * @param length
   *          The maximum number of bytes to be appended from {@code
   *          input}.
   * @throws IndexOutOfBoundsException
   *           If {@code length} is less than zero.
   * @throws EOFException
   *           If this stream reaches the end before reading all the bytes.
   * @throws IOException
   *           If an I/O error occurs.
   */
  public void append(DataInput stream, int length) throws IndexOutOfBoundsException, EOFException, IOException
  {
    if (length < 0)
    {
      throw new IndexOutOfBoundsException();
    }
    ensureAdditionalCapacity(length);
    stream.readFully(buffer, this.length, length);
    this.length += length;
  }
  /**
   * Appends the provided {@code ReadableByteChannel} to this byte string
opends/src/server/org/opends/server/util/StaticUtils.java
@@ -3463,32 +3463,36 @@
  /**
   * Attempts to delete the specified file or directory.  If it is a directory,
   * Attempts to delete the specified file or directory. If it is a directory,
   * then any files or subdirectories that it contains will be recursively
   * deleted as well.
   *
   * @param  file  The file or directory to be removed.
   *
   * @return  <CODE>true</CODE> if the specified file and any subordinates are
   *          all successfully removed, or <CODE>false</CODE> if at least one
   *          element in the subtree could not be removed.
   * @param file
   *          The file or directory to be removed.
   * @return {@code true} if the specified file and any subordinates are all
   *         successfully removed, or {@code false} if at least one element in
   *         the subtree could not be removed or file does not exists.
   */
  public static boolean recursiveDelete(File file)
  {
    boolean successful = true;
    if (file.isDirectory())
    if (file.exists())
    {
      File[] childList = file.listFiles();
      if (childList != null)
      boolean successful = true;
      if (file.isDirectory())
      {
        for (File f : childList)
        File[] childList = file.listFiles();
        if (childList != null)
        {
          successful &= recursiveDelete(f);
          for (File f : childList)
          {
            successful &= recursiveDelete(f);
          }
        }
      }
    }
    return (successful & file.delete());
      return (successful & file.delete());
    }
    return false;
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
New file
@@ -0,0 +1,490 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.Pair;
@SuppressWarnings("javadoc")
public class BlockLogReaderWriterTest extends DirectoryServerTestCase
{
  private static final File TEST_DIRECTORY = new File(TestCaseUtils.getUnitTestRootPath(), "changelog-unit");
  private static final File TEST_FILE = new File(TEST_DIRECTORY, "file");
  private static final RecordParser<Integer, Integer> RECORD_PARSER = new IntRecordParser();
  private static final int INT_RECORD_SIZE = 12;
  @BeforeClass
  void createTestDirectory()
  {
    TEST_DIRECTORY.mkdirs();
  }
  @BeforeMethod
  void ensureTestFileIsEmpty() throws Exception
  {
    StaticUtils.recursiveDelete(TEST_FILE);
  }
  @AfterClass
  void cleanTestDirectory()
  {
    StaticUtils.recursiveDelete(TEST_DIRECTORY);
  }
  @DataProvider(name = "recordsData")
  Object[][] recordsData()
  {
    return new Object[][]
    {
      // raw size taken by each record is: 4 (record size) + 4 (key) + 4 (value) = 12 bytes
      // size of block, expected size of file after all records are written, records
      { 12, 12, records(1) }, // zero block marker
      { 10, 16, records(1) }, // one block marker
      { 8, 16, records(1) },  // one block marker
      { 7, 20, records(1) },  // two block markers
      { 6, 24, records(1) },  // three block markers
      { 5, 40, records(1) },  // seven block markers
      { 16, 28, records(1,2) }, // one block marker
      { 12, 32, records(1,2) }, // two block markers
      { 10, 36, records(1,2) }, // three block markers
    };
  }
  /**
   * Tests that records can be written then read correctly for different block sizes.
   */
  @Test(dataProvider="recordsData")
  public void testWriteThenRead(int blockSize, int expectedSizeOfFile, List<Record<Integer, Integer>> records)
      throws Exception
  {
    writeRecords(blockSize, records);
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      for (int i = 0; i < records.size(); i++)
      {
         Record<Integer, Integer> record = reader.readRecord();
         assertThat(record).isEqualTo(records.get(i));
      }
      assertThat(reader.readRecord()).isNull();
      assertThat(reader.getFilePosition()).isEqualTo(expectedSizeOfFile);
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  @DataProvider(name = "recordsForSeek")
  Object[][] recordsForSeek()
  {
    Object[][] data = new Object[][] {
      // records, key, findNearest, expectedRecord, expectedFound
      // no record
      { records(), 1, false, null, false },
      // 1 record exact find
      { records(1), 1, false, record(1), true },
      // 1 record nearest find
      { records(1), 1, true, null, true },
      // 2 records
      { records(1,2), 1, false, record(1), true },
      { records(1,2), 2, false, record(2), true },
      { records(1,2), 1, true, record(2), true },
      { records(1,2), 2, true, null, true },
      // 3 records exact find
      { records(1,2,3), 0, false, null, false },
      { records(1,2,3), 1, false, record(1), true },
      { records(1,2,3), 2, false, record(2), true },
      { records(1,2,3), 3, false, record(3), true },
      { records(1,2,3), 4, false, null, false },
      // 3 records nearest find
      { records(1,2,3), 0, true, record(1), true },
      { records(1,2,3), 1, true, record(2), true },
      { records(1,2,3), 2, true, record(3), true },
      { records(1,2,3), 3, true, null, true },
      { records(1,2,3), 4, true, null, false },
      // 10 records exact find
      { records(1,2,3,4,5,6,7,8,9,10), 0, false, null, false },
      { records(1,2,3,4,5,6,7,8,9,10), 1, false, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, false, record(5), true },
      { records(1,2,3,4,5,6,7,8,9,10), 10, false, record(10), true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, false, null, false },
      // 10 records nearest find
      { records(1,2,3,4,5,6,7,8,9,10), 0, true, record(1), true },
      { records(1,2,3,4,5,6,7,8,9,10), 1, true, record(2), true },
      { records(1,2,3,4,5,6,7,8,9,10), 5, true, record(6), true },
      { records(1,2,3,4,5,6,7,8,9,10), 10, true, null, true },
      { records(1,2,3,4,5,6,7,8,9,10), 11, true, null, false },
    };
    // For each test case, do a test with various block sizes to ensure algorithm is not broken
    // on a given size
    int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 };
    Object[][] finalData = new Object[sizes.length*data.length][6];
    for (int i = 0; i < data.length; i++)
    {
      for (int j = 0; j < sizes.length; j++)
      {
        Object[] a = data[i];
        // add the block size at beginning of each test case
        finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4]};
      }
    }
    return finalData;
  }
  @Test(dataProvider="recordsForSeek")
  public void testSeek(int blockSize, List<Record<Integer, Integer>> records, int key, boolean findNearest,
      Record<Integer, Integer> expectedRecord, boolean expectedFound) throws Exception
  {
    writeRecords(blockSize, records);
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest);
      assertThat(result.getFirst()).isEqualTo(expectedFound);
      assertThat(result.getSecond()).isEqualTo(expectedRecord);
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  @Test
  public void testGetClosestMarkerBeforeOrAtPosition() throws Exception
  {
    final int blockSize = 10;
    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(0)).isEqualTo(0);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(5)).isEqualTo(0);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(9)).isEqualTo(0);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(10)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(15)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartBeforeOrAtPosition(20)).isEqualTo(20);
  }
  @Test
  public void testGetClosestMarkerStrictlyAfterPosition() throws Exception
  {
    final int blockSize = 10;
    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(0)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(5)).isEqualTo(10);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(10)).isEqualTo(20);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(11)).isEqualTo(20);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(15)).isEqualTo(20);
    assertThat(reader.getClosestBlockStartStrictlyAfterPosition(20)).isEqualTo(30);
  }
  @Test
  public void testSearchClosestMarkerToKey() throws Exception
  {
    int blockSize = 20;
    writeRecords(blockSize, records(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20));
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      assertThat(reader.searchClosestBlockStartToKey(0)).isEqualTo(0);
      assertThat(reader.searchClosestBlockStartToKey(1)).isEqualTo(0);
      assertThat(reader.searchClosestBlockStartToKey(2)).isEqualTo(20);
      assertThat(reader.searchClosestBlockStartToKey(3)).isEqualTo(20);
      assertThat(reader.searchClosestBlockStartToKey(4)).isEqualTo(40);
      assertThat(reader.searchClosestBlockStartToKey(5)).isEqualTo(60);
      assertThat(reader.searchClosestBlockStartToKey(6)).isEqualTo(80);
      assertThat(reader.searchClosestBlockStartToKey(7)).isEqualTo(80);
      assertThat(reader.searchClosestBlockStartToKey(8)).isEqualTo(100);
      assertThat(reader.searchClosestBlockStartToKey(9)).isEqualTo(120);
      assertThat(reader.searchClosestBlockStartToKey(10)).isEqualTo(140);
      assertThat(reader.searchClosestBlockStartToKey(19)).isEqualTo(260);
      assertThat(reader.searchClosestBlockStartToKey(20)).isEqualTo(280);
      // out of reach keys
      assertThat(reader.searchClosestBlockStartToKey(21)).isEqualTo(280);
      assertThat(reader.searchClosestBlockStartToKey(22)).isEqualTo(280);
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  @Test
  public void testLengthOfStoredRecord() throws Exception
  {
    final int blockSize = 100;
    BlockLogReader<Integer, Integer> reader = newReaderWithNullFile(blockSize);
    int recordLength = 10;
    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 20)).isEqualTo(recordLength);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 10)).isEqualTo(recordLength);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 9)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    recordLength = 150;
    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 60)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 54)).isEqualTo(recordLength + SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 53)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    recordLength = 200;
    assertThat(reader.getLengthOfStoredRecord(recordLength, 99)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 8)).isEqualTo(recordLength + 2 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 7)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
    assertThat(reader.getLengthOfStoredRecord(recordLength, 0)).isEqualTo(recordLength + 3 * SIZE_OF_BLOCK_OFFSET);
  }
  /**
   * This test is intended to be run only manually to check the performance between binary search
   * and sequential access.
   * Note that sequential run may be extremely long when using high values.
   */
  @Test(enabled=false)
  public void seekPerformanceComparison() throws Exception
  {
    // You may change these values
    long fileSizeInBytes = 100*1024*1024;
    int numberOfValuesToSeek = 50000;
    int blockSize = 256;
    writeRecordsToReachFileSize(blockSize, fileSizeInBytes);
    BlockLogReader<Integer, Integer> reader = null;
    try
    {
      reader = newReader(blockSize);
      List<Integer> keysToSeek = getShuffledKeys(fileSizeInBytes, numberOfValuesToSeek);
      System.out.println("File size: " + TEST_FILE.length() + " bytes");
      System.out.println("\n---- BINARY SEARCH");
      long minTime = Long.MAX_VALUE;
      long maxTime = Long.MIN_VALUE;
      final long t0 = System.nanoTime();
      for (Integer key : keysToSeek)
      {
        final long ts = System.nanoTime();
        Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false);
        final long te = System.nanoTime() - ts;
        if (te < minTime) minTime = te;
        if (te > maxTime) maxTime = te;
        // show time for seeks that last more than N microseconds (tune as needed)
        if (te/1000 > 1000)
        {
          System.out.println("TIME! key:" + result.getSecond().getKey() + ", time=" + te/1000 + " microseconds");
        }
        assertThat(result.getSecond()).isEqualTo(record(key));
      }
      System.out.println("Time taken: " + ((System.nanoTime() - t0)/1000000) + " milliseconds");
      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
      System.out.println("Max time for a search: " + maxTime/1000 + " microseconds");
      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000 + " microseconds");
      System.out.println("\n---- SEQUENTIAL SEARCH");
      minTime = Long.MAX_VALUE;
      maxTime = Long.MIN_VALUE;
      final long t1 = System.nanoTime();
      for (Integer val : keysToSeek)
      {
        long ts = System.nanoTime();
        Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false);
        assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
        long te = System.nanoTime() - ts;
        if (te < minTime) minTime = te;
        if (te > maxTime) maxTime = te;
      }
      System.out.println("Time taken: " + ((System.nanoTime() - t1)/1000000) + " milliseconds");
      System.out.println("Min time for a search: " + minTime/1000 + " microseconds");
      System.out.println("Max time for a search: " + maxTime/1000000 + " milliseconds");
      System.out.println("Max difference for a search: " + (maxTime - minTime)/1000000 + " milliseconds");
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  /** Write provided records. */
  private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException
  {
    BlockLogWriter<Integer, Integer> writer = null;
    try
    {
      writer = newWriter(blockSize);
      for (Record<Integer, Integer> record : records)
      {
        writer.write(record);
      }
    }
    finally
    {
      StaticUtils.close(writer);
    }
  }
  /** Write as many records as needed to reach provided file size. Records goes from 1 up to N. */
  private void writeRecordsToReachFileSize(int blockSize, long sizeInBytes) throws Exception
  {
      final int numberOfValues = (int) sizeInBytes / INT_RECORD_SIZE;
      final int[] values = new int[numberOfValues];
      for (int i = 0; i < numberOfValues; i++)
      {
        values[i] = i+1;
      }
      writeRecords(blockSize, records(values));
  }
  /** Returns provided number of keys to seek in random order, for a file of provided size. */
  private List<Integer> getShuffledKeys(long fileSizeInBytes, int numberOfKeys)
  {
    final int numberOfValues = (int) fileSizeInBytes / INT_RECORD_SIZE;
    final List<Integer> values = new ArrayList<Integer>(numberOfValues);
    for (int i = 0; i < numberOfValues; i++)
    {
      values.add(i+1);
    }
    Collections.shuffle(values);
    return values.subList(0, numberOfKeys);
  }
  private BlockLogWriter<Integer, Integer> newWriter(int sizeOfBlock) throws ChangelogException
  {
    return BlockLogWriter.newWriterForTests(new LogWriter(TEST_FILE), RECORD_PARSER, sizeOfBlock);
  }
  private BlockLogReader<Integer, Integer> newReader(int blockSize) throws FileNotFoundException
  {
    return BlockLogReader.newReaderForTests(TEST_FILE, new RandomAccessFile(TEST_FILE, "r"),
        RECORD_PARSER, blockSize);
  }
  private BlockLogReader<Integer, Integer> newReaderWithNullFile(int blockSize) throws FileNotFoundException
  {
    return BlockLogReader.newReaderForTests(null, null, RECORD_PARSER, blockSize);
  }
  /** Helper to build a list of records. */
  private List<Record<Integer, Integer>> records(int...keys)
  {
    List<Record<Integer, Integer>> records = new ArrayList<Record<Integer, Integer>>();
    for (int key : keys)
    {
      records.add(Record.from(key, key));
    }
    return records;
  }
  /** Helper to build a record. */
  private Record<Integer, Integer> record(int key)
  {
    return Record.from(key, key);
  }
  /**
   * Record parser implementation for records with keys and values as integers to be used in tests.
   * Using integer allow to know precisely the size of the records (4 bytes for key + 4 bytes for value),
   * which is useful for some tests.
   */
  private static class IntRecordParser implements RecordParser<Integer, Integer>
  {
    public Record<Integer, Integer> decodeRecord(final ByteString data) throws DecodingException
    {
      ByteSequenceReader reader = data.asReader();
      int key = reader.getInt();
      int value = reader.getInt();
      return Record.from(key, value);
    }
    public ByteString encodeRecord(Record<Integer, Integer> record)
    {
      return new ByteStringBuilder().append(record.getKey()).append(record.getValue()).toByteString();
    }
    @Override
    public Integer decodeKeyFromString(String key) throws ChangelogException
    {
      return Integer.valueOf(key);
    }
    @Override
    public String encodeKeyToString(Integer key)
    {
      return String.valueOf(key);
    }
    /** {@inheritDoc} */
    @Override
    public Integer getMaxKey()
    {
      return Integer.MAX_VALUE;
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogFileTest.java
@@ -58,6 +58,15 @@
      }
  };
  static final RecordParser<String,String> CORRUPTING_RECORD_PARSER = new StringRecordParser() {
    @Override
    public ByteString encodeRecord(Record<String, String> record)
    {
      // write the key only, to corrupt the log file
      return new ByteStringBuilder().append(record.getKey()).toByteString();
    }
  };
  @BeforeClass
  public void createTestDirectory()
  {
@@ -78,7 +87,7 @@
    for (int i = 1; i <= 10; i++)
    {
      logFile.append(Record.from("key"+i, "value"+i));
      logFile.append(Record.from(String.format("key%02d", i), "value"+i));
    }
    logFile.close();
  }
@@ -106,7 +115,7 @@
    try {
      cursor = changelog.getCursor();
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
@@ -120,9 +129,9 @@
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = changelog.getCursor("key5");
      cursor = changelog.getCursor("key05");
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key5", "value5"));
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key05", "value5"));
      assertThatCursorCanBeFullyRead(cursor, 6, 10);
    }
    finally {
@@ -156,7 +165,7 @@
      cursor = changelog.getCursor(null);
      // should start from start
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
@@ -170,10 +179,10 @@
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = changelog.getNearestCursor("key1");
      cursor = changelog.getNearestCursor("key01");
      // lowest higher key is key2
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key2", "value2"));
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key02", "value2"));
      assertThatCursorCanBeFullyRead(cursor, 3, 10);
    }
    finally {
@@ -187,10 +196,10 @@
    LogFile<String, String> changelog = getLogFile(RECORD_PARSER);
    DBCursor<Record<String, String>> cursor = null;
    try {
      cursor = changelog.getNearestCursor("key0");
      cursor = changelog.getNearestCursor("key00");
      // lowest higher key is key1
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
@@ -207,7 +216,7 @@
      cursor = changelog.getNearestCursor(null);
      // should start from start
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key1", "value1"));
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key01", "value1"));
      assertThatCursorCanBeFullyRead(cursor, 2, 10);
    }
    finally {
@@ -235,7 +244,7 @@
    {
      Record<String, String> record = changelog.getOldestRecord();
      assertThat(record).isEqualTo(Record.from("key1", "value1"));
      assertThat(record).isEqualTo(Record.from("key01", "value1"));
    }
    finally {
      StaticUtils.close(changelog);
@@ -275,9 +284,9 @@
        Record<String, String> record = Record.from("newkey" + i, "newvalue" + i);
        writeLog.append(record);
        assertThat(writeLog.getNewestRecord()).as("write changelog " + i).isEqualTo(record);
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key1", "value1"));
        assertThat(writeLog.getOldestRecord()).as("write changelog " + i).isEqualTo(Record.from("key01", "value1"));
        assertThat(readLog.getNewestRecord()).as("read changelog " + i).isEqualTo(record);
        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key1", "value1"));
        assertThat(readLog.getOldestRecord()).as("read changelog " + i).isEqualTo(Record.from("key01", "value1"));
      }
    }
    finally
@@ -296,7 +305,7 @@
    for (int i = fromIndex; i <= endIndex; i++)
    {
      assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
      assertThat(cursor.getRecord()).isEqualTo(Record.from("key" + i, "value" + i));
      assertThat(cursor.getRecord()).isEqualTo(Record.from(String.format("key%02d", i), "value" + i));
    }
    assertThatCursorIsExhausted(cursor);
  }