| | |
| | | |
| | | import net.jcip.annotations.GuardedBy; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.util.Pair; |
| | | import org.forgerock.util.Reject; |
| | |
| | | private final TreeMap<K, LogFile<K, V>> logFiles = new TreeMap<>(); |
| | | |
| | | /** |
| | | * The last key appended to the log. In order to keep the ordering of the keys |
| | | * in the log, any attempt to append a record with a key lower or equal to |
| | | * this key is rejected (no error but an event is logged). |
| | | */ |
| | | private K lastAppendedKey; |
| | | |
| | | /** |
| | | * The list of non-empty cursors opened on this log. Opened cursors may have |
| | | * to be updated when rotating the head log file. |
| | | */ |
| | |
| | | * <p> |
| | | * The record must have a key strictly higher than the key |
| | | * of the last record added. If it is not the case, the record is not |
| | | * appended and the method returns immediately. |
| | | * appended. |
| | | * <p> |
| | | * In order to ensure that record is written out of buffers and persisted |
| | | * to file system, it is necessary to explicitly call the |
| | | * {@code syncToFileSystem()} method. |
| | | * <p> |
| | | * This method is not thread-safe. |
| | | * |
| | | * @param record |
| | | * The record to add. |
| | |
| | | */ |
| | | public void append(final Record<K, V> record) throws ChangelogException |
| | | { |
| | | // This check is ok outside of any locking because only the append thread updates lastAppendedKey. |
| | | if (recordIsBreakingKeyOrdering(record)) |
| | | { |
| | | logger.info(LocalizableMessage.raw( |
| | | "Rejecting append to log '%s' for record: [%s], last key appended: [%s]", logPath.getPath(), record, |
| | | lastAppendedKey != null ? lastAppendedKey : "null")); |
| | | return; |
| | | } |
| | | |
| | | // Fast-path - assume that no rotation is needed and use shared lock. |
| | | sharedLock.lock(); |
| | | try |
| | |
| | | if (!mustRotate(headLogFile)) |
| | | { |
| | | headLogFile.append(record); |
| | | lastAppendedKey = record.getKey(); |
| | | return; |
| | | } |
| | | } |
| | |
| | | return; |
| | | } |
| | | LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | if (headLogFile.appendWouldBreakKeyOrdering(record)) |
| | | { |
| | | // abort rotation |
| | | return; |
| | | } |
| | | if (mustRotate(headLogFile)) |
| | | { |
| | | logger.trace(INFO_CHANGELOG_LOG_FILE_ROTATION.get(logPath.getPath(), headLogFile.getSizeInBytes())); |
| | |
| | | headLogFile = getHeadLogFile(); |
| | | } |
| | | headLogFile.append(record); |
| | | lastAppendedKey = record.getKey(); |
| | | } |
| | | finally |
| | | { |
| | |
| | | |
| | | private boolean mustRotate(LogFile<K, V> headLogFile) |
| | | { |
| | | if (lastAppendedKey == null) |
| | | if (headLogFile.getNewestRecord() == null) |
| | | { |
| | | // never rotate an empty file |
| | | return false; |
| | |
| | | return false; |
| | | } |
| | | |
| | | /** Indicates if the provided record has a key that would break the key ordering in the log. */ |
| | | private boolean recordIsBreakingKeyOrdering(final Record<K, V> record) |
| | | { |
| | | return lastAppendedKey != null && record.getKey().compareTo(lastAppendedKey) <= 0; |
| | | } |
| | | |
| | | /** |
| | | * Synchronize all records added with the file system, ensuring that records |
| | | * are effectively persisted. |
| | |
| | | private void openHeadLogFile() throws ChangelogException |
| | | { |
| | | final LogFile<K, V> head = LogFile.newAppendableLogFile(new File(logPath, HEAD_LOG_FILE_NAME), recordParser); |
| | | final Record<K,V> newestRecord = head.getNewestRecord(); |
| | | lastAppendedKey = newestRecord != null ? newestRecord.getKey() : null; |
| | | logFiles.put(recordParser.getMaxKey(), head); |
| | | } |
| | | |