recordParser;
/**
* Indicates if this log is closed. When the log is closed, all methods return
* immediately with no effect.
*/
private boolean isClosed;
/**
* The log files contained in this log, ordered by key.
*
* The head log file is always present and is associated with the maximum
* possible key, given by the record parser.
*
* The read-only log files are associated with the highest key they contain.
*/
private final TreeMap> logFiles = new TreeMap>();
/**
* The last key appended to the log. In order to keep the ordering of the keys
* in the log, any attempt to append a record with a key lower or equal to
* this key is rejected (no error but an event is logged).
*/
private K lastAppendedKey;
/**
* The list of non-empty cursors opened on this log. Opened cursors may have
* to be updated when rotating the head log file.
*/
private final List> openCursors = new CopyOnWriteArrayList>();
/**
* A log file is rotated once it has exceeded this size limit. The log file can have
* a size much larger than this limit if the last record written has a huge size.
*
* TODO : to be replaced later by a (or a list of) configurable Rotation policy
* eg, List rotationPolicies = new ArrayList();
*/
private final long sizeLimitPerLogFileInBytes;
/**
* The exclusive lock used for writes and lifecycle operations on this log:
* initialize, clear, sync and close.
*/
private final Lock exclusiveLock;
/**
* The shared lock used for reads and cursor operations on this log.
*/
private final Lock sharedLock;
/**
* Open a log with the provided log path, record parser and maximum size per
* log file.
*
* If no log exists for the provided path, a new one is created.
*
* @param
* Type of the key of a record, which must be comparable.
* @param
* Type of the value of a record.
* @param logPath
* Path of the log.
* @param parser
* Parser for encoding/decoding of records.
* @param sizeLimitPerFileInBytes
* Limit in bytes before rotating the head log file of the log.
* @return a log
* @throws ChangelogException
* If a problem occurs during initialization.
*/
static synchronized , V> Log openLog(final File logPath,
final RecordParser parser, final long sizeLimitPerFileInBytes) throws ChangelogException
{
ensureNotNull(logPath, parser);
@SuppressWarnings("unchecked")
Log log = (Log) logsCache.get(logPath);
if (log == null)
{
log = new Log(logPath, parser, sizeLimitPerFileInBytes);
logsCache.put(logPath, log);
}
else
{
// TODO : check that parser and size limit are compatible with the existing one,
// and issue a warning if it is not the case
log.referenceCount++;
}
return log;
}
/**
* Release a reference to the log corresponding to provided path. The log is
* closed if this is the last reference.
*/
private static synchronized void releaseLog(final File logPath)
{
Log, ?> log = logsCache.get(logPath);
if (log == null)
{
// this should never happen
ErrorLogger.logError(
ERR_CHANGELOG_UNREFERENCED_LOG_WHILE_RELEASING.get(logPath.getPath()));
return;
}
if (log.referenceCount > 1)
{
log.referenceCount--;
}
else
{
log.doClose();
logsCache.remove(logPath);
}
}
/**
* Creates a new log.
*
* @param logPath
* The directory path of the log.
* @param parser
* Parser of records.
* @param sizeLimitPerFile
* Limit in bytes before rotating a log file.
* @throws ChangelogException
* If a problem occurs during initialization.
*/
private Log(final File logPath, final RecordParser parser, final long sizeLimitPerFile)
throws ChangelogException
{
this.logPath = logPath;
this.recordParser = parser;
this.sizeLimitPerLogFileInBytes = sizeLimitPerFile;
this.referenceCount = 1;
final ReadWriteLock lock = new ReentrantReadWriteLock(false);
this.exclusiveLock = lock.writeLock();
this.sharedLock = lock.readLock();
createOrOpenLogFiles();
}
/** Create or open log files used by this log. */
private void createOrOpenLogFiles() throws ChangelogException
{
exclusiveLock.lock();
try
{
createRootDirIfNotExists();
openHeadLogFile();
for (final File file : getReadOnlyLogFiles())
{
openReadOnlyLogFile(file);
}
isClosed = false;
}
catch (ChangelogException e)
{
// ensure all log files opened at this point are closed
close();
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_INITIALIZE_LOG.get(logPath.getPath()), e);
}
finally
{
exclusiveLock.unlock();
}
}
private File[] getReadOnlyLogFiles() throws ChangelogException
{
File[] files = logPath.listFiles(READ_ONLY_LOG_FILES_FILTER);
if (files == null)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RETRIEVE_READ_ONLY_LOG_FILES_LIST.get(logPath.getPath()));
}
return files;
}
private void createRootDirIfNotExists() throws ChangelogException
{
if (!logPath.exists())
{
if (!logPath.mkdirs())
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_CREATE_LOG_DIRECTORY.get(logPath.getPath()));
}
}
}
/**
* Returns the path of this log.
*
* @return the path of this log directory
*/
public File getPath()
{
return logPath;
}
/**
* Add the provided record at the end of this log.
*
* The record must have a key strictly higher than the key
* of the last record added. If it is not the case, the record is not
* appended and the method returns immediately.
*
* In order to ensure that record is written out of buffers and persisted
* to file system, it is necessary to explicitely call the
* {@code syncToFileSystem()} method.
*
* @param record
* The record to add.
* @throws ChangelogException
* If an error occurs while adding the record to the log.
*/
public void append(final Record record) throws ChangelogException
{
// If this exclusive lock happens to be a bottleneck :
// 1. use a shared lock for appending the record first
// 2. switch to an exclusive lock only if rotation is needed
// See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (recordIsBreakingKeyOrdering(record))
{
ErrorLogger.logError(Message.raw(Category.SYNC, Severity.NOTICE,
"Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record,
lastAppendedKey != null ? lastAppendedKey : "null"));
return;
}
LogFile headLogFile = getHeadLogFile();
if (headLogFile.getSizeInBytes() > sizeLimitPerLogFileInBytes)
{
ErrorLogger.logError(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes()));
rotateHeadLogFile();
headLogFile = getHeadLogFile();
}
headLogFile.append(record);
lastAppendedKey = record.getKey();
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Indicates if the provided record has a key that would break the key
* ordering in the log.
*/
private boolean recordIsBreakingKeyOrdering(final Record record)
{
return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0;
}
/**
* Synchronize all records added with the file system, ensuring that records
* are effectively persisted.
*
* After a successful call to this method, it is guaranteed that all records
* added to the log are persisted to the file system.
*
* @throws ChangelogException
* If the synchronization fails.
*/
public void syncToFileSystem() throws ChangelogException
{
exclusiveLock.lock();
try
{
getHeadLogFile().syncToFileSystem();
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the first position.
*
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor() throws ChangelogException
{
LogCursor cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
return new EmptyLogCursor();
}
cursor = new LogCursor(this);
cursor.positionTo(null, null, null);
registerCursor(cursor);
return cursor;
}
catch (ChangelogException e)
{
StaticUtils.close(cursor);
throw e;
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns a cursor that allows to retrieve the records from this log,
* starting at the position defined by the provided key.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getCursor(final K key) throws ChangelogException
{
return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, null);
}
/**
* Returns a cursor that allows to retrieve the records from this log.
* The starting position is defined by the provided key and cursor
* positioning strategy.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
* @param positionStrategy
* The cursor positioning strategy.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
public RepositionableCursor getNearestCursor(final K key, PositionStrategy positionStrategy)
throws ChangelogException
{
return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
}
/**
* Returns a cursor starting from a key, using the provided matching and
* position strategies for the cursor.
*/
private RepositionableCursor getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
final PositionStrategy positionStrategy) throws ChangelogException
{
if (key == null)
{
return getCursor();
}
LogCursor cursor = null;
sharedLock.lock();
try
{
if (isClosed)
{
return new EmptyLogCursor();
}
cursor = new LogCursor(this);
final boolean isFound = cursor.positionTo(key, matchingStrategy, positionStrategy);
// When not matching the exact key, it is ok if the target is not found
if (isFound || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
{
registerCursor(cursor);
return cursor;
}
else
{
StaticUtils.close(cursor);
return new EmptyLogCursor();
}
}
catch (ChangelogException e)
{
StaticUtils.close(cursor);
throw e;
}
finally
{
sharedLock.unlock();
}
}
/**
* Returns the oldest (first) record from this log.
*
* @return the oldest record, which may be {@code null} if there is no record
* in the log.
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
public Record getOldestRecord() throws ChangelogException
{
return getOldestLogFile().getOldestRecord();
}
/**
* Returns the newest (last) record from this log.
*
* @return the newest record, which may be {@code null}
* @throws ChangelogException
* If an error occurs while retrieving the record.
*/
public Record getNewestRecord() throws ChangelogException
{
return getHeadLogFile().getNewestRecord();
}
/**
* Returns the number of records in the log.
*
* @return the number of records
* @throws ChangelogException
* If a problem occurs.
*/
public long getNumberOfRecords() throws ChangelogException
{
long count = 0;
for (final LogFile logFile : logFiles.values())
{
count += logFile.getNumberOfRecords();
}
return count;
}
/**
* Purge the log up to and excluding the provided key.
*
* @param purgeKey
* the key up to which purging must happen
* @return the oldest non purged record, or {@code null}
* if no record was purged
* @throws ChangelogException
* if a database problem occurs.
*/
public Record purgeUpTo(final K purgeKey) throws ChangelogException
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return null;
}
final SortedMap> logFilesToPurge = logFiles.headMap(purgeKey);
if (logFilesToPurge.isEmpty())
{
return null;
}
final List undeletableFiles = new ArrayList();
final Iterator>> entriesToPurge = logFilesToPurge.entrySet().iterator();
while (entriesToPurge.hasNext())
{
final LogFile logFile = entriesToPurge.next().getValue();
try
{
logFile.close();
logFile.delete();
entriesToPurge.remove();
}
catch (ChangelogException e)
{
// The deletion of log file on file system has failed
undeletableFiles.add(logFile.getFile().getPath());
}
}
if (!undeletableFiles.isEmpty())
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE_WHILE_PURGING.get(
StaticUtils.listToString(undeletableFiles, ", ")));
}
return getOldestRecord();
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Empties the log, discarding all records it contains.
*
* @throws ChangelogException
* If cursors are opened on this log, or if a problem occurs during
* clearing operation.
*/
public void clear() throws ChangelogException
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (!openCursors.isEmpty())
{
// Allow opened cursors at this point, but turn them into empty cursors.
// This behavior is needed by the change number indexer thread.
switchCursorsOpenedIntoEmptyCursors();
}
// delete all log files
final List undeletableFiles = new ArrayList();
for (LogFile logFile : logFiles.values())
{
try
{
logFile.close();
logFile.delete();
}
catch (ChangelogException e)
{
undeletableFiles.add(logFile.getFile().getPath());
}
}
if (!undeletableFiles.isEmpty())
{
throw new ChangelogException(ERR_CHANGELOG_UNABLE_TO_DELETE_LOG_FILE.get(
StaticUtils.listToString(undeletableFiles, ", ")));
}
logFiles.clear();
// recreate an empty head log file
openHeadLogFile();
}
catch (Exception e)
{
throw new ChangelogException(ERR_ERROR_CLEARING_DB.get(logPath.getPath(), stackTraceToSingleLineString(e)));
}
finally
{
exclusiveLock.unlock();
}
}
/**
* Dump this log as a text files, intended for debugging purpose only.
*
* @param dumpDirectory
* Directory that will contains log files with text format
* and ".txt" extensions
* @throws ChangelogException
* If an error occurs during dump
*/
void dumpAsTextFile(File dumpDirectory) throws ChangelogException
{
for (LogFile logFile : logFiles.values())
{
logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt"));
}
}
/** {@inheritDoc} */
@Override
public void close()
{
releaseLog(logPath);
}
/** Effectively close this log. */
private void doClose()
{
exclusiveLock.lock();
try
{
if (isClosed)
{
return;
}
if (!openCursors.isEmpty())
{
ErrorLogger.logError(
ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLOSING_LOG.get(logPath.getPath(), openCursors.size()));
}
StaticUtils.close(logFiles.values());
isClosed = true;
}
finally
{
exclusiveLock.unlock();
}
}
private LogFile getHeadLogFile()
{
return logFiles.lastEntry().getValue();
}
private LogFile getOldestLogFile()
{
return logFiles.firstEntry().getValue();
}
/**
* Rotate the head log file to a read-only log file, and open a new empty head
* log file to write in.
*
* All cursors opened on this log are temporarily disabled (closing underlying resources)
* and then re-open with their previous state.
*/
private void rotateHeadLogFile() throws ChangelogException
{
// Temporarily disable cursors opened on head, saving their state
final List, CursorState>> cursorsOnHead = disableOpenedCursorsOnHead();
final LogFile headLogFile = getHeadLogFile();
final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile));
headLogFile.close();
renameHeadLogFileTo(readOnlyLogFile);
openHeadLogFile();
openReadOnlyLogFile(readOnlyLogFile);
// Re-enable cursors previously opened on head, with the saved state
updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead);
}
private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException
{
final File headLogFile = new File(logPath, HEAD_LOG_FILE_NAME);
try
{
StaticUtils.renameFile(headLogFile, rotatedLogFile);
}
catch (IOException e)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RENAME_HEAD_LOG_FILE.get(headLogFile.getPath(), rotatedLogFile.getPath()), e);
}
}
/**
* Returns the key bounds for the provided log file.
*
* @return the pair of (lowest key, highest key) that correspond to records
* stored in the corresponding log file.
* @throws ChangelogException
* if an error occurs while retrieving the keys
*/
private Pair getKeyBounds(final LogFile logFile) throws ChangelogException
{
try
{
final String name = logFile.getFile().getName();
final String[] keys = name.substring(0, name.length() - Log.LOG_FILE_SUFFIX.length())
.split(LOG_FILE_NAME_SEPARATOR);
return Pair.of(recordParser.decodeKeyFromString(keys[0]), recordParser.decodeKeyFromString(keys[1]));
}
catch (Exception e)
{
throw new ChangelogException(
ERR_CHANGELOG_UNABLE_TO_RETRIEVE_KEY_BOUNDS_FROM_FILE.get(logFile.getFile().getPath()), e);
}
}
/**
* Returns the file name to use for the read-only version of the provided
* log file.
*
* The file name is based on the lowest and highest key in the log file.
*
* @return the name to use for the read-only version of the log file
* @throws ChangelogException
* If an error occurs.
*/
private String generateReadOnlyFileName(final LogFile logFile) throws ChangelogException
{
final K lowestKey = logFile.getOldestRecord().getKey();
final K highestKey = logFile.getNewestRecord().getKey();
return recordParser.encodeKeyToString(lowestKey) + LOG_FILE_NAME_SEPARATOR
+ recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX;
}
/** Update the cursors that were pointing to head after a rotation of the head log file. */
private void updateOpenedCursorsOnHeadAfterRotation(List, CursorState>> cursors)
throws ChangelogException
{
for (Pair, CursorState> pair : cursors)
{
final CursorState cursorState = pair.getSecond();
// Need to update the cursor only if it is pointing to the head log file
if (isHeadLogFile(cursorState.logFile))
{
final K previousKey = logFiles.lowerKey(recordParser.getMaxKey());
final LogFile logFile = findLogFileFor(previousKey);
final LogCursor cursor = pair.getFirst();
cursor.reinitializeTo(new CursorState(logFile, cursorState.filePosition, cursorState.record));
}
}
}
private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException
{
for (LogCursor cursor : openCursors)
{
cursor.actAsEmptyCursor();
}
openCursors.clear();
}
/**
* Disable the cursors opened on the head log file log, by closing their underlying cursor.
* Returns the state of each cursor just before the close operation.
*
* @return the pairs (cursor, cursor state) for each cursor pointing to head log file.
* @throws ChangelogException
* If an error occurs.
*/
private List, CursorState>> disableOpenedCursorsOnHead() throws ChangelogException
{
final List, CursorState>> openCursorsStates =
new ArrayList, CursorState>>();
for (LogCursor cursor : openCursors)
{
if (isHeadLogFile(cursor.currentLogFile))
{
openCursorsStates.add(Pair.of(cursor, cursor.getState()));
cursor.closeUnderlyingCursor();
}
}
return openCursorsStates;
}
private void openHeadLogFile() throws ChangelogException
{
final LogFile head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser);
final Record newestRecord = head.getNewestRecord();
lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null;
logFiles.put(recordParser.getMaxKey(), head);
}
private void openReadOnlyLogFile(final File logFilePath) throws ChangelogException
{
final LogFile logFile = LogFile.newReadOnlyLogFile(logFilePath, recordParser);
final Pair bounds = getKeyBounds(logFile);
logFiles.put(bounds.getSecond(), logFile);
}
private void registerCursor(final LogCursor cursor)
{
openCursors.add(cursor);
}
private void unregisterCursor(final LogCursor cursor)
{
openCursors.remove(cursor);
}
/**
* Returns the log file that is just after the provided log file wrt the order
* defined on keys, or {@code null} if the provided log file is the last one
* (the head log file).
*/
private LogFile getNextLogFile(final LogFile currentLogFile) throws ChangelogException
{
if (isHeadLogFile(currentLogFile))
{
return null;
}
final Pair bounds = getKeyBounds(currentLogFile);
return logFiles.higherEntry(bounds.getSecond()).getValue();
}
private boolean isHeadLogFile(final LogFile logFile)
{
return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME);
}
/** Returns the log file that should contain the provided key. */
private LogFile findLogFileFor(final K key)
{
if (key == null || logFiles.lowerKey(key) == null)
{
return getOldestLogFile();
}
return logFiles.ceilingEntry(key).getValue();
}
/**
* Represents a DB Cursor than can be repositioned on a given key.
*
* Note that as a DBCursor, it provides a java.sql.ResultSet like API.
*/
static interface RepositionableCursor, V> extends DBCursor>
{
/**
* Position the cursor to the record corresponding to the provided key and
* provided matching and positioning strategies.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, use the key of the first record instead.
* @param matchStrategy
* The cursor key matching strategy.
* @param positionStrategy
* The cursor positioning strategy.
* @return {@code true} if cursor is successfully positioned, or
* {@code false} otherwise.
* @throws ChangelogException
* If an error occurs when positioning cursor.
*/
boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
throws ChangelogException;
}
/**
* Implements a cursor on the log.
*
* The cursor uses the log shared lock to ensure reads are not done during a rotation.
*
* The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()}
* method.
*/
private static class LogCursor, V> implements RepositionableCursor
{
private final Log log;
private LogFile currentLogFile;
private LogFileCursor currentCursor;
private boolean actAsEmptyCursor;
/**
* Creates a cursor on the provided log.
*
* @param log
* The log on which the cursor read records.
* @throws ChangelogException
* If an error occurs when creating the cursor.
*/
private LogCursor(final Log log) throws ChangelogException
{
this.log = log;
this.actAsEmptyCursor = false;
}
/** {@inheritDoc} */
@Override
public Record getRecord()
{
return currentCursor != null ? currentCursor.getRecord() : null;
}
/** {@inheritDoc} */
@Override
public boolean next() throws ChangelogException
{
if (actAsEmptyCursor)
{
return false;
}
log.sharedLock.lock();
try
{
final boolean hasNext = currentCursor.next();
if (hasNext)
{
return true;
}
final LogFile logFile = log.getNextLogFile(currentLogFile);
if (logFile != null)
{
switchToLogFile(logFile);
return currentCursor.next();
}
return false;
}
finally
{
log.sharedLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public void close()
{
log.sharedLock.lock();
try
{
StaticUtils.close(currentCursor);
log.unregisterCursor(this);
}
finally
{
log.sharedLock.unlock();
}
}
/** {@inheritDoc} */
@Override
public boolean positionTo(
final K key,
final KeyMatchingStrategy matchStrategy,
final PositionStrategy positionStrategy)
throws ChangelogException
{
if (actAsEmptyCursor)
{
return false;
}
log.sharedLock.lock();
try
{
final LogFile logFile = log.findLogFileFor(key);
if (logFile != currentLogFile)
{
switchToLogFile(logFile);
}
return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
finally
{
log.sharedLock.unlock();
}
}
/** Returns the state of this cursor. */
private CursorState getState() throws ChangelogException
{
return !actAsEmptyCursor ?
new CursorState(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null;
}
private void closeUnderlyingCursor()
{
StaticUtils.close(currentCursor);
}
/** Reinitialize this cursor to the provided state. */
private void reinitializeTo(final CursorState cursorState) throws ChangelogException
{
if (!actAsEmptyCursor)
{
currentLogFile = cursorState.logFile;
currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition);
}
}
/** Turn this cursor into an empty cursor, with no actual resource used. */
private void actAsEmptyCursor()
{
currentLogFile = null;
currentCursor = null;
actAsEmptyCursor = true;
}
/** Switch the cursor to the provided log file. */
private void switchToLogFile(final LogFile logFile) throws ChangelogException
{
StaticUtils.close(currentCursor);
currentLogFile = logFile;
currentCursor = currentLogFile.getCursor();
}
/** {@inheritDoc} */
public String toString()
{
return actAsEmptyCursor ?
String.format("Cursor on log : %s, acting as empty cursor", log.logPath) :
String.format("Cursor on log : %s, current log file: %s, current cursor: %s",
log.logPath, currentLogFile.getFile().getName(), currentCursor);
}
}
/** An empty cursor, that always return null records and false to {@code next()} method. */
static final class EmptyLogCursor, V> implements RepositionableCursor
{
/** {@inheritDoc} */
@Override
public Record getRecord()
{
return null;
}
/** {@inheritDoc} */
@Override
public boolean next()
{
return false;
}
/** {@inheritDoc} */
@Override
public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
{
return false;
}
/** {@inheritDoc} */
@Override
public void close()
{
// nothing to do
}
/** {@inheritDoc} */
@Override
public String toString()
{
return "EmptyLogCursor";
}
}
/**
* Represents the state of a cursor.
*
* The state is used to update a cursor when rotating the head log file : the
* state of cursor on head log file must be reported to the new read-only log
* file that is created when rotating.
*/
private static class CursorState, V>
{
/** The log file. */
private final LogFile logFile;
/**
* The position of the reader on the log file. It is the offset from the
* beginning of the file, in bytes, at which the next read occurs.
*/
private final long filePosition;
/** The record the cursor is pointing to. */
private final Record record;
private CursorState(final LogFile logFile, final long filePosition, final Record record)
{
this.logFile = logFile;
this.filePosition = filePosition;
this.record = record;
}
}
}