| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | |
| | | */ |
| | | class FileReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | /** The underlying cursor. */ |
| | | private final RepositionableCursor<CSN, UpdateMsg> cursor; |
| | | |
| | |
| | | /** The CSN to re-start with in case the cursor is exhausted. */ |
| | | private CSN lastNonNullCurrentCSN; |
| | | |
| | | private PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Creates the cursor from provided log cursor and start CSN. |
| | | * |
| | | * @param cursor |
| | | * The underlying log cursor to read log. |
| | | * @param startAfterCSN |
| | | * @param startCSN |
| | | * The CSN to use as a start point (excluded from cursor, the lowest |
| | | * CSN higher than this CSN is used as the real start point). |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to choose if cursor must |
| | | * start from the provided CSN or just after the provided CSN. |
| | | */ |
| | | FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) { |
| | | FileReplicaDBCursor( |
| | | final RepositionableCursor<CSN, UpdateMsg> cursor, |
| | | final CSN startCSN, |
| | | final PositionStrategy positionStrategy) { |
| | | this.cursor = cursor; |
| | | this.lastNonNullCurrentCSN = startAfterCSN; |
| | | this.lastNonNullCurrentCSN = startCSN; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | if (cursor.next()) |
| | | { |
| | | nextRecord = cursor.getRecord(); |
| | | if (nextRecord.getKey().compareTo(lastNonNullCurrentCSN) > 0) |
| | | final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN); |
| | | if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY)) |
| | | { |
| | | // start CSN is found, switch to position strategy that always find the next |
| | | lastNonNullCurrentCSN = nextRecord.getKey(); |
| | | positionStrategy = AFTER_MATCHING_KEY; |
| | | return true; |
| | | } |
| | | } |
| | | // either cursor is exhausted or we still have not reached the start CSN |
| | | return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned(); |
| | | } |
| | | |
| | | /** Re-initialize the cursor after the last non null CSN. */ |
| | | private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException |
| | | { |
| | | final boolean found = cursor.positionTo(lastNonNullCurrentCSN, true); |
| | | final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | if (found && cursor.next()) |
| | | { |
| | | nextRecord = cursor.getRecord(); |