| | |
| | | } |
| | | if (!openCursors.isEmpty()) |
| | | { |
| | | // TODO: throwing an exception makes the replication/totalupdate.txt robot functional test fail because |
| | | // there is one open cursor when clearing. |
| | | // Switch to logging until this issue is solved |
| | | // throw new ChangelogException(Message.raw("Can't clean log '%s' because there are %d cursor(s) opened on it", |
| | | // logPath.getPath(), openCursors.size())); |
| | | ErrorLogger.logError( |
| | | ERR_CHANGELOG_CURSOR_OPENED_WHILE_CLEANING_LOG.get(logPath.getPath(), openCursors.size())); |
| | | // Allow opened cursors at this point, but turn them into empty cursors. |
| | | // This behavior is needed by the change number indexer thread. |
| | | switchCursorsOpenedIntoEmptyCursors(); |
| | | } |
| | | |
| | | // delete all log files |
| | |
| | | return logFiles.firstEntry().getValue(); |
| | | } |
| | | |
| | | /** Rotate the head log file to a read-only log file, and open a new empty head log file to write in. */ |
| | | /** |
| | | * Rotate the head log file to a read-only log file, and open a new empty head |
| | | * log file to write in. |
| | | * <p> |
| | | * All cursors opened on this log are temporarily disabled (closing underlying resources) |
| | | * and then re-open with their previous state. |
| | | */ |
| | | private void rotateHeadLogFile() throws ChangelogException |
| | | { |
| | | // Temporarily disable cursors opened on head, saving their state |
| | | final List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursorsOnHead = disableOpenedCursorsOnHead(); |
| | | |
| | | final LogFile<K, V> headLogFile = getHeadLogFile(); |
| | | final File readOnlyLogFile = new File(logPath, generateReadOnlyFileName(headLogFile)); |
| | | headLogFile.close(); |
| | |
| | | |
| | | openHeadLogFile(); |
| | | openReadOnlyLogFile(readOnlyLogFile); |
| | | updateCursorsOpenedOnHead(); |
| | | |
| | | // Re-enable cursors previously opened on head, with the saved state |
| | | updateOpenedCursorsOnHeadAfterRotation(cursorsOnHead); |
| | | } |
| | | |
| | | private void renameHeadLogFileTo(final File rotatedLogFile) throws ChangelogException |
| | |
| | | + recordParser.encodeKeyToString(highestKey) + LOG_FILE_SUFFIX; |
| | | } |
| | | |
| | | /** Update the open cursors after a rotation of the head log file. */ |
| | | private void updateCursorsOpenedOnHead() throws ChangelogException |
| | | /** Update the cursors that were pointing to head after a rotation of the head log file. */ |
| | | private void updateOpenedCursorsOnHeadAfterRotation(List<Pair<LogCursor<K, V>, CursorState<K, V>>> cursors) |
| | | throws ChangelogException |
| | | { |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | for (Pair<LogCursor<K, V>, CursorState<K, V>> pair : cursors) |
| | | { |
| | | final CursorState<K, V> state = cursor.getState(); |
| | | final CursorState<K, V> cursorState = pair.getSecond(); |
| | | |
| | | // Need to update the cursor only if it is pointing to the head log file |
| | | if (isHeadLogFile(state.logFile)) |
| | | if (isHeadLogFile(cursorState.logFile)) |
| | | { |
| | | updateOpenCursor(cursor, state); |
| | | final K previousKey = logFiles.lowerKey(recordParser.getMaxKey()); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey); |
| | | final LogCursor<K, V> cursor = pair.getFirst(); |
| | | cursor.reinitializeTo(new CursorState<K, V>(logFile, cursorState.filePosition, cursorState.record)); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Update the provided open cursor with the provided state. |
| | | * <p> |
| | | * The cursor must report the previous state on the head log file to the same |
| | | * state (position in file, current record) in the read-only log file just created. |
| | | */ |
| | | private void updateOpenCursor(final LogCursor<K,V> cursor, final CursorState<K, V> state) throws ChangelogException |
| | | private void switchCursorsOpenedIntoEmptyCursors() throws ChangelogException |
| | | { |
| | | final K previousKey = logFiles.lowerKey(recordParser.getMaxKey()); |
| | | final LogFile<K, V> logFile = findLogFileFor(previousKey); |
| | | cursor.reinitializeTo(new CursorState<K, V>(logFile, state.filePosition, state.record)); |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | { |
| | | cursor.actAsEmptyCursor(); |
| | | } |
| | | openCursors.clear(); |
| | | } |
| | | |
| | | /** |
| | | * Disable the cursors opened on the head log file log, by closing their underlying cursor. |
| | | * Returns the state of each cursor just before the close operation. |
| | | * |
| | | * @return the pairs (cursor, cursor state) for each cursor pointing to head log file. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | private List<Pair<LogCursor<K, V>, CursorState<K, V>>> disableOpenedCursorsOnHead() throws ChangelogException |
| | | { |
| | | final List<Pair<LogCursor<K, V>, CursorState<K, V>>> openCursorsStates = |
| | | new ArrayList<Pair<LogCursor<K, V>, CursorState<K, V>>>(); |
| | | for (LogCursor<K, V> cursor : openCursors) |
| | | { |
| | | if (isHeadLogFile(cursor.currentLogFile)) |
| | | { |
| | | openCursorsStates.add(Pair.of(cursor, cursor.getState())); |
| | | cursor.closeUnderlyingCursor(); |
| | | } |
| | | } |
| | | return openCursorsStates; |
| | | } |
| | | |
| | | private void openHeadLogFile() throws ChangelogException |
| | |
| | | * {@code cursor.next()} method. |
| | | * <p> |
| | | * The cursor uses the log shared lock to ensure reads are not done during a rotation. |
| | | * <p> |
| | | * The cursor can be switched into an empty cursor by calling the {@code actAsEmptyCursor()} |
| | | * method. |
| | | */ |
| | | private static class LogCursor<K extends Comparable<K>, V> implements RepositionableCursor<K, V> |
| | | { |
| | |
| | | |
| | | private LogFile<K, V> currentLogFile; |
| | | private LogFileCursor<K, V> currentCursor; |
| | | private boolean actAsEmptyCursor; |
| | | |
| | | /** |
| | | * Creates a cursor on the provided log. |
| | |
| | | private LogCursor(final Log<K, V> log) throws ChangelogException |
| | | { |
| | | this.log = log; |
| | | this.actAsEmptyCursor = false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Record<K, V> getRecord() |
| | | { |
| | | return currentCursor.getRecord(); |
| | | return currentCursor != null ? currentCursor.getRecord() : null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | { |
| | | return false; |
| | | } |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | |
| | | @Override |
| | | public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | { |
| | | return false; |
| | | } |
| | | log.sharedLock.lock(); |
| | | try |
| | | { |
| | |
| | | /** Returns the state of this cursor. */ |
| | | private CursorState<K, V> getState() throws ChangelogException |
| | | { |
| | | return new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()); |
| | | return !actAsEmptyCursor ? |
| | | new CursorState<K, V>(currentLogFile, currentCursor.getFilePosition(), currentCursor.getRecord()) : null; |
| | | } |
| | | |
| | | private void closeUnderlyingCursor() |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | } |
| | | |
| | | /** Reinitialize this cursor to the provided state. */ |
| | | private void reinitializeTo(final CursorState<K, V> cursorState) throws ChangelogException |
| | | { |
| | | StaticUtils.close(currentCursor); |
| | | currentLogFile = cursorState.logFile; |
| | | currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition); |
| | | if (!actAsEmptyCursor) |
| | | { |
| | | currentLogFile = cursorState.logFile; |
| | | currentCursor = currentLogFile.getCursorInitialisedTo(cursorState.record, cursorState.filePosition); |
| | | } |
| | | } |
| | | |
| | | /** Turn this cursor into an empty cursor, with no actual resource used. */ |
| | | private void actAsEmptyCursor() |
| | | { |
| | | currentLogFile = null; |
| | | currentCursor = null; |
| | | actAsEmptyCursor = true; |
| | | } |
| | | |
| | | /** Switch the cursor to the provided log file. */ |
| | |
| | | /** {@inheritDoc} */ |
| | | public String toString() |
| | | { |
| | | return String.format("Cursor on log : %s, current log file: %s, current cursor: %s", |
| | | log.logPath, currentLogFile.getFile().getName(), currentCursor); |
| | | return actAsEmptyCursor ? |
| | | String.format("Cursor on log : %s, acting as empty cursor", log.logPath) : |
| | | String.format("Cursor on log : %s, current log file: %s, current cursor: %s", |
| | | log.logPath, currentLogFile.getFile().getName(), currentCursor); |
| | | } |
| | | } |
| | | |