| | |
| | | private long lastRotationTime; |
| | | |
| | | /** |
| | | * The exclusive lock used for writes and lifecycle operations on this log: |
| | | * The exclusive lock used for log rotation and lifecycle operations on this log: |
| | | * initialize, clear, sync and close. |
| | | */ |
| | | private final Lock exclusiveLock; |
| | | |
| | | /** |
| | | * The shared lock used for reads and cursor operations on this log. |
| | | */ |
| | | /** The shared lock used for write operations and accessing {@link #logFiles} map. */ |
| | | private final Lock sharedLock; |
| | | |
| | | /** |
| | |
| | | * appended and the method returns immediately. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitely call the |
| | | * to file system, it is necessary to explicitly call the |
| | | * {@code syncToFileSystem()} method. |
| | | * <p> |
| | | * This method is not thread-safe. |
| | | * |
| | | * @param record |
| | | * The record to add. |
| | |
| | | */ |
| | | public void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | // If this exclusive lock happens to be a bottleneck : |
| | | // 1. use a shared lock for appending the record first |
| | | // 2. switch to an exclusive lock only if rotation is needed |
| | | // See http://sources.forgerock.org/cru/CR-3548#c27521 for full detail |
| | | exclusiveLock.lock(); |
| | | // This check is ok outside of any locking because only the append thread updates lastAppendedKey. |
| | | if (recordIsBreakingKeyOrdering(record)) |
| | | { |
| | | logger.info(LocalizableMessage.raw( |
| | | "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record, |
| | | lastAppendedKey != null ? lastAppendedKey : "null")); |
| | | return; |
| | | } |
| | | |
| | | // Fast-path - assume that no rotation is needed and use shared lock. |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | if (recordIsBreakingKeyOrdering(record)) |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (!mustRotate(headLogFile)) |
| | | { |
| | | logger.info(LocalizableMessage.raw( |
| | | "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record, |
| | | lastAppendedKey != null ? lastAppendedKey : "null")); |
| | | headLogFile.append(record); |
| | | lastAppendedKey = record.getKey(); |
| | | return; |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | |
| | | // Slow-path - rotation is needed so use exclusive lock. |
| | | exclusiveLock.lock(); |
| | | try |
| | | { |
| | | if (isClosed) |
| | | { |
| | | return; |
| | | } |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | |
| | | */ |
| | | void dumpAsTextFile(File dumpDirectory) throws ChangelogException |
| | | { |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt")); |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | logFile.dumpAsTextFile(new File(dumpDirectory, logFile.getFile().getName() + ".txt")); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | K key = null; |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | final Record<K, V> record = logFile.getOldestRecord(); |
| | | final V2 oldestValue = mapper.map(record.getValue()); |
| | | if (oldestValue.compareTo(limitValue) > 0) |
| | | K key = null; |
| | | for (LogFile<K, V> logFile : logFiles.values()) |
| | | { |
| | | return key; |
| | | final Record<K, V> record = logFile.getOldestRecord(); |
| | | final V2 oldestValue = mapper.map(record.getValue()); |
| | | if (oldestValue.compareTo(limitValue) > 0) |
| | | { |
| | | return key; |
| | | } |
| | | key = record.getKey(); |
| | | } |
| | | key = record.getKey(); |
| | | return key; |
| | | } |
| | | return key; |
| | | } |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | |
| | | * <p> |
| | | * All cursors opened on this log are temporarily disabled (closing underlying resources) |
| | | * and then re-open with their previous state. |
| | | * @GuardedBy("exclusiveLock") |
| | | */ |
| | | private void rotateHeadLogFile() throws ChangelogException |
| | | { |
| | |
| | | + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX; |
| | | } |
| | | |
| | | /** Update the cursors that were pointing to head after a rotation of the head log file. */ |
| | | /** |
| | | * Update the cursors that were pointing to head after a rotation of the head log file. |
| | | * @GuardedBy("exclusiveLock") |
| | | */ |
| | | private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<AbortableLogCursor<K, V>, CursorState<K, V>>> cursors) |
| | | throws ChangelogException |
| | | { |
| | |
| | | if (cursorState.isValid() && isHeadLogFile(cursorState.logFile)) |
| | | { |
| | | final K previousKey = logFiles.lowerKey(recordParser.getMaxKey()); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey, KeyMatchingStrategy.EQUAL_TO_KEY); |
| | | final AbortableLogCursor<K, V> cursor = pair.getFirst(); |
| | | cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record)); |
| | | } |
| | |
| | | /** |
| | | * Disable the cursors opened on the head log file log, by closing their underlying cursor. |
| | | * Returns the state of each cursor just before the close operation. |
| | | * @GuardedBy("exclusiveLock") |
| | | * |
| | | * @return the pairs (cursor, cursor state) for each cursor pointing to head log file. |
| | | * @throws ChangelogException |
| | |
| | | */ |
| | | private LogFile<K, V> getNextLogFile(final LogFile<K, V> currentLogFile) throws ChangelogException |
| | | { |
| | | if (isHeadLogFile(currentLogFile)) |
| | | sharedLock.lock(); |
| | | try |
| | | { |
| | | return null; |
| | | if (isHeadLogFile(currentLogFile)) |
| | | { |
| | | return null; |
| | | } |
| | | final Pair<K, K> bounds = getKeyBounds(currentLogFile); |
| | | return logFiles.higherEntry(bounds.getSecond()).getValue(); |
| | | } |
| | | final Pair<K, K> bounds = getKeyBounds(currentLogFile); |
| | | return logFiles.higherEntry(bounds.getSecond()).getValue(); |
| | | finally |
| | | { |
| | | sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | private boolean isHeadLogFile(final LogFile<K, V> logFile) |
| | |
| | | return logFile.getFile().getName().equals(Log.HEAD_LOG_FILE_NAME); |
| | | } |
| | | |
| | | /** Returns the log file that should contain the provided key. */ |
| | | private LogFile<K, V> findLogFileFor(final K key) |
| | | /** @GuardedBy("sharedLock") */ |
| | | private LogFile<K, V> findLogFileFor(final K key, KeyMatchingStrategy keyMatchingStrategy) throws ChangelogException |
| | | { |
| | | if (key == null || logFiles.lowerKey(key) == null) |
| | | { |
| | | return getOldestLogFile(); |
| | | } |
| | | return logFiles.ceilingEntry(key).getValue(); |
| | | |
| | | final LogFile<K, V> candidate = logFiles.ceilingEntry(key).getValue(); |
| | | if (KeyMatchingStrategy.LESS_THAN_OR_EQUAL_TO_KEY.equals(keyMatchingStrategy) |
| | | && candidate.getOldestRecord().getKey().compareTo(key) > 0) |
| | | { |
| | | // This handle the special case where the first key of the candidate is actually greater than the expected one. |
| | | // We have to return the previous logfile in order to match the LESS_THAN_OR_EQUAL_TO_KEY matching strategy. |
| | | return logFiles.floorEntry(key).getValue(); |
| | | } |
| | | return candidate; |
| | | } |
| | | |
| | | /** |
| | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | final boolean hasNext = currentCursor.next(); |
| | | if (hasNext) |
| | | // Lock is needed here to ensure that log rotation is performed atomically. |
| | | // This ensures that currentCursor will not be aborted concurrently. |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | return true; |
| | | final boolean hasNext = currentCursor.next(); |
| | | if (hasNext) |
| | | { |
| | | return true; |
| | | } |
| | | final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile); |
| | | if (logFile != null) |
| | | { |
| | | switchToLogFile(logFile); |
| | | return currentCursor.next(); |
| | | } |
| | | return false; |
| | | } |
| | | final LogFile<K, V> logFile = log.getNextLogFile(currentLogFile); |
| | | if (logFile != null) |
| | | finally |
| | | { |
| | | switchToLogFile(logFile); |
| | | return currentCursor.next(); |
| | | log.sharedLock.unlock(); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | final LogFile<K, V> logFile = log.findLogFileFor(key); |
| | | if (logFile != currentLogFile) |
| | | // Lock is needed here to ensure that log rotation is performed atomically. |
| | | // This ensures that currentLogFile will not be closed concurrently. |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | switchToLogFile(logFile); |
| | | final LogFile<K, V> logFile = log.findLogFileFor(key, matchStrategy); |
| | | if (logFile != currentLogFile) |
| | | { |
| | | switchToLogFile(logFile); |
| | | } |
| | | return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy); |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | /** Returns the state of this cursor. */ |
| | | @Override |
| | | CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()); |
| | | // Lock is needed here to ensure that log rotation is performed atomically. |
| | | // This ensures that currentCursor will not be aborted concurrently. |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | return new CursorState<>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | |
| | | @Override |
| | | public Record<K, V> getRecord() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | return delegate.getRecord(); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | return delegate.getRecord(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | public synchronized boolean next() throws ChangelogException |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | if (mustAbort) |
| | | { |
| | | if (mustAbort) |
| | | { |
| | | delegate.close(); |
| | | delegate = new AbortedLogCursor<>(log.getPath()); |
| | | mustAbort = false; |
| | | } |
| | | return delegate.next(); |
| | | delegate.close(); |
| | | delegate = new AbortedLogCursor<>(log.getPath()); |
| | | mustAbort = false; |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | return delegate.next(); |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | delegate.close(); |
| | | log.unregisterCursor(this); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | delegate.close(); |
| | | log.unregisterCursor(this); |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | | return delegate.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | finally |
| | | { |
| | | log.sharedLock.unlock(); |
| | | } |
| | | return delegate.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | void abort() |
| | | synchronized void abort() |
| | | { |
| | | mustAbort = true; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | /** @GuardedBy("exclusiveLock") */ |
| | | @Override |
| | | CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return delegate.getState(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | /** @GuardedBy("exclusiveLock") */ |
| | | @Override |
| | | void closeUnderlyingCursor() |
| | | { |
| | | delegate.closeUnderlyingCursor(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | /** @GuardedBy("exclusiveLock") */ |
| | | @Override |
| | | void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | delegate.reinitializeTo(cursorState); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | * <p> |
| | | * This method is called only when log.exclusiveLock has been acquired. |
| | | */ |
| | | /** @GuardedBy("exclusiveLock") */ |
| | | @Override |
| | | boolean isAccessingLogFile(LogFile<K, V> logFile) |
| | | { |