| | |
| | | |
| | | import java.io.BufferedWriter; |
| | | import java.io.Closeable; |
| | | import java.io.EOFException; |
| | | import java.io.File; |
| | | import java.io.FileWriter; |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * A log file, containing part of a {@code Log}. The log file may be: |
| | | * <ul> |
| | |
| | | /** The file containing the records. */ |
| | | private final File logfile; |
| | | |
| | | /** The parser of records, to convert bytes to record and record to bytes. */ |
| | | private final RecordParser<K, V> parser; |
| | | |
| | | /** The pool to obtain a reader on the log. */ |
| | | private LogReaderPool readerPool; |
| | | private final LogReaderPool<K, V> readerPool; |
| | | |
| | | /** |
| | | * The writer on the log file, which may be {@code null} if log file is not |
| | | * write-enabled |
| | | * write-enabled. |
| | | */ |
| | | private LogWriter writer; |
| | | private final BlockLogWriter<K, V> writer; |
| | | |
| | | /** Indicates if log is enabled for write. */ |
| | | private final boolean isWriteEnabled; |
| | |
| | | { |
| | | Reject.ifNull(logFilePath, parser); |
| | | this.logfile = logFilePath; |
| | | this.parser = parser; |
| | | this.isWriteEnabled = isWriteEnabled; |
| | | |
| | | initialize(); |
| | | createLogFileIfNotExists(); |
| | | writer = isWriteEnabled ? BlockLogWriter.newWriter(new LogWriter(logfile), parser) : null; |
| | | readerPool = new LogReaderPool<K, V>(logfile, parser); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | /** |
| | | * Initialize this log. |
| | | * <p> |
| | | * Create directories and file if necessary, and create a writer |
| | | * and pool of readers. |
| | | * |
| | | * @throws ChangelogException |
| | | * If an errors occurs during initialization. |
| | | */ |
| | | private void initialize() throws ChangelogException |
| | | { |
| | | createLogFileIfNotExists(); |
| | | if (isWriteEnabled) |
| | | { |
| | | writer = new LogWriter(logfile); |
| | | } |
| | | readerPool = new LogReaderPool(logfile); |
| | | } |
| | | |
| | | /** |
| | | * Returns the file containing the records. |
| | | * |
| | | * @return the file |
| | |
| | | void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | checkLogIsEnabledForWrite(); |
| | | try |
| | | { |
| | | writer.write(encodeRecord(record)); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e); |
| | | } |
| | | } |
| | | |
| | | private ByteString encodeRecord(final Record<K, V> record) |
| | | { |
| | | final ByteString data = parser.encodeRecord(record); |
| | | return new ByteStringBuilder() |
| | | .append(data.length()) |
| | | .append(data) |
| | | .toByteString(); |
| | | writer.write(record); |
| | | } |
| | | |
| | | /** |
| | |
| | | return logfile.getPath(); |
| | | } |
| | | |
| | | /** Read a record from the provided reader. */ |
| | | private Record<K,V> readRecord(final RandomAccessFile reader) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final ByteString recordData = readEncodedRecord(reader); |
| | | return recordData != null ? parser.decodeRecord(recordData) : null; |
| | | } |
| | | catch(DecodingException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | final byte[] lengthData = new byte[4]; |
| | | reader.readFully(lengthData); |
| | | int recordLength = ByteString.wrap(lengthData).toInt(); |
| | | |
| | | final byte[] recordData = new byte[recordLength]; |
| | | reader.readFully(recordData); |
| | | return ByteString.wrap(recordData); |
| | | } |
| | | catch(EOFException e) |
| | | { |
| | | // end of stream, no record or uncomplete record |
| | | return null; |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** Seek to given position on the provided reader. */ |
| | | private void seek(RandomAccessFile reader, long position) throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | reader.seek(position); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a random access file to read this log. |
| | | * Returns a reader for this log. |
| | | * <p> |
| | | * Assumes that calling methods ensure that log is not closed. |
| | | */ |
| | | private RandomAccessFile getReader() throws ChangelogException |
| | | private BlockLogReader<K, V> getReader() throws ChangelogException |
| | | { |
| | | return readerPool.get(); |
| | | } |
| | | |
| | | /** Release the provided reader. */ |
| | | private void releaseReader(RandomAccessFile reader) { |
| | | private void releaseReader(BlockLogReader<K, V> reader) { |
| | | readerPool.release(reader); |
| | | } |
| | | |
| | | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public int hashCode() |
| | |
| | | private final LogFile<K, V> logFile; |
| | | |
| | | /** To read the records. */ |
| | | private final RandomAccessFile reader; |
| | | private final BlockLogReader<K, V> reader; |
| | | |
| | | /** The current available record, may be {@code null}. */ |
| | | private Record<K,V> currentRecord; |
| | |
| | | this.logFile = logFile; |
| | | this.reader = logFile.getReader(); |
| | | this.currentRecord = record; |
| | | logFile.seek(reader, filePosition); |
| | | reader.seekToPosition(filePosition); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | currentRecord = logFile.readRecord(reader); |
| | | currentRecord = reader.readRecord(); |
| | | return currentRecord != null; |
| | | } |
| | | |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, boolean findNearest) throws ChangelogException { |
| | | do |
| | | { |
| | | if (currentRecord != null) |
| | | { |
| | | final boolean matches = findNearest ? |
| | | currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key); |
| | | if (matches) |
| | | { |
| | | if (findNearest && currentRecord.getKey().equals(key)) |
| | | { |
| | | // skip key in order to position on lowest higher key |
| | | next(); |
| | | } |
| | | return true; |
| | | } |
| | | } |
| | | next(); |
| | | } |
| | | while (currentRecord != null); |
| | | return false; |
| | | final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest); |
| | | final boolean found = result.getFirst(); |
| | | currentRecord = found ? result.getSecond() : null; |
| | | return found; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | */ |
| | | long getFilePosition() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | return reader.getFilePointer(); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new ChangelogException( |
| | | ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e); |
| | | } |
| | | return reader.getFilePosition(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |