mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.35.2014 3b08e64ccf908fa4c57e6ee13aa8901efcc53ee2
Actual merge of complete changelog.file package which contains implementation
of file-based changelog
14 files added
3 files modified
6106 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java 24 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java 632 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java 214 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java 72 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java 394 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java 79 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 982 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java 409 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java 144 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java 1187 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java 607 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java 118 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java 151 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java 130 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java 101 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java 860 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -36,6 +36,7 @@
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.types.AttributeType;
@@ -285,4 +286,27 @@
    throw new RuntimeException("Not implemented");
  }
  /**
   * Get the instance of backend.
   *
   * @return the instance
   */
  public static ChangelogBackend getInstance()
  {
    throw new RuntimeException("Not implemented");
  }
  /**
   * Notify.
   *
   * @param baseDN
   *            The base DN
   * @param updateMsg
   *            The update msg
   */
  public void notifyCookieEntryAdded(DN baseDN, UpdateMsg updateMsg)
  {
    throw new RuntimeException("Not implemented");
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
New file
@@ -0,0 +1,632 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * A log reader with binary search support.
 * <p>
 * The log file contains record offsets at fixed block size : given a block size N,
 * an offset is written at every N bytes. The offset contains the number of bytes to
 * reach the beginning of previous record (or next record if offset equals 0).
 * <p>
 * The reader provides both sequential access, using the {@code readRecord()} method,
 * and reasonably fast random access, using the {@code seekToRecord(K, boolean)} method.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
class BlockLogReader<K extends Comparable<K>, V> implements Closeable
{
  static final int SIZE_OF_BLOCK_OFFSET = 4;
  static final int SIZE_OF_RECORD_SIZE = 4;
  /**
   * Size of a block, after which an offset to the nearest record is written.
   * <p>
   * This value has been fixed based on performance tests. See
   * <a href="https://bugster.forgerock.org/jira/browse/OPENDJ-1472">
   * OPENDJ-1472</a> for details.
   */
  static final int BLOCK_SIZE = 256;
  private final int blockSize;
  private final RecordParser<K, V> parser;
  private final RandomAccessFile reader;
  private final File file;
  /**
   * Creates a reader for the provided file, file reader and parser.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param file
   *          The log file to read.
   * @param reader
   *          The random access reader on the log file.
   * @param parser
   *          The parser to decode the records read.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReader(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser)
  {
    return new BlockLogReader<K, V>(file, reader, parser, BLOCK_SIZE);
  }
  /**
   * Creates a reader for the provided file, file reader, parser and block size.
   * <p>
   * This method is intended for tests only, to allow tuning of the block size.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param file
   *          The log file to read.
   * @param reader
   *          The random access reader on the log file.
   * @param parser
   *          The parser to decode the records read.
   * @param blockSize
   *          The size of each block, or frequency at which the record offset is
   *          present in the log file.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogReader<K, V> newReaderForTests(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, int blockSize)
  {
    return new BlockLogReader<K, V>(file, reader, parser, blockSize);
  }
  private BlockLogReader(
      final File file, final RandomAccessFile reader, final RecordParser<K, V> parser, final int blockSize)
  {
    this.file = file;
    this.reader = reader;
    this.parser = parser;
    this.blockSize = blockSize;
  }
  /**
   * Position the reader to the record corresponding to the provided key and
   * matching and positioning strategies. Returns the last record read.
   *
   * @param key
   *          Key to use as a start position. Key must not be {@code null}.
   * @param matchStrategy
   *          The key matching strategy.
   * @param positionStrategy
   *          The positioning strategy.
   * @return The pair (key_found, last_record_read). key_found is a boolean
   *         indicating if reader is successfully positioned. last_record_read
   *         is the last record that was read. When key_found is equals to
   *         {@code false}, then last_record_read is always {@code null}. When
   *         key_found is equals to {@code true}, last_record_read can be valued
   *         or be {@code null}
   * @throws ChangelogException
   *           If an error occurs when seeking the key.
   */
  public Pair<Boolean, Record<K,V>> seekToRecord(
      final K key,
      final KeyMatchingStrategy matchStrategy,
      final PositionStrategy positionStrategy)
          throws ChangelogException
  {
    Reject.ifNull(key);
    final long markerPosition = searchClosestBlockStartToKey(key);
    if (markerPosition >= 0)
    {
      return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
    }
    return Pair.of(false, null);
  }
  /**
   * Position the reader to the provided file position.
   *
   * @param filePosition
   *            offset from the beginning of the file, in bytes.
   * @throws ChangelogException
   *            If an error occurs.
   */
  public void seekToPosition(final long filePosition) throws ChangelogException
  {
    try
    {
      reader.seek(filePosition);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(filePosition, file.getPath()), e);
    }
  }
  /**
   * Read a record from current position.
   *
   * @return the record read
   * @throws ChangelogException
   *            If an error occurs during read.
   */
  public Record<K,V> readRecord() throws ChangelogException
  {
    return readRecord(-1);
  }
  /**
   * Returns the file position for this reader.
   *
   * @return the position of reader on the log file
   * @throws ChangelogException
   *          If an error occurs.
   */
  public long getFilePosition() throws ChangelogException
  {
    try
    {
      return reader.getFilePointer();
    }
    catch (IOException e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(file.getPath()), e);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close() throws IOException
  {
    reader.close();
  }
  /**
   * Read a record, either from the provided start of block position or from
   * the current position.
   *
   * @param blockStartPosition
   *          The position of start of block, where a record offset is written.
   *          If provided value is -1, then record is read from current position instead.
   * @return the record read
   * @throws ChangelogException
   *           If an error occurs during read.
   */
  private Record<K,V> readRecord(final long blockStartPosition) throws ChangelogException
  {
    try
    {
      if (blockStartPosition != -1)
      {
        positionToRecordFromBlockStart(blockStartPosition);
      }
      final ByteString recordData = readNextRecord();
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
    catch (Exception io)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(reader.toString()), io);
    }
  }
  /**
   * Position to the record given by the offset read from provided block
   * start.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written.
   * @throws IOException
   *           If an error occurs during read.
   */
  private void positionToRecordFromBlockStart(final long blockStartPosition) throws IOException
  {
    reader.seek(blockStartPosition);
    if (blockStartPosition > 0)
    {
      final byte[] offsetData = new byte[SIZE_OF_BLOCK_OFFSET];
      reader.readFully(offsetData);
      final int offsetToRecord = ByteString.wrap(offsetData).toInt();
      if (offsetToRecord > 0)
      {
        reader.seek(blockStartPosition - offsetToRecord);
      } // if offset is zero, reader is already well positioned
    }
  }
  /**
   * Reads the next record.
   *
   * @return the bytes of the next record, or {@code null} if no record is available
   * @throws IOException
   *            If an error occurs while reading.
   */
  private ByteString readNextRecord() throws IOException
  {
    try
    {
      // read length of record
      final long filePosition = reader.getFilePointer();
      int distanceToBlockStart = getDistanceToNextBlockStart(filePosition, blockSize);
      final int recordLength = readRecordLength(distanceToBlockStart);
      // read the record
      long currentPosition = reader.getFilePointer();
      distanceToBlockStart = getDistanceToNextBlockStart(currentPosition, blockSize);
      final ByteStringBuilder recordBytes =
          new ByteStringBuilder(getLengthOfStoredRecord(recordLength, distanceToBlockStart));
      int remainingBytesToRead = recordLength;
      while (distanceToBlockStart < remainingBytesToRead)
      {
        if (distanceToBlockStart != 0)
        {
          recordBytes.append(reader, distanceToBlockStart);
        }
        // skip the offset
        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
        // next step
        currentPosition += distanceToBlockStart + SIZE_OF_BLOCK_OFFSET;
        remainingBytesToRead -= distanceToBlockStart;
        distanceToBlockStart = blockSize - SIZE_OF_BLOCK_OFFSET;
      }
      if (remainingBytesToRead > 0)
      {
        // last bytes of the record
        recordBytes.append(reader, remainingBytesToRead);
      }
      return recordBytes.toByteString();
    }
    catch (EOFException e)
    {
      // end of stream, no record or uncomplete record
      return null;
    }
  }
  /**
   * Returns the total length in bytes taken by a record when stored in log file,
   * including size taken by block offsets.
   *
   * @param recordLength
   *            The length of record to write.
   * @param distanceToBlockStart
   *            Distance before the next block start.
   * @return the length in bytes necessary to store the record in the log
   */
  int getLengthOfStoredRecord(int recordLength, int distanceToBlockStart)
  {
    int totalLength = recordLength;
    if (recordLength > distanceToBlockStart)
    {
      totalLength += SIZE_OF_BLOCK_OFFSET;
      final int remainingBlocks = (recordLength - distanceToBlockStart -1) / (blockSize - SIZE_OF_BLOCK_OFFSET);
      totalLength += remainingBlocks * SIZE_OF_BLOCK_OFFSET;
    }
    return totalLength;
  }
  /** Read the length of a record. */
  private int readRecordLength(final int distanceToBlockStart) throws IOException
  {
    final ByteStringBuilder lengthBytes = new ByteStringBuilder(SIZE_OF_RECORD_SIZE);
    if (distanceToBlockStart > 0 && distanceToBlockStart < SIZE_OF_RECORD_SIZE)
    {
      lengthBytes.append(reader, distanceToBlockStart);
      // skip the offset
      reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE - distanceToBlockStart);
      return lengthBytes.toByteString().toInt();
    }
    else
    {
      if (distanceToBlockStart == 0)
      {
        // skip the offset
        reader.skipBytes(SIZE_OF_BLOCK_OFFSET);
      }
      lengthBytes.append(reader, SIZE_OF_RECORD_SIZE);
      return lengthBytes.toByteString().toInt();
    }
  }
  /**
   * Search the closest block start to the provided key, using binary search.
   * <p>
   * Note that position of reader is modified by this method.
   *
   * @param key
   *          The key to search
   * @return the file position of block start that must be used to find the given key,
   *      or a negative number if no position could be found.
   * @throws ChangelogException
   *          if a problem occurs
   */
  long searchClosestBlockStartToKey(K key) throws ChangelogException
  {
    final long maxPos = getFileLength() - 1;
    long lowPos = 0L;
    long highPos = getClosestBlockStartStrictlyAfterPosition(maxPos);
    while (lowPos <= highPos)
    {
      final long middlePos = Math.min((lowPos + highPos) / 2, maxPos);
      final long middleBlockStartPos = getClosestBlockStartBeforeOrAtPosition(middlePos);
      final Record<K, V> middleRecord = readRecord(middleBlockStartPos);
      if (middleRecord == null)
      {
        return -1;
      }
      final int keyComparison = middleRecord.getKey().compareTo(key);
      if (keyComparison < 0)
      {
        if (middleBlockStartPos <= lowPos)
        {
          return lowPos;
        }
        lowPos = middleBlockStartPos;
      }
      else if (keyComparison > 0)
      {
        if (middleBlockStartPos >= highPos)
        {
          return highPos;
        }
        highPos = middleBlockStartPos;
      }
      else
      {
        return middleBlockStartPos;
      }
    }
    // Unable to find a position where key can be found
    return -1;
  }
  private long getFileLength() throws ChangelogException
  {
    try
    {
      return reader.length();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RETRIEVE_FILE_LENGTH.get(file.getPath()), e);
    }
  }
  /**
   * Position before, at or after provided key, starting from provided block
   * start position and reading sequentially until key is found according to
   * matching and positioning strategies.
   *
   * @param blockStartPosition
   *          Position of read pointer in the file, expected to be the start of
   *          a block where a record offset is written
   * @param key
   *          The key to find
   * @param matchStrategy
   *          The key matching strategy
   * @param positionStrategy
   *          The positioning strategy
   * @return The pair ({@code true}, selected record) if reader is successfully
   *         positioned (selected record may be null if end of file is reached),
   *         ({@code false}, null) otherwise.
   * @throws ChangelogException
   *           If an error occurs.
   */
   Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key,
       final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException
   {
    Record<K,V> record = readRecord(blockStartPosition);
    Record<K,V> previousRecord = null;
    long previousPosition = blockStartPosition;
    boolean matchingKeyIsLowerThanAnyRecord = true;
    while (record != null)
    {
      final int keysComparison = record.getKey().compareTo(key);
      if (keysComparison <= 0)
      {
        matchingKeyIsLowerThanAnyRecord = false;
      }
      if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY)
          || (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY))
      {
        return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord,
            record, previousRecord, previousPosition);
      }
      previousRecord = record;
      previousPosition = getFilePosition();
      record = readRecord();
    }
    if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
    {
      return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition);
    }
    return Pair.of(false, null);
  }
  private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy,
      PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord,
      Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition)
          throws ChangelogException
  {
    Record<K, V> record = currentRecord;
    if (positionStrategy == AFTER_MATCHING_KEY)
    {
      if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord)
      {
        return Pair.of(false, null);
      }
      if (keysComparison == 0)
      {
        // skip matching key
        record = readRecord();
      }
    }
    else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0)
    {
      seekToPosition(previousPosition);
      return Pair.of(previousRecord != null, previousRecord);
    }
    return Pair.of(true, record);
  }
  private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy(
      final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition)
          throws ChangelogException
  {
    if (positionStrategy == ON_MATCHING_KEY)
    {
      seekToPosition(previousPosition);
      return Pair.of(previousRecord != null, previousRecord);
    }
    else
    {
      return Pair.of(true, null);
    }
  }
  /**
   * Returns the closest start of block which has a position lower than or equal
   * to the provided file position.
   *
   * @param filePosition
   *          The position of reader on file.
   * @return the file position of the block start.
   */
  long getClosestBlockStartBeforeOrAtPosition(final long filePosition)
  {
    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
    return dist == 0 ? filePosition : filePosition + dist - blockSize;
  }
  /**
   * Returns the closest start of block which has a position strictly
   * higher than the provided file position.
   *
   * @param filePosition
   *           The position of reader on file.
   * @return the file position of the block start.
   */
  long getClosestBlockStartStrictlyAfterPosition(final long filePosition)
  {
    final int dist = getDistanceToNextBlockStart(filePosition, blockSize);
    return dist == 0 ? filePosition + blockSize : filePosition + dist;
  }
  /**
   * Returns the distance to next block for the provided file position.
   *
   * @param filePosition
   *            offset from the beginning of the file, in bytes.
   * @param blockSize
   *            Size of each block in bytes.
   * @return the distance to next block in bytes
   */
  static int getDistanceToNextBlockStart(final long filePosition, final int blockSize)
  {
    if (filePosition == 0)
    {
      return blockSize;
    }
    final int distance = (int) (filePosition % blockSize);
    return distance == 0 ? 0 : blockSize - distance;
  }
  /**
   * Check if the log file is valid, by reading the latest records at the end of
   * the log file.
   * <p>
   * The intent of this method is to allow self-recovery in case a partial
   * record as been written at the end of file (eg, after a server crash).
   * <p>
   * Any unexpected exception is considered as a severe error where
   * self-recovery is not appropriate and thus will lead to a
   * ChangelogException.
   *
   * @return -1 if log is valid, or a positive number if log is invalid, where
   *         the number represents the last valid position in the log file.
   * @throws ChangelogException
   *           if an error occurs while checking the log
   */
 long checkLogIsValid() throws ChangelogException
 {
   try
   {
     final long fileSize = getFileLength();
     final long lastBlockStart = getClosestBlockStartBeforeOrAtPosition(fileSize);
     positionToRecordFromBlockStart(lastBlockStart);
     long lastValidPosition = lastBlockStart;
     for (ByteString recordData = readNextRecord(); recordData != null; recordData = readNextRecord()) {
       parser.decodeRecord(recordData);
       lastValidPosition = reader.getFilePointer();
     }
     final boolean isFileValid = lastValidPosition == fileSize;
     return isFileValid ? -1 : lastValidPosition;
   }
   catch (Exception e)
   {
     throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
         file.getPath(),
         StaticUtils.stackTraceToSingleLineString(e)));
   }
 }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/BlockLogWriter.java
New file
@@ -0,0 +1,214 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
import java.io.Closeable;
import java.io.IOException;
import java.io.SyncFailedException;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
/**
 * A log writer, using fixed-size blocks to allow fast retrieval when reading.
 * <p>
 * The log file contains record offsets at fixed block size : given block size N,
 * an offset is written at every N bytes. The offset contains the number of bytes to
 * reach the beginning of previous record (or next record if offset equals 0).
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
class BlockLogWriter<K extends Comparable<K>, V> implements Closeable
{
  private final int blockSize;
  private final RecordParser<K, V> parser;
  private final LogWriter writer;
  /**
   * Creates a writer for the provided log writer and parser.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param writer
   *          The writer on the log file.
   * @param parser
   *          The parser to encode the records.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriter(
      final LogWriter writer, final RecordParser<K, V> parser)
  {
    return new BlockLogWriter<K, V>(writer, parser, BLOCK_SIZE);
  }
  /**
   * Creates a writer for the provided log writer, parser and size for blocks.
   * <p>
   * This method is intended for tests only, to allow tuning of the block size.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param writer
   *          The writer on the log file.
   * @param parser
   *          The parser to encode the records.
   * @param blockSize
   *          The size of each block, or frequency at which the record offset is
   *          present in the log file.
   * @return a new log reader
   */
  static <K extends Comparable<K>, V> BlockLogWriter<K,V> newWriterForTests(
      final LogWriter writer, final RecordParser<K, V> parser, final int blockSize)
  {
    return new BlockLogWriter<K, V>(writer, parser, blockSize);
  }
  /**
   * Creates the writer with an underlying writer, a parser and a size for blocks.
   *
   * @param writer
   *            The writer to the log file.
   * @param parser
   *            The parser to encode the records.
   * @param blockSize
   *            The size of each block.
   */
  private BlockLogWriter(LogWriter writer, RecordParser<K, V> parser, int blockSize)
  {
    Reject.ifNull(writer, parser);
    this.writer = writer;
    this.parser = parser;
    this.blockSize = blockSize;
  }
  /**
   * Writes the provided record to the log file.
   *
   * @param record
   *            The record to write.
   * @throws ChangelogException
   *            If a problem occurs during write.
   */
  public void write(final Record<K, V> record) throws ChangelogException
  {
    try
    {
      write(parser.encodeRecord(record));
      writer.flush();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(),
          writer.getFile().getPath()), e);
    }
  }
  /**
   * Returns the number of bytes written in the log file.
   *
   * @return the number of bytes
   */
  public long getBytesWritten()
  {
    return writer.getBytesWritten();
  }
  /**
   * Synchronize all modifications to the log file to the underlying device.
   *
   * @throws SyncFailedException
   *           If synchronization fails.
   */
  public void sync() throws SyncFailedException
  {
    writer.sync();
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    writer.close();
  }
  /**
   * Writes the provided byte string to the log file.
   *
   * @param record
   *            The value to write.
   * @throws IOException
   *            If an error occurs while writing
   */
  private void write(final ByteString record) throws IOException
  {
    // Add length of record before writing
    ByteString data = new ByteStringBuilder(SIZE_OF_RECORD_SIZE + record.length()).
        append(record.length()).
        append(record).
        toByteString();
    int distanceToBlockStart = BlockLogReader.getDistanceToNextBlockStart(writer.getBytesWritten(), blockSize);
    int cumulatedDistanceToBeginning = distanceToBlockStart;
    int dataPosition = 0;
    int dataRemaining = data.length();
    final int dataSizeForOneBlock = blockSize - SIZE_OF_BLOCK_OFFSET;
    while (distanceToBlockStart < dataRemaining)
    {
      if (distanceToBlockStart > 0)
      {
        // append part of record
        final int dataEndPosition = dataPosition + distanceToBlockStart;
        writer.write(data.subSequence(dataPosition, dataEndPosition));
        dataPosition = dataEndPosition;
        dataRemaining -= distanceToBlockStart;
      }
      // append the offset to the record
      writer.write(ByteString.valueOf(cumulatedDistanceToBeginning));
      // next step
      distanceToBlockStart = dataSizeForOneBlock;
      cumulatedDistanceToBeginning += blockSize;
    }
    // append the remaining bytes to finish the record
    writer.write(data.subSequence(dataPosition, data.length()));
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/DecodingException.java
New file
@@ -0,0 +1,72 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import org.forgerock.i18n.LocalizableMessage;
import org.opends.server.replication.server.changelog.api.ChangelogException;
/**
 * Exception thrown when a record can't be decoded properly.
 */
public class DecodingException extends ChangelogException
{
  private static final long serialVersionUID = 5629692522662643737L;
  /**
   * Creates a new decoding exception with the provided information.
   *
   * @param message
   *          The message that explains the problem that occurred.
   */
  public DecodingException(LocalizableMessage message)
  {
    super(message);
  }
  /**
   * Creates a new decoding exception with the provided information.
   *
   * @param cause
   *          The underlying cause that triggered this exception.
   */
  public DecodingException(Throwable cause)
  {
    super(cause);
  }
  /**
   * Creates a new decoding exception with the provided information.
   *
   * @param message
   *          The message that explains the problem that occurred.
   * @param cause
   *          The underlying cause that triggered this exception.
   */
  public DecodingException(LocalizableMessage message, Throwable cause)
  {
    super(message, cause);
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDB.java
New file
@@ -0,0 +1,394 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequenceReader;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.*;
import static org.opends.messages.ReplicationMessages.*;
/**
 * Implementation of a ChangeNumberIndexDB with a log.
 * <p>
 * This class publishes some monitoring information below <code>
 * cn=monitor</code>.
 */
class FileChangeNumberIndexDB implements ChangeNumberIndexDB
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final int NO_KEY = 0;
  /** The parser of records stored in this ChangeNumberIndexDB. */
  static final RecordParser<Long, ChangeNumberIndexRecord> RECORD_PARSER = new ChangeNumberIndexDBParser();
  /** The log in which records are persisted. */
  private final Log<Long, ChangeNumberIndexRecord> log;
  /**
   * The newest changenumber stored in the DB. It is used to avoid purging the
   * record with the newest changenumber. The newest record in the changenumber
   * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
   * then retrieved on server startup.
   */
  private volatile long newestChangeNumber = NO_KEY;
  /**
   * The last generated value for the change number. It is kept separate from
   * the {@link #newestChangeNumber} because there is an opportunity for a race
   * condition between:
   * <ol>
   * <li>this atomic long being incremented for a new record ('recordB')</li>
   * <li>the current newest record ('recordA') being purged from the DB</li>
   * <li>'recordB' failing to be inserted in the DB</li>
   * </ol>
   */
  private final AtomicLong lastGeneratedChangeNumber;
  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /**
   * Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
   *
   * @param replicationEnv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   */
  FileChangeNumberIndexDB(ReplicationEnvironment replicationEnv) throws ChangelogException
  {
    log = replicationEnv.getOrCreateCNIndexDB();
    final ChangeNumberIndexRecord newestRecord = readLastRecord();
    newestChangeNumber = getChangeNumber(newestRecord);
    // initialization of the lastGeneratedChangeNumber from the DB content
    // if DB is empty => last record does not exist => default to 0
    lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
    // Monitoring registration
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  private ChangeNumberIndexRecord readLastRecord() throws ChangelogException
  {
    final Record<Long, ChangeNumberIndexRecord> record = log.getNewestRecord();
    return record == null ? null : record.getValue();
  }
  private ChangeNumberIndexRecord readFirstRecord() throws ChangelogException
  {
    final Record<Long, ChangeNumberIndexRecord> record = log.getOldestRecord();
    return record == null ? null : record.getValue();
  }
  private long getChangeNumber(final ChangeNumberIndexRecord record) throws ChangelogException
  {
    if (record != null)
    {
      return record.getChangeNumber();
    }
    return NO_KEY;
  }
  /** {@inheritDoc} */
  @Override
  public long addRecord(final ChangeNumberIndexRecord record) throws ChangelogException
  {
    final long changeNumber = nextChangeNumber();
    final ChangeNumberIndexRecord newRecord =
        new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
    log.append(Record.from(newRecord.getChangeNumber(), newRecord));
    newestChangeNumber = changeNumber;
    if (logger.isTraceEnabled())
    {
      logger.trace("In FileChangeNumberIndexDB.addRecord, added: " + newRecord);
    }
    return changeNumber;
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexRecord getOldestRecord() throws ChangelogException
  {
    return readFirstRecord();
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexRecord getNewestRecord() throws ChangelogException
  {
    return readLastRecord();
  }
  private long nextChangeNumber()
  {
    return lastGeneratedChangeNumber.incrementAndGet();
  }
  /** {@inheritDoc} */
  @Override
  public long getLastGeneratedChangeNumber()
  {
    return lastGeneratedChangeNumber.get();
  }
  /**
   * Get the number of changes.
   *
   * @return Returns the number of changes.
   * @throws ChangelogException
   *            If a problem occurs.
   */
  long count() throws ChangelogException
  {
    return log.getNumberOfRecords();
  }
  /**
   * Returns whether this database is empty.
   *
   * @return <code>true</code> if this database is empty, <code>false</code>
   *         otherwise
   * @throws ChangelogException
   *           if a problem occurs.
   */
  boolean isEmpty() throws ChangelogException
  {
    return getNewestRecord() == null;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<ChangeNumberIndexRecord> getCursorFrom(final long startChangeNumber) throws ChangelogException
  {
    return new FileChangeNumberIndexDBCursor(log.getCursor(startChangeNumber));
  }
  /**
   * Shutdown this DB.
   */
  void shutdown()
  {
    if (shutdown.compareAndSet(false, true))
    {
      log.close();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
  /**
   * Synchronously purges the change number index DB up to and excluding the
   * provided timestamp.
   *
   * @param purgeCSN
   *          the timestamp up to which purging must happen
   * @return the oldest non purged CSN.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  CSN purgeUpTo(final CSN purgeCSN) throws ChangelogException
  {
    if (isEmpty() || purgeCSN == null)
    {
      return null;
    }
    final Record<Long, ChangeNumberIndexRecord> record = log.purgeUpTo(purgeCSN.getTime());
    return record != null ? record.getValue().getCSN() : null;
  }
  /**
   * Implements the Monitoring capabilities of the FileChangeNumberIndexDB.
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
    /** {@inheritDoc} */
    @Override
    public List<Attribute> getMonitorData()
    {
      final List<Attribute> attributes = new ArrayList<Attribute>();
      attributes.add(createChangeNumberAttribute(true));
      attributes.add(createChangeNumberAttribute(false));
      long numberOfChanges = 0;
      try
      {
         numberOfChanges = count();
      }
      catch (ChangelogException e)
      {
        logger.traceException(e);
      }
      attributes.add(Attributes.create("count", Long.toString(numberOfChanges)));
      return attributes;
    }
    private Attribute createChangeNumberAttribute(final boolean isFirst)
    {
      final String attributeName = isFirst ? "first-draft-changenumber" : "last-draft-changenumber";
      final String changeNumber = String.valueOf(readChangeNumber(isFirst));
      return Attributes.create(attributeName, changeNumber);
    }
    private long readChangeNumber(final boolean isFirst)
    {
      try
      {
        return getChangeNumber(isFirst ? readFirstRecord() : readLastRecord());
      }
      catch (ChangelogException e)
      {
        logger.traceException(e);
        return NO_KEY;
      }
    }
    /** {@inheritDoc} */
    @Override
    public String getMonitorInstanceName()
    {
      return "ChangeNumber Index Database";
    }
    /** {@inheritDoc} */
    @Override
    public void initializeMonitorProvider(MonitorProviderCfg configuration)
                            throws ConfigException, InitializationException
    {
      // Nothing to do for now
    }
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ", newestChangeNumber=" + newestChangeNumber;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   *
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public void clear() throws ChangelogException
  {
    log.clear();
    newestChangeNumber = NO_KEY;
  }
  /** Parser of records persisted in the FileChangeNumberIndex log. */
  private static class ChangeNumberIndexDBParser implements RecordParser<Long, ChangeNumberIndexRecord>
  {
    private static final byte STRING_SEPARATOR = 0;
    @Override
    public ByteString encodeRecord(final Record<Long, ChangeNumberIndexRecord> record)
    {
      final ChangeNumberIndexRecord cnIndexRecord = record.getValue();
      return new ByteStringBuilder()
        .append(record.getKey())
        .append(cnIndexRecord.getBaseDN().toString())
        .append(STRING_SEPARATOR)
        .append(cnIndexRecord.getCSN().toByteString()).toByteString();
    }
    @Override
    public Record<Long, ChangeNumberIndexRecord> decodeRecord(final ByteString data) throws DecodingException
    {
      try
      {
        ByteSequenceReader reader = data.asReader();
        final long changeNumber = reader.getLong();
        final DN baseDN = DN.valueOf(reader.getString(getNextStringLength(reader)));
        reader.skip(1);
        final CSN csn = CSN.valueOf(reader.getByteString(reader.remaining()));
        return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, baseDN, csn));
      }
      catch (Exception e)
      {
        throw new DecodingException(e);
      }
    }
    /** Returns the length of next string by looking for the zero byte used as separator. */
    private int getNextStringLength(ByteSequenceReader reader)
    {
      int length = 0;
      while (reader.peek(length) != STRING_SEPARATOR)
      {
        length++;
      }
      return length;
    }
    /** {@inheritDoc} */
    @Override
    public Long decodeKeyFromString(String key) throws ChangelogException
    {
      try
      {
        return Long.valueOf(key);
      }
      catch (NumberFormatException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_DECODE_KEY_FROM_STRING.get(key), e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public String encodeKeyToString(Long key)
    {
      return key.toString();
    }
    /** {@inheritDoc} */
    @Override
    public Long getMaxKey()
    {
      return Long.MAX_VALUE;
    }
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBCursor.java
New file
@@ -0,0 +1,79 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
/**
 * A cursor on ChangeNumberIndexDB.
 *
 * \@NotThreadSafe
 */
class FileChangeNumberIndexDBCursor implements DBCursor<ChangeNumberIndexRecord>
{
  /** The underlying cursor. */
  private final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor;
  /**
   * Creates the cursor from provided cursor.
   *
   * @param cursor
   *          The underlying cursor to read log.
   * @throws ChangelogException
   *            If an error occurs.
   */
  FileChangeNumberIndexDBCursor(final DBCursor<Record<Long, ChangeNumberIndexRecord>> cursor)
      throws ChangelogException
  {
    this.cursor = cursor;
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexRecord getRecord()
  {
    final Record<Long, ChangeNumberIndexRecord> record = cursor.getRecord();
    return record != null ? record.getValue() : null;
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    return cursor.next();
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -21,17 +21,36 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.backends.ChangelogBackend;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
@@ -40,43 +59,25 @@
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.ReplicaCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * File-based implementation of the ChangelogDB interface.
 * Log file implementation of the ChangelogDB interface.
 */
public class FileChangelogDB implements ChangelogDB, ReplicationDomainDB
{
  /**
   * Creates the changelog DB.
   *
   * @param replicationServer
   *            replication server
   * @param cfg
   *            configuration
   */
  public FileChangelogDB(ReplicationServer replicationServer,
      ReplicationServerCfg cfg)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public ServerState getDomainOldestCSNs(DN baseDN)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public ServerState getDomainNewestCSNs(DN baseDN)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public long getDomainLatestTrimDate(DN baseDN)
@@ -84,136 +85,923 @@
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void removeDomain(DN baseDN) throws ChangelogException
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /**
   * This map contains the List of updates received from each LDAP server.
   * <p>
   * When removing a domainMap, code:
   * <ol>
   * <li>first get the domainMap</li>
   * <li>synchronized on the domainMap</li>
   * <li>remove the domainMap</li>
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a replicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final ConcurrentMap<DN, ConcurrentMap<Integer, FileReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, ConcurrentMap<Integer, FileReplicaDB>>();
  /**
   * \@GuardedBy("itself")
   */
  private final Map<DN, List<DomainDBCursor>> registeredDomainCursors =
      new HashMap<DN, List<DomainDBCursor>>();
  private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
      new CopyOnWriteArrayList<MultiDomainDBCursor>();
  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
  private ReplicationEnvironment replicationEnv;
  private final ReplicationServerCfg config;
  private final File dbDirectory;
  /**
   * The handler of the changelog database, the database stores the relation
   * between a change number and the associated cookie.
   * <p>
   * @GuardedBy("cnIndexDBLock")
   */
  private FileChangeNumberIndexDB cnIndexDB;
  private final AtomicReference<ChangeNumberIndexer> cnIndexer = new AtomicReference<ChangeNumberIndexer>();
  /** Used for protecting {@link ChangeNumberIndexDB} related state. */
  private final Object cnIndexDBLock = new Object();
  /**
   * The purge delay (in milliseconds). Records in the changelog DB that are
   * older than this delay might be removed.
   */
  private long purgeDelayInMillis;
  private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>();
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
      new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
  /**
   * Creates a new changelog DB.
   *
   * @param replicationServer
   *          the local replication server.
   * @param config
   *          the replication server configuration
   * @throws ConfigException
   *           if a problem occurs opening the supplied directory
   */
  public FileChangelogDB(final ReplicationServer replicationServer, final ReplicationServerCfg config)
      throws ConfigException
  {
    throw new RuntimeException("Not implemented");
    this.config = config;
    this.replicationServer = replicationServer;
    this.dbDirectory = makeDir(config.getReplicationDBDirectory());
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
      throws ChangelogException
  private File makeDir(final String dbDirName) throws ConfigException
  {
    throw new RuntimeException("Not implemented");
    // Check that this path exists or create it.
    final File dbDirectory = getFileForPath(dbDirName);
    try
    {
      if (!dbDirectory.exists())
      {
        dbDirectory.mkdir();
      }
      return dbDirectory;
    }
    catch (Exception e)
    {
      final LocalizableMessageBuilder mb = new LocalizableMessageBuilder(e.getLocalizedMessage()).append(" ")
          .append(String.valueOf(dbDirectory));
      throw new ConfigException(ERR_FILE_CHECK_CREATE_FAILED.get(mb.toString()), e);
    }
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
      Set<DN> excludedDomainDns) throws ChangelogException
  private Map<Integer, FileReplicaDB> getDomainMap(final DN baseDN)
  {
    throw new RuntimeException("Not implemented");
    final Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      return domainMap;
    }
    return Collections.emptyMap();
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
      KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
      throws ChangelogException
  private FileReplicaDB getReplicaDB(final DN baseDN, final int serverId)
  {
    throw new RuntimeException("Not implemented");
    return getDomainMap(baseDN).get(serverId);
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
      CSN startCSN, KeyMatchingStrategy matchingStrategy,
      PositionStrategy positionStrategy) throws ChangelogException
  /**
   * Returns a {@link FileReplicaDB}, possibly creating it.
   *
   * @param baseDN
   *          the baseDN for which to create a ReplicaDB
   * @param serverId
   *          the serverId for which to create a ReplicaDB
   * @param server
   *          the ReplicationServer
   * @return a Pair with the FileReplicaDB and a boolean indicating whether it has been created
   * @throws ChangelogException
   *           if a problem occurred with the database
   */
  Pair<FileReplicaDB, Boolean> getOrCreateReplicaDB(final DN baseDN, final int serverId,
      final ReplicationServer server) throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
    while (!shutdown.get())
    {
      final ConcurrentMap<Integer, FileReplicaDB> domainMap = getExistingOrNewDomainMap(baseDN);
      final Pair<FileReplicaDB, Boolean> result = getExistingOrNewReplicaDB(domainMap, serverId, baseDN, server);
      if (result != null)
      {
        final Boolean dbWasCreated = result.getSecond();
        if (dbWasCreated)
        { // new replicaDB => update all cursors with it
          final List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
          if (cursors != null && !cursors.isEmpty())
          {
            for (DomainDBCursor cursor : cursors)
            {
              cursor.addReplicaDB(serverId, null);
            }
          }
        }
        return result;
      }
    }
    throw new ChangelogException(ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
  }
  /** {@inheritDoc} */
  @Override
  public void unregisterCursor(DBCursor<?> cursor)
  private ConcurrentMap<Integer, FileReplicaDB> getExistingOrNewDomainMap(final DN baseDN)
  {
    throw new RuntimeException("Not implemented");
    // happy path: the domainMap already exists
    final ConcurrentMap<Integer, FileReplicaDB> currentValue = domainToReplicaDBs.get(baseDN);
    if (currentValue != null)
    {
      return currentValue;
    }
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another thread
    final ConcurrentMap<Integer, FileReplicaDB> newValue = new ConcurrentHashMap<Integer, FileReplicaDB>();
    final ConcurrentMap<Integer, FileReplicaDB> previousValue = domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
    {
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    // we just created a new domain => update all cursors
    for (MultiDomainDBCursor cursor : registeredMultiDomainCursors)
    {
      cursor.addDomain(baseDN, null);
    }
    return newValue;
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(DN baseDN, UpdateMsg updateMsg)
      throws ChangelogException
  private Pair<FileReplicaDB, Boolean> getExistingOrNewReplicaDB(final ConcurrentMap<Integer, FileReplicaDB> domainMap,
      final int serverId, final DN baseDN, final ReplicationServer server) throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
    // happy path: the replicaDB already exists
    FileReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
  /** {@inheritDoc} */
  @Override
  public void replicaHeartbeat(DN baseDN, CSN heartbeatCSN)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
    // unlucky, the replicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
      currentValue = domainMap.get(serverId);
      if (currentValue != null)
      {
        return Pair.of(currentValue, false);
      }
      if (domainToReplicaDBs.get(baseDN) != domainMap)
      {
        // The domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the replicaDB
        return null;
      }
      final FileReplicaDB newDB = new FileReplicaDB(serverId, baseDN, server, replicationEnv);
      domainMap.put(serverId, newDB);
      return Pair.of(newDB, true);
    }
  }
  /** {@inheritDoc} */
  @Override
  public void initializeDB()
  {
    throw new RuntimeException("Not implemented");
    try
    {
      final File dbDir = getFileForPath(config.getReplicationDBDirectory());
      replicationEnv = new ReplicationEnvironment(dbDir.getAbsolutePath(), replicationServer);
      final ChangelogState changelogState = replicationEnv.getChangelogState();
      initializeToChangelogState(changelogState);
      if (config.isComputeChangeNumber())
      {
        startIndexer(changelogState);
      }
      setPurgeDelay(replicationServer.getPurgeDelay());
    }
    catch (ChangelogException e)
    {
      logger.traceException(e);
      logger.error(ERR_COULD_NOT_READ_DB.get(this.dbDirectory.getAbsolutePath(), e.getLocalizedMessage()));
    }
  }
  /** {@inheritDoc} */
  @Override
  public void setPurgeDelay(long delayInMillis)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void setComputeChangeNumber(boolean computeChangeNumber)
  private void initializeToChangelogState(final ChangelogState changelogState)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
    for (Map.Entry<DN, Long> entry : changelogState.getDomainToGenerationId().entrySet())
    {
      replicationServer.getReplicationServerDomain(entry.getKey(), true).initGenerationID(entry.getValue());
    }
    for (Map.Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
    {
      for (int serverId : entry.getValue())
      {
        getOrCreateReplicaDB(entry.getKey(), serverId, replicationServer);
      }
    }
  }
  private void shutdownChangeNumberIndexDB() throws ChangelogException
  {
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        cnIndexDB.shutdown();
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public void shutdownDB() throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
    if (!this.shutdown.compareAndSet(false, true))
    { // shutdown has already been initiated
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
    if (indexer != null)
    {
      indexer.initiateShutdown();
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
    }
    // wait for shutdown of the threads holding cursors
    try
    {
      if (indexer != null)
      {
        indexer.join();
      }
      if (purger != null)
      {
        purger.join();
      }
    }
    catch (InterruptedException e)
    {
      // do nothing: we are already shutting down
    }
    // now we can safely shutdown all DBs
    try
    {
      shutdownChangeNumberIndexDB();
    }
    catch (ChangelogException e)
    {
      firstException = e;
    }
    for (Iterator<ConcurrentMap<Integer, FileReplicaDB>> it =
        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
    {
      final ConcurrentMap<Integer, FileReplicaDB> domainMap = it.next();
      synchronized (domainMap)
      {
        it.remove();
        for (FileReplicaDB replicaDB : domainMap.values())
        {
          replicaDB.shutdown();
        }
      }
    }
    if (replicationEnv != null)
    {
      replicationEnv.shutdown();
    }
    if (firstException != null)
    {
      throw firstException;
    }
  }
  /**
   * Clears all records from the changelog (does not remove the changelog itself).
   *
   * @throws ChangelogException
   *           If an error occurs when clearing the changelog.
   */
  public void clearDB() throws ChangelogException
  {
    if (!dbDirectory.exists())
    {
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    for (DN baseDN : this.domainToReplicaDBs.keySet())
    {
      removeDomain(baseDN);
    }
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB != null)
      {
        try
        {
          cnIndexDB.clear();
        }
        catch (ChangelogException e)
        {
          firstException = e;
        }
        try
        {
          shutdownChangeNumberIndexDB();
        }
        catch (ChangelogException e)
        {
          if (firstException == null)
          {
            firstException = e;
          }
          else if (logger.isTraceEnabled())
          {
            logger.traceException(e);
          }
        }
        cnIndexDB = null;
      }
    }
    if (firstException != null)
    {
      throw firstException;
    }
  }
  /** {@inheritDoc} */
  @Override
  public void removeDB() throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
    shutdownDB();
    StaticUtils.recursiveDelete(dbDirectory);
  }
  /** {@inheritDoc} */
  @Override
  public void notifyReplicaOffline(DN baseDN, CSN offlineCSN)
  public ServerState getDomainOldestCSNs(DN baseDN)
  {
    final ServerState result = new ServerState();
    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      result.update(replicaDB.getOldestCSN());
    }
    return result;
  }
  /** {@inheritDoc} */
  @Override
  public ServerState getDomainNewestCSNs(DN baseDN)
  {
    final ServerState result = new ServerState();
    for (FileReplicaDB replicaDB : getDomainMap(baseDN).values())
    {
      result.update(replicaDB.getNewestCSN());
    }
    return result;
  }
  /** {@inheritDoc} */
  @Override
  public void removeDomain(DN baseDN) throws ChangelogException
  {
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    Map<Integer, FileReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
      final ChangeNumberIndexer indexer = this.cnIndexer.get();
      if (indexer != null)
      {
        indexer.clear(baseDN);
      }
      synchronized (domainMap)
      {
        domainMap = domainToReplicaDBs.remove(baseDN);
        for (FileReplicaDB replicaDB : domainMap.values())
        {
          try
          {
            replicaDB.clear();
          }
          catch (ChangelogException e)
          {
            firstException = e;
          }
          replicaDB.shutdown();
        }
      }
    }
    // 2- clear the changelogstate DB
    try
    {
      replicationEnv.clearGenerationId(baseDN);
    }
    catch (ChangelogException e)
    {
      if (firstException == null)
      {
        firstException = e;
      }
      else if (logger.isTraceEnabled())
      {
        logger.traceException(e);
      }
    }
    if (firstException != null)
    {
      throw firstException;
    }
  }
  /** {@inheritDoc} */
  @Override
  public void setPurgeDelay(final long purgeDelayInMillis)
  {
    this.purgeDelayInMillis = purgeDelayInMillis;
    final ChangelogDBPurger purger;
    if (purgeDelayInMillis > 0)
    {
      purger = new ChangelogDBPurger();
      if (cnPurger.compareAndSet(null, purger))
      {
        purger.start();
      } // otherwise a purger was already running
    }
    else
    {
      purger = cnPurger.getAndSet(null);
      if (purger != null)
      {
        purger.initiateShutdown();
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public void setComputeChangeNumber(final boolean computeChangeNumber)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
    if (computeChangeNumber)
    {
      startIndexer(replicationEnv.getChangelogState());
    }
    else
    {
      final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
      if (indexer != null)
      {
        indexer.initiateShutdown();
      }
    }
  }
  private void startIndexer(final ChangelogState changelogState)
  {
    final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState);
    if (cnIndexer.compareAndSet(null, indexer))
    {
      indexer.start();
    }
  }
  /** {@inheritDoc} */
  @Override
  public ChangeNumberIndexDB getChangeNumberIndexDB()
  {
    throw new RuntimeException("Not implemented");
    synchronized (cnIndexDBLock)
    {
      if (cnIndexDB == null)
      {
        try
        {
          cnIndexDB = new FileChangeNumberIndexDB(replicationEnv);
        }
        catch (Exception e)
        {
          logger.traceException(e);
          logger.error(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
        }
      }
      return cnIndexDB;
    }
  }
  /** {@inheritDoc} */
  @Override
  public ReplicationDomainDB getReplicationDomainDB()
  {
    throw new RuntimeException("Not implemented");
    return this;
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final Set<DN> excludedDomainDns = Collections.emptySet();
    return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
      final Set<DN> excludedDomainDns)
          throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      if (!excludedDomainDns.contains(baseDN))
      {
        cursor.addDomain(baseDN, startState.getServerState(baseDN));
      }
    }
    return cursor;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy);
    for (int serverId : getDomainMap(baseDN).keySet())
    {
      // get the last already sent CSN from that server to get a cursor
      final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
      cursor.addReplicaDB(serverId, lastCSN);
    }
    return cursor;
  }
  private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy)
  {
    synchronized (registeredDomainCursors)
    {
      final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
      List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
      if (cursors == null)
      {
        cursors = new ArrayList<DomainDBCursor>();
        registeredDomainCursors.put(baseDN, cursors);
      }
      cursors.add(cursor);
      return cursor;
    }
  }
  private CSN getOfflineCSN(DN baseDN, int serverId, CSN startAfterCSN)
  {
    final MultiDomainServerState offlineReplicas =
        replicationEnv.getChangelogState().getOfflineReplicas();
    final CSN offlineCSN = offlineReplicas.getCSN(baseDN, serverId);
    if (offlineCSN != null
        && (startAfterCSN == null || startAfterCSN.isOlderThan(offlineCSN)))
    {
      return offlineCSN;
    }
    return null;
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
      final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
  {
    final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
    if (replicaDB != null)
    {
      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
      final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
      synchronized (replicaCursors)
      {
        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
        if (cursors == null)
        {
          cursors = new ArrayList<ReplicaCursor>();
          replicaCursors.put(replicaID, cursors);
        }
        cursors.add(replicaCursor);
      }
      return replicaCursor;
    }
    return EMPTY_CURSOR_REPLICA_DB;
  }
  /** {@inheritDoc} */
  @Override
  public void unregisterCursor(final DBCursor<?> cursor)
  {
    if (cursor instanceof MultiDomainDBCursor)
    {
      registeredMultiDomainCursors.remove(cursor);
    }
    else if (cursor instanceof DomainDBCursor)
    {
      final DomainDBCursor domainCursor = (DomainDBCursor) cursor;
      synchronized (registeredMultiDomainCursors)
      {
        final List<DomainDBCursor> cursors = registeredDomainCursors.get(domainCursor.getBaseDN());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
      }
    }
    else if (cursor instanceof ReplicaCursor)
    {
      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
      synchronized (replicaCursors)
      {
        final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
        if (cursors != null)
        {
          cursors.remove(cursor);
        }
      }
    }
  }
  /** {@inheritDoc} */
  @Override
  public boolean publishUpdateMsg(final DN baseDN, final UpdateMsg updateMsg) throws ChangelogException
  {
    final CSN csn = updateMsg.getCSN();
    final Pair<FileReplicaDB, Boolean> pair = getOrCreateReplicaDB(baseDN,
        csn.getServerId(), replicationServer);
    final FileReplicaDB replicaDB = pair.getFirst();
    replicaDB.add(updateMsg);
    ChangelogBackend.getInstance().notifyCookieEntryAdded(baseDN, updateMsg);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, csn.getServerId());
      indexer.publishUpdateMsg(baseDN, updateMsg);
    }
    return pair.getSecond(); // replica DB was created
  }
  /** {@inheritDoc} */
  @Override
  public void replicaHeartbeat(final DN baseDN, final CSN heartbeatCSN) throws ChangelogException
  {
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      notifyReplicaOnline(indexer, baseDN, heartbeatCSN.getServerId());
      indexer.publishHeartbeat(baseDN, heartbeatCSN);
    }
  }
  private void notifyReplicaOnline(final ChangeNumberIndexer indexer, final DN baseDN, final int serverId)
      throws ChangelogException
  {
    if (indexer.isReplicaOffline(baseDN, serverId))
    {
      replicationEnv.notifyReplicaOnline(baseDN, serverId);
    }
    updateCursorsWithOfflineCSN(baseDN, serverId, null);
  }
  /** {@inheritDoc} */
  @Override
  public void notifyReplicaOffline(final DN baseDN, final CSN offlineCSN) throws ChangelogException
  {
    replicationEnv.notifyReplicaOffline(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
  }
  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
  {
    synchronized (replicaCursors)
    {
      final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
      if (cursors != null)
      {
        for (ReplicaCursor cursor : cursors)
        {
          cursor.setOfflineCSN(offlineCSN);
        }
      }
    }
  }
  /**
   * Clear the database.
   * The thread purging the changelogDB on a regular interval. Records are
   * purged from the changelogDB if they are older than a delay specified in
   * seconds. The purge process works in two steps:
   * <ol>
   * <li>first purge the changeNumberIndexDB and retrieve information to drive
   * replicaDBs purging</li>
   * <li>proceed to purge each replicaDBs based on the information collected
   * when purging the changeNumberIndexDB</li>
   * </ol>
   */
  public void clearDB()
  private final class ChangelogDBPurger extends DirectoryThread
  {
    throw new RuntimeException("Not implemented");
  }
    private static final int DEFAULT_SLEEP = 500;
    protected ChangelogDBPurger()
    {
      super("changelog DB purger");
    }
    /** {@inheritDoc} */
    @Override
    public void run()
    {
      // initialize CNIndexDB
      getChangeNumberIndexDB();
      while (!isShutdownInitiated())
      {
        try
        {
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
          final CSN oldestNotPurgedCSN;
          // next code assumes that the compute-change-number config
          // never changes during the life time of an RS
          if (!config.isComputeChangeNumber())
          {
            oldestNotPurgedCSN = purgeCSN;
          }
          else
          {
            final FileChangeNumberIndexDB localCNIndexDB = cnIndexDB;
            if (localCNIndexDB == null)
            { // shutdown has been initiated
              return;
            }
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              // ... or the change number index DB is empty,
              // wait for new changes to come in.
              // Note we cannot sleep for as long as the purge delay
              // (3 days default), because we might receive late updates
              // that will have to be purged before the purge delay elapses.
              // This can particularly happen in case of network partitions.
              jeFriendlySleep(DEFAULT_SLEEP);
              continue;
            }
          }
          for (final Map<Integer, FileReplicaDB> domainMap : domainToReplicaDBs.values())
          {
            for (final FileReplicaDB replicaDB : domainMap.values())
            {
              replicaDB.purgeUpTo(oldestNotPurgedCSN);
            }
          }
          jeFriendlySleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
          // shutdown initiated?
        }
        catch (Exception e)
        {
          logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH.get(stackTraceToSingleLineString(e)));
          if (replicationServer != null)
          {
            replicationServer.shutdown();
          }
        }
      }
    }
    /**
     * This method implements a sleep() that is friendly to Berkeley JE.
     * <p>
     * Originally, {@link Thread#sleep(long)} was used , but waking up a
     * sleeping threads required calling {@link Thread#interrupt()}, and JE
     * threw exceptions when invoked on interrupted threads.
     * <p>
     * The solution is to replace:
     * <ol>
     * <li> {@link Thread#sleep()} with {@link Object#wait(long)}</li>
     * <li> {@link Thread#interrupt()} with {@link Object#notify()}</li>
     * </ol>
     */
    private void jeFriendlySleep(long millis) throws InterruptedException
    {
      if (!isShutdownInitiated())
      {
        synchronized (this)
        {
          if (!isShutdownInitiated())
          {
            wait(millis);
          }
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
      if (currentPurgeTime <= nextPurgeTime)
      {
        // sleep till the next CSN to purge,
        return nextPurgeTime - currentPurgeTime;
      }
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      super.initiateShutdown();
      synchronized (this)
      {
        notify(); // wake up the purger thread for faster shutdown
      }
    }
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
New file
@@ -0,0 +1,409 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
/**
 * Represents a replication server database for one server in the topology.
 * <p>
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * <p>
 * It is also able to generate a {@link DBCursor} that can be used to
 * read all changes from a given {@link CSN}.
 * <p>
 * It publishes some monitoring information below cn=monitor.
 */
class FileReplicaDB
{
  /** The parser of records stored in Replica DB. */
  static final RecordParser<CSN, UpdateMsg> RECORD_PARSER = new ReplicaDBParser();
  /**
   * Class that allows atomically setting oldest and newest CSNs without
   * synchronization.
   *
   * @Immutable
   */
  private static final class CSNLimits
  {
    private final CSN oldestCSN;
    private final CSN newestCSN;
    public CSNLimits(CSN oldestCSN, CSN newestCSN)
    {
      this.oldestCSN = oldestCSN;
      this.newestCSN = newestCSN;
    }
  }
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  /** The log in which records are persisted. */
  private final Log<CSN, UpdateMsg> log;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
   *
   * @NonNull
   */
  private volatile CSNLimits csnLimits;
  private final int serverId;
  private final DN baseDN;
  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final ReplicationServer replicationServer;
  private final ReplicationEnvironment replicationEnv;
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
   * @param serverId
   *          Id of this server.
   * @param baseDN
   *          the replication domain baseDN.
   * @param replicationServer
   *          The ReplicationServer that creates this ReplicaDB.
   * @param replicationEnv
   *          the Database Env to use to create the ReplicationServer DB. server
   *          for this domain.
   * @throws ChangelogException
   *           If a database problem happened
   */
  FileReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
      final ReplicationEnvironment replicationEnv) throws ChangelogException
  {
    this.serverId = serverId;
    this.baseDN = baseDN;
    this.replicationServer = replicationServer;
    this.replicationEnv = replicationEnv;
    this.log = createLog(replicationEnv);
    this.csnLimits = new CSNLimits(readOldestCSN(), readNewestCSN());
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  private CSN readOldestCSN() throws ChangelogException
  {
    final Record<CSN, UpdateMsg> record = log.getOldestRecord();
    return record == null ? null : record.getKey();
  }
  private CSN readNewestCSN() throws ChangelogException
  {
    final Record<CSN, UpdateMsg> record = log.getNewestRecord();
    return record == null ? null : record.getKey();
  }
  private Log<CSN, UpdateMsg> createLog(final ReplicationEnvironment replicationEnv) throws ChangelogException
  {
    final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN, true);
    return replicationEnv.getOrCreateReplicaDB(baseDN, serverId, domain.getGenerationId());
  }
  /**
   * Adds a new message.
   *
   * @param updateMsg
   *          The update message to add.
   * @throws ChangelogException
   *           If an error occurs when trying to add the message.
   */
  void add(final UpdateMsg updateMsg) throws ChangelogException
  {
    if (shutdown.get())
    {
      throw new ChangelogException(
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
    log.append(Record.from(updateMsg.getCSN(), updateMsg));
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
    if (updateOld || updateNew)
    {
      csnLimits = new CSNLimits(
          updateOld ? updateMsg.getCSN() : limits.oldestCSN,
          updateNew ? updateMsg.getCSN() : limits.newestCSN);
    }
  }
  /**
   * Get the oldest CSN that has not been purged yet.
   *
   * @return the oldest CSN that has not been purged yet.
   */
  CSN getOldestCSN()
  {
    return csnLimits.oldestCSN;
  }
  /**
   * Get the newest CSN that has not been purged yet.
   *
   * @return the newest CSN that has not been purged yet.
   */
  CSN getNewestCSN()
  {
    return csnLimits.newestCSN;
  }
  /**
   * Returns a cursor that allows to retrieve the update messages from this DB.
   * The actual starting position is defined by the provided CSN, the key
   * matching strategy and the positioning strategy.
   *
   * @param startCSN
   *          The position where the cursor must start. If null, start from the
   *          oldest CSN
   * @param matchingStrategy
   *          Cursor key matching strategy
   * @param positionStrategy
   *          Cursor position strategy
   * @return a new {@link DBCursor} to retrieve update messages.
   * @throws ChangelogException
   *           if a database problem happened
   */
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy);
    return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
  }
  /**
   * Shutdown this ReplicaDB.
   */
  void shutdown()
  {
    if (shutdown.compareAndSet(false, true))
    {
      log.close();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
  /**
   * Synchronously purge changes older than purgeCSN from this replicaDB.
   *
   * @param purgeCSN
   *          The CSN up to which changes can be purged. No purging happens when
   *          it is {@code null}.
   * @throws ChangelogException
   *           In case of database problem.
   */
  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
  {
    if (purgeCSN == null)
    {
      return;
    }
    final Record<CSN, UpdateMsg> oldestRecord = log.purgeUpTo(purgeCSN);
    if (oldestRecord != null)
    {
      csnLimits = new CSNLimits(oldestRecord.getKey(), csnLimits.newestCSN);
    }
  }
  /**
   * Implements monitoring capabilities of the ReplicaDB.
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
    /** {@inheritDoc} */
    @Override
    public List<Attribute> getMonitorData()
    {
      final List<Attribute> attributes = new ArrayList<Attribute>();
      create(attributes, "replicationServer-database",String.valueOf(serverId));
      create(attributes, "domain-name", baseDN.toNormalizedString());
      final CSNLimits limits = csnLimits;
      if (limits.oldestCSN != null)
      {
        create(attributes, "first-change", encode(limits.oldestCSN));
      }
      if (limits.newestCSN != null)
      {
        create(attributes, "last-change", encode(limits.newestCSN));
      }
      return attributes;
    }
    private void create(final List<Attribute> attributes, final String name, final String value)
    {
      attributes.add(Attributes.create(name, value));
    }
    private String encode(final CSN csn)
    {
      return csn + " " + new Date(csn.getTime());
    }
    /** {@inheritDoc} */
    @Override
    public String getMonitorInstanceName()
    {
      ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
      return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
    }
    /** {@inheritDoc} */
    @Override
    public void initializeMonitorProvider(MonitorProviderCfg configuration)
        throws ConfigException, InitializationException
    {
      // Nothing to do for now
    }
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    final CSNLimits limits = csnLimits;
    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
        + limits.oldestCSN + " " + limits.newestCSN;
  }
  /**
   * Clear the changes from this DB, from both memory cache and persistent
   * storage.
   *
   * @throws ChangelogException
   *           If an error occurs while removing the changes from the DB.
   */
  void clear() throws ChangelogException
  {
    // Remove all persisted data and reset generationId to default value
    log.clear();
    replicationEnv.resetGenerationId(baseDN);
    csnLimits = new CSNLimits(null, null);
  }
  /**
   * Return the number of records of this replicaDB.
   * <p>
   * For test purpose.
   *
   * @return The number of records of this replicaDB.
   * @throws ChangelogException
   *            If an error occurs
   */
  long getNumberRecords() throws ChangelogException
  {
    return log.getNumberOfRecords();
  }
  /**
   * Dump this DB as text files, intended for debugging purpose only.
   *
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  void dumpAsTextFiles() throws ChangelogException {
    log.dumpAsTextFile(log.getPath());
  }
  /** Parser of records persisted in the ReplicaDB log. */
  private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
  {
    @Override
    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
    {
      final UpdateMsg message = record.getValue();
      return ByteString.wrap(message.getBytes());
    }
    @Override
    public Record<CSN, UpdateMsg> decodeRecord(final ByteString data) throws DecodingException
    {
      try
      {
        final UpdateMsg msg =
            (UpdateMsg) UpdateMsg.generateMsg(data.toByteArray(), ProtocolVersion.REPLICATION_PROTOCOL_V7);
        return Record.from(msg.getCSN(), msg);
      }
      catch (Exception e)
      {
        throw new DecodingException(e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public CSN decodeKeyFromString(String key) throws ChangelogException
    {
      return new CSN(key);
    }
    /** {@inheritDoc} */
    @Override
    public String encodeKeyToString(CSN key)
    {
      return key.toString();
    }
    /** {@inheritDoc} */
    @Override
    public CSN getMaxKey()
    {
      return CSN.MAX_CSN_VALUE;
    }
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
New file
@@ -0,0 +1,144 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
/**
 * A cursor on ReplicaDB, which can re-initialize itself after exhaustion.
 * <p>
 * The cursor provides a java.sql.ResultSet like API :
 * <pre>
 *  FileReplicaDBCursor cursor = ...;
 *  try {
 *    while (cursor.next()) {
 *      Record record = cursor.getRecord();
 *      // ... can call cursor.getRecord() again: it will return the same result
 *    }
 *  }
 *  finally {
 *    close(cursor);
 *  }
 * }
 * </pre>
 * <p>
 * The cursor automatically re-initializes itself if it is exhausted: if a
 * record is newly available, a subsequent call to the {@code next()} method will
 * return {@code true} and the record will be available by calling {@code getRecord()}
 * method.
 *
 * \@NotThreadSafe
 */
class FileReplicaDBCursor implements DBCursor<UpdateMsg>
{
  /** The underlying cursor. */
  private final RepositionableCursor<CSN, UpdateMsg> cursor;
  /** The next record to return. */
  private Record<CSN, UpdateMsg> nextRecord;
  /**  The CSN to re-start with in case the cursor is exhausted. */
  private CSN lastNonNullCurrentCSN;
  private PositionStrategy positionStrategy;
  /**
   * Creates the cursor from provided log cursor and start CSN.
   *
   * @param cursor
   *          The underlying log cursor to read log.
   * @param startCSN
   *          The CSN to use as a start point (excluded from cursor, the lowest
   *          CSN higher than this CSN is used as the real start point).
   * @param positionStrategy
   *          Cursor position strategy, which allow to choose if cursor must
   *          start from the provided CSN or just after the provided CSN.
   */
  FileReplicaDBCursor(
      final RepositionableCursor<CSN, UpdateMsg> cursor,
      final CSN startCSN,
      final PositionStrategy positionStrategy) {
    this.cursor = cursor;
    this.lastNonNullCurrentCSN = startCSN;
    this.positionStrategy = positionStrategy;
  }
  /** {@inheritDoc} */
  @Override
  public UpdateMsg getRecord()
  {
    return nextRecord == null ? null : nextRecord.getValue();
  }
  /** {@inheritDoc} */
  @Override
  public boolean next() throws ChangelogException
  {
    if (cursor.next())
    {
      nextRecord = cursor.getRecord();
      final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN);
      if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY))
      {
        // start CSN is found, switch to position strategy that always find the next
        lastNonNullCurrentCSN = nextRecord.getKey();
        positionStrategy = AFTER_MATCHING_KEY;
        return true;
      }
    }
    // either cursor is exhausted or we still have not reached the start CSN
    return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
  }
  /** Re-initialize the cursor after the last non null CSN. */
  private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
  {
    final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
    if (found && cursor.next())
    {
      nextRecord = cursor.getRecord();
      lastNonNullCurrentCSN = nextRecord.getKey();
      return true;
    }
    nextRecord = null;
    return false;
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    cursor.close();
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Log.java
New file
@@ -0,0 +1,1187 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * A multi-file log that features appending key-value records and reading them
 * using a {@code DBCursor}.
 * The records must be appended to the log in ascending order of the keys.
 * <p>
 * A log is defined for a path - the log path - and contains one to many log files:
 * <ul>
 * <li>it has always at least one log file, the head log file, named "head.log",
 * which is used to append the records.</li>
 * <li>it may have from zero to many read-only log files, which are named after
 * the pattern '[lowkey]_[highkey}.log' where [lowkey] and [highkey] are respectively
 * the string representation of lowest and highest key present in the log file.</li>
 * </ul>
 * A read-only log file is created each time the head log file has reached the
 * maximum size limit. The head log file is then rotated to the read-only file and a
 * new empty head log file is opened. There is no limit on the number of read-only
 * files, but they can be purged.
 * <p>
 * A log is obtained using the {@code Log.openLog()} method and must always be
 * released using the {@code close()} method.
 * <p>
 * Usage example:
 * <pre>
 * {@code
 *   Log<K, V> log = null;
 *   try
 *   {
 *     log = Log.openLog(logPath, parser, maxFileSize);
 *     log.append(key, value);
 *     DBCursor<K, V> cursor = log.getCursor(someKey);
 *     // use cursor, then close cursor
 *   }
 *   finally
 *   {
 *     log.close();
 *   }
 * }
 * </pre>
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
final class Log<K extends Comparable<K>, V> implements Closeable
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final String LOG_FILE_SUFFIX = ".log";
  static final String HEAD_LOG_FILE_NAME = "head" + LOG_FILE_SUFFIX;
  private static final String LOG_FILE_NAME_SEPARATOR = "_";
  private static final FileFilter READ_ONLY_LOG_FILES_FILTER = new FileFilter()
  {
    @Override
    public boolean accept(File file)
    {
      return file.isFile() && file.getName().endsWith(LOG_FILE_SUFFIX) &&
          !file.getName().equals(HEAD_LOG_FILE_NAME);
    }
  };
  /** Map that holds the unique log instance for each log path. */
  private static final Map<File, Log<?, ?>> logsCache = new HashMap<File, Log<?, ?>>();
  /**
   * The number of references on this log instance. It is incremented each time
   * a log is opened on the same log path. The log is effectively closed only
   * when the {@code close()} method is called and this value is at most 1.
   */
  private int referenceCount;
  /** The path of directory for this log, where log files are stored. */
  private final File logPath;
  /** The parser used for encoding/decoding of records. */
  private final RecordParser<K, V> recordParser;
  /**
   * Indicates if this log is closed. When the log is closed, all methods return
   * immediately with no effect.
   */
  private boolean isClosed;
  /**
   * The log files contained in this log, ordered by key.
   * <p>
   * The head log file is always present and is associated with the maximum
   * possible key, given by the record parser.
   * <p>
   * The read-only log files are associated with the highest key they contain.
   */
  private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<K, LogFile<K, V>>();
  /**
   * The last key appended to the log. In order to keep the ordering of the keys
   * in the log, any attempt to append a record with a key lower or equal to
   * this key is rejected (no error but an event is logged).
   */
  private K lastAppendedKey;
  /**
   * The list of non-empty cursors opened on this log. Opened cursors may have
   * to be updated when rotating the head log file.
   */
  private final List<LogCursor<K, V>> openCursors = new CopyOnWriteArrayList<LogCursor<K, V>>();
  /**
   * A log file is rotated once it has exceeded this size limit. The log file can have
   * a size much larger than this limit if the last record written has a huge size.
   *
   * TODO : to be replaced later by a (or a list of) configurable Rotation policy
   * eg, List<RotationPolicy> rotationPolicies = new ArrayList<RotationPolicy>();
   */
  private final long sizeLimitPerLogFileInBytes;
  /**
   * The exclusive lock used for writes and lifecycle operations on this log:
   * initialize, clear, sync and close.
   */
  private final Lock exclusiveLock;
  /**
   * The shared lock used for reads and cursor operations on this log.
   */
  private final Lock sharedLock;
  /**
   * Open a log with the provided log path, record parser and maximum size per
   * log file.
   * <p>
   * If no log exists for the provided path, a new one is created.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param logPath
   *          Path of the log.
   * @param parser
   *          Parser for encoding/decoding of records.
   * @param sizeLimitPerFileInBytes
   *          Limit in bytes before rotating the head log file of the log.
   * @return a log
   * @throws ChangelogException
   *           If a problem occurs during initialization.
   */
  static synchronized <K extends Comparable<K>, V> Log<K, V> openLog(final File logPath,
      final RecordParser<K, V> parser, final long sizeLimitPerFileInBytes) throws ChangelogException
  {
    Reject.ifNull(logPath, parser);
    @SuppressWarnings("unchecked")
    Log<K, V> log = (Log<K, V>) logsCache.get(logPath);
    if (log == null)
    {
      log = new Log<K, V>(logPath, parser, sizeLimitPerFileInBytes);
      logsCache.put(logPath, log);
    }
    else
    {
      // TODO : check that parser and size limit are compatible with the existing one,
      // and issue a warning if it is not the case
      log.referenceCount++;
    }
    return log;
  }
  /**
   * Release a reference to the log corresponding to provided path. The log is
   * closed if this is the last reference.
   */
  private static synchronized void releaseLog(final File logPath)
  {
    Log<?, ?> log = logsCache.get(logPath);
    if (log == null)
    {
      // this should never happen
      logger.error(ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
      return;
    }
    if (log.referenceCount > 1)
    {
      log.referenceCount--;
    }
    else
    {
      log.doClose();
      logsCache.remove(logPath);
    }
  }
  /**
   * Creates a new log.
   *
   * @param logPath
   *            The directory path of the log.
   * @param parser
   *          Parser of records.
   * @param sizeLimitPerFile
   *            Limit in bytes before rotating a log file.
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private Log(final File logPath, final RecordParser<K, V> parser, final long sizeLimitPerFile)
      throws ChangelogException
  {
    this.logPath = logPath;
    this.recordParser = parser;
    this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
    this.referenceCount = 1;
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
    this.exclusiveLock = lock.writeLock();
    this.sharedLock = lock.readLock();
    createOrOpenLogFiles();
  }
  /** Create or open log files used by this log. */
  private void createOrOpenLogFiles() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      createRootDirIfNotExists();
      openHeadLogFile();
      for (final File file : getReadOnlyLogFiles())
      {
        openReadOnlyLogFile(file);
      }
      isClosed = false;
    }
    catch (ChangelogException e)
    {
      // ensure all log files opened at this point are closed
      close();
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  private File[] getReadOnlyLogFiles() throws ChangelogException
  {
    File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
    if (files == null)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
    }
    return files;
  }
  private void createRootDirIfNotExists() throws ChangelogException
  {
    if (!logPath.exists())
    {
      if (!logPath.mkdirs())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
      }
    }
  }
  /**
   * Returns the path of this log.
   *
   * @return the path of this log directory
   */
  public File getPath()
  {
    return logPath;
  }
  /**
   * Add the provided record at the end of this log.
   * <p>
   * The record must have a key strictly higher than the key
   * of the last record added. If it is not the case, the record is not
   * appended and the method returns immediately.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   *
   * @param record
   *          The record to add.
   * @throws ChangelogException
   *           If an error occurs while adding the record to the log.
   */
  public void append(final Record<K, V> record) throws ChangelogException
  {
    // If this exclusive lock happens to be a bottleneck :
    // 1. use a shared lock for appending the record first
    // 2. switch to an exclusive lock only if rotation is needed
    // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (recordIsBreakingKeyOrdering(record))
      {
        logger.info(LocalizableMessage.raw(
            "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
            lastAppendedKey != null ? lastAppendedKey : "null"));
        return;
      }
      LogFile<K, V> headLogFile = getHeadLogFile();
      if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
      {
        logger.error(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
        rotateHeadLogFile();
        headLogFile = getHeadLogFile();
      }
      headLogFile.append(record);
      lastAppendedKey = record.getKey();
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Indicates if the provided record has a key that would break the key
   * ordering in the log.
   */
  private boolean recordIsBreakingKeyOrdering(final Record<K, V> record)
  {
    return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
  }
  /**
   * Synchronize all records added with the file system, ensuring that records
   * are effectively persisted.
   * <p>
   * After a successful call to this method, it is guaranteed that all records
   * added to the log are persisted to the file system.
   *
   * @throws ChangelogException
   *           If the synchronization fails.
   */
  public void syncToFileSystem() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      getHeadLogFile().syncToFileSystem();
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getCursor() throws ChangelogException
  {
    LogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      cursor.positionTo(null, null, null);
      registerCursor(cursor);
      return cursor;
    }
    catch (ChangelogException e)
    {
      StaticUtils.close(cursor);
      throw e;
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the position defined by the provided key.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
   *          {@code null}, cursor will point at the first record of the log.
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
  {
    return getCursor(key, EQUAL_TO_KEY, ON_MATCHING_KEY);
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log. The
   * starting position is defined by the provided key, cursor matching strategy
   * and cursor positioning strategy.
   *
   * @param key
   *          Key to use as a start position for the cursor. If key is
   *          {@code null}, cursor will point at the first record of the log.
   * @param matchingStrategy
   *          Cursor key matching strategy.
   * @param positionStrategy
   *          The cursor positioning strategy.
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    if (key == null)
    {
      return getCursor();
    }
    LogCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogCursor<K, V>(this);
      final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
      // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy
      if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
      {
        registerCursor(cursor);
        return cursor;
      }
      else
      {
        StaticUtils.close(cursor);
        return new EmptyLogCursor<K, V>();
      }
    }
    catch (ChangelogException e)
    {
      StaticUtils.close(cursor);
      throw e;
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
   * Returns the oldest (first) record from this log.
   *
   * @return the oldest record, which may be {@code null} if there is no record
   *         in the log.
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getOldestRecord() throws ChangelogException
  {
    return getOldestLogFile().getOldestRecord();
  }
  /**
   * Returns the newest (last) record from this log.
   *
   * @return the newest record, which may be {@code null}
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getNewestRecord() throws ChangelogException
  {
    return getHeadLogFile().getNewestRecord();
  }
  /**
   * Returns the number of records in the log.
   *
   * @return the number of records
   * @throws ChangelogException
   *            If a problem occurs.
   */
  public long getNumberOfRecords() throws ChangelogException
  {
    long count = 0;
    for (final LogFile<K, V> logFile : logFiles.values())
    {
      count += logFile.getNumberOfRecords();
    }
    return count;
  }
  /**
   * Purge the log up to and excluding the provided key.
   *
   * @param purgeKey
   *            the key up to which purging must happen
   * @return the oldest non purged record, or {@code null}
   *         if no record was purged
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public Record<K,V> purgeUpTo(final K purgeKey) throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return null;
      }
      final SortedMap<K, LogFile<K, V>> logFilesToPurge = logFiles.headMap(purgeKey);
      if (logFilesToPurge.isEmpty())
      {
        return null;
      }
      final List<String> undeletableFiles = new ArrayList<String>();
      final Iterator<Entry<K, LogFile<K, V>>> entriesToPurge = logFilesToPurge.entrySet().iterator();
      while (entriesToPurge.hasNext())
      {
        final LogFile<K, V> logFile = entriesToPurge.next().getValue();
        try
        {
          logFile.close();
          logFile.delete();
          entriesToPurge.remove();
        }
        catch (ChangelogException e)
        {
          // The deletion of log file on file system has failed
          undeletableFiles.add(logFile.getFile().getPath());
        }
      }
      if (!undeletableFiles.isEmpty())
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
                Utils.joinAsString(", ", undeletableFiles)));
      }
      return getOldestRecord();
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Empties the log, discarding all records it contains.
   *
   * @throws ChangelogException
   *           If cursors are opened on this log, or if a problem occurs during
   *           clearing operation.
   */
  public void clear() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (!openCursors.isEmpty())
      {
        // Allow opened cursors at this point, but turn them into empty cursors.
        // This behavior is needed by the change number indexer thread.
        switchCursorsOpenedIntoEmptyCursors();
      }
      // delete all log files
      final List<String> undeletableFiles = new ArrayList<String>();
      for (LogFile<K, V> logFile : logFiles.values())
      {
        try
        {
          logFile.close();
          logFile.delete();
        }
        catch (ChangelogException e)
        {
          undeletableFiles.add(logFile.getFile().getPath());
        }
      }
      if (!undeletableFiles.isEmpty())
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
            Utils.joinAsString(", ", undeletableFiles)));
      }
      logFiles.clear();
      // recreate an empty head log file
      openHeadLogFile();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
   * Dump this log as a text files, intended for debugging purpose only.
   *
   * @param dumpDirectory
   *          Directory that will contains log files with text format
   *          and ".txt" extensions
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  void dumpAsTextFile(File dumpDirectory) throws ChangelogException
  {
    for (LogFile<K, V> logFile : logFiles.values())
    {
      logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
    }
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    releaseLog(logPath);
  }
  /** Effectively close this log. */
  private void doClose()
  {
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      if (!openCursors.isEmpty())
      {
        logger.error(ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
      }
      StaticUtils.close(logFiles.values());
      isClosed = true;
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  private LogFile<K, V> getHeadLogFile()
  {
    return logFiles.lastEntry().getValue();
  }
  private LogFile<K, V> getOldestLogFile()
  {
    return logFiles.firstEntry().getValue();
  }
  /**
   * Rotate the head log file to a read-only log file, and open a new empty head
   * log file to write in.
   * <p>
   * All cursors opened on this log are temporarily disabled (closing underlying resources)
   * and then re-open with their previous state.
   */
  private void rotateHeadLogFile() throws ChangelogException
  {
    // Temporarily disable cursors opened on head, saving their state
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead();
    final LogFile<K, V> headLogFile = getHeadLogFile();
    final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
    headLogFile.close();
    renameHeadLogFileTo(readOnlyLogFile);
    openHeadLogFile();
    openReadOnlyLogFile(readOnlyLogFile);
    // Re-enable cursors previously opened on head, with the saved state
    updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
  }
  private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
  {
    final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
    try
    {
      StaticUtils.renameFile(headLogFile, rotatedLogFile);
    }
    catch (IOException e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
    }
  }
  /**
   * Returns the key bounds for the provided log file.
   *
   * @return the pair of (lowest key, highest key) that correspond to records
   *         stored in the corresponding log file.
   * @throws ChangelogException
   *            if an error occurs while retrieving the keys
   */
   private Pair<K, K> getKeyBounds(final LogFile<K, V> logFile) throws ChangelogException
   {
     try
     {
       final String name = logFile.getFile().getName();
       final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
           .split(LOG_FILE_NAME_SEPARATOR);
       return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
     }
     catch (Exception e)
     {
       throw new ChangelogException(
           ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
     }
   }
   /**
    * Returns the file name to use for the read-only version of the provided
    * log file.
    * <p>
    * The file name is based on the lowest and highest key in the log file.
    *
    * @return the name to use for the read-only version of the log file
    * @throws ChangelogException
    *            If an error occurs.
    */
  private String generateReadOnlyFileName(final LogFile<K,V> logFile) throws ChangelogException
  {
    final K lowestKey = logFile.getOldestRecord().getKey();
    final K highestKey = logFile.getNewestRecord().getKey();
    return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
       + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
  }
  /** Update the cursors that were pointing to head after a rotation of the head log file. */
  private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors)
      throws ChangelogException
  {
    for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors)
    {
      final CursorState<K, V> cursorState = pair.getSecond();
      // Need to update the cursor only if it is pointing to the head log file
      if (isHeadLogFile(cursorState.logFile))
      {
        final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
        final LogFile<K, V> logFile = findLogFileFor(previousKey);
        final LogCursor<K, V> cursor = pair.getFirst();
        cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record));
      }
    }
  }
  private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
  {
    for (LogCursor<K, V> cursor : openCursors)
    {
      cursor.actAsEmptyCursor();
    }
    openCursors.clear();
  }
  /**
   * Disable the cursors opened on the head log file log, by closing their underlying cursor.
   * Returns the state of each cursor just before the close operation.
   *
   * @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
   * @throws ChangelogException
   *           If an error occurs.
   */
  private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException
  {
    final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates =
        new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>();
    for (LogCursor<K, V> cursor : openCursors)
    {
      if (isHeadLogFile(cursor.currentLogFile))
      {
        openCursorsStates.add(Pair.of(cursor, cursor.getState()));
        cursor.closeUnderlyingCursor();
      }
    }
    return openCursorsStates;
  }
  private void openHeadLogFile() throws ChangelogException
  {
    final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath,  HEAD_LOG_FILE_NAME), recordParser);
    final Record<K,V> newestRecord = head.getNewestRecord();
    lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
    logFiles.put(recordParser.getMaxKey(), head);
  }
  private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
  {
    final LogFile<K, V> logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
    final Pair<K, K> bounds = getKeyBounds(logFile);
    logFiles.put(bounds.getSecond(), logFile);
  }
  private void registerCursor(final LogCursor<K, V> cursor)
  {
    openCursors.add(cursor);
  }
  private void unregisterCursor(final LogCursor<K, V> cursor)
  {
    openCursors.remove(cursor);
  }
  /**
   * Returns the log file that is just after the provided log file wrt the order
   * defined on keys, or {@code null} if the provided log file is the last one
   * (the head log file).
   */
  private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException
  {
    if (isHeadLogFile(currentLogFile))
    {
      return null;
    }
    final Pair<K, K> bounds = getKeyBounds(currentLogFile);
    return logFiles.higherEntry(bounds.getSecond()).getValue();
  }
  private boolean isHeadLogFile(final LogFile<K, V> logFile)
  {
    return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
  }
  /** Returns the log file that should contain the provided key. */
  private LogFile<K, V> findLogFileFor(final K key)
  {
    if (key == null || logFiles.lowerKey(key) == null)
    {
      return getOldestLogFile();
    }
    return logFiles.ceilingEntry(key).getValue();
  }
  /**
   * Represents a DB Cursor than can be repositioned on a given key.
   * <p>
   * Note that as a DBCursor, it provides a java.sql.ResultSet like API.
   */
  static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
  {
    /**
     * Position the cursor to the record corresponding to the provided key and
     * provided matching and positioning strategies.
     *
     * @param key
     *          Key to use as a start position for the cursor. If key is
     *          {@code null}, use the key of the first record instead.
     * @param matchStrategy
     *          The cursor key matching strategy.
     * @param positionStrategy
     *          The cursor positioning strategy.
     * @return {@code true} if cursor is successfully positioned, or
     *         {@code false} otherwise.
     * @throws ChangelogException
     *           If an error occurs when positioning cursor.
     */
    boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
        throws ChangelogException;
  }
  /**
   * Implements a cursor on the log.
   * <p>
   * The cursor uses the log shared lock to ensure reads are not done during a rotation.
   * <p>
   * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
   * method.
   */
  private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V>
  {
    private final Log<K, V> log;
    private LogFile<K, V> currentLogFile;
    private LogFileCursor<K, V> currentCursor;
    private boolean actAsEmptyCursor;
    /**
     * Creates a cursor on the provided log.
     *
     * @param log
     *           The log on which the cursor read records.
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    private LogCursor(final Log<K, V> log) throws ChangelogException
    {
      this.log = log;
      this.actAsEmptyCursor = false;
    }
    /** {@inheritDoc} */
    @Override
    public Record<K, V> getRecord()
    {
      return currentCursor != null ? currentCursor.getRecord() : null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (actAsEmptyCursor)
      {
        return false;
      }
      log.sharedLock.lock();
      try
      {
        final boolean hasNext = currentCursor.next();
        if (hasNext)
        {
          return true;
        }
        final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile);
        if (logFile != null)
        {
          switchToLogFile(logFile);
          return currentCursor.next();
        }
        return false;
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      log.sharedLock.lock();
      try
      {
        StaticUtils.close(currentCursor);
        log.unregisterCursor(this);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(
        final K key,
        final KeyMatchingStrategy matchStrategy,
        final PositionStrategy positionStrategy)
            throws ChangelogException
    {
      if (actAsEmptyCursor)
      {
        return false;
      }
      log.sharedLock.lock();
      try
      {
        final LogFile<K, V> logFile = log.findLogFileFor(key);
        if (logFile != currentLogFile)
        {
          switchToLogFile(logFile);
        }
        return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
      }
      finally
      {
        log.sharedLock.unlock();
      }
    }
    /** Returns the state of this cursor. */
    private CursorState<K, V> getState() throws ChangelogException
    {
      return !actAsEmptyCursor ?
          new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
    }
    private void closeUnderlyingCursor()
    {
      StaticUtils.close(currentCursor);
    }
    /** Reinitialize this cursor to the provided state. */
    private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException
    {
      if (!actAsEmptyCursor)
      {
        currentLogFile = cursorState.logFile;
        currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
      }
    }
    /** Turn this cursor into an empty cursor, with no actual resource used. */
    private void actAsEmptyCursor()
    {
      currentLogFile = null;
      currentCursor = null;
      actAsEmptyCursor = true;
    }
    /** Switch the cursor to the provided log file. */
    private void switchToLogFile(final LogFile<K, V> logFile) throws ChangelogException
    {
      StaticUtils.close(currentCursor);
      currentLogFile = logFile;
      currentCursor = currentLogFile.getCursor();
    }
    /** {@inheritDoc} */
    public String toString()
    {
      return actAsEmptyCursor ?
          String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
          String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
              log.logPath, currentLogFile.getFile().getName(), currentCursor);
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
  static final class EmptyLogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return null;
    }
    /** {@inheritDoc} */
    @Override
    public boolean next()
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "EmptyLogCursor";
    }
  }
  /**
   * Represents the state of a cursor.
   * <p>
   * The state is used to update a cursor when rotating the head log file : the
   * state of cursor on head log file must be reported to the new read-only log
   * file that is created when rotating.
   */
  private static class CursorState<K extends Comparable<K>, V>
  {
    /** The log file. */
    private final LogFile<K, V> logFile;
    /**
     * The position of the reader on the log file. It is the offset from the
     * beginning of the file, in bytes, at which the next read occurs.
     */
    private final long filePosition;
    /** The record the cursor is pointing to. */
    private final Record<K,V> record;
    private CursorState(final LogFile<K, V> logFile, final long filePosition, final Record<K, V> record)
    {
      this.logFile = logFile;
      this.filePosition = filePosition;
      this.record = record;
    }
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
New file
@@ -0,0 +1,607 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * A log file, containing part of a {@code Log}. The log file may be:
 * <ul>
 * <li>write-enabled : allowing to append key-value records and read records
 * from cursors,</li>
 * <li>read-only : allowing to read records from cursors.</li>
 * </ul>
 * <p>
 * A log file is NOT intended to be used directly, but only has part of a
 * {@code Log}. In particular, there is no concurrency management and no checks
 * to ensure that log is not closed when performing any operation on it. Those
 * are managed at the {@code Log} level.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
final class LogFile<K extends Comparable<K>, V> implements Closeable
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** The file containing the records. */
  private final File logfile;
  /** The pool to obtain a reader on the log. */
  private final LogReaderPool<K, V> readerPool;
  /**
   * The writer on the log file, which may be {@code null} if log file is not
   * write-enabled.
   */
  private final BlockLogWriter<K, V> writer;
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
  /**
   * Creates a new log file.
   *
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @param isWriteEnabled
   *          {@code true} if this changelog is write-enabled, {@code false}
   *          otherwise.
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
      throws ChangelogException
  {
    Reject.ifNull(logFilePath, parser);
    this.logfile = logFilePath;
    this.isWriteEnabled = isWriteEnabled;
    createLogFileIfNotExists();
    if (isWriteEnabled)
    {
      ensureLogFileIsValid(parser);
      writer = BlockLogWriter.newWriter(new LogWriter(logfile), parser);
    }
    else
    {
      writer = null;
    }
    readerPool = new LogReaderPool<K, V>(logfile, parser);
  }
  /**
   * Creates a read-only log file with the provided root path and record parser.
   *
   * @param <K>
   *            Type of the key of a record, which must be comparable.
   * @param <V>
   *            Type of the value of a record.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @return a read-only log file
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
      final RecordParser<K, V> parser) throws ChangelogException
  {
    return new LogFile<K, V>(logFilePath, parser, false);
  }
  /**
   * Creates a write-enabled log file that appends records to the end of file,
   * with the provided root path and record parser.
   *
   * @param <K>
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @return a write-enabled log file
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
      final RecordParser<K, V> parser) throws ChangelogException
  {
    return new LogFile<K, V>(logFilePath, parser, true);
  }
  /**
   * Returns the file containing the records.
   *
   * @return the file
   */
  File getFile()
  {
    return logfile;
  }
  private void checkLogIsEnabledForWrite() throws ChangelogException
  {
    if (!isWriteEnabled)
    {
      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
    }
  }
  private void createLogFileIfNotExists() throws ChangelogException
  {
    try
    {
      if (!logfile.exists())
      {
        logfile.createNewFile();
      }
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e);
    }
  }
  /**
   * Ensure that log file is not corrupted, by checking it is valid and cleaning
   * the end of file if necessary, to remove a partially written record.
   * <p>
   * If log file is cleaned to remove a partially written record, then a message
   * is logged for information.
   *
   * @throws ChangelogException
   *           If an error occurs or if log file is corrupted and can't be
   *           cleaned
   */
  private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
  {
    BlockLogReader<K, V> reader = null;
    try
    {
      final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
      reader = BlockLogReader.newReader(logfile, readerWriter, parser) ;
      final long lastValidPosition = reader.checkLogIsValid();
      if (lastValidPosition != -1)
      {
          // truncate the file to point where last valid record has been read
          readerWriter.setLength(lastValidPosition);
          logger.error(INFO_CHANGELOG_LOG_FILE_RECOVERED.get(logfile.getPath()));
      }
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_RECOVER_LOG_FILE.get(
          logfile.getPath(),
          StaticUtils.stackTraceToSingleLineString(e)));
    }
    finally
    {
      StaticUtils.close(reader);
    }
  }
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   *
   * @param record
   *          The record to add.
   * @throws ChangelogException
   *           If the record can't be added to the log.
   */
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    writer.write(record);
  }
  /**
   * Dump this log file as a text file, intended for debugging purpose only.
   *
   * @param dumpFile
   *          File that will contains log records using a human-readable format
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  void dumpAsTextFile(File dumpFile) throws ChangelogException
  {
    DBCursor<Record<K, V>> cursor = getCursor();
    BufferedWriter textWriter = null;
    try
    {
      textWriter = new BufferedWriter(new FileWriter(dumpFile));
      while (cursor.getRecord() != null)
      {
        Record<K, V> record = cursor.getRecord();
        textWriter.write("key=" + record.getKey());
        textWriter.write(" | ");
        textWriter.write("value=" + record.getValue());
        textWriter.write('\n');
        cursor.next();
      }
    }
    catch (IOException e)
    {
      // No I18N needed, used for debugging purpose only
      throw new ChangelogException(
          LocalizableMessage.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile),
          e);
    }
    finally
    {
      StaticUtils.close(textWriter);
    }
  }
  /**
   * Synchronize all records added with the file system, ensuring that records
   * are effectively persisted.
   * <p>
   * After a successful call to this method, it is guaranteed that all records
   * added to the log are persisted to the file system.
   *
   * @throws ChangelogException
   *           If the synchronization fails.
   */
  void syncToFileSystem() throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    try
    {
      writer.sync();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
    }
  }
  /**
   * Returns a cursor that allows to retrieve the records from this log,
   * starting at the first position.
   *
   * @return a cursor on the log records, which is never {@code null}
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  LogFileCursor<K, V> getCursor() throws ChangelogException
  {
    return new LogFileCursor<K, V>(this);
  }
  /**
   * Returns a cursor initialised to the provided record and position in file.
   *
   * @param record
   *            The initial record this cursor points on
   * @param position
   *            The file position this cursor points on
   * @return the cursor
   * @throws ChangelogException
   *            If a problem occurs while creating the cursor.
   */
  LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
  {
    return new LogFileCursor<K, V>(this, record, position);
  }
  /**
   * Returns the oldest (first) record from this log.
   *
   * @return the oldest record, which may be {@code null} if there is no record
   *         in the log.
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  Record<K, V> getOldestRecord() throws ChangelogException
  {
    DBCursor<Record<K, V>> cursor = null;
    try
    {
      cursor = getCursor();
      return cursor.next() ? cursor.getRecord() : null;
    }
    finally
    {
      StaticUtils.close(cursor);
    }
  }
  /**
   * Returns the newest (last) record from this log.
   *
   * @return the newest record, which may be {@code null}
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  Record<K, V> getNewestRecord() throws ChangelogException
  {
    // TODO : need a more efficient way to retrieve it
    DBCursor<Record<K, V>> cursor = null;
    try
    {
      cursor = getCursor();
      Record<K, V> record = null;
      while (cursor.next())
      {
        record = cursor.getRecord();
      }
      return record;
    }
    finally
    {
      StaticUtils.close(cursor);
    }
  }
  /**
   * Returns the number of records in the log.
   *
   * @return the number of records
   * @throws ChangelogException
   *            If an error occurs.
   */
  long getNumberOfRecords() throws ChangelogException
  {
    // TODO  : need a more efficient way to retrieve it
    DBCursor<Record<K, V>> cursor = null;
    try
    {
      cursor = getCursor();
      long counter = 0L;
      while (cursor.next())
      {
        counter++;
      }
      return counter;
    }
    finally
    {
      StaticUtils.close(cursor);
    }
  }
  /** {@inheritDoc} */
  public void close()
  {
    if (isWriteEnabled)
    {
      try
      {
        syncToFileSystem();
      }
      catch (ChangelogException e)
      {
        logger.traceException(e);
      }
      writer.close();
    }
    readerPool.shutdown();
  }
  /**
   * Delete this log file (file is physically removed). Should be called only
   * when log file is closed.
   *
   * @throws ChangelogException
   *            If log file can't be deleted.
   */
  void delete() throws ChangelogException
  {
    final boolean isDeleted = logfile.delete();
    if (!isDeleted)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
    }
  }
  /**
   * Return the size of this log file in bytes.
   *
   * @return the size of log file
   */
  long getSizeInBytes()
  {
    return writer.getBytesWritten();
  }
  /** The path of this log file as a String. */
  private String getPath()
  {
    return logfile.getPath();
  }
  /**
   * Returns a reader for this log.
   * <p>
   * Assumes that calling methods ensure that log is not closed.
   */
  private BlockLogReader<K, V> getReader() throws ChangelogException
  {
    return readerPool.get();
  }
  /** Release the provided reader. */
  private void releaseReader(BlockLogReader<K, V> reader) {
    readerPool.release(reader);
  }
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    return logfile.hashCode();
  }
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object that)
  {
    if (this == that)
    {
      return true;
    }
    if (!(that instanceof LogFile))
    {
      return false;
    }
    final LogFile<?, ?> other = (LogFile<?, ?>) that;
    return logfile.equals(other.logfile);
  }
  /** Implements a repositionable cursor on the log file. */
  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
    /** The underlying log on which entries are read. */
    private final LogFile<K, V> logFile;
    /** To read the records. */
    private final BlockLogReader<K, V> reader;
    /** The current available record, may be {@code null}. */
    private Record<K,V> currentRecord;
    /**
     * The initial record when starting from a given key. It must be
     * stored because it is read in advance.
     */
    private Record<K,V> initialRecord;
    /**
     * Creates a cursor on the provided log.
     *
     * @param logFile
     *           The log on which the cursor read records.
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
    }
    /**
     * Creates a cursor on the provided log, initialised to the provided record and
     * pointing to the provided file position.
     * <p>
     * Note: there is no check to ensure that provided record and file position are
     * consistent. It is the responsability of the caller of this method.
     */
    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      reader.seekToPosition(filePosition);
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      if (initialRecord != null)
      {
        // initial record is used only once
        currentRecord = initialRecord;
        initialRecord = null;
        return true;
      }
      currentRecord = reader.readRecord();
      return currentRecord != null;
    }
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return currentRecord;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
        throws ChangelogException {
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
      final boolean found = result.getFirst();
      initialRecord = found ? result.getSecond() : null;
      return found;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      logFile.releaseReader(reader);
    }
    /**
     * Returns the file position this cursor is pointing at.
     *
     * @return the position of reader on the log file
     * @throws ChangelogException
     *          If an error occurs.
     */
    long getFilePosition() throws ChangelogException
    {
      return reader.getFilePosition();
    }
    /** {@inheritDoc} */
    public String toString()
    {
      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
    }
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogReaderPool.java
New file
@@ -0,0 +1,118 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.io.RandomAccessFile;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
/**
 * A Pool of readers to a log file.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
 * @param <V>
 *          Type of the value of a record.
 */
// TODO : implement a real pool - reusing readers instead of opening-closing them each time
class LogReaderPool<K extends Comparable<K>, V>
{
  /** The file to read. */
  private final File file;
  private final RecordParser<K, V> parser;
  /**
   * Creates a pool of readers for provided file.
   *
   * @param file
   *          The file to read.
   * @param parser
   *          The parser to decode the records read.
   */
  LogReaderPool(File file, RecordParser<K, V> parser)
  {
    this.file = file;
    this.parser = parser;
  }
  /**
   * Returns a random access reader on the provided file.
   * <p>
   * The acquired reader must be released with the {@code release()}
   * method.
   *
   * @return a random access reader
   * @throws ChangelogException
   *            If the file can't be found or read.
   */
  BlockLogReader<K, V> get() throws ChangelogException
  {
    return getReader(file);
  }
  /**
   * Release the provided reader.
   * <p>
   * Once released, this reader must not be used any more.
   *
   * @param reader
   *          The random access reader to a file previously acquired with this
   *          pool.
   */
  void release(BlockLogReader<K, V> reader)
  {
    StaticUtils.close(reader);
  }
  /** Returns a random access file to read this log. */
  private BlockLogReader<K, V> getReader(File file) throws ChangelogException
  {
    try
    {
      return BlockLogReader.newReader(file, new RandomAccessFile(file, "r"), parser) ;
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_READER_ON_LOG_FILE.get(file.getPath()), e);
    }
  }
  /**
   * Shutdown this pool, releasing all files handles opened
   * on the file.
   */
  void shutdown()
  {
    // Nothing to do yet as no file handle is kept opened.
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/LogWriter.java
New file
@@ -0,0 +1,151 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import java.io.File;
import java.io.FileDescriptor;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.SyncFailedException;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.loggers.MeteredStream;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
/**
 * A writer on a log file.
 */
class LogWriter extends OutputStream
{
  /** The file to write in. */
  private final File file;
  /** The stream to write data in the file, capable of counting bytes written. */
  private final MeteredStream stream;
  /** The file descriptor on the file. */
  private final FileDescriptor fileDescriptor;
  /**
   * Creates a writer on the provided file.
   *
   * @param file
   *          The file to write.
   * @throws ChangelogException
   *            If a problem occurs at creation.
   */
  public LogWriter(final File file) throws ChangelogException
  {
    this.file = file;
    try
    {
      FileOutputStream fos = new FileOutputStream(file, true);
      this.stream = new MeteredStream(fos, file.length());
      this.fileDescriptor = fos.getFD();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_OPEN_LOG_FILE.get(file.getPath()));
    }
  }
  /**
   * Returns the file used by this writer.
   *
   * @return the file
   */
  File getFile()
  {
    return file;
  }
  /** {@inheritDoc} */
  @Override
  public void write(int b) throws IOException
  {
    stream.write(b);
  }
  /** {@inheritDoc} */
  @Override
  public void write(byte[] b) throws IOException
  {
    stream.write(b);
  }
  /** {@inheritDoc} */
  @Override
  public void write(byte[] b, int off, int len) throws IOException
  {
    stream.write(b, off, len);
  }
  /**
   * Writes the provided byte string to the underlying output stream of this writer.
   *
   * @param bs
   *            The byte string to write.
   * @throws IOException
   *           if an I/O error occurs. In particular, an IOException may be
   *           thrown if the output stream has been closed.
   */
  public void write(ByteString bs) throws IOException
  {
    bs.copyTo(stream);
  }
  /**
   * Returns the number of bytes written in the underlying file.
   *
   * @return the number of bytes
   */
  public long getBytesWritten()
  {
    return stream.getBytesWritten();
  }
  /**
   * Synchronize all modifications to the file to the underlying device.
   *
   * @throws SyncFailedException
   *            If synchronization fails.
   */
  void sync() throws SyncFailedException {
    fileDescriptor.sync();
  }
  /** {@inheritDoc} */
  @Override
  public void close()
  {
    StaticUtils.close(stream);
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/Record.java
New file
@@ -0,0 +1,130 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
/**
 * Represents a record, which is a pair of key-value.
 *
 * @param <K>
 *         The type of a key.
 * @param <V>
 *         The type of a value.
 */
class Record<K, V>
{
  private final K key;
  private final V value;
  /**
   * Creates a record from provided key and value.
   *
   * @param key
   *          The key.
   * @param value
   *          The value.
   */
  private Record(final K key, final V value)
  {
    this.key = key;
    this.value = value;
  }
  /**
   * Create a record from provided key and value.
   *
   * @param <K>
   *          The type of the key.
   * @param <V>
   *          The type of the value.
   * @param key
   *          The key.
   * @param value
   *          The value.
   * @return a record
   */
  static <K, V> Record<K, V> from(final K key, final V value) {
    return new Record<K, V>(key, value);
  }
  /**
   * Returns the key of this record.
   *
   * @return the key
   */
  K getKey()
  {
    return key;
  }
  /**
   * Returns the value of this record.
   *
   * @return the value
   */
  V getValue()
  {
    return value;
  }
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    final int prime = 31;
    int result = 1;
    result = prime * result + ((key == null) ? 0 : key.hashCode());
    result = prime * result + ((value == null) ? 0 : value.hashCode());
    return result;
  }
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object that)
  {
    if (this == that)
    {
      return true;
    }
    if (!(that instanceof Record))
    {
      return false;
    }
    Record<?, ?> other = (Record<?, ?>) that;
    final boolean keyEquals = key == null ?  other.key == null : key.equals(other.key);
    if (!keyEquals)
    {
      return false;
    }
    return value == null ?  other.value == null : value.equals(other.value);
  }
  /** {@inheritDoc} */
  @Override
  public String toString() {
      return "Record [" + key + ":" + value + "]";
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/RecordParser.java
New file
@@ -0,0 +1,101 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.replication.server.changelog.file;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.replication.server.changelog.api.ChangelogException;
/**
 * Parser of a log record.
 * <p>
 * The parser allows to convert from a object to its binary representation
 * and to convert back from the binary representation to the object.
 *
 * @param <K>
 *          Type of the key of the record.
 * @param <V>
 *          Type of the value of the record.
 */
interface RecordParser<K, V>
{
  /**
   * Decode a record from the provided byte array.
   * <p>
   * The record is expected to have been encoded using the {@code writeRecord()}
   * method.
   *
   * @param data
   *          The raw data to read the record from.
   * @return the decoded record, or {@code null} if there is no more record to
   *         read, or only an incomplete record
   * @throws DecodingException
   *           If an error occurs while decoding the record.
   */
  Record<K, V> decodeRecord(ByteString data) throws DecodingException;
  /**
   * Encode the provided record to a byte array.
   * <p>
   * The returned array is intended to be stored as provided in the log file.
   *
   * @param record
   *          The record to encode.
   * @return the bytes array representing the (key,value) record
   */
  ByteString encodeRecord(Record<K, V> record);
  /**
   * Read the key from the provided string.
   *
   * @param key
   *          The string representation of key, suitable for use in a filename,
   *          as written by the {@code encodeKeyToString()} method.
   * @return the key
   * @throws ChangelogException
   *           If key can't be read from the string.
   */
  K decodeKeyFromString(String key) throws ChangelogException;
  /**
   * Returns the provided key as a string that is suitable to be used in a
   * filename.
   *
   * @param key
   *          The key of a record.
   * @return a string encoding the key, unambiguously decodable to the original
   *         key, and suitable for use in a filename. The string should contain
   *         only ASCII characters and no space.
   */
  String encodeKeyToString(K key);
  /**
   * Returns a key that is guaranted to be always higher than any other key.
   *
   * @return the highest possible key
   */
  K getMaxKey();
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/ReplicationEnvironment.java
New file
@@ -0,0 +1,860 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.file;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.UnsupportedEncodingException;
import java.io.Writer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
/**
 * Represents the replication environment, which allows to manage the lifecycle
 * of the replication changelog.
 * <p>
 * A changelog has a root directory, under which the following directories and files are
 * created :
 * <ul>
 * <li>A "changenumberindex" directory containing the log files for
 * ChangeNumberIndexDB</li>
 * <li>A "domains.state" file containing a mapping of each domain DN to an id. The
 * id is used to name the corresponding domain directory.</li>
 * <li>One directory per domain, named after "[id].domain" where [id] is the id
 * assigned to the domain, as specified in the "domains.state" file.</li>
 * </ul>
 * <p>
 * Each domain directory contains the following directories and files :
 * <ul>
 * <li>A "generation_[id].id" file, where [id] is the generation id</li>
 * <li>One directory per server id, named after "[id].server" where [id] is the
 * id of the server.</li>
 * </ul>
 * Each server id directory contains the following files :
 * <ul>
 * <li>The "head.log" file, which is the more recent log file where records are appended.</li>
 * <li>Zero to many read-only log files named after the lowest key
 * and highest key present in the log file (they all end with the ".log" suffix.</li>
 * <li>Optionally, a "offline.state" file that indicates that this particular server id
 *  of the domain is offline. This file contains the offline CSN, encoded as a String on a single line.</li>
 * </ul>
 * See {@code Log} class for details on the log files.
 *
 * <p>
 * Layout example with two domains "o=test1" and "o=test2", each having server
 * ids 22 and 33, with server id 33 for domain "o=test1" being offline :
 *
 * <pre>
 * +---changelog
 * |   \---domains.state  [contains mapping: 1 => "o=test1", 2 => "o=test2"]
 * |   \---changenumberindex
 * |      \--- head.log [contains last records written]
 * |      \--- 1_50.log [contains records with keys in interval [1, 50]]
 * |   \---1.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---head.log [contains last records written]
 * |       \---33.server
 * |           \---head.log [contains last records written]
 *             \---offline.state
 * |   \---2.domain
 * |       \---generation1.id
 * |       \---22.server
 * |           \---head.log [contains last records written]
 * |       \---33.server
 * |           \---head.log [contains last records written]
 * </pre>
 */
class ReplicationEnvironment
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final long MAX_LOG_FILE_SIZE_IN_BYTES = 10*1024*1024;
  private static final int NO_GENERATION_ID = -1;
  private static final String CN_INDEX_DB_DIRNAME = "changenumberindex";
  private static final String DOMAINS_STATE_FILENAME = "domains.state";
  static final String REPLICA_OFFLINE_STATE_FILENAME = "offline.state";
  private static final String DOMAIN_STATE_SEPARATOR = ":";
  private static final String DOMAIN_SUFFIX = ".domain";
  private static final String SERVER_ID_SUFFIX = ".server";
  private static final String GENERATION_ID_FILE_PREFIX = "generation";
  private static final String GENERATION_ID_FILE_SUFFIX = ".id";
  private static final String UTF8_ENCODING = "UTF-8";
  private static final FileFilter DOMAIN_FILE_FILTER = new FileFilter()
  {
    @Override
    public boolean accept(File file)
    {
      return file.isDirectory() && file.getName().endsWith(DOMAIN_SUFFIX);
    }
  };
  private static final FileFilter SERVER_ID_FILE_FILTER = new FileFilter()
  {
    @Override
    public boolean accept(File file)
    {
      return file.isDirectory() && file.getName().endsWith(SERVER_ID_SUFFIX);
    }
  };
  private static final FileFilter GENERATION_ID_FILE_FILTER = new FileFilter()
  {
    @Override
    public boolean accept(File file)
    {
      return file.isFile()
          && file.getName().startsWith(GENERATION_ID_FILE_PREFIX)
          && file.getName().endsWith(GENERATION_ID_FILE_SUFFIX);
    }
  };
  /** Root path where the replication log is stored. */
  private final String replicationRootPath;
  /**
   * The current changelogState. This is in-memory version of what is inside the
   * on-disk changelogStateDB. It improves performances in case the
   * changelogState is read often.
   *
   * @GuardedBy("domainsLock")
   */
  private final ChangelogState changelogState;
  /** The list of logs that are in use. */
  private final List<Log<?, ?>> logs = new CopyOnWriteArrayList<Log<?, ?>>();
  /**
   * Maps each domain DN to a domain id that is used to name directory in file system.
   *
   * @GuardedBy("domainsLock")
   */
  private final Map<DN, String> domains = new HashMap<DN, String>();
  /**
   * Exclusive lock to synchronize:
   * <ul>
   * <li>the domains mapping</li>
   * <li>changes to the in-memory changelogState</li>
   * <li>changes to the on-disk state of a domain</li>
   */
  private final Object domainsLock = new Object();
  /** The underlying replication server. */
  private final ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  /**
   * Creates the replication environment.
   *
   * @param rootPath
   *          Root path where replication log is stored.
   * @param replicationServer
   *          The underlying replication server.
   * @throws ChangelogException
   *           If an error occurs during initialization.
   */
  ReplicationEnvironment(final String rootPath,
      final ReplicationServer replicationServer) throws ChangelogException
  {
    this.replicationRootPath = rootPath;
    this.replicationServer = replicationServer;
    this.changelogState = readOnDiskChangelogState();
  }
  /**
   * Returns the state of the replication changelog.
   *
   * @return the {@link ChangelogState} read from the changelogState DB
   * @throws ChangelogException
   *           if a database problem occurs
   */
  ChangelogState readOnDiskChangelogState() throws ChangelogException
  {
    final ChangelogState state = new ChangelogState();
    final File changelogPath = new File(replicationRootPath);
    synchronized (domainsLock)
    {
      readDomainsStateFile();
      checkDomainDirectories(changelogPath);
      for (final Entry<DN, String> domainEntry : domains.entrySet())
      {
        readStateForDomain(domainEntry, state);
      }
    }
    return state;
  }
  /**
   * Returns the current state of the replication changelog.
   *
   * @return the current {@link ChangelogState}
   */
  ChangelogState getChangelogState()
  {
    return changelogState;
  }
  /**
   * Finds or creates the log used to store changes from the replication server
   * with the given serverId and the given baseDN.
   *
   * @param domainDN
   *          The DN that identifies the domain.
   * @param serverId
   *          The server id that identifies the server.
   * @param generationId
   *          The generationId associated to this domain.
   * @return the log.
   * @throws ChangelogException
   *           if an error occurs.
   */
  Log<CSN, UpdateMsg> getOrCreateReplicaDB(final DN domainDN, final int serverId, final long generationId)
      throws ChangelogException
  {
    if (logger.isTraceEnabled())
    {
      logger.trace("ReplicationEnvironment.getOrCreateReplicaDB(%s, %s, %s)", domainDN, serverId, generationId);
    }
    try
    {
      ensureRootDirectoryExists();
      String domainId = null;
      synchronized (domainsLock)
      {
        domainId = domains.get(domainDN);
        if (domainId == null)
        {
          domainId = createDomainId(domainDN);
        }
        final File serverIdPath = getServerIdPath(domainId, serverId);
        ensureServerIdDirectoryExists(serverIdPath);
        changelogState.addServerIdToDomain(serverId, domainDN);
        final File generationIdPath = getGenerationIdPath(domainId, generationId);
        ensureGenerationIdFileExists(generationIdPath);
        changelogState.setDomainGenerationId(domainDN, generationId);
        return openLog(serverIdPath, FileReplicaDB.RECORD_PARSER);
      }
    }
    catch (Exception e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_CREATE_REPLICA_DB.get(domainDN.toString(), serverId, generationId), e);
    }
  }
  /**
   * Find or create the log to manage integer change number associated to
   * multidomain server state.
   * <p>
   * TODO: ECL how to manage compatibility of this db
   * with new domains added or removed ?
   *
   * @return the log.
   * @throws ChangelogException
   *           when a problem occurs.
   */
  Log<Long, ChangeNumberIndexRecord> getOrCreateCNIndexDB() throws ChangelogException
  {
    final File path = getCNIndexDBPath();
    try
    {
      return openLog(path, FileChangeNumberIndexDB.RECORD_PARSER);
    }
    catch (Exception e)
    {
      throw new ChangelogException(
          ERR_CHANGELOG_UNABLE_TO_CREATE_CN_INDEX_DB.get(replicationRootPath, path.getPath()), e);
    }
  }
  /**
   * Shutdown the environment.
   * <p>
   * The log DBs are not closed by this method. It assumes they are already
   * closed.
   */
  void shutdown()
  {
    if (isShuttingDown.compareAndSet(false, true))
    {
      logs.clear();
    }
  }
  /**
   * Clears the generated id associated to the provided domain DN from the state
   * Db.
   * <p>
   * If generation id can't be found, it is not considered as an error, the
   * method will just return.
   *
   * @param domainDN
   *          The domain DN for which the generationID must be cleared.
   * @throws ChangelogException
   *           If a problem occurs during clearing.
   */
  void clearGenerationId(final DN domainDN) throws ChangelogException
  {
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknown domain => no-op
      }
      final File idFile = retrieveGenerationIdFile(getDomainPath(domainId));
      if (idFile != null)
      {
        final boolean isDeleted = idFile.delete();
        if (!isDeleted)
        {
          throw new ChangelogException(
              ERR_CHANGELOG_UNABLE_TO_DELETE_GENERATION_ID_FILE.get(idFile.getPath(), domainDN.toString()));
        }
      }
      changelogState.setDomainGenerationId(domainDN, NO_GENERATION_ID);
    }
  }
  /**
   * Reset the generationId to the default value used when there is no
   * generation id.
   *
   * @param baseDN
   *          The baseDN for which the generationID must be reset.
   * @throws ChangelogException
   *           If a problem occurs during reset.
   */
  void resetGenerationId(final DN baseDN) throws ChangelogException
  {
    synchronized (domainsLock)
    {
      clearGenerationId(baseDN);
      final String domainId = domains.get(baseDN);
      if (domainId == null)
      {
        return; // unknown domain => no-op
      }
      final File generationIdPath = getGenerationIdPath(domainId, NO_GENERATION_ID);
      ensureGenerationIdFileExists(generationIdPath);
      changelogState.setDomainGenerationId(baseDN, NO_GENERATION_ID);
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and provided CSN
   * is offline.
   *
   * @param domainDN
   *          the domain of the offline replica
   * @param offlineCSN
   *          the offline replica serverId and offline timestamp
   * @throws ChangelogException
   *           if a problem occurs
   */
  void notifyReplicaOffline(DN domainDN, CSN offlineCSN) throws ChangelogException
  {
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknown domain => no-op
      }
      final File serverIdPath = getServerIdPath(domainId, offlineCSN.getServerId());
      if (!serverIdPath.exists())
      {
        return; // no serverId anymore => no-op
      }
      final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
      Writer writer = null;
      try
      {
        // Overwrite file, only the last sent offline CSN is kept
        writer = newFileWriter(offlineFile);
        writer.write(offlineCSN.toString());
        changelogState.addOfflineReplica(domainDN, offlineCSN);
      }
      catch (IOException e)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_WRITE_REPLICA_OFFLINE_STATE_FILE.get(
            domainDN.toString(), offlineCSN.getServerId(), offlineFile.getPath(), offlineCSN.toString()), e);
      }
      finally
      {
        StaticUtils.close(writer);
      }
    }
  }
  /**
   * Notify that the replica corresponding to provided domain and server id
   * is online.
   *
   * @param domainDN
   *          the domain of the replica
   * @param serverId
   *          the replica serverId
   * @throws ChangelogException
   *           if a problem occurs
   */
  void notifyReplicaOnline(DN domainDN, int serverId) throws ChangelogException
  {
    synchronized (domainsLock)
    {
      final String domainId = domains.get(domainDN);
      if (domainId == null)
      {
        return; // unknown domain => no-op
      }
      final File offlineFile = new File(getServerIdPath(domainId, serverId), REPLICA_OFFLINE_STATE_FILENAME);
      if (offlineFile.exists())
      {
        final boolean isDeleted = offlineFile.delete();
        if (!isDeleted)
        {
          throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_REPLICA_OFFLINE_STATE_FILE.get(
              offlineFile.getPath(), domainDN.toString(), serverId));
        }
      }
      changelogState.removeOfflineReplica(domainDN, serverId);
    }
  }
  /** Reads the domain state file to find mapping between each domainDN and its associated domainId. */
  private void readDomainsStateFile() throws ChangelogException
  {
    final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
    if (domainsStateFile.exists())
    {
      BufferedReader reader = null;
      String line = null;
      try
      {
        reader = newFileReader(domainsStateFile);
        while ((line = reader.readLine()) != null)
        {
          final int separatorPos = line.indexOf(DOMAIN_STATE_SEPARATOR);
          final String domainId = line.substring(0, separatorPos);
          final DN domainDN = DN.valueOf(line.substring(separatorPos+1));
          domains.put(domainDN, domainId);
        }
      }
      catch(DirectoryException e)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_DN_FROM_DOMAIN_STATE_FILE.get(
            domainsStateFile.getPath(), line), e);
      }
      catch(Exception e)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_DOMAIN_STATE_FILE.get(
            domainsStateFile.getPath()), e);
      }
      finally {
        StaticUtils.close(reader);
      }
    }
  }
  /**
   * Checks that domain directories in file system are consistent with
   * information from domains mapping.
   */
  private void checkDomainDirectories(final File changelogPath) throws ChangelogException
  {
    final File[] dnDirectories = changelogPath.listFiles(DOMAIN_FILE_FILTER);
    if (dnDirectories != null)
    {
      final Set<String> domainIdsFromFileSystem = new HashSet<String>();
      for (final File dnDir : dnDirectories)
      {
        final String fileName = dnDir.getName();
        final String domainId = fileName.substring(0, fileName.length() - DOMAIN_SUFFIX.length());
        domainIdsFromFileSystem.add(domainId);
      }
      final Set<String> expectedDomainIds = new HashSet<String>(domains.values());
      if (!domainIdsFromFileSystem.equals(expectedDomainIds))
      {
        throw new ChangelogException(ERR_CHANGELOG_INCOHERENT_DOMAIN_STATE.get(domains.values().toString(),
            domainIdsFromFileSystem.toString()));
      }
    }
  }
  /**
   * Update the changelog state with the state corresponding to the provided
   * domain DN.
   */
  private void readStateForDomain(final Entry<DN, String> domainEntry, final ChangelogState state)
      throws ChangelogException
  {
    final File domainDirectory = getDomainPath(domainEntry.getValue());
    final DN domainDN = domainEntry.getKey();
    final String generationId = retrieveGenerationId(domainDirectory);
    if (generationId != null)
    {
      state.setDomainGenerationId(domainDN, toGenerationId(generationId));
    }
    final File[] serverIds = domainDirectory.listFiles(SERVER_ID_FILE_FILTER);
    if (serverIds == null)
    {
      throw new ChangelogException(ERR_CHANGELOG_READ_STATE_CANT_READ_DOMAIN_DIRECTORY.get(
          replicationRootPath, domainDirectory.getPath()));
    }
    for (final File serverId : serverIds)
    {
      readStateForServerId(domainDN, serverId, state);
    }
  }
  private void readStateForServerId(DN domainDN, File serverIdPath, ChangelogState state) throws ChangelogException
  {
    state.addServerIdToDomain(toServerId(serverIdPath.getName()), domainDN);
    final File offlineFile = new File(serverIdPath, REPLICA_OFFLINE_STATE_FILENAME);
    if (offlineFile.exists())
    {
      final CSN offlineCSN = readOfflineStateFile(offlineFile, domainDN);
      state.addOfflineReplica(domainDN, offlineCSN);
    }
  }
  private CSN readOfflineStateFile(final File offlineFile, DN domainDN) throws ChangelogException
  {
    BufferedReader reader = null;
    try
    {
      reader = newFileReader(offlineFile);
      String line = reader.readLine();
      if (line == null || reader.readLine() != null)
      {
        throw new ChangelogException(ERR_CHANGELOG_INVALID_REPLICA_OFFLINE_STATE_FILE.get(
            domainDN.toString(), offlineFile.getPath()));
      }
      return new CSN(line);
    }
    catch(IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_READ_REPLICA_OFFLINE_STATE_FILE.get(
          domainDN.toString(), offlineFile.getPath()), e);
    }
    finally {
      StaticUtils.close(reader);
    }
  }
  private String createDomainId(final DN domainDN) throws ChangelogException
  {
    final String nextDomainId = findNextDomainId();
    domains.put(domainDN, nextDomainId);
    final File domainsStateFile = new File(replicationRootPath, DOMAINS_STATE_FILENAME);
    Writer writer = null;
    try
    {
      writer = newFileWriter(domainsStateFile);
      for (final Entry<DN, String> entry : domains.entrySet())
      {
        writer.write(String.format("%s%s%s%n", entry.getValue(), DOMAIN_STATE_SEPARATOR, entry.getKey()));
      }
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_UPDATE_DOMAIN_STATE_FILE.get(nextDomainId,
          domainDN.toString(), domainsStateFile.getPath()), e);
    }
    finally
    {
      StaticUtils.close(writer);
    }
    return nextDomainId;
  }
  /** Find the next domain id to use. This is the lowest integer that is higher than all existing ids. */
  private String findNextDomainId()
  {
    int nextId = 1;
    for (final String domainId : domains.values())
    {
      final Integer id = Integer.valueOf(domainId);
      if (nextId <= id)
      {
        nextId = id + 1;
      }
    }
    return String.valueOf(nextId);
  }
  /** Open a log from the provided path and record parser. */
  private <K extends Comparable<K>, V> Log<K, V> openLog(final File serverIdPath, final RecordParser<K, V> parser)
      throws ChangelogException
  {
    checkShutDownBeforeOpening(serverIdPath);
    final Log<K, V> log = Log.openLog(serverIdPath, parser, MAX_LOG_FILE_SIZE_IN_BYTES);
    checkShutDownAfterOpening(serverIdPath, log);
    logs.add(log);
    return log;
  }
  private void checkShutDownAfterOpening(final File serverIdPath, final Log<?, ?> log) throws ChangelogException
  {
    if (isShuttingDown.get())
    {
      closeLog(log);
      throw new ChangelogException(WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(serverIdPath.getPath(),
          replicationServer.getServerId()));
    }
  }
  private void checkShutDownBeforeOpening(final File serverIdPath) throws ChangelogException
  {
    if (isShuttingDown.get())
    {
      throw new ChangelogException(
          WARN_CANNOT_OPEN_DATABASE_BECAUSE_SHUTDOWN_WAS_REQUESTED.get(
              serverIdPath.getPath(), replicationServer.getServerId()));
    }
  }
  /**
   * Retrieve the generation id from the provided directory.
   *
   * @return the generation id or {@code null} if the corresponding file can't
   *         be found
   */
  private String retrieveGenerationId(final File directory)
  {
    final File generationId = retrieveGenerationIdFile(directory);
    if (generationId != null)
    {
      String filename = generationId.getName();
      return filename.substring(GENERATION_ID_FILE_PREFIX.length(),
         filename.length() - GENERATION_ID_FILE_SUFFIX.length());
    }
    return null;
  }
  /**
   * Retrieve the file named after the generation id from the provided
   * directory.
   *
   * @return the generation id file or {@code null} if the corresponding file
   *         can't be found
   */
  private File retrieveGenerationIdFile(final File directory)
  {
    File[] generationIds = directory.listFiles(GENERATION_ID_FILE_FILTER);
    return (generationIds != null && generationIds.length > 0) ? generationIds[0] : null;
  }
  private File getDomainPath(final String domainId)
  {
    return new File(replicationRootPath, domainId + DOMAIN_SUFFIX);
  }
  /**
   * Return the path for the provided domain id and server id.
   * Package private to be usable in tests.
   *
   * @param domainId
   *            The id corresponding to a domain DN
   * @param serverId
   *            The server id to retrieve
   * @return the path
   */
  File getServerIdPath(final String domainId, final int serverId)
  {
    return new File(getDomainPath(domainId), String.valueOf(serverId) + SERVER_ID_SUFFIX);
  }
  private File getGenerationIdPath(final String domainId, final long generationId)
  {
    return new File(getDomainPath(domainId), GENERATION_ID_FILE_PREFIX + generationId + GENERATION_ID_FILE_SUFFIX);
  }
  private File getCNIndexDBPath()
  {
    return new File(replicationRootPath, CN_INDEX_DB_DIRNAME);
  }
  private void closeLog(final Log<?, ?> log)
  {
    logs.remove(log);
    log.close();
  }
  private void ensureRootDirectoryExists() throws ChangelogException
  {
    final File rootDir = new File(replicationRootPath);
    if (!rootDir.exists())
    {
      final boolean created = rootDir.mkdirs();
      if (!created)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(replicationRootPath));
      }
    }
  }
  private void ensureServerIdDirectoryExists(final File serverIdPath) throws ChangelogException
  {
    if (!serverIdPath.exists())
    {
      boolean created = false;
      try
      {
        created = serverIdPath.mkdirs();
      }
      catch (Exception e)
      {
        // nothing to do
      }
      if (!created)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_CREATE_SERVER_ID_DIRECTORY.get(serverIdPath.getPath(), 0));
      }
    }
  }
  private void ensureGenerationIdFileExists(final File generationIdPath)
      throws ChangelogException
  {
    if (!generationIdPath.exists())
    {
      try
      {
        boolean isCreated = generationIdPath.createNewFile();
        if (!isCreated)
        {
          throw new ChangelogException(
              ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
        }
      }
      catch (IOException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_CREATE_GENERATION_ID_FILE.get(generationIdPath.getPath()));
      }
    }
  }
  private void debug(String message)
  {
    // Replication server may be null when testing
    String monitorInstanceName = replicationServer != null ? replicationServer.getMonitorInstanceName() :
      "no monitor [test]";
    logger.trace("In %s, %s", monitorInstanceName, message);
  }
  private int toServerId(final String serverIdName) throws ChangelogException
  {
    try
    {
      String serverId = serverIdName.substring(0, serverIdName.length() - SERVER_ID_SUFFIX.length());
      return Integer.parseInt(serverId);
    }
    catch (NumberFormatException e)
    {
      // should never happen
      throw new ChangelogException(ERR_CHANGELOG_SERVER_ID_FILENAME_WRONG_FORMAT.get(serverIdName), e);
    }
  }
  private long toGenerationId(final String data) throws ChangelogException
  {
    try
    {
      return Long.parseLong(data);
    }
    catch (NumberFormatException e)
    {
      // should never happen
      throw new ChangelogException(ERR_CHANGELOG_GENERATION_ID_WRONG_FORMAT.get(data), e);
    }
  }
  /** Returns a buffered writer on the provided file. */
  private BufferedWriter newFileWriter(final File file) throws UnsupportedEncodingException, FileNotFoundException
  {
    return new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file), UTF8_ENCODING));
  }
  /** Returns a buffered reader on the provided file. */
  private BufferedReader newFileReader(final File file) throws UnsupportedEncodingException, FileNotFoundException
  {
    return new BufferedReader(new InputStreamReader(new FileInputStream(file), UTF8_ENCODING));
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -115,7 +115,7 @@
   * @param changelogState
   *          the changelog state used for initialization
   */
  ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
  public ChangeNumberIndexer(ChangelogDB changelogDB, ChangelogState changelogState)
  {
    this(changelogDB, changelogState, new ECLEnabledDomainPredicate());
  }