recordParser;
/**
* Indicates if this log is closed. When the log is closed, all methods return
* immediately with no effect.
*/
private boolean isClosed;
/**
* The log files contained in this log, ordered by key.
*
* The head log file is always present and is associated with the maximum
* possible key, given by the record parser.
*
* The read-only log files are associated with the highest key they contain.
*/
private final TreeMap> logFiles = new TreeMap<>();
/**
* The list of non-empty cursors opened on this log. Opened cursors may have
* to be updated when rotating the head log file.
*/
private final List> openCursors = new CopyOnWriteArrayList<>();
/**
* A log file can be rotated once it has exceeded this size limit. The log file can have
* a size much larger than this limit if the last record written has a huge size.
*
* TODO : to be replaced later by a list of configurable Rotation policy
* eg, List rotationPolicies = new ArrayList();
*/
private final long sizeLimitPerLogFileInBytes;
/** The time service used for timing. It is package private so it can be modified by test case. */
TimeService timeService = TimeService.SYSTEM;
/** A log file can be rotated once it has exceeded a given time interval. No rotation happens if equals to zero. */
private long rotationIntervalInMillis;
/** The last time a log file was rotated. */
private long lastRotationTime;
/**
* The exclusive lock used for log rotation and lifecycle operations on this log:
* initialize, clear, sync and close.
*/
private final Lock exclusiveLock;
/** The shared lock used for write operations and accessing {@link #logFiles} map. */
private final Lock sharedLock;
/**
* The replication environment used to create this log. The log is notifying it for any change
* that must be persisted.
*/
private final ReplicationEnvironment replicationEnv;
/**
* Open a log with the provided log path, record parser and maximum size per
* log file.
*
* If no log exists for the provided path, a new one is created.
*
* @param
* Type of the key of a record, which must be comparable.
* @param
* Type of the value of a record.
* @param replicationEnv
* The replication environment used to create this log.
* @param logPath
* Path of the log.
* @param parser
* Parser for encoding/decoding of records.
* @param rotationParameters
* Parameters for the log files rotation.
* @return a log
* @throws ChangelogException
* If a problem occurs during initialization.
*/
static synchronized , V> Log openLog(final ReplicationEnvironment replicationEnv,
final File logPath, final RecordParser parser, final LogRotationParameters rotationParameters)
throws ChangelogException
{
Reject.ifNull(logPath, parser);
@SuppressWarnings("unchecked")
Log log = (Log) logsCache.get(logPath);
if (log == null)
{
log = new Log<>(replicationEnv, logPath, parser, rotationParameters);
logsCache.put(logPath, log);
}
else
{
// TODO : check that parser and size limit are compatible with the existing one,
// and issue a warning if it is not the case
log.referenceCount++;
}
return log;
}
/**
* Returns an empty cursor.
*
* @param the type of keys.
* @param the type of values.
* @return an empty cursor
*/
static , V> RepositionableCursor getEmptyCursor() {
return new Log.EmptyCursor<>();
}
/** Holds the parameters for log files rotation. */
static class LogRotationParameters {
private final long sizeLimitPerFileInBytes;
private final long rotationInterval;
private final long lastRotationTime;
/**
* Creates rotation parameters.
*
* @param sizeLimitPerFileInBytes
* Size limit before rotating a log file.
* @param rotationInterval
* Time interval before rotating a log file.
* @param lastRotationTime
* Last time a log file was rotated.
*/
LogRotationParameters(long sizeLimitPerFileInBytes, long rotationInterval, long lastRotationTime)
{
this.sizeLimitPerFileInBytes = sizeLimitPerFileInBytes;
this.rotationInterval = rotationInterval;
this.lastRotationTime = lastRotationTime;
}
@Override
public String toString()
{
return getClass().getSimpleName() + "("
+ "sizeLimitPerFileInBytes=" + sizeLimitPerFileInBytes
+ ", rotationInterval=" + rotationInterval
+ ", lastRotationTime=" + lastRotationTime
+ ")";
}
}
/**
* Set the time interval for rotation of log file.
*
* @param rotationIntervalInMillis
* time interval before rotation of log file
*/
void setRotationInterval(long rotationIntervalInMillis)
{
this.rotationIntervalInMillis = rotationIntervalInMillis;
}
/**
* Release a reference to the log corresponding to provided path. The log is
* closed if this is the last reference.
*/
private static synchronized void releaseLog(final File logPath)
{
Log, ?> log = logsCache.get(logPath);
if (log == null)
{
// this should never happen
logger.error(ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
return;
}
if (log.referenceCount > 1)
{
log.referenceCount--;
}
else
{
log.doClose();
logsCache.remove(logPath);
}
}
/**
* Creates a new log.
*
* @param replicationEnv
* The replication environment used to create this log.
* @param logPath
* The directory path of the log.
* @param parser
* Parser of records.
* @param rotationParams
* Parameters for log-file rotation.
*
* @throws ChangelogException
* If a problem occurs during initialization.
*/
private Log(final ReplicationEnvironment replicationEnv, final File logPath, final RecordParser parser,
final LogRotationParameters rotationParams) throws ChangelogException
{
this.replicationEnv = replicationEnv;
this.logPath = logPath;
this.recordParser = parser;
this.sizeLimitPerLogFileInBytes = rotationParams.sizeLimitPerFileInBytes;
this.rotationIntervalInMillis = rotationParams.rotationInterval;
this.lastRotationTime = rotationParams.lastRotationTime;
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
this.exclusiveLock = lock.writeLock();
this.sharedLock = lock.readLock();
createOrOpenLogFiles();
}
/** Create or open log files used by this log. */
private void createOrOpenLogFiles() throws ChangelogException
{
exclusiveLock.lock();
try
{
createRootDirIfNotExists();
openHeadLogFile();
for (final File file : getReadOnlyLogFiles())
{
openReadOnlyLogFile(file);
}
isClosed = false;
}
catch (ChangelogException e)
{
// ensure all log files opened at this point are closed
close();
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
}
finally
{
exclusiveLock.unlock();
}
}
private File[] getReadOnlyLogFiles() throws ChangelogException
{
File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
if (files == null)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
}
return files;
}
private void createRootDirIfNotExists() throws ChangelogException
{
if (!logPath.exists() && !logPath.mkdirs())
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
}
}
/**
* Returns the path of this log.
*
* @return the path of this log directory
*/
public File getPath()
{
return logPath;
}
/**
* Add the provided record at the end of this log.
*
* The record must have a key strictly higher than the key
* of the last record added. If it is not the case, the record is not
* appended.
*
* In order to ensure that record is written out of buffers and persisted
* to file system, it is necessary to explicitly call the
* {@code syncToFileSystem()} method.
*
* @param record
* The record to add.
* @throws ChangelogException
* If an error occurs while adding the record to the log.
*/
public void append(final Record record) throws ChangelogException
{
// Fast-path - assume that no rotation is needed and use shared lock.
sharedLock.lock();
try
{
if (isClosed)
{
return;
}
LogFile headLogFile = getHeadLogFile();
if (!mustRotate(headLogFile))
{
headLogFile.append(record);
return;
}
}
finally
{
sharedLock.unlock();
}
// Slow-path - rotation is needed so use exclusive lock.
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
LogFile headLogFile = getHeadLogFile();
if (headLogFile.appendWouldBreakKeyOrdering(record))
{
// abort rotation
return;
}
if (mustRotate(headLogFile))
{
logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
}
headLogFile.append(record);
}
finally
{
exclusiveLock.unlock();
}
}
private boolean mustRotate(LogFile headLogFile)
{
if (headLogFile.getNewestRecord() == null)
{
// never rotate an empty file
return false;
}
if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
{
// rotate because file size exceeded threshold
logger.trace("Rotate log %s due to size: %s", logPath.getPath(), headLogFile.getSizeInBytes());
return true;
}
if (rotationIntervalInMillis > 0)
{
// rotate if time limit is reached
final long timeElapsed = timeService.since(lastRotationTime);
boolean shouldRotate = timeElapsed > rotationIntervalInMillis;
if (shouldRotate)
{
logger.trace("Rotate log %s due to time: time elapsed %s, rotation interval: %s",
logPath.getPath(), timeElapsed, rotationIntervalInMillis);
}
return shouldRotate;
}
return false;
}
/**
* Synchronize all records added with the file system, ensuring that records
* are effectively persisted.
*
* After a successful call to this method, it is guaranteed that all records
* added to the log are persisted to the file system.
*
* @throws ChangelogException
* If the synchronization fails.
*/
public void syncToFileSystem() throws ChangelogException
{
exclusiveLock.lock();
try
{
getHeadLogFile().syncToFileSystem();
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the first position.
*
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor() throws ChangelogException
{
AbortableLogCursor cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
return new EmptyCursor<>();
}
cursor = new AbortableLogCursor<>(this, new InternalLogCursor(this));
cursor.positionTo(null, null, null);
registerCursor(cursor);
return cursor;
}
catch (ChangelogException e)
{
StaticUtils.close(cursor);
throw e;
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the position defined by the provided key.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor(final K key) throws ChangelogException
{
return getCursor(key, EQUAL_TO_KEY, ON_MATCHING_KEY);
}
/**
* Returns a cursor that allows to retrieve the records from this log. The
* starting position is defined by the provided key, cursor matching strategy
* and cursor positioning strategy.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
* @param matchingStrategy
* Cursor key matching strategy.
* @param positionStrategy
* The cursor positioning strategy.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
final PositionStrategy positionStrategy) throws ChangelogException
{
if (key == null)
{
return getCursor();
}
AbortableLogCursor cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
return new EmptyCursor<>();
}
cursor = new AbortableLogCursor<>(this, new InternalLogCursor(this));
final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy);
// Allow for cursor re-initialization after exhaustion in case of
// LESS_THAN_OR_EQUAL_TO_KEY ands GREATER_THAN_OR_EQUAL_TO_KEY strategies
if (isSuccessfullyPositioned || matchingStrategy != EQUAL_TO_KEY)
{
registerCursor(cursor);
return cursor;
}
else
{
StaticUtils.close(cursor);
return new EmptyCursor<>();
}
}
catch (ChangelogException e)
{
StaticUtils.close(cursor);
throw e;
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns the oldest (first) record from this log.
*
* @return the oldest record, which may be {@code null} if there is no record
* in the log.
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
public Record getOldestRecord() throws ChangelogException
{
sharedLock.lock();
try
{
return getOldestLogFile().getOldestRecord();
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns the newest (last) record from this log.
*
* @return the newest record, which may be {@code null}
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
public Record getNewestRecord() throws ChangelogException
{
sharedLock.lock();
try
{
return getHeadLogFile().getNewestRecord();
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns the number of records in the log.
*
* @return the number of records
* @throws ChangelogException
* If a problem occurs.
*/
public long getNumberOfRecords() throws ChangelogException
{
long count = 0;
sharedLock.lock();
try
{
for (final LogFile logFile : logFiles.values())
{
count += logFile.getNumberOfRecords();
}
return count;
}
finally
{
sharedLock.unlock();
}
}
/**
* Purge the log up to and excluding the provided key.
*
* @param purgeKey
* the key up to which purging must happen
* @return the oldest non purged record, or {@code null}
* if no record was purged
* @throws ChangelogException
* if a database problem occurs.
*/
public Record purgeUpTo(final K purgeKey) throws ChangelogException
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return null;
}
final SortedMap> logFilesToPurge = logFiles.headMap(purgeKey);
if (logFilesToPurge.isEmpty())
{
return null;
}
logger.trace("About to purge log files older than purgeKey %s: %s", purgeKey, logFilesToPurge);
final List undeletableFiles = new ArrayList<>();
final Iterator> entriesToPurge = logFilesToPurge.values().iterator();
while (entriesToPurge.hasNext())
{
final LogFile logFile = entriesToPurge.next();
try
{
abortCursorsOpenOnLogFile(logFile);
logFile.close();
logFile.delete();
entriesToPurge.remove();
}
catch (ChangelogException e)
{
// The deletion of log file on file system has failed
undeletableFiles.add(logFile.getFile().getPath());
}
}
if (!undeletableFiles.isEmpty())
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
Utils.joinAsString(", ", undeletableFiles)));
}
return getOldestRecord();
}
finally
{
exclusiveLock.unlock();
}
}
/** Abort all cursors opened on the provided log file. */
@GuardedBy("exclusiveLock")
private void abortCursorsOpenOnLogFile(LogFile logFile)
{
for (AbortableLogCursor cursor : openCursors)
{
if (cursor.isAccessingLogFile(logFile))
{
cursor.abort();
}
}
}
/**
* Empties the log, discarding all records it contains.
*
* All cursors open on the log are aborted.
*
* @throws ChangelogException
* If cursors are opened on this log, or if a problem occurs during
* clearing operation.
*/
public void clear() throws ChangelogException
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (!openCursors.isEmpty())
{
// All open cursors are aborted, which means the change number indexer thread
// should manage AbortedChangelogCursorException specifically to avoid being
// stopped
abortAllOpenCursors();
}
// delete all log files
final List undeletableFiles = new ArrayList<>();
for (LogFile logFile : logFiles.values())
{
try
{
logFile.close();
logFile.delete();
}
catch (ChangelogException e)
{
undeletableFiles.add(logFile.getFile().getPath());
}
}
if (!undeletableFiles.isEmpty())
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
Utils.joinAsString(", ", undeletableFiles)));
}
logFiles.clear();
// recreate an empty head log file
openHeadLogFile();
}
catch (Exception e)
{
throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Dump this log as a text files, intended for debugging purpose only.
*
* @param dumpDirectory
* Directory that will contains log files with text format
* and ".txt" extensions
* @throws ChangelogException
* If an error occurs during dump
*/
void dumpAsTextFile(File dumpDirectory) throws ChangelogException
{
sharedLock.lock();
try
{
for (LogFile logFile : logFiles.values())
{
logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
}
}
finally
{
sharedLock.unlock();
}
}
@Override
public void close()
{
releaseLog(logPath);
}
/**
* Find the highest key that corresponds to a record that is the oldest (or
* first) of a read-only log file and where value mapped from the record is
* lower or equals to provided limit value.
*
* Example
* Given a log with 3 log files, with Record and Mapper mapping a string to its long value
*
* - 1_10.log where oldest record is (key=1, value="50")
* - 11_20.log where oldest record is (key=11, value="150")
* - head.log where oldest record is (key=25, value="250")
*
* Then
*
* - findBoundaryKeyFromRecord(mapper, 20) => null
* - findBoundaryKeyFromRecord(mapper, 50) => 1
* - findBoundaryKeyFromRecord(mapper, 100) => 1
* - findBoundaryKeyFromRecord(mapper, 150) => 11
* - findBoundaryKeyFromRecord(mapper, 200) => 11
* - findBoundaryKeyFromRecord(mapper, 250) => 25
* - findBoundaryKeyFromRecord(mapper, 300) => 25
*
*
* @param
* Type of the value extracted from the record
* @param mapper
* The mapper to extract a value from a record. It is expected that
* extracted values are ordered according to an order consistent with
* this log ordering, i.e. for two records, if key(R1) > key(R2) then
* extractedValue(R1) > extractedValue(R2).
* @param limitValue
* The limit value to search for
* @return the key or {@code null} if no key corresponds
* @throws ChangelogException
* If a problem occurs
*/
> K findBoundaryKeyFromRecord(Record.Mapper mapper, V2 limitValue)
throws ChangelogException
{
sharedLock.lock();
try
{
K key = null;
for (LogFile logFile : logFiles.values())
{
final Record record = logFile.getOldestRecord();
final V2 oldestValue = mapper.map(record.getValue());
if (oldestValue.compareTo(limitValue) > 0)
{
return key;
}
key = record.getKey();
}
return key;
}
finally
{
sharedLock.unlock();
}
}
/** Effectively close this log. */
private void doClose()
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (!openCursors.isEmpty())
{
logger.error(ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
}
StaticUtils.close(logFiles.values());
isClosed = true;
}
finally
{
exclusiveLock.unlock();
}
}
private LogFile getHeadLogFile()
{
return logFiles.lastEntry().getValue();
}
private LogFile getOldestLogFile()
{
return logFiles.firstEntry().getValue();
}
/**
* Rotate the head log file to a read-only log file, and open a new empty head
* log file to write in.
*
* All cursors opened on this log are temporarily disabled (closing underlying resources)
* and then re-open with their previous state.
*/
@GuardedBy("exclusiveLock")
private void rotateHeadLogFile() throws ChangelogException
{
// Temporarily disable cursors opened on head, saving their state
final List, CursorState>> cursorsOnHead = disableOpenedCursorsOnHead();
final LogFile headLogFile = getHeadLogFile();
final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
headLogFile.close();
renameHeadLogFileTo(readOnlyLogFile);
openHeadLogFile();
openReadOnlyLogFile(readOnlyLogFile);
// Re-enable cursors previously opened on head, with the saved state
updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
// Notify even if time-based rotation is not enabled, as it could be enabled at any time
replicationEnv.notifyLogFileRotation(this);
lastRotationTime = timeService.now();
}
private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
{
final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
try
{
StaticUtils.renameFile(headLogFile, rotatedLogFile);
}
catch (IOException e)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
}
}
/**
* Returns the key bounds for the provided log file.
*
* @return the pair of (lowest key, highest key) that correspond to records
* stored in the corresponding log file.
* @throws ChangelogException
* if an error occurs while retrieving the keys
*/
private Pair getKeyBounds(final LogFile logFile) throws ChangelogException
{
try
{
final String name = logFile.getFile().getName();
final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
.split(LOG_FILE_NAME_SEPARATOR);
return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
}
catch (Exception e)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
}
}
/**
* Returns the file name to use for the read-only version of the provided
* log file.
*
* The file name is based on the lowest and highest key in the log file.
*
* @return the name to use for the read-only version of the log file
* @throws ChangelogException
* If an error occurs.
*/
private String generateReadOnlyFileName(final LogFile logFile) throws ChangelogException
{
final K lowestKey = logFile.getOldestRecord().getKey();
final K highestKey = logFile.getNewestRecord().getKey();
return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+ recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
}
/** Update the cursors that were pointing to head after a rotation of the head log file. */
@GuardedBy("exclusiveLock")
private void updateOpenedCursorsOnHeadAfterRotation(List, CursorState>> cursors)
throws ChangelogException
{
for (Pair, CursorState> pair : cursors)
{
final CursorState cursorState = pair.getSecond();
// Need to update the cursor only if it is pointing to the head log file
if (cursorState.isValid() && isHeadLogFile(cursorState.logFile))
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
final LogFile logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY);
final AbortableLogCursor cursor = pair.getFirst();
cursor.reinitializeTo(new CursorState(logFile, cursorState.filePosition, cursorState.record));
}
}
}
@GuardedBy("exclusiveLock")
private void abortAllOpenCursors() throws ChangelogException
{
for (AbortableLogCursor cursor : openCursors)
{
cursor.abort();
}
}
/**
* Disable the cursors opened on the head log file log, by closing their underlying cursor.
* Returns the state of each cursor just before the close operation.
*
* @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
* @throws ChangelogException
* If an error occurs.
*/
@GuardedBy("exclusiveLock")
private List, CursorState>> disableOpenedCursorsOnHead()
throws ChangelogException
{
final List, CursorState>> openCursorsStates = new ArrayList<>();
final LogFile headLogFile = getHeadLogFile();
for (AbortableLogCursor cursor : openCursors)
{
if (cursor.isAccessingLogFile(headLogFile))
{
openCursorsStates.add(Pair.of(cursor, cursor.getState()));
cursor.closeUnderlyingCursor();
}
}
return openCursorsStates;
}
private void openHeadLogFile() throws ChangelogException
{
final LogFile head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser);
logFiles.put(recordParser.getMaxKey(), head);
}
private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
{
final LogFile logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
final Pair bounds = getKeyBounds(logFile);
logFiles.put(bounds.getSecond(), logFile);
}
private void registerCursor(final AbortableLogCursor cursor)
{
openCursors.add(cursor);
}
private void unregisterCursor(final LogCursor cursor)
{
openCursors.remove(cursor);
}
/**
* Returns the log file that is just after the provided log file wrt the order
* defined on keys, or {@code null} if the provided log file is the last one
* (the head log file).
*/
private LogFile getNextLogFile(final LogFile currentLogFile) throws ChangelogException
{
sharedLock.lock();
try
{
if (isHeadLogFile(currentLogFile))
{
return null;
}
final Pair bounds = getKeyBounds(currentLogFile);
return logFiles.higherEntry(bounds.getSecond()).getValue();
}
finally
{
sharedLock.unlock();
}
}
private boolean isHeadLogFile(final LogFile logFile)
{
return Log.HEAD_LOG_FILE_NAME.equals(logFile.getFile().getName());
}
@GuardedBy("sharedLock")
private LogFile findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException
{
if (key == null || logFiles.lowerKey(key) == null)
{
return getOldestLogFile();
}
final LogFile candidate = logFiles.ceilingEntry(key).getValue();
if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy)
&& candidate.getOldestRecord().getKey().compareTo(key) > 0)
{
// This handle the special case where the first key of the candidate is actually greater than the expected one.
// We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy.
return logFiles.floorEntry(key).getValue();
}
return candidate;
}
/**
* Represents a DB Cursor than can be repositioned on a given key.
*
* Note that as a DBCursor, it provides a java.sql.ResultSet like API.
*/
static interface RepositionableCursor, V> extends DBCursor>
{
/**
* Position the cursor to the record corresponding to the provided key and
* provided matching and positioning strategies.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, use the key of the first record instead.
* @param matchStrategy
* The cursor key matching strategy.
* @param positionStrategy
* The cursor positioning strategy.
* @return {@code true} if cursor is successfully positioned, or
* {@code false} otherwise.
* @throws ChangelogException
* If an error occurs when positioning cursor.
*/
boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
throws ChangelogException;
}
/**
* Represents an internal view of a cursor on the log, with extended operations.
*
* This is an abstract class rather than an interface to allow reduced visibility of the methods.
*/
private static abstract class LogCursor, V> implements RepositionableCursor
{
/** Closes the underlying cursor. */
abstract void closeUnderlyingCursor();
/** Returns the state of this cursor. */
abstract CursorState getState() throws ChangelogException;
/** Reinitialize this cursor to the provided state. */
abstract void reinitializeTo(final CursorState cursorState) throws ChangelogException;
/** Returns true if cursor is pointing on provided log file. */
abstract boolean isAccessingLogFile(LogFile logFile);
}
/**
* Implements an internal cursor on the log.
*
* This cursor is intended to be used only inside an {@link AbortableLogCursor},
* because it is relying on AbortableLogCursor for locking.
*/
private static class InternalLogCursor, V> extends LogCursor
{
private final Log log;
private LogFile currentLogFile;
private LogFileCursor currentCursor;
/**
* Creates a cursor on the provided log.
*
* @param log
* The log on which the cursor read records.
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
private InternalLogCursor(final Log log) throws ChangelogException
{
this.log = log;
}
@Override
public Record getRecord()
{
return currentCursor != null ? currentCursor.getRecord() : null;
}
@Override
public boolean next() throws ChangelogException
{
// Lock is needed here to ensure that log rotation is performed atomically.
// This ensures that currentCursor will not be aborted concurrently.
log.sharedLock.lock();
try
{
final boolean hasNext = currentCursor.next();
if (hasNext)
{
return true;
}
final LogFile logFile = log.getNextLogFile(currentLogFile);
if (logFile != null)
{
switchToLogFile(logFile);
return currentCursor.next();
}
return false;
}
finally
{
log.sharedLock.unlock();
}
}
@Override
public void close()
{
StaticUtils.close(currentCursor);
}
@Override
public boolean positionTo(
final K key,
final KeyMatchingStrategy matchStrategy,
final PositionStrategy positionStrategy)
throws ChangelogException
{
// Lock is needed here to ensure that log rotation is performed atomically.
// This ensures that currentLogFile will not be closed concurrently.
log.sharedLock.lock();
try
{
final LogFile logFile = log.findLogFileFor(key, matchStrategy);
if (logFile != currentLogFile)
{
switchToLogFile(logFile);
}
return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
finally
{
log.sharedLock.unlock();
}
}
@Override
CursorState getState() throws ChangelogException
{
// Lock is needed here to ensure that log rotation is performed atomically.
// This ensures that currentCursor will not be aborted concurrently.
log.sharedLock.lock();
try
{
return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord());
}
finally
{
log.sharedLock.unlock();
}
}
@Override
void closeUnderlyingCursor()
{
StaticUtils.close(currentCursor);
}
/** Reinitialize this cursor to the provided state. */
@Override
void reinitializeTo(final CursorState cursorState) throws ChangelogException
{
currentLogFile = cursorState.logFile;
currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
}
@Override
boolean isAccessingLogFile(LogFile logFile)
{
return currentLogFile != null && currentLogFile.equals(logFile);
}
/** Switch the cursor to the provided log file. */
private void switchToLogFile(final LogFile logFile) throws ChangelogException
{
StaticUtils.close(currentCursor);
currentLogFile = logFile;
currentCursor = currentLogFile.getCursor();
}
@Override
public String toString()
{
return String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
log.logPath, currentLogFile.getFile().getName(), currentCursor);
}
}
/**
* An empty cursor, that always return null records and false to {@link #next()} method.
*
* This class is thread-safe.
*/
private static final class EmptyCursor, V> implements RepositionableCursor
{
@Override
public Record getRecord()
{
return null;
}
@Override
public boolean next()
{
return false;
}
@Override
public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
{
return false;
}
@Override
public void close()
{
// nothing to do
}
@Override
public String toString()
{
return getClass().getSimpleName();
}
}
/**
* An aborted cursor, that throws AbortedChangelogCursorException on methods that can
* throw a ChangelogException and returns a default value on other methods.
*
* Although this cursor is thread-safe, it is intended to be used inside an
* AbortableLogCursor which manages locking.
*/
private static final class AbortedLogCursor, V> extends LogCursor
{
/** Records the path of the log the aborted cursor was positioned on. */
private final File logPath;
AbortedLogCursor(File logPath)
{
this.logPath = logPath;
}
@Override
public Record getRecord()
{
throw new IllegalStateException("this cursor is aborted");
}
@Override
public boolean next() throws ChangelogException
{
throw abortedCursorException();
}
private AbortedChangelogCursorException abortedCursorException()
{
return new AbortedChangelogCursorException(ERR_CHANGELOG_CURSOR_ABORTED.get(logPath));
}
@Override
public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
{
throw abortedCursorException();
}
@Override
public void close()
{
// nothing to do
}
@Override
CursorState getState() throws ChangelogException
{
throw abortedCursorException();
}
@Override
void closeUnderlyingCursor()
{
// nothing to do
}
@Override
void reinitializeTo(CursorState cursorState) throws ChangelogException
{
throw abortedCursorException();
}
@Override
boolean isAccessingLogFile(LogFile logFile)
{
return false;
}
@Override
public String toString()
{
return getClass().getSimpleName();
}
}
/**
* A cursor on the log that can be aborted.
*
* The cursor uses the log sharedLock to ensure no read can occur during a
* rotation, a clear or a purge.
*
* Note that only public methods use the sharedLock. Protected methods are intended to be used only
* internally in the Log class when the log exclusiveLock is on.
*
* The cursor can be be aborted by calling the {@link #abort()} method.
*/
private static class AbortableLogCursor, V> extends LogCursor
{
/** The log on which this cursor is created. */
private final Log log;
/** The actual cursor on which methods are delegated. */
private LogCursor delegate;
/** Indicates if the cursor must be aborted. */
private boolean mustAbort;
private AbortableLogCursor(Log log, LogCursor delegate)
{
this.log = log;
this.delegate = delegate;
}
@Override
public Record getRecord()
{
return delegate.getRecord();
}
@Override
public boolean next() throws ChangelogException
{
// This lock is needed to ensure that abort() is atomic.
log.sharedLock.lock();
try
{
if (mustAbort)
{
delegate.close();
delegate = new AbortedLogCursor<>(log.getPath());
mustAbort = false;
}
return delegate.next();
}
finally
{
log.sharedLock.unlock();
}
}
@Override
public void close()
{
// Lock is needed here to ensure that log cursor cannot be closed while still referenced in the cursor list.
// Removing the cursor before the close is not enough due to the CopyOnWrite nature of the cursor list.
log.sharedLock.lock();
try
{
delegate.close();
log.unregisterCursor(this);
}
finally
{
log.sharedLock.unlock();
}
}
@Override
public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
throws ChangelogException
{
return delegate.positionTo(key, matchStrategy, positionStrategy);
}
/**
* Aborts this cursor. Once aborted, a cursor throws an
* AbortedChangelogCursorException if it is used.
*/
@GuardedBy("exclusiveLock")
void abort()
{
mustAbort = true;
}
@GuardedBy("exclusiveLock")
@Override
CursorState getState() throws ChangelogException
{
return delegate.getState();
}
@GuardedBy("exclusiveLock")
@Override
void closeUnderlyingCursor()
{
delegate.closeUnderlyingCursor();
}
@GuardedBy("exclusiveLock")
@Override
void reinitializeTo(final CursorState cursorState) throws ChangelogException
{
delegate.reinitializeTo(cursorState);
}
@GuardedBy("exclusiveLock")
@Override
boolean isAccessingLogFile(LogFile logFile)
{
return delegate.isAccessingLogFile(logFile);
}
@Override
public String toString()
{
return delegate.toString();
}
}
/**
* Represents the state of a cursor.
*
* The state is used to update a cursor when rotating the head log file : the
* state of cursor on head log file must be reported to the new read-only log
* file that is created when rotating.
*/
private static class CursorState, V>
{
/** The log file. */
private final LogFile logFile;
/**
* The position of the reader on the log file. It is the offset from the
* beginning of the file, in bytes, at which the next read occurs.
*/
private final long filePosition;
/** The record the cursor is pointing to. */
private final Record record;
private final boolean isValid;
/** Creates a non-valid state. */
private CursorState() {
logFile = null;
filePosition = 0;
record = null;
isValid = false;
}
/** Creates a valid state. */
private CursorState(final LogFile logFile, final long filePosition, final Record record)
{
this.logFile = logFile;
this.filePosition = filePosition;
this.record = record;
isValid = true;
}
/**
* Indicates if this state is valid, i.e if it has non-null values.
*
* @return {@code true iff state is valid}
*/
public boolean isValid()
{
return isValid;
}
}
}