| | |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | * @return a non null {@link DBCursor} going from oldest to newest CSN |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState) |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy) |
| | | */ |
| | | public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | { |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState); |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | |
| | | /** |
| | | * Represents a cursor key matching strategy, which allow to choose if only |
| | | * the exact key must be found or if any key equals or higher should match. |
| | | */ |
| | | public enum KeyMatchingStrategy { |
| | | /** matches only if the exact key is found. */ |
| | | EQUAL_TO_KEY, |
| | | /** matches if the key or a greater key is found. */ |
| | | GREATER_THAN_OR_EQUAL_TO_KEY |
| | | } |
| | | |
| | | /** |
| | | * Represents a cursor positioning strategy, which allow to choose if the start point |
| | | * corresponds to the record at the provided key or the record just after the provided |
| | | * key. |
| | | */ |
| | | public enum PositionStrategy { |
| | | /** start point is on the matching key. */ |
| | | ON_MATCHING_KEY, |
| | | /** start point is after the matching key. */ |
| | | AFTER_MATCHING_KEY |
| | | } |
| | | |
| | | /** |
| | | * Getter for the current record. |
| | | * |
| | | * @return The current record. |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | |
| | | void removeDomain(DN baseDN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting after the |
| | | * Generates a {@link DBCursor} across all the domains starting at or after the |
| | | * provided {@link MultiDomainServerState} for each domain. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | | * |
| | | * @param startAfterState |
| | | * @param startState |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState) |
| | | * @see #getCursorFrom(DN, ServerState, PositionStrategy) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState) |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | |
| | | // serverId methods |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the replicaDBs for the specified |
| | | * replication domain starting after the provided {@link ServerState} for each |
| | | * replication domain starting at or after the provided {@link ServerState} for each |
| | | * replicaDBs. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param startAfterState |
| | | * @param startState |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN) |
| | | * @see #getCursorFrom(DN, int, CSN, PositionStrategy) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState) |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | | * replication domain and serverId starting after the provided {@link CSN}. |
| | | * replication domain and serverId starting at or after the provided {@link CSN}. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | |
| | | * the replication domain baseDN of the replicaDB |
| | | * @param serverId |
| | | * the serverId of the replicaDB |
| | | * @param startAfterCSN |
| | | * @param startCSN |
| | | * Starting point for the ReplicaDB cursor. If the CSN is null, then |
| | | * start from the oldest CSN for this replicaDB |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.EOFException; |
| | |
| | | import java.io.IOException; |
| | | import java.io.RandomAccessFile; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Position the reader to the record corresponding to the provided key or to |
| | | * the nearest key (the lowest key higher than the provided key), and returns |
| | | * the last record read. |
| | | * Position the reader to the record corresponding to the provided key and |
| | | * matching and positioning strategies. Returns the last record read. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position. Key must not be {@code null}. |
| | | * @param findNextRecord |
| | | * If {@code true}, start position is the lowest key that is higher |
| | | * than the provided key, otherwise start position is the provided |
| | | * key. |
| | | * @param matchStrategy |
| | | * The key matching strategy. |
| | | * @param positionStrategy |
| | | * The positioning strategy. |
| | | * @return The pair (key_found, last_record_read). key_found is a boolean |
| | | * indicating if reader is successfully positioned to the key or the |
| | | * nearest key. last_record_read is the last record that was read. |
| | | * When key_found is equals to {@code false}, then last_record_read is |
| | | * always {@code null}. When key_found is equals to {@code true}, |
| | | * last_record_read can be valued or be {@code null} |
| | | * indicating if reader is successfully positioned. last_record_read |
| | | * is the last record that was read. When key_found is equals to |
| | | * {@code false}, then last_record_read is always {@code null}. When |
| | | * key_found is equals to {@code true}, last_record_read can be valued |
| | | * or be {@code null} |
| | | * @throws ChangelogException |
| | | * If an error occurs when seeking the key. |
| | | */ |
| | | public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException |
| | | public Pair<Boolean, Record<K,V>> seekToRecord( |
| | | final K key, |
| | | final KeyMatchingStrategy matchStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | Reject.checkNotNull(key); |
| | | final long markerPosition = searchClosestBlockStartToKey(key); |
| | | if (markerPosition >= 0) |
| | | { |
| | | return positionToKeySequentially(markerPosition, key, findNextRecord); |
| | | return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy); |
| | | } |
| | | return Pair.of(false, null); |
| | | } |
| | |
| | | |
| | | /** |
| | | * Position to provided key, starting from provided block start position and |
| | | * reading sequentially until key is found. |
| | | * reading sequentially until key is found according to matching and |
| | | * positioning strategies. |
| | | * |
| | | * @param blockStartPosition |
| | | * Position of read pointer in the file, expected to be the start of |
| | | * a block where a record offset is written. |
| | | * @param key |
| | | * The key to find. |
| | | * @param findNextRecord |
| | | * If {@code true}, position at the end of this method is the lowest |
| | | * key that is higher than the provided key, otherwise position is |
| | | * the provided key. |
| | | * @param matchStrategy |
| | | * The key matching strategy. |
| | | * @param positionStrategy |
| | | * The positioning strategy. |
| | | * @return The pair ({@code true}, last record read) if reader is successfully |
| | | * positioned to the key or the nearest key (last record may be null |
| | | * if end of file is reached), ({@code false}, null) otherwise. |
| | | * positioned (last record may be null if end of file is reached), ( |
| | | * {@code false}, null) otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | * If an error occurs. |
| | | */ |
| | | Pair<Boolean, Record<K,V>> positionToKeySequentially( |
| | | final long blockStartPosition, |
| | | final K key, |
| | | final boolean findNextRecord) |
| | | final KeyMatchingStrategy matchStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException { |
| | | Record<K,V> record = readRecord(blockStartPosition); |
| | | do { |
| | | if (record != null) |
| | | { |
| | | final int keysComparison = record.getKey().compareTo(key); |
| | | final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key); |
| | | final boolean matches = (matchStrategy == EQUAL_TO_KEY && keysComparison == 0) |
| | | || (matchStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && keysComparison >= 0); |
| | | if (matches) |
| | | { |
| | | if (findNextRecord && keysComparison == 0) |
| | | if (positionStrategy == AFTER_MATCHING_KEY && keysComparison == 0) |
| | | { |
| | | // skip key in order to position on lowest higher key |
| | | // skip matching key |
| | | record = readRecord(); |
| | | } |
| | | return Pair.of(true, record); |
| | |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | | import org.opends.server.replication.server.changelog.je.DomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(); |
| | | |
| | | static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB = |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null); |
| | | new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY); |
| | | |
| | | /** |
| | | * Creates a new changelog DB. |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | cursor.addDomain(baseDN, startState.getServerState(baseDN)); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the update messages from this DB, |
| | | * starting at the position defined by the smallest CSN that is strictly |
| | | * higher than the provided CSN. |
| | | * Returns a cursor that allows to retrieve the update messages from this DB. |
| | | * The starting position is defined by the provided CSN and cursor |
| | | * positioning strategy. |
| | | * |
| | | * @param startAfterCSN |
| | | * @param startCSN |
| | | * The position where the cursor must start. If null, start from the |
| | | * oldest CSN |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to choose if cursor must |
| | | * start from the provided CSN or just after the provided CSN. |
| | | * @return a new {@link DBCursor} to retreive update messages. |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN); |
| | | return new FileReplicaDBCursor(cursor, startAfterCSN); |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startCSN, positionStrategy); |
| | | return new FileReplicaDBCursor(cursor, startCSN, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | |
| | | */ |
| | | class FileReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | /** The underlying cursor. */ |
| | | private final RepositionableCursor<CSN, UpdateMsg> cursor; |
| | | |
| | |
| | | /** The CSN to re-start with in case the cursor is exhausted. */ |
| | | private CSN lastNonNullCurrentCSN; |
| | | |
| | | private PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Creates the cursor from provided log cursor and start CSN. |
| | | * |
| | | * @param cursor |
| | | * The underlying log cursor to read log. |
| | | * @param startAfterCSN |
| | | * @param startCSN |
| | | * The CSN to use as a start point (excluded from cursor, the lowest |
| | | * CSN higher than this CSN is used as the real start point). |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to choose if cursor must |
| | | * start from the provided CSN or just after the provided CSN. |
| | | */ |
| | | FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) { |
| | | FileReplicaDBCursor( |
| | | final RepositionableCursor<CSN, UpdateMsg> cursor, |
| | | final CSN startCSN, |
| | | final PositionStrategy positionStrategy) { |
| | | this.cursor = cursor; |
| | | this.lastNonNullCurrentCSN = startAfterCSN; |
| | | this.lastNonNullCurrentCSN = startCSN; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | if (cursor.next()) |
| | | { |
| | | nextRecord = cursor.getRecord(); |
| | | if (nextRecord.getKey().compareTo(lastNonNullCurrentCSN) > 0) |
| | | final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN); |
| | | if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY)) |
| | | { |
| | | // start CSN is found, switch to position strategy that always find the next |
| | | lastNonNullCurrentCSN = nextRecord.getKey(); |
| | | positionStrategy = AFTER_MATCHING_KEY; |
| | | return true; |
| | | } |
| | | } |
| | | // either cursor is exhausted or we still have not reached the start CSN |
| | | return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned(); |
| | | } |
| | | |
| | | /** Re-initialize the cursor after the last non null CSN. */ |
| | | private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException |
| | | { |
| | | final boolean found = cursor.positionTo(lastNonNullCurrentCSN, true); |
| | | final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | if (found && cursor.next()) |
| | | { |
| | | nextRecord = cursor.getRecord(); |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.Closeable; |
| | |
| | | import org.opends.server.loggers.ErrorLogger; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | cursor.positionTo(null, false); |
| | | cursor.positionTo(null, null, null); |
| | | registerCursor(cursor); |
| | | return cursor; |
| | | } |
| | |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, false); |
| | | return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, null); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the smallest key that is higher than |
| | | * the provided key. |
| | | * Returns a cursor that allows to retrieve the records from this log. |
| | | * The starting position is defined by the provided key and cursor |
| | | * positioning strategy. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @param positionStrategy |
| | | * The cursor positioning strategy. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException |
| | | public RepositionableCursor<K, V> getNearestCursor(final K key, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | return getCursor(key, true); |
| | | return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | } |
| | | |
| | | /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */ |
| | | private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException |
| | | /** |
| | | * Returns a cursor starting from a key, using the provided matching and |
| | | * position strategies for the cursor. |
| | | */ |
| | | private RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | if (key == null) |
| | | { |
| | |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | final boolean isFound = cursor.positionTo(key, findNearest); |
| | | // for nearest case, it is ok if the target is not found |
| | | if (isFound || findNearest) |
| | | final boolean isFound = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // When not matching the exact key, it is ok if the target is not found |
| | | if (isFound || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | registerCursor(cursor); |
| | | return cursor; |
| | |
| | | static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>> |
| | | { |
| | | /** |
| | | * Position the cursor to the record corresponding to the provided key or to |
| | | * the nearest key (the lowest key higher than the provided key). |
| | | * Position the cursor to the record corresponding to the provided key and |
| | | * provided matching and positioning strategies. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, use the key of the first record instead. |
| | | * @param findNearest |
| | | * If {@code true}, start position is the lowest key that is higher |
| | | * than the provided key, otherwise start position is the provided |
| | | * key. |
| | | * @return {@code true} if cursor is successfully positionned to the key or |
| | | * the nearest key, {@code false} otherwise. |
| | | * @param matchStrategy |
| | | * The cursor key matching strategy. |
| | | * @param positionStrategy |
| | | * The cursor positioning strategy. |
| | | * @return {@code true} if cursor is successfully positioned, or |
| | | * {@code false} otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs when positioning cursor. |
| | | */ |
| | | boolean positionTo(K key, boolean findNearest) throws ChangelogException; |
| | | boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | } |
| | | |
| | | /** |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException |
| | | public boolean positionTo( |
| | | final K key, |
| | | final KeyMatchingStrategy matchStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | if (actAsEmptyCursor) |
| | | { |
| | |
| | | { |
| | | switchToLogFile(logFile); |
| | | } |
| | | return (key == null) ? true : currentCursor.positionTo(key, findNearest); |
| | | return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy); |
| | | } |
| | | finally |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException |
| | | public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException |
| | | { |
| | | return false; |
| | | } |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import java.io.BufferedWriter; |
| | | import java.io.Closeable; |
| | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | */ |
| | | LogFileCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, false); |
| | | return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, PositionStrategy.ON_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, true); |
| | | return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | } |
| | | |
| | | /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */ |
| | | private LogFileCursor<K, V> getCursor(final K key, boolean findNearest) |
| | | throws ChangelogException |
| | | private LogFileCursor<K, V> getCursor( |
| | | final K key, |
| | | final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | if (key == null) |
| | | { |
| | |
| | | try |
| | | { |
| | | cursor = new LogFileCursor<K, V>(this); |
| | | cursor.positionTo(key, findNearest); |
| | | cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // if target is not found, cursor is positioned at end of stream |
| | | return cursor; |
| | | } |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionTo(final K key, boolean findNearest) throws ChangelogException { |
| | | final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest); |
| | | public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos) |
| | | throws ChangelogException { |
| | | final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos); |
| | | final boolean found = result.getFirst(); |
| | | initialRecord = found ? result.getSecond() : null; |
| | | return found; |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV); |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | | * |
| | |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB) |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** |
| | |
| | | final Entry<Integer, CSN> pair = iter.next(); |
| | | final int serverId = pair.getKey(); |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicReference; |
| | | |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | cursor.addDomain(baseDN, startState.getServerState(baseDN)); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | |
| | | { |
| | | Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY" |
| | | + " is not supported for the JE implementation fo changelog"); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | replicaDB.generateCursorFrom(startCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | |
| | | private final ConcurrentSkipListSet<DN> removeDomains = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Builds a MultiDomainDBCursor instance. |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | */ |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB) |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | { |
| | | this.domainDB = domainDB; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** |
| | |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.file.BlockLogReader.*; |
| | | |
| | | import java.io.File; |
| | |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest); |
| | | KeyMatchingStrategy matchStrategy = |
| | | findNearest ? KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY : KeyMatchingStrategy.EQUAL_TO_KEY; |
| | | PositionStrategy posStrategy = |
| | | findNearest ? PositionStrategy.AFTER_MATCHING_KEY : PositionStrategy.ON_MATCHING_KEY; |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchStrategy, posStrategy); |
| | | |
| | | assertThat(result.getFirst()).isEqualTo(expectedFound); |
| | | assertThat(result.getSecond()).isEqualTo(expectedRecord); |
| | |
| | | for (Integer key : keysToSeek) |
| | | { |
| | | final long ts = System.nanoTime(); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false); |
| | | Pair<Boolean, Record<Integer, Integer>> result = |
| | | reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | final long te = System.nanoTime() - ts; |
| | | if (te < minTime) minTime = te; |
| | | if (te > maxTime) maxTime = te; |
| | |
| | | for (Integer val : keysToSeek) |
| | | { |
| | | long ts = System.nanoTime(); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false); |
| | | Pair<Boolean, Record<Integer, Integer>> result = |
| | | reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | assertThat(result.getSecond()).isEqualTo(Record.from(val, val)); |
| | | long te = System.nanoTime() - ts; |
| | | if (te < minTime) minTime = te; |
| | |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | /** |
| | | * Test the FileReplicaDB class |
| | |
| | | } |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[0]); |
| | | cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY); |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[1]); |
| | | StaticUtils.close(cursor); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[3]); |
| | | cursor = replicaDB.generateCursorFrom(csns[3], AFTER_MATCHING_KEY); |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[4]); |
| | | StaticUtils.close(cursor); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[4]); |
| | | cursor = replicaDB.generateCursorFrom(csns[4], AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getRecord()); |
| | | } |
| | |
| | | |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey]); |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], PositionStrategy.AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | |
| | | int[] indicesToAdd = new int[] { 0, 1, 2, 4 }; |
| | |
| | | return; |
| | | } |
| | | |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]); |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY); |
| | | try |
| | | { |
| | | // Cursor points to a null record initially |
| | |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | { |
| | | cursor = replicaDB.generateCursorFrom(csn); |
| | | cursor = replicaDB.generateCursorFrom(csn, AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getRecord()); |
| | | } |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.file.LogFileTest.*; |
| | | |
| | | import java.io.File; |
| | |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null; |
| | | try { |
| | | // this key is the first key of the log file "key1_key2.log" |
| | | cursor1 = log.getNearestCursor("key001"); |
| | | cursor1 = log.getNearestCursor("key001", AFTER_MATCHING_KEY); |
| | | assertThatCursorCanBeFullyReadFromStart(cursor1, 2, 10); |
| | | |
| | | // this key is the last key of the log file "key3_key4.log" |
| | | cursor2 = log.getNearestCursor("key004"); |
| | | cursor2 = log.getNearestCursor("key004", AFTER_MATCHING_KEY); |
| | | assertThatCursorCanBeFullyReadFromStart(cursor2, 5, 10); |
| | | |
| | | cursor3 = log.getNearestCursor("key009"); |
| | | cursor3 = log.getNearestCursor("key009", AFTER_MATCHING_KEY); |
| | | assertThatCursorCanBeFullyReadFromStart(cursor3, 10, 10); |
| | | } |
| | | finally { |
| | |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getNearestCursor("key010"); |
| | | cursor = log.getNearestCursor("key010", AFTER_MATCHING_KEY); |
| | | |
| | | // lowest higher key does not exist |
| | | assertThatCursorIsExhausted(cursor); |
| | |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getNearestCursor("key005000"); |
| | | cursor = log.getNearestCursor("key005000", AFTER_MATCHING_KEY); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 6, 10); |
| | | } |
| | |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getNearestCursor(null); |
| | | cursor = log.getNearestCursor(null, null); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | import org.testng.annotations.*; |
| | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Matchers.*; |
| | | import static org.mockito.Mockito.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | /** |
| | | * Test for ChangeNumberIndexer class. All dependencies to the changelog DB |
| | |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY); |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn( |
| | | multiDomainCursor); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB); |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class))) |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |