| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | import static com.sleepycat.je.OperationStatus.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start.If null, start from the |
| | | * oldest CSN |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @return The ReplServerDBCursor. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException |
| | | ReplServerDBCursor openReadCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | return new ReplServerDBCursor(startCSN, positionStrategy); |
| | | return new ReplServerDBCursor(startCSN, matchingStrategy, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | |
| | | return serverId + " " + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | /** Hold a cursor and an indicator of wether the cursor should be considered as empty. */ |
| | | private static class CursorWithEmptyIndicator |
| | | { |
| | | private Cursor cursor; |
| | | private boolean isEmpty; |
| | | |
| | | private CursorWithEmptyIndicator(Cursor localCursor, boolean isEmpty) |
| | | { |
| | | this.cursor = localCursor; |
| | | this.isEmpty = isEmpty; |
| | | } |
| | | |
| | | /** Creates cursor considered as empty. */ |
| | | static CursorWithEmptyIndicator createEmpty(Cursor cursor) |
| | | { |
| | | return new CursorWithEmptyIndicator(cursor, true); |
| | | } |
| | | |
| | | /** Creates cursor considered as non-empty. */ |
| | | static CursorWithEmptyIndicator createNonEmpty(Cursor cursor) |
| | | { |
| | | return new CursorWithEmptyIndicator(cursor, false); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This Class implements a cursor that can be used to browse a |
| | | * replicationServer database. |
| | |
| | | * <p> |
| | | * Will be set non null for a write cursor |
| | | */ |
| | | private final Cursor cursor; |
| | | private Cursor cursor; |
| | | private final DatabaseEntry key; |
| | | private final DatabaseEntry data; |
| | | /** \@Null for read cursors, \@NotNull for deleting cursors. */ |
| | |
| | | * |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start. |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy, which allow to indicates how key |
| | | * is matched |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * @throws ChangelogException |
| | | * When the startCSN does not exist. |
| | | */ |
| | | private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException |
| | | private ReplServerDBCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | key = createReplicationKey(startCSN); |
| | | data = new DatabaseEntry(); |
| | |
| | | // unlock it when throwing an exception. |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | boolean cursorHeld = false; |
| | | Cursor localCursor = null; |
| | | CursorWithEmptyIndicator maybeEmptyCursor = null; |
| | | try |
| | | { |
| | | // If the DB has been closed then create empty cursor. |
| | |
| | | return; |
| | | } |
| | | |
| | | localCursor = db.openCursor(txn, null); |
| | | if (startCSN != null |
| | | && localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | maybeEmptyCursor = generateCursor(startCSN, matchingStrategy, positionStrategy); |
| | | if (maybeEmptyCursor.isEmpty) |
| | | { |
| | | // We could not move the cursor to the expected startCSN |
| | | if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS) |
| | | { |
| | | // We could not even move the cursor close to it |
| | | // => return empty cursor |
| | | isClosed = true; |
| | | cursor = null; |
| | | return; |
| | | } |
| | | |
| | | if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY) |
| | | { |
| | | // We can move close to the startCSN. |
| | | // Let's create a cursor from that point. |
| | | key.setData(null); |
| | | if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | localCursor.close(); |
| | | localCursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | isClosed = true; |
| | | cursor = null; |
| | | return; |
| | | } |
| | | cursor = localCursor; |
| | | cursorHeld = cursor != null; |
| | | |
| | | cursor = maybeEmptyCursor.cursor; |
| | | if (key.getData() != null) |
| | | { |
| | | computeCurrentRecord(); |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (!cursorHeld) |
| | | if (maybeEmptyCursor != null && maybeEmptyCursor.isEmpty) |
| | | { |
| | | closeAndReleaseReadLock(localCursor); |
| | | closeAndReleaseReadLock(maybeEmptyCursor.cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Generate a possibly empty cursor with the provided start CSN and strategies. */ |
| | | private CursorWithEmptyIndicator generateCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) |
| | | { |
| | | Cursor cursor = db.openCursor(txn, null); |
| | | boolean isCsnFound = startCSN == null || cursor.getSearchKey(key, data, LockMode.DEFAULT) == SUCCESS; |
| | | if (!isCsnFound) |
| | | { |
| | | if (matchingStrategy == EQUAL_TO_KEY) |
| | | { |
| | | return CursorWithEmptyIndicator.createEmpty(cursor); |
| | | } |
| | | |
| | | boolean isGreaterCsnFound = cursor.getSearchKeyRange(key, data, DEFAULT) == SUCCESS; |
| | | if (isGreaterCsnFound) |
| | | { |
| | | if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && positionStrategy == AFTER_MATCHING_KEY) |
| | | { |
| | | // Move backward so that the first call to next() points to this greater csn |
| | | key.setData(null); |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | // Edge case: we're at the beginning of the database |
| | | cursor.close(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | else if (matchingStrategy == LESS_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | // Move backward to point on the lower csn |
| | | key.setData(null); |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | // Edge case: we're at the beginning of the log, there is no lower csn |
| | | return CursorWithEmptyIndicator.createEmpty(cursor); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | // There is no greater csn |
| | | return CursorWithEmptyIndicator.createEmpty(cursor); |
| | | } |
| | | // LESS_THAN_OR_EQUAL_TO_KEY case : the lower csn is the highest csn available |
| | | key.setData(null); |
| | | boolean isLastKeyFound = cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS; |
| | | if (!isLastKeyFound) |
| | | { |
| | | // Edge case: empty database |
| | | cursor.close(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | } |
| | | return CursorWithEmptyIndicator.createNonEmpty(cursor); |
| | | } |
| | | |
| | | private ReplServerDBCursor() throws ChangelogException |
| | | { |
| | | key = new DatabaseEntry(); |