recordParser;
/**
* Indicates if this log is closed. When the log is closed, all methods return
* immediately with no effect.
*/
private boolean isClosed;
/**
* The log files contained in this log, ordered by key.
*
* The head log file is always present and is associated with the maximum
* possible key, given by the record parser.
*
* The read-only log files are associated with the highest key they contain.
*/
private final TreeMap> logFiles = new TreeMap>();
/**
* The list of non-empty cursors opened on this log. Opened cursors may have
* to be updated when rotating the head log file.
*/
private final List> openCursors = new CopyOnWriteArrayList>();
/**
* A log file is rotated once it has exceeded this size limit. The log file can have
* a size much larger than this limit if the last record written has a huge size.
*
* TODO : to be replaced later by a (or a list of) configurable Rotation policy
* eg, List rotationPolicies = new ArrayList();
*/
private final long sizeLimitPerLogFileInBytes;
/**
* The exclusive lock used for writes and lifecycle operations on this log:
* initialize, clear, sync and close.
*/
private final Lock exclusiveLock;
/**
* The shared lock used for reads and cursor operations on this log.
*/
private final Lock sharedLock;
/**
* Open a log with the provided log path, record parser and maximum size per
* log file.
*
* If no log exists for the provided path, a new one is created.
*
* @param
* Type of the key of a record, which must be comparable.
* @param
* Type of the value of a record.
* @param logPath
* Path of the log.
* @param parser
* Parser for encoding/decoding of records.
* @param sizeLimitPerFileInBytes
* Limit in bytes before rotating the head log file of the log.
* @return a log
* @throws ChangelogException
* If a problem occurs during initialization.
*/
static synchronized , V> Log openLog(final File logPath,
final RecordParser parser, final long sizeLimitPerFileInBytes) throws ChangelogException
{
Reject.ifNull(logPath, parser);
@SuppressWarnings("unchecked")
Log log = (Log) logsCache.get(logPath);
if (log == null)
{
log = new Log(logPath, parser, sizeLimitPerFileInBytes);
logsCache.put(logPath, log);
}
else
{
// TODO : check that parser and size limit are compatible with the existing one,
// and issue a warning if it is not the case
log.referenceCount++;
}
return log;
}
/**
* Release a reference to the log corresponding to provided path. The log is
* closed if this is the last reference.
*/
private static synchronized void releaseLog(final File logPath)
{
Log, ?> log = logsCache.get(logPath);
if (log == null)
{
// this should never happen
ErrorLogger.logError(
ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
return;
}
if (log.referenceCount > 1)
{
log.referenceCount--;
}
else
{
log.doClose();
logsCache.remove(logPath);
}
}
/**
* Creates a new log.
*
* @param logPath
* The directory path of the log.
* @param parser
* Parser of records.
* @param sizeLimitPerFile
* Limit in bytes before rotating a log file.
* @throws ChangelogException
* If a problem occurs during initialization.
*/
private Log(final File logPath, final RecordParser parser, final long sizeLimitPerFile)
throws ChangelogException
{
this.logPath = logPath;
this.recordParser = parser;
this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
this.exclusiveLock = lock.writeLock();
this.sharedLock = lock.readLock();
createOrOpenLogFiles();
}
/** Create or open log files used by this log. */
private void createOrOpenLogFiles() throws ChangelogException
{
exclusiveLock.lock();
try
{
createRootDirIfNotExists();
openHeadLogFile();
for (final File file : getReadOnlyLogFiles())
{
openReadOnlyLogFile(file);
}
isClosed = false;
}
catch (ChangelogException e)
{
// ensure all log files opened at this point are closed
close();
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
}
finally
{
exclusiveLock.unlock();
}
}
private File[] getReadOnlyLogFiles() throws ChangelogException
{
File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
if (files == null)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
}
return files;
}
private void createRootDirIfNotExists() throws ChangelogException
{
if (!logPath.exists())
{
if (!logPath.mkdirs())
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
}
}
}
/**
* Returns the path of this log.
*
* @return the path of this log directory
*/
public File getPath()
{
return logPath;
}
/**
* Add the provided record at the end of this log.
*
* In order to ensure that record is written out of buffers and persisted
* to file system, it is necessary to explicitely call the
* {@code syncToFileSystem()} method.
*
* @param record
* The record to add.
* @throws ChangelogException
* If the record can't be added to the log.
*/
public void append(final Record record) throws ChangelogException
{
// If this exclusive lock happens to be a bottleneck :
// 1. use a shared lock for appending the record first
// 2. switch to an exclusive lock only if rotation is needed
// See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
LogFile headLogFile = getHeadLogFile();
if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
{
ErrorLogger.logError(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
}
headLogFile.append(record);
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Synchronize all records added with the file system, ensuring that records
* are effectively persisted.
*
* After a successful call to this method, it is guaranteed that all records
* added to the log are persisted to the file system.
*
* @throws ChangelogException
* If the synchronization fails.
*/
public void syncToFileSystem() throws ChangelogException
{
exclusiveLock.lock();
try
{
getHeadLogFile().syncToFileSystem();
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the first position.
*
* The returned cursor initially points to record corresponding to the first
* key, that is {@code cursor.getRecord()} is equals to the record
* corresponding to the first key before any call to {@code cursor.next()}
* method.
*
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor() throws ChangelogException
{
LogCursor cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
return new EmptyLogCursor();
}
cursor = new LogCursor(this);
cursor.positionTo(null, false);
registerCursor(cursor);
return cursor;
}
catch (ChangelogException e)
{
StaticUtils.close(cursor);
throw e;
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the position defined by the provided key.
*
* The returned cursor initially points to record corresponding to the key,
* that is {@code cursor.getRecord()} is equals to the record corresponding to
* the key before any call to {@code cursor.next()} method.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor(final K key) throws ChangelogException
{
return getCursor(key, false);
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the position defined by the smallest key that is higher than
* the provided key.
*
* The returned cursor initially points to record corresponding to the key
* found, that is {@code cursor.getRecord()} is equals to the record
* corresponding to the key found before any call to {@code cursor.next()}
* method.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getNearestCursor(final K key) throws ChangelogException
{
return getCursor(key, true);
}
/** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */
private RepositionableCursor getCursor(final K key, boolean findNearest) throws ChangelogException
{
if (key == null)
{
return getCursor();
}
LogCursor cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
return new EmptyLogCursor();
}
cursor = new LogCursor(this);
final boolean isFound = cursor.positionTo(key, findNearest);
// for nearest case, it is ok if the target is not found
if (isFound || findNearest)
{
registerCursor(cursor);
return cursor;
}
else
{
StaticUtils.close(cursor);
return new EmptyLogCursor();
}
}
catch (ChangelogException e)
{
StaticUtils.close(cursor);
throw e;
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns the oldest (first) record from this log.
*
* @return the oldest record, which may be {@code null} if there is no record
* in the log.
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
public Record getOldestRecord() throws ChangelogException
{
return getOldestLogFile().getOldestRecord();
}
/**
* Returns the newest (last) record from this log.
*
* @return the newest record, which may be {@code null}
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
public Record getNewestRecord() throws ChangelogException
{
return getHeadLogFile().getNewestRecord();
}
/**
* Returns the number of records in the log.
*
* @return the number of records
* @throws ChangelogException
* If a problem occurs.
*/
public long getNumberOfRecords() throws ChangelogException
{
long count = 0;
for (final LogFile logFile : logFiles.values())
{
count += logFile.getNumberOfRecords();
}
return count;
}
/**
* Purge the log up to and excluding the provided key.
*
* @param purgeKey
* the key up to which purging must happen
* @return the oldest non purged record, or {@code null}
* if no record was purged
* @throws ChangelogException
* if a database problem occurs.
*/
public Record purgeUpTo(final K purgeKey) throws ChangelogException
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return null;
}
final SortedMap> logFilesToPurge = logFiles.headMap(purgeKey);
if (logFilesToPurge.isEmpty())
{
return null;
}
final List undeletableFiles = new ArrayList();
final Iterator>> entriesToPurge = logFilesToPurge.entrySet().iterator();
while (entriesToPurge.hasNext())
{
final LogFile logFile = entriesToPurge.next().getValue();
try
{
logFile.close();
logFile.delete();
entriesToPurge.remove();
}
catch (ChangelogException e)
{
// The deletion of log file on file system has failed
undeletableFiles.add(logFile.getFile().getPath());
}
}
if (!undeletableFiles.isEmpty())
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
StaticUtils.listToString(undeletableFiles, ", ")));
}
return getOldestRecord();
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Empties the log, discarding all records it contains.
*
* @throws ChangelogException
* If cursors are opened on this log, or if a problem occurs during
* clearing operation.
*/
public void clear() throws ChangelogException
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (!openCursors.isEmpty())
{
// TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because
// there is one open cursor when clearing.
// Switch to logging until this issue is solved
// throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it",
// logPath.getPath(), openCursors.size()));
ErrorLogger.logError(
ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size()));
}
// delete all log files
final List undeletableFiles = new ArrayList();
for (LogFile logFile : logFiles.values())
{
try
{
logFile.close();
logFile.delete();
}
catch (ChangelogException e)
{
undeletableFiles.add(logFile.getFile().getPath());
}
}
if (!undeletableFiles.isEmpty())
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
StaticUtils.listToString(undeletableFiles, ", ")));
}
logFiles.clear();
// recreate an empty head log file
openHeadLogFile();
}
catch (Exception e)
{
throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
}
finally
{
exclusiveLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public void close()
{
releaseLog(logPath);
}
/** Effectively close this log. */
private void doClose()
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (!openCursors.isEmpty())
{
ErrorLogger.logError(
ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
}
StaticUtils.close(logFiles.values());
isClosed = true;
}
finally
{
exclusiveLock.unlock();
}
}
private LogFile getHeadLogFile()
{
return logFiles.lastEntry().getValue();
}
private LogFile getOldestLogFile()
{
return logFiles.firstEntry().getValue();
}
/** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */
private void rotateHeadLogFile() throws ChangelogException
{
final LogFile headLogFile = getHeadLogFile();
final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
headLogFile.close();
renameHeadLogFileTo(readOnlyLogFile);
openHeadLogFile();
openReadOnlyLogFile(readOnlyLogFile);
updateCursorsOpenedOnHead();
}
private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
{
final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
try
{
StaticUtils.renameFile(headLogFile, rotatedLogFile);
}
catch (IOException e)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(HEAD_LOG_FILE_NAME, rotatedLogFile.getPath()), e);
}
}
/**
* Returns the key bounds for the provided log file.
*
* @return the pair of (lowest key, highest key) that correspond to records
* stored in the corresponding log file.
* @throws ChangelogException
* if an error occurs while retrieving the keys
*/
private Pair getKeyBounds(final LogFile logFile) throws ChangelogException
{
try
{
final String name = logFile.getFile().getName();
final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
.split(LOG_FILE_NAME_SEPARATOR);
return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
}
catch (Exception e)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
}
}
/**
* Returns the file name to use for the read-only version of the provided
* log file.
*
* The file name is based on the lowest and highest key in the log file.
*
* @return the name to use for the read-only version of the log file
* @throws ChangelogException
* If an error occurs.
*/
private String generateReadOnlyFileName(final LogFile logFile) throws ChangelogException
{
final K lowestKey = logFile.getOldestRecord().getKey();
final K highestKey = logFile.getNewestRecord().getKey();
return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+ recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
}
/** Update the open cursors after a rotation of the head log file. */
private void updateCursorsOpenedOnHead() throws ChangelogException
{
for (LogCursor cursor : openCursors)
{
final CursorState state = cursor.getState();
// Need to update the cursor only if it is pointing to the head log file
if (isHeadLogFile(state.logFile))
{
updateOpenCursor(cursor, state);
}
}
}
/**
* Update the provided open cursor with the provided state.
*
* The cursor must report the previous state on the head log file to the same
* state (position in file, current record) in the read-only log file just created.
*/
private void updateOpenCursor(final LogCursor cursor, final CursorState state) throws ChangelogException
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
final LogFile logFile = findLogFileFor(previousKey);
cursor.reinitializeTo(new CursorState(logFile, state.filePosition, state.record));
}
private void openHeadLogFile() throws ChangelogException
{
logFiles.put(recordParser.getMaxKey(),
LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser));
}
private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
{
final LogFile logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
final Pair bounds = getKeyBounds(logFile);
logFiles.put(bounds.getSecond(), logFile);
}
private void registerCursor(final LogCursor cursor)
{
openCursors.add(cursor);
}
private void unregisterCursor(final LogCursor cursor)
{
openCursors.remove(cursor);
}
/**
* Returns the log file that is just after the provided log file wrt the order
* defined on keys, or {@code null} if the provided log file is the last one
* (the head log file).
*/
private LogFile getNextLogFile(final LogFile currentLogFile) throws ChangelogException
{
if (isHeadLogFile(currentLogFile))
{
return null;
}
final Pair bounds = getKeyBounds(currentLogFile);
return logFiles.higherEntry(bounds.getSecond()).getValue();
}
private boolean isHeadLogFile(final LogFile logFile)
{
return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
}
/** Returns the log file that should contain the provided key. */
private LogFile findLogFileFor(final K key)
{
if (key == null || logFiles.lowerKey(key) == null)
{
return getOldestLogFile();
}
return logFiles.ceilingEntry(key).getValue();
}
/**
* Represents a cursor than can be repositioned on a given key.
*/
static interface RepositionableCursor,V> extends DBCursor>
{
/**
* Position the cursor to the record corresponding to the provided key or to
* the nearest key (the lowest key higher than the provided key).
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, use the key of the first record instead.
* @param findNearest
* If {@code true}, start position is the lowest key that is higher
* than the provided key, otherwise start position is the provided
* key.
* @return {@code true} if cursor is successfully positionned to the key or
* the nearest key, {@code false} otherwise.
* @throws ChangelogException
* If an error occurs when positioning cursor.
*/
boolean positionTo(K key, boolean findNearest) throws ChangelogException;
}
/**
* Implements a cursor on the log.
*
* The cursor initially points to a record, that is {@code cursor.getRecord()}
* is equals to the first record available from the cursor before any call to
* {@code cursor.next()} method.
*
* The cursor uses the log shared lock to ensure reads are not done during a rotation.
*/
private static class LogCursor, V> implements RepositionableCursor
{
private final Log log;
private LogFile currentLogFile;
private LogFileCursor currentCursor;
/**
* Creates a cursor on the provided log.
*
* @param log
* The log on which the cursor read records.
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
private LogCursor(final Log log) throws ChangelogException
{
this.log = log;
}
/** {@inheritDoc} */
@Override
public Record getRecord()
{
return currentCursor.getRecord();
}
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
log.sharedLock.lock();
try
{
final boolean hasNext = currentCursor.next();
if (hasNext)
{
return true;
}
final LogFile logFile = log.getNextLogFile(currentLogFile);
if (logFile != null)
{
switchToLogFile(logFile);
return true;
}
return false;
}
finally
{
log.sharedLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public void close()
{
log.sharedLock.lock();
try
{
StaticUtils.close(currentCursor);
log.unregisterCursor(this);
}
finally
{
log.sharedLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
{
log.sharedLock.lock();
try
{
final LogFile logFile = log.findLogFileFor(key);
if (logFile != currentLogFile)
{
switchToLogFile(logFile);
}
if (key != null)
{
boolean isFound = currentCursor.positionTo(key, findNearest);
if (isFound && getRecord() == null)
{
// The key to position to may be in the next file, force the switch
isFound = next();
}
return isFound;
}
return true;
}
finally
{
log.sharedLock.unlock();
}
}
/** Returns the state of this cursor. */
private CursorState getState() throws ChangelogException
{
return new CursorState(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
}
/** Reinitialize this cursor to the provided state. */
private void reinitializeTo(final CursorState cursorState) throws ChangelogException
{
StaticUtils.close(currentCursor);
currentLogFile = cursorState.logFile;
currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
}
/** Switch the cursor to the provided log file. */
private void switchToLogFile(final LogFile logFile) throws ChangelogException
{
StaticUtils.close(currentCursor);
currentLogFile = logFile;
currentCursor = currentLogFile.getCursor();
}
}
/** An empty cursor, that always return null records and false to {@code next()} method. */
static final class EmptyLogCursor, V> implements RepositionableCursor
{
/** {@inheritDoc} */
@Override
public Record getRecord()
{
return null;
}
/** {@inheritDoc} */
@Override
public boolean next()
{
return false;
}
/** {@inheritDoc} */
@Override
public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
{
return false;
}
/** {@inheritDoc} */
@Override
public void close()
{
// nothing to do
}
/** {@inheritDoc} */
@Override
public String toString()
{
return "EmptyLogCursor";
}
}
/**
* Represents the state of a cursor.
*
* The state is used to update a cursor when rotating the head log file : the
* state of cursor on head log file must be reported to the new read-only log
* file that is created when rotating.
*/
private static class CursorState, V>
{
/** The log file. */
private final LogFile logFile;
/**
* The position of the reader on the log file. It is the offset from the
* beginning of the file, in bytes, at which the next read occurs.
*/
private final long filePosition;
/** The record the cursor is pointing to. */
private final Record record;
private CursorState(final LogFile logFile, final long filePosition, final Record record)
{
this.logFile = logFile;
this.filePosition = filePosition;
this.record = record;
}
}
}