/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2014 ForgeRock AS. */ package org.opends.server.replication.server.changelog.file; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import java.io.BufferedWriter; import java.io.Closeable; import java.io.EOFException; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.io.RandomAccessFile; import org.forgerock.util.Reject; import org.opends.messages.Message; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; import org.opends.server.types.ByteString; import org.opends.server.types.ByteStringBuilder; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.StaticUtils; /** * A log file, containing part of a {@code Log}. The log file may be: * *

* A log file is NOT intended to be used directly, but only has part of a * {@code Log}. In particular, there is no concurrency management and no checks * to ensure that log is not closed when performing any operation on it. Those * are managed at the {@code Log} level. * * @param * Type of the key of a record, which must be comparable. * @param * Type of the value of a record. */ final class LogFile, V> implements Closeable { private static final DebugTracer TRACER = getTracer(); /** The file containing the records. */ private final File logfile; /** The parser of records, to convert bytes to record and record to bytes. */ private final RecordParser parser; /** The pool to obtain a reader on the log. */ private LogReaderPool readerPool; /** * The writer on the log file, which may be {@code null} if log file is not * write-enabled */ private LogWriter writer; /** Indicates if log is enabled for write. */ private final boolean isWriteEnabled; /** * Creates a new log file. * * @param logFilePath * Path of the log file. * @param parser * Parser of records. * @param isWriteEnabled * {@code true} if this changelog is write-enabled, {@code false} * otherwise. * @throws ChangelogException * If a problem occurs during initialization. */ private LogFile(final File logFilePath, final RecordParser parser, boolean isWriteEnabled) throws ChangelogException { Reject.ifNull(logFilePath, parser); this.logfile = logFilePath; this.parser = parser; this.isWriteEnabled = isWriteEnabled; initialize(); } /** * Creates a read-only log file with the provided root path and record parser. * * @param * Type of the key of a record, which must be comparable. * @param * Type of the value of a record. * @param logFilePath * Path of the log file. * @param parser * Parser of records. * @return a read-only log file * @throws ChangelogException * If a problem occurs during initialization. */ static , V> LogFile newReadOnlyLogFile(final File logFilePath, final RecordParser parser) throws ChangelogException { return new LogFile(logFilePath, parser, false); } /** * Creates a write-enabled log file that appends records to the end of file, * with the provided root path and record parser. * * @param * Type of the key of a record, which must be comparable. * @param * Type of the value of a record. * @param logFilePath * Path of the log file. * @param parser * Parser of records. * @return a write-enabled log file * @throws ChangelogException * If a problem occurs during initialization. */ static , V> LogFile newAppendableLogFile(final File logFilePath, final RecordParser parser) throws ChangelogException { return new LogFile(logFilePath, parser, true); } /** * Initialize this log. *

* Create directories and file if necessary, and create a writer * and pool of readers. * * @throws ChangelogException * If an errors occurs during initialization. */ private void initialize() throws ChangelogException { createLogFileIfNotExists(); if (isWriteEnabled) { writer = new LogWriter(logfile); } readerPool = new LogReaderPool(logfile); } /** * Returns the file containing the records. * * @return the file */ File getFile() { return logfile; } private void checkLogIsEnabledForWrite() throws ChangelogException { if (!isWriteEnabled) { throw new ChangelogException(WARN_CHANGELOG_NOT_ENABLED_FOR_WRITE.get(logfile.getPath())); } } private void createLogFileIfNotExists() throws ChangelogException { try { if (!logfile.exists()) { logfile.createNewFile(); } } catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_FILE.get(logfile.getPath()), e); } } /** * Add the provided record at the end of this log. *

* In order to ensure that record is written out of buffers and persisted * to file system, it is necessary to explicitely call the * {@code syncToFileSystem()} method. * * @param record * The record to add. * @throws ChangelogException * If the record can't be added to the log. */ void append(final Record record) throws ChangelogException { checkLogIsEnabledForWrite(); try { writer.write(encodeRecord(record)); } catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_ADD_RECORD.get(record.toString(), getPath()), e); } } private ByteString encodeRecord(final Record record) { final ByteString data = parser.encodeRecord(record); return new ByteStringBuilder() .append(data.length()) .append(data) .toByteString(); } /** * Dump this log file as a text file, intended for debugging purpose only. * * @param dumpFile * File that will contains log records using a human-readable format * @throws ChangelogException * If an error occurs during dump */ void dumpAsTextFile(File dumpFile) throws ChangelogException { DBCursor> cursor = getCursor(); BufferedWriter textWriter = null; try { textWriter = new BufferedWriter(new FileWriter(dumpFile)); while (cursor.getRecord() != null) { Record record = cursor.getRecord(); textWriter.write("key=" + record.getKey()); textWriter.write(" | "); textWriter.write("value=" + record.getValue()); textWriter.write('\n'); cursor.next(); } } catch (IOException e) { // No I18N needed, used for debugging purpose only throw new ChangelogException( Message.raw("Error when dumping content of log '%s' in target file : '%s'", getPath(), dumpFile), e); } finally { StaticUtils.close(textWriter); } } /** * Synchronize all records added with the file system, ensuring that records * are effectively persisted. *

* After a successful call to this method, it is guaranteed that all records * added to the log are persisted to the file system. * * @throws ChangelogException * If the synchronization fails. */ void syncToFileSystem() throws ChangelogException { checkLogIsEnabledForWrite(); try { writer.sync(); } catch (Exception e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SYNC.get(getPath()), e); } } /** * Returns a cursor that allows to retrieve the records from this log, * starting at the first position. *

* The returned cursor initially points to record corresponding to the first * key, that is {@code cursor.getRecord()} is equals to the record * corresponding to the first key before any call to {@code cursor.next()} * method. * * @return a cursor on the log records, which is never {@code null} * @throws ChangelogException * If the cursor can't be created. */ LogFileCursor getCursor() throws ChangelogException { return new LogFileCursor(this); } /** * Returns a cursor that allows to retrieve the records from this log, * starting at the position defined by the provided key. *

* The returned cursor initially points to record corresponding to the key, * that is {@code cursor.getRecord()} is equals to the record corresponding to * the key before any call to {@code cursor.next()} method. * * @param key * Key to use as a start position for the cursor. If key is * {@code null}, cursor will point at the first record of the log. * @return a cursor on the log records, which is never {@code null} * @throws ChangelogException * If the cursor can't be created. */ LogFileCursor getCursor(final K key) throws ChangelogException { return getCursor(key, false); } /** * Returns a cursor that allows to retrieve the records from this log, * starting at the position defined by the smallest key that is higher than * the provided key. *

* The returned cursor initially points to record corresponding to the key * found, that is {@code cursor.getRecord()} is equals to the record * corresponding to the key found before any call to {@code cursor.next()} * method. * * @param key * Key to use as a start position for the cursor. If key is * {@code null}, cursor will point at the first record of the log. * @return a cursor on the log records, which is never {@code null} * @throws ChangelogException * If the cursor can't be created. */ LogFileCursor getNearestCursor(final K key) throws ChangelogException { return getCursor(key, true); } /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */ private LogFileCursor getCursor(final K key, boolean findNearest) throws ChangelogException { if (key == null) { return getCursor(); } LogFileCursor cursor = null; try { cursor = new LogFileCursor(this); cursor.positionTo(key, findNearest); // if target is not found, cursor is positioned at end of stream return cursor; } catch (ChangelogException e) { StaticUtils.close(cursor); throw e; } } /** * Returns a cursor initialised to the provided record and position in file. * * @param record * The initial record this cursor points on * @param position * The file position this cursor points on * @return the cursor * @throws ChangelogException * If a problem occurs while creating the cursor. */ LogFileCursor getCursorInitialisedTo(Record record, long position) throws ChangelogException { return new LogFileCursor(this, record, position); } /** * Returns the oldest (first) record from this log. * * @return the oldest record, which may be {@code null} if there is no record * in the log. * @throws ChangelogException * If an error occurs while retrieving the record. */ Record getOldestRecord() throws ChangelogException { DBCursor> cursor = null; try { cursor = getCursor(); return cursor.getRecord(); } finally { StaticUtils.close(cursor); } } /** * Returns the newest (last) record from this log. * * @return the newest record, which may be {@code null} * @throws ChangelogException * If an error occurs while retrieving the record. */ Record getNewestRecord() throws ChangelogException { // TODO : need a more efficient way to retrieve it DBCursor> cursor = null; try { cursor = getCursor(); Record record = cursor.getRecord(); while (cursor.next()) { record = cursor.getRecord(); } return record; } finally { StaticUtils.close(cursor); } } /** * Returns the number of records in the log. * * @return the number of records * @throws ChangelogException * If an error occurs. */ long getNumberOfRecords() throws ChangelogException { // TODO : need a more efficient way to retrieve it DBCursor> cursor = null; try { cursor = getCursor(); Record record = cursor.getRecord(); if (record == null) { return 0L; } long counter = 1L; while (cursor.next()) { record = cursor.getRecord(); counter++; } return counter; } finally { StaticUtils.close(cursor); } } /** {@inheritDoc} */ public void close() { if (isWriteEnabled) { try { syncToFileSystem(); } catch (ChangelogException e) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } writer.close(); } readerPool.shutdown(); } /** * Delete this log file (file is physically removed). Should be called only * when log file is closed. * * @throws ChangelogException * If log file can't be deleted. */ void delete() throws ChangelogException { final boolean isDeleted = logfile.delete(); if (!isDeleted) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(getPath())); } } /** * Return the size of this log file in bytes. * * @return the size of log file */ long getSizeInBytes() { return writer.getBytesWritten(); } /** The path of this log file as a String. */ private String getPath() { return logfile.getPath(); } /** Read a record from the provided reader. */ private Record readRecord(final RandomAccessFile reader) throws ChangelogException { try { final ByteString recordData = readEncodedRecord(reader); return recordData != null ? parser.decodeRecord(recordData) : null; } catch(DecodingException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); } } private ByteString readEncodedRecord(final RandomAccessFile reader) throws ChangelogException { try { final byte[] lengthData = new byte[4]; reader.readFully(lengthData); int recordLength = ByteString.wrap(lengthData).toInt(); final byte[] recordData = new byte[recordLength]; reader.readFully(recordData); return ByteString.wrap(recordData); } catch(EOFException e) { // end of stream, no record or uncomplete record return null; } catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DECODE_RECORD.get(logfile.getPath()), e); } } /** Seek to given position on the provided reader. */ private void seek(RandomAccessFile reader, long position) throws ChangelogException { try { reader.seek(position); } catch (IOException e) { throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_SEEK.get(position, logfile.getPath()), e); } } /** * Returns a random access file to read this log. *

* Assumes that calling methods ensure that log is not closed. */ private RandomAccessFile getReader() throws ChangelogException { return readerPool.get(); } /** Release the provided reader. */ private void releaseReader(RandomAccessFile reader) { readerPool.release(reader); } /** {@inheritDoc} */ @Override public int hashCode() { return logfile.hashCode(); } /** {@inheritDoc} */ @Override public boolean equals(Object that) { if (this == that) { return true; } if (!(that instanceof LogFile)) { return false; } final LogFile other = (LogFile) that; return logfile.equals(other.logfile); } /** * Implements a repositionable cursor on the log file. *

* The cursor initially points to a record, that is {@code cursor.getRecord()} * is equals to the first record available from the cursor before any call to * {@code cursor.next()} method. */ static final class LogFileCursor, V> implements RepositionableCursor { /** The underlying log on which entries are read. */ private final LogFile logFile; /** To read the records. */ private final RandomAccessFile reader; /** The current available record, may be {@code null}. */ private Record currentRecord; /** * Creates a cursor on the provided log. * * @param logFile * The log on which the cursor read records. * @throws ChangelogException * If an error occurs when creating the cursor. */ private LogFileCursor(final LogFile logFile) throws ChangelogException { this.logFile = logFile; this.reader = logFile.getReader(); try { // position to the first record. next(); } catch (ChangelogException e) { close(); throw e; } } /** * Creates a cursor on the provided log, initialised to the provided record and * pointing to the provided file position. *

* Note: there is no check to ensure that provided record and file position are * consistent. It is the responsability of the caller of this method. */ private LogFileCursor(LogFile logFile, Record record, long filePosition) throws ChangelogException { this.logFile = logFile; this.reader = logFile.getReader(); this.currentRecord = record; logFile.seek(reader, filePosition); } /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { currentRecord = logFile.readRecord(reader); return currentRecord != null; } /** {@inheritDoc} */ @Override public Record getRecord() { return currentRecord; } /** {@inheritDoc} */ @Override public boolean positionTo(final K key, boolean findNearest) throws ChangelogException { do { if (currentRecord != null) { final boolean matches = findNearest ? currentRecord.getKey().compareTo(key) >= 0 : currentRecord.getKey().equals(key); if (matches) { if (findNearest && currentRecord.getKey().equals(key)) { // skip key in order to position on lowest higher key next(); } return true; } } next(); } while (currentRecord != null); return false; } /** {@inheritDoc} */ @Override public void close() { logFile.releaseReader(reader); } /** * Returns the file position this cursor is pointing at. * * @return the position of reader on the log file * @throws ChangelogException * If an error occurs. */ long getFilePosition() throws ChangelogException { try { return reader.getFilePointer(); } catch (IOException e) { throw new ChangelogException( ERR_CHANGELOG_UNABLE_TO_GET_CURSOR_READER_POSITION_LOG_FILE.get(logFile.getPath()), e); } } /** {@inheritDoc} */ public String toString() { return String.format("Cursor on log file: %s, current record: %s", logFile.logfile, currentRecord); } } }