mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
12.08.2014 3c931b0f1ba72ce655f1fe03295aff77b4bfcf38
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -30,11 +30,9 @@
import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.EOFException;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.forgerock.util.Reject;
import org.opends.messages.Message;
@@ -42,11 +40,11 @@
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import com.forgerock.opendj.util.Pair;
/**
 * A log file, containing part of a {@code Log}. The log file may be:
 * <ul>
@@ -72,17 +70,14 @@
  /** The file containing the records. */
  private final File logfile;
  /** The parser of records, to convert bytes to record and record to bytes. */
  private final RecordParser<K, V> parser;
  /** The pool to obtain a reader on the log. */
  private LogReaderPool readerPool;
  private final LogReaderPool<K, V> readerPool;
  /**
   * The writer on the log file, which may be {@code null} if log file is not
   * write-enabled
   * write-enabled.
   */
  private LogWriter writer;
  private final BlockLogWriter<K, V> writer;
  /** Indicates if log is enabled for write. */
  private final boolean isWriteEnabled;
@@ -105,10 +100,11 @@
  {
    Reject.ifNull(logFilePath, parser);
    this.logfile = logFilePath;
    this.parser = parser;
    this.isWriteEnabled = isWriteEnabled;
    initialize();
    createLogFileIfNotExists();
    writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null;
    readerPool = new LogReaderPool<K, V>(logfile, parser);
  }
  /**
@@ -155,25 +151,6 @@
  }
  /**
   * Initialize this log.
   * <p>
   * Create directories and file if necessary, and create a writer
   * and pool of readers.
   *
   * @throws ChangelogException
   *            If an errors occurs during initialization.
   */
  private void initialize() throws ChangelogException
  {
    createLogFileIfNotExists();
    if (isWriteEnabled)
    {
      writer = new LogWriter(logfile);
    }
    readerPool = new LogReaderPool(logfile);
  }
  /**
   * Returns the file containing the records.
   *
   * @return the file
@@ -221,23 +198,7 @@
  void append(final Record<K, V> record) throws ChangelogException
  {
    checkLogIsEnabledForWrite();
    try
    {
      writer.write(encodeRecord(record));
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e);
    }
  }
  private ByteString encodeRecord(final Record<K, V> record)
  {
    final ByteString data = parser.encodeRecord(record);
    return new ByteStringBuilder()
      .append(data.length())
      .append(data)
      .toByteString();
    writer.write(record);
  }
  /**
@@ -530,73 +491,21 @@
    return logfile.getPath();
  }
  /** Read a record from the provided reader. */
  private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException
  {
    try
    {
      final ByteString recordData = readEncodedRecord(reader);
      return recordData != null ? parser.decodeRecord(recordData) : null;
    }
    catch(DecodingException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
    }
  }
  private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException
  {
    try
    {
      final byte[] lengthData = new byte[4];
      reader.readFully(lengthData);
      int recordLength = ByteString.wrap(lengthData).toInt();
      final byte[] recordData = new byte[recordLength];
      reader.readFully(recordData);
      return ByteString.wrap(recordData);
    }
    catch(EOFException e)
    {
      // end of stream, no record or uncomplete record
      return null;
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e);
    }
  }
  /** Seek to given position on the provided reader. */
  private void seek(RandomAccessFile reader, long position) throws ChangelogException
  {
    try
    {
      reader.seek(position);
    }
    catch (IOException e)
    {
      throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e);
    }
  }
  /**
   * Returns a random access file to read this log.
   * Returns a reader for this log.
   * <p>
   * Assumes that calling methods ensure that log is not closed.
   */
  private RandomAccessFile getReader() throws ChangelogException
  private BlockLogReader<K, V> getReader() throws ChangelogException
  {
    return readerPool.get();
  }
  /** Release the provided reader. */
  private void releaseReader(RandomAccessFile reader) {
  private void releaseReader(BlockLogReader<K, V> reader) {
    readerPool.release(reader);
  }
  /** {@inheritDoc} */
  @Override
  public int hashCode()
@@ -633,7 +542,7 @@
    private final LogFile<K, V> logFile;
    /** To read the records. */
    private final RandomAccessFile reader;
    private final BlockLogReader<K, V> reader;
    /** The current available record, may be {@code null}. */
    private Record<K,V> currentRecord;
@@ -674,14 +583,14 @@
      this.logFile = logFile;
      this.reader = logFile.getReader();
      this.currentRecord = record;
      logFile.seek(reader, filePosition);
      reader.seekToPosition(filePosition);
    }
    /** {@inheritDoc} */
    @Override
    public boolean next() throws ChangelogException
    {
      currentRecord = logFile.readRecord(reader);
      currentRecord = reader.readRecord();
      return currentRecord != null;
    }
@@ -695,26 +604,10 @@
    /** {@inheritDoc} */
    @Override
    public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
      do
      {
        if (currentRecord != null)
        {
          final boolean matches = findNearest ?
              currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key);
          if (matches)
          {
            if (findNearest && currentRecord.getKey().equals(key))
            {
              // skip key in order to position on lowest higher key
              next();
            }
            return true;
          }
        }
        next();
      }
      while (currentRecord != null);
      return false;
      final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
      final boolean found = result.getFirst();
      currentRecord = found ? result.getSecond() : null;
      return found;
    }
    /** {@inheritDoc} */
@@ -733,15 +626,7 @@
     */
    long getFilePosition() throws ChangelogException
    {
      try
      {
        return reader.getFilePointer();
      }
      catch (IOException e)
      {
        throw new ChangelogException(
            ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e);
      }
      return reader.getFilePosition();
    }
    /** {@inheritDoc} */