mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.56.2014 84cecd3711ddbbd60132cdd80957e387f23cf63e
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -25,8 +25,8 @@
 */
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -35,24 +35,30 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import static org.opends.messages.ReplicationMessages.*;
/**
 * A file-based log that allow to append key-value records and
 * read them using a {@code DBCursor}.
 * A log file, containing part of a {@code Log}. The log file may be:
 * <ul>
 * <li>write-enabled : allowing to append key-value records and read records
 * from cursors,</li>
 * <li>read-only : allowing to read records from cursors.</li>
 * </ul>
 * <p>
 * A log file is NOT intended to be used directly, but only has part of a
 * {@code Log}. In particular, there is no concurrency management and no checks
 * to ensure that log is not closed when performing any operation on it. Those
 * are managed at the {@code Log} level.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
@@ -61,16 +67,9 @@
 */
final class LogFile<K extends Comparable<K>, V> implements Closeable
{
  private static final DebugTracer TRACER = getTracer();
  // Non private for use in tests
  static final String LOG_FILE_NAME = "current.log";
  /** The path of directory that contains the log file. */
  private final File rootPath;
  /** The log file containing the records. */
  /** The file containing the records. */
  private final File logfile;
  /** The parser of records, to convert bytes to record and record to bytes. */
@@ -79,30 +78,20 @@
  /** The pool to obtain a reader on the log. */
  private LogReaderPool readerPool;
  /** The writer on the log, which may be {@code null} if log is not write-enabled */
  /**
   * The writer on the log file, which may be {@code null} if log file is not
   * write-enabled
   */
  private LogWriter writer;
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
  /** Indicates if the log is closed. */
  private volatile boolean isClosed;
  /** The exclusive lock used for wide changes on this log file : init, clear, sync and close. */
  private final Lock exclusiveLock;
  /**
   * The shared lock used for read, write and flush operations on this log file.
   * Write and flush operations can be shared because they're synchronized in the underlying writer.
   * Reads can be done safely when writing because partially written records are handled.
   */
  private final Lock sharedLock;
  /**
   * Creates a new log file.
   *
   * @param rootPath
   *          Path of root directory that contains the log file.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @param isWriteEnabled
@@ -111,17 +100,13 @@
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  private LogFile(final File rootPath, final RecordParser<K, V> parser, boolean isWriteEnabled)
  private LogFile(final File logFilePath, final RecordParser<K, V> parser, boolean isWriteEnabled)
      throws ChangelogException
  {
    this.rootPath = rootPath;
    Reject.ifNull(logFilePath, parser);
    this.logfile = logFilePath;
    this.parser = parser;
    this.isWriteEnabled = isWriteEnabled;
    this.logfile = new File(rootPath, LOG_FILE_NAME);
    final ReadWriteLock lock = new ReentrantReadWriteLock(false);
    this.exclusiveLock = lock.writeLock();
    this.sharedLock = lock.readLock();
    initialize();
  }
@@ -133,18 +118,18 @@
   *            Type of the key of a record, which must be comparable.
   * @param <V>
   *            Type of the value of a record.
   * @param rootPath
   *          Path of root directory that contains the log file.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @return a read-only log file
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  public static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File rootPath,
  static <K extends Comparable<K>, V> LogFile<K, V> newReadOnlyLogFile(final File logFilePath,
      final RecordParser<K, V> parser) throws ChangelogException
  {
    return new LogFile<K, V>(rootPath, parser, false);
    return new LogFile<K, V>(logFilePath, parser, false);
  }
  /**
@@ -155,18 +140,18 @@
   *          Type of the key of a record, which must be comparable.
   * @param <V>
   *          Type of the value of a record.
   * @param rootPath
   *          Path of root directory that contains the log file.
   * @param logFilePath
   *          Path of the log file.
   * @param parser
   *          Parser of records.
   * @return a write-enabled log file
   * @throws ChangelogException
   *            If a problem occurs during initialization.
   */
  public static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File rootPath,
  static <K extends Comparable<K>, V> LogFile<K, V> newAppendableLogFile(final File logFilePath,
      final RecordParser<K, V> parser) throws ChangelogException
  {
    return new LogFile<K, V>(rootPath, parser, true);
    return new LogFile<K, V>(logFilePath, parser, true);
  }
  /**
@@ -180,89 +165,29 @@
   */
  private void initialize() throws ChangelogException
  {
    exclusiveLock.lock();
    try
    createLogFileIfNotExists();
    if (isWriteEnabled)
    {
      createRootDirIfNotExists();
      createLogFileIfNotExists();
      isClosed = false;
      if (isWriteEnabled)
      {
        writer = LogWriter.acquireWriter(logfile);
      }
      readerPool = new LogReaderPool(logfile);
      writer = new LogWriter(logfile);
    }
    finally
    {
      exclusiveLock.unlock();
    }
    readerPool = new LogReaderPool(logfile);
  }
  /**
   * Returns the name of this log.
   * Returns the file containing the records.
   *
   * @return the name, which corresponds to the directory containing the log
   * @return the file
   */
  public String getName()
  File getFile()
  {
    return logfile.getParent().toString();
  }
  /**
   * Empties the log, discarding all records it contains.
   * <p>
   * This method should not be called with open cursors or
   * when multiple instances of the log are opened.
   *
   * @throws ChangelogException
   *            If a problem occurs.
   */
  public void clear() throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    exclusiveLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      close();
      final boolean isDeleted = logfile.delete();
      if (!isDeleted)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(logfile.getPath()));
      }
      initialize();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(getName(), stackTraceToSingleLineString(e)));
    }
    finally
    {
      exclusiveLock.unlock();
    }
    return logfile;
  }
  private void checkLogIsEnabledForWrite() throws ChangelogException
  {
    if (!isWriteEnabled)
    {
      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(rootPath.getPath()));
    }
  }
  private void createRootDirIfNotExists() throws ChangelogException
  {
    if (!rootPath.exists())
    {
      final boolean created = rootPath.mkdirs();
      if (!created)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(rootPath.getPath()));
      }
      throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath()));
    }
  }
@@ -282,25 +207,6 @@
  }
  /**
   * Add a record at the end of this log from the provided key and value.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
   * to file system, it is necessary to explicitely call the
   * {@code syncToFileSystem()} method.
   *
   * @param key
   *          The key of the record.
   * @param value
   *          The value of the record.
   * @throws ChangelogException
   *           If the record can't be added to the log.
   */
  public void addRecord(final K key, final V value) throws ChangelogException
  {
    addRecord(Record.from(key, value));
  }
  /**
   * Add the provided record at the end of this log.
   * <p>
   * In order to ensure that record is written out of buffers and persisted
@@ -312,33 +218,22 @@
   * @throws ChangelogException
   *           If the record can't be added to the log.
   */
  public void addRecord(final Record<K, V> record) throws ChangelogException
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      writer.write(encodeRecord(record));
      writer.flush();
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getName()), e);
    }
    finally
    {
      sharedLock.unlock();
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
    }
  }
  private ByteString encodeRecord(final Record<K, V> record)
  {
    final ByteString data = parser.encodeRecord(record.getKey(), record.getValue());
    final ByteString data = parser.encodeRecord(record);
    return new ByteStringBuilder()
      .append(data.length())
      .append(data)
@@ -346,14 +241,14 @@
  }
  /**
   * Dump this log as text file, intended for debugging purpose only.
   * Dump this log file as a text file, intended for debugging purpose only.
   *
   * @param dumpFile
   *          File that will contains log records using a human-readable format
   * @throws ChangelogException
   *           If an error occurs during dump
   */
  public void dumpAsTextFile(File dumpFile) throws ChangelogException
  void dumpAsTextFile(File dumpFile) throws ChangelogException
  {
    DBCursor<Record<K, V>> cursor = getCursor();
    BufferedWriter textWriter = null;
@@ -364,7 +259,7 @@
      {
        Record<K, V> record = cursor.getRecord();
        textWriter.write("key=" + record.getKey());
        textWriter.write(" -- ");
        textWriter.write(" | ");
        textWriter.write("value=" + record.getValue());
        textWriter.write('\n');
        cursor.next();
@@ -374,7 +269,7 @@
    {
      // No I18N needed, used for debugging purpose only
      throw new ChangelogException(
          Message.raw("Error when dumping content of log '%s' in target file : '%s'", getName(), dumpFile), e);
          Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e);
    }
    finally
    {
@@ -392,20 +287,16 @@
   * @throws ChangelogException
   *           If the synchronization fails.
   */
  public void syncToFileSystem() throws ChangelogException
  void syncToFileSystem() throws ChangelogException
  {
    exclusiveLock.lock();
    checkLogIsEnabledForWrite();
    try
    {
      writer.sync();
    }
    catch (Exception e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getName()), e);
    }
    finally
    {
      exclusiveLock.unlock();
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
    }
  }
@@ -422,21 +313,9 @@
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public LogCursor<K, V> getCursor() throws ChangelogException
  LogFileCursor<K, V> getCursor() throws ChangelogException
  {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      return new LogFileCursor<K, V>(this);
    }
    finally
    {
      sharedLock.unlock();
    }
    return new LogFileCursor<K, V>(this);
  }
  /**
@@ -454,7 +333,7 @@
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public LogCursor<K, V> getCursor(final K key) throws ChangelogException
  LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
  {
    return getCursor(key, false);
  }
@@ -476,13 +355,13 @@
   * @throws ChangelogException
   *           If the cursor can't be created.
   */
  public LogCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
  {
    return getCursor(key, true);
  }
  /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
  private LogCursor<K, V> getCursor(final K key, boolean findNearest)
  private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
      throws ChangelogException
  {
    if (key == null)
@@ -490,13 +369,8 @@
      return getCursor();
    }
    LogFileCursor<K, V> cursor = null;
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return new EmptyLogCursor<K, V>();
      }
      cursor = new LogFileCursor<K, V>(this);
      cursor.positionTo(key, findNearest);
      // if target is not found, cursor is positioned at end of stream
@@ -506,10 +380,22 @@
      StaticUtils.close(cursor);
      throw e;
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
   * Returns a cursor initialised to the provided record and position in file.
   *
   * @param record
   *            The initial record this cursor points on
   * @param position
   *            The file position this cursor points on
   * @return the cursor
   * @throws ChangelogException
   *            If a problem occurs while creating the cursor.
   */
  LogFileCursor<K, V> getCursorInitialisedTo(Record<K,V> record, long position) throws ChangelogException
  {
    return new LogFileCursor<K, V>(this, record, position);
  }
  /**
@@ -520,7 +406,7 @@
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getOldestRecord() throws ChangelogException
  Record<K, V> getOldestRecord() throws ChangelogException
  {
    DBCursor<Record<K, V>> cursor = null;
    try
@@ -541,7 +427,7 @@
   * @throws ChangelogException
   *           If an error occurs while retrieving the record.
   */
  public Record<K, V> getNewestRecord() throws ChangelogException
  Record<K, V> getNewestRecord() throws ChangelogException
  {
    // TODO : need a more efficient way to retrieve it
    DBCursor<Record<K, V>> cursor = null;
@@ -597,45 +483,58 @@
  /** {@inheritDoc} */
  public void close()
  {
    exclusiveLock.lock();
    try
    if (isWriteEnabled)
    {
      if (isClosed)
      try
      {
        return;
        syncToFileSystem();
      }
      catch (ChangelogException e)
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      writer.close();
    }
    readerPool.shutdown();
  }
      if (isWriteEnabled)
      {
        try
        {
          syncToFileSystem();
        }
        catch (ChangelogException e)
        {
          TRACER.debugCaught(DebugLogLevel.ERROR, e);
        }
        writer.close();
      }
      readerPool.shutdown();
      isClosed = true;
    }
    finally
  /**
   * Delete this log file (file is physically removed). Should be called only
   * when log file is closed.
   *
   * @throws ChangelogException
   *            If log file can't be deleted.
   */
  void delete() throws ChangelogException
  {
    final boolean isDeleted = logfile.delete();
    if (!isDeleted)
    {
      exclusiveLock.unlock();
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
    }
  }
  /**
   * Return the size of this log file in bytes.
   *
   * @return the size of log file
   */
  long getSizeInBytes()
  {
    return writer.getBytesWritten();
  }
  /** The path of this log file as a String. */
  private String getPath()
  {
    return logfile.getPath();
  }
  /** Read a record from the provided reader. */
  private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
  {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return null;
      }
      final ByteString recordData = readEncodedRecord(reader);
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
@@ -643,10 +542,6 @@
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
@@ -672,26 +567,17 @@
    }
  }
  /** Seek to provided position on the provided reader. */
  /** Seek to given position on the provided reader. */
  private void seek(RandomAccessFile reader, long position) throws ChangelogException
  {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      reader.seek(position);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
@@ -706,66 +592,42 @@
  /** Release the provided reader. */
  private void releaseReader(RandomAccessFile reader) {
    sharedLock.lock();
    try
    {
      if (isClosed)
      {
        return;
      }
      readerPool.release(reader);
    }
    finally
    {
      sharedLock.unlock();
    }
    readerPool.release(reader);
  }
  /**
   * A cursor on the log.
   */
  static interface LogCursor<K extends Comparable<K>,V> extends DBCursor<Record<K, V>>
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    /**
     * Position the cursor to the record corresponding to the provided key or to
     * the nearest key (the lowest key higher than the provided key).
     * <p>
     * The record is only searched forward. To search backward, it is first
     * necessary to call the {@code rewind()} method to start from beginning of
     * log file.
     *
     * @param key
     *          Key to use as a start position for the cursor. If key is
     *          {@code null}, use the key of the first record instead.
     * @param findNearest
     *          If {@code true}, start position is the lowest key that is higher
     *          than the provided key, otherwise start position is the provided
     *          key.
     * @return {@code true} if cursor is successfully positionned to the key or
     *         the the nearest key, {@code false} otherwise.
     * @throws ChangelogException
     *           If an error occurs when positioning cursor.
     */
    boolean positionTo(K key, boolean findNearest) throws ChangelogException;
    return logfile.hashCode();
  }
    /**
     * Rewind the cursor, positioning it to the beginning of the log file,
     * pointing to no record initially.
     *
     * @throws ChangelogException
     *          If an error occurs when rewinding to zero.
     */
    void rewind() throws ChangelogException;
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object that)
  {
    if (this == that)
    {
      return true;
    }
    if (!(that instanceof LogFile))
    {
      return false;
    }
    final LogFile<?, ?> other = (LogFile<?, ?>) that;
    return logfile.equals(other.logfile);
  }
  /**
   * Implements a cursor on the log.
   * Implements a repositionable cursor on the log file.
   * <p>
   * The cursor initially points to a record, that is {@code cursor.getRecord()}
   * is equals to the first record available from the cursor before any call to
   * {@code cursor.next()} method.
   */
  private static final class LogFileCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
  static final class LogFileCursor<K extends Comparable<K>, V> implements RepositionableCursor<K,V>
  {
    /** The underlying log on which entries are read. */
    private final LogFile<K, V> logFile;
@@ -784,7 +646,7 @@
     * @throws ChangelogException
     *           If an error occurs when creating the cursor.
     */
    LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
    private LogFileCursor(final LogFile<K, V> logFile) throws ChangelogException
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
@@ -800,17 +662,19 @@
      }
    }
    /** {@inheritDoc} */
    public String toString()
    /**
     * Creates a cursor on the provided log, initialised to the provided record and
     * pointing to the provided file position.
     * <p>
     * Note: there is no check to ensure that provided record and file position are
     * consistent. It is the responsability of the caller of this method.
     */
    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
    {
      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
    }
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return currentRecord;
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      logFile.seek(reader, filePosition);
    }
    /** {@inheritDoc} */
@@ -823,6 +687,13 @@
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    {
      return currentRecord;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
      do
      {
@@ -848,64 +719,35 @@
    /** {@inheritDoc} */
    @Override
    public void rewind() throws ChangelogException
    {
      logFile.seek(reader, 0);
      currentRecord = null;
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      logFile.releaseReader(reader);
    }
  }
  /** An empty cursor, that always return null records and false to {@code next()} method. */
  static final class EmptyLogCursor<K extends Comparable<K>, V> implements LogCursor<K,V>
  {
    /** {@inheritDoc} */
    @Override
    public Record<K,V> getRecord()
    /**
     * Returns the file position this cursor is pointing at.
     *
     * @return the position of reader on the log file
     * @throws ChangelogException
     *          If an error occurs.
     */
    long getFilePosition() throws ChangelogException
    {
      return null;
      try
      {
        return reader.getFilePointer();
      }
      catch (IOException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public boolean next()
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
    {
      return false;
    }
    /** {@inheritDoc} */
    @Override
    public void rewind() throws ChangelogException
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public void close()
    {
      // nothing to do
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      return "EmptyLogCursor";
      return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord);
    }
  }
}