mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Yannick Lecaillez
01.45.2015 8fdf1768757fba933e7ce63ac6381eacec41f0c6
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -33,6 +33,9 @@
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -52,9 +55,7 @@
 * </ul>
 * <p>
 * A log file is NOT intended to be used directly, but only has part of a
 * {@link Log}. In particular, there is no concurrency management and no checks
 * to ensure that log is not closed when performing any operation on it. Those
 * are managed at the {@code Log} level.
 * {@link Log}.
 *
 * @param <K>
 *          Type of the key of a record, which must be comparable.
@@ -80,6 +81,12 @@
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
  /** Lock used to ensure write atomicity. */
  private final Lock exclusiveLock;
  /** Lock used to ensure that log file is in a consistent state when reading it. */
  private final Lock sharedLock;
  private Record<K, V> newestRecord;
  /**
@@ -113,6 +120,10 @@
      writer = null;
    }
    readerPool = new LogReaderPool<>(logfile, parser);
    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
    exclusiveLock = rwLock.writeLock();
    sharedLock = rwLock.readLock();
  }
  /**
@@ -204,8 +215,8 @@
   */
  private void ensureLogFileIsValid(final RecordParser<K, V> parser) throws ChangelogException
  {
    try(final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
        final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
    try (final RandomAccessFile readerWriter = new RandomAccessFile(logfile, "rws");
         final BlockLogReader<K, V> reader = BlockLogReader.newReader(logfile, readerWriter, parser))
    {
      final long lastValidPosition = reader.checkLogIsValid();
      if (lastValidPosition != -1)
@@ -238,8 +249,16 @@
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    writer.write(record);
    newestRecord = record;
    exclusiveLock.lock();
    try
    {
      writer.write(record);
      newestRecord = record;
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
  /**
@@ -252,8 +271,8 @@
   */
  void dumpAsTextFile(File dumpFile) throws ChangelogException
  {
    try(final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
        final DBCursor<Record<K, V>> cursor = getCursor())
    try (final BufferedWriter textWriter = new BufferedWriter(new FileWriter(dumpFile));
         final DBCursor<Record<K, V>> cursor = getCursor())
    {
      while (cursor.getRecord() != null)
      {
@@ -287,6 +306,7 @@
  void syncToFileSystem() throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    sharedLock.lock();
    try
    {
      writer.sync();
@@ -295,6 +315,10 @@
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e);
    }
    finally
    {
      sharedLock.unlock();
    }
  }
  /**
@@ -355,7 +379,15 @@
    {
      try (BlockLogReader<K, V> reader = getReader())
      {
        newestRecord = reader.getNewestRecord();
        sharedLock.lock();
        try
        {
          newestRecord = reader.getNewestRecord();
        }
        finally
        {
          sharedLock.unlock();
        }
      }
      catch (IOException ioe)
      {
@@ -375,7 +407,7 @@
  long getNumberOfRecords() throws ChangelogException
  {
    // TODO  : need a more efficient way to retrieve it
    try(final DBCursor<Record<K, V>> cursor = getCursor())
    try (final DBCursor<Record<K, V>> cursor = getCursor())
    {
      long counter = 0L;
      while (cursor.next())
@@ -414,9 +446,18 @@
   */
  void delete() throws ChangelogException
  {
    if (!logfile.delete())
    exclusiveLock.lock();
    try
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
      final boolean isDeleted = logfile.delete();
      if (!isDeleted)
      {
        throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath()));
      }
    }
    finally
    {
      exclusiveLock.unlock();
    }
  }
@@ -517,14 +558,22 @@
     * pointing to the provided file position.
     * <p>
     * Note: there is no check to ensure that provided record and file position are
     * consistent. It is the responsability of the caller of this method.
     * consistent. It is the responsibility of the caller of this method.
     */
    private LogFileCursor(LogFile<K, V> logFile, Record<K, V> record, long filePosition) throws ChangelogException
    {
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      reader.seekToPosition(filePosition);
      logFile.sharedLock.lock();
      try
      {
        reader.seekToPosition(filePosition);
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */
@@ -538,7 +587,15 @@
        initialRecord = null;
        return true;
      }
      currentRecord = reader.readRecord();
      logFile.sharedLock.lock();
      try
      {
        currentRecord = reader.readRecord();
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
      return currentRecord != null;
    }
@@ -553,7 +610,16 @@
    @Override
    public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
        throws ChangelogException {
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
      final Pair<Boolean, Record<K, V>> result;
      logFile.sharedLock.lock();
      try
      {
        result = reader.seekToRecord(key, match, pos);
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
      final boolean found = result.getFirst();
      initialRecord = found ? result.getSecond() : null;
      return found;
@@ -575,7 +641,15 @@
     */
    long getFilePosition() throws ChangelogException
    {
      return reader.getFilePosition();
      logFile.sharedLock.lock();
      try
      {
        return reader.getFilePosition();
      }
      finally
      {
        logFile.sharedLock.unlock();
      }
    }
    /** {@inheritDoc} */