| | |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | /** |
| | | * Berkeley DB JE implementation of {@link DBCursor}. |
| | | * |
| | | * \@NotThreadSafe |
| | | */ |
| | | public class JEReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | class JEReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | private UpdateMsg currentChange; |
| | | private ReplServerDBCursor cursor; |
| | | private final ReplicationDB db; |
| | | private final PositionStrategy positionStrategy; |
| | | private JEReplicaDB replicaDB; |
| | | private ReplicationDB db; |
| | | private CSN lastNonNullCurrentCSN; |
| | | private ReplServerDBCursor cursor; |
| | | private UpdateMsg currentChange; |
| | | |
| | | /** |
| | | * Creates a new {@link JEReplicaDBCursor}. All created cursor must be |
| | |
| | | * |
| | | * @param db |
| | | * The db where the cursor must be created. |
| | | * @param startAfterCSN |
| | | * @param startCSN |
| | | * The CSN after which the cursor must start.If null, start from the |
| | | * oldest CSN |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * @param replicaDB |
| | | * The associated JEReplicaDB. |
| | | * @throws ChangelogException |
| | | * if a database problem happened. |
| | | * if a database problem happened. |
| | | */ |
| | | public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN, |
| | | public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy, |
| | | JEReplicaDB replicaDB) throws ChangelogException |
| | | { |
| | | this.db = db; |
| | | this.positionStrategy = positionStrategy; |
| | | this.replicaDB = replicaDB; |
| | | this.lastNonNullCurrentCSN = startAfterCSN; |
| | | this.lastNonNullCurrentCSN = startCSN; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | final ReplServerDBCursor localCursor = cursor; |
| | | currentChange = localCursor != null ? localCursor.next() : null; |
| | | |
| | | if (currentChange == null) |
| | | { |
| | | synchronized (this) |
| | |
| | | // if following code is called while the cursor is closed. |
| | | // It is better to let the deadlock happen to help quickly identifying |
| | | // and fixing such issue with unit tests. |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN); |
| | | currentChange = cursor.next(); |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy); |
| | | } |
| | | } |
| | | |
| | | // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized. |
| | | if (positionStrategy == ON_MATCHING_KEY && currentChange != null |
| | | || positionStrategy == AFTER_MATCHING_KEY) |
| | | { |
| | | cursor.next(); |
| | | } |
| | | currentChange = cursor.getRecord(); |
| | | |
| | | if (currentChange != null) |
| | | { |
| | | lastNonNullCurrentCSN = currentChange.getCSN(); |
| | |
| | | synchronized (this) |
| | | { |
| | | closeCursor(); |
| | | this.replicaDB = null; |
| | | replicaDB = null; |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | cursor.close(); |
| | | cursor = null; |
| | | currentChange = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Called by the Gc when the object is garbage collected. Release the internal |
| | | * Called by the GC when the object is garbage collected. Release the internal |
| | | * cursor in case the cursor was badly used and {@link #close()} was never |
| | | * called. |
| | | */ |
| | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " currentChange=" + currentChange |
| | | return getClass().getSimpleName() |
| | | + " positionStrategy=" + positionStrategy |
| | | + " currentChange=" + currentChange |
| | | + " replicaDB=" + replicaDB; |
| | | } |
| | | } |