OPENDJ-1444 CR-4537 Remove previous cookie from storage of ChangeNumberIndexDB
* Implement a new matching strategy for cursors : LESS_THAN_OR_EQUAL_TO_KEY
for both je and file-based implementations
* Replace the previous cookie by the usage of an ECLMultiDomainDBCursor generated
with the medium consistency CSN as start point and the
LESS_THAN_OR_EQUAL_TO_KEY strategy
in classes ChangeNumberIndexer and ChangelogBackend
* Remove storage of the previous cookie in the log for both je and file-based
implementations
| | |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.plugin.MultimasterReplication.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.LDIFWriter.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | new MultiDomainServerState(), ON_MATCHING_KEY, getExcludedBaseDNs()); |
| | | new MultiDomainServerState(), GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, getExcludedBaseDNs()); |
| | | try |
| | | { |
| | | baseEntryHasSubordinates = cursor.next(); |
| | |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | |
| | | boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie); |
| | |
| | | try |
| | | { |
| | | cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber); |
| | | MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | final boolean continueSearch = |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie); |
| | | if (continueSearch) |
| | | { |
| | | entrySender.transitioningToPersistentSearchPhase(); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie); |
| | | } |
| | | } |
| | | finally |
| | |
| | | |
| | | private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender, |
| | | final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, |
| | | AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException |
| | | AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, MultiDomainServerState cookie) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | boolean continueSearch = true; |
| | | while (continueSearch && cnIndexDBCursor.next()) |
| | |
| | | if (replicaUpdatesCursor.get() == null) |
| | | { |
| | | replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord)); |
| | | initializeCookieForChangeNumberMode(cookie, cnIndexRecord); |
| | | } |
| | | else |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | } |
| | | continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); |
| | | if (continueSearch) |
| | |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get()); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg); |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); |
| | | replicaUpdatesCursor.get().next(); |
| | | } |
| | | } |
| | |
| | | return continueSearch; |
| | | } |
| | | |
| | | /** Initialize the provided cookie from the provided change number index record. */ |
| | | private void initializeCookieForChangeNumberMode( |
| | | MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | | ECLMultiDomainDBCursor eclCursor = null; |
| | | try |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | MultiDomainDBCursor cursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, |
| | | LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | eclCursor.next(); |
| | | cookie.update(eclCursor.toCookie()); |
| | | } |
| | | finally |
| | | { |
| | | close(eclCursor); |
| | | } |
| | | } |
| | | |
| | | private MultiDomainDBCursor initializeReplicaUpdatesCursor( |
| | | final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | |
| | | // No need for ECLMultiDomainDBCursor in this case |
| | | // as updateMsg will be matched with cnIndexRecord |
| | | final MultiDomainDBCursor replicaUpdatesCursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY); |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | replicaUpdatesCursor.next(); |
| | | return replicaUpdatesCursor; |
| | | } |
| | |
| | | /** |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) |
| | | throws DirectoryException |
| | | private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg, |
| | | MultiDomainServerState cookie) throws DirectoryException |
| | | { |
| | | final DN baseDN = cnIndexRecord.getBaseDN(); |
| | | final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie()); |
| | | cookie.update(baseDN, cnIndexRecord.getCSN()); |
| | | final String cookieString = cookie.toString(); |
| | | |
| | | sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber()); |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg); |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); |
| | | return sendEntryIfMatches(searchOp, entry, null); |
| | | } |
| | | |
| | |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | |
| | | * @return a non null {@link DBCursor} going from oldest to newest CSN |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy) |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy) |
| | | */ |
| | | public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | { |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY); |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | |
| | | |
| | | /** This is the key used to store this record. */ |
| | | private final long changeNumber; |
| | | /** This is used on startup to recover the medium consistency point. */ |
| | | private final String previousCookie; |
| | | /** The baseDN where the change happened. */ |
| | | private final DN baseDN; |
| | | /** The CSN of the change. */ |
| | |
| | | * |
| | | * @param changeNumber |
| | | * the change number |
| | | * @param previousCookie |
| | | * the previous cookie |
| | | * @param baseDN |
| | | * the baseDN |
| | | * @param csn |
| | | * the replication CSN field |
| | | */ |
| | | public ChangeNumberIndexRecord(long changeNumber, String previousCookie, |
| | | DN baseDN, CSN csn) |
| | | public ChangeNumberIndexRecord(long changeNumber, DN baseDN, CSN csn) |
| | | { |
| | | this.changeNumber = changeNumber; |
| | | this.previousCookie = previousCookie; |
| | | this.baseDN = baseDN; |
| | | this.csn = csn; |
| | | } |
| | | |
| | | /** |
| | | * Builds an instance of this class, with changeNumber equal to 0. |
| | | * |
| | | * @param previousCookie |
| | | * the previous cookie |
| | | * @param baseDN |
| | | * the baseDN |
| | | * @param csn |
| | | * the replication CSN field |
| | | * @see #ChangeNumberIndexRecord(long, String, DN, CSN) |
| | | * |
| | | * @see #ChangeNumberIndexRecord(long, DN, CSN) |
| | | */ |
| | | public ChangeNumberIndexRecord(String previousCookie, DN baseDN, CSN csn) |
| | | public ChangeNumberIndexRecord(DN baseDN, CSN csn) |
| | | { |
| | | this(0, previousCookie, baseDN, csn); |
| | | this(0, baseDN, csn); |
| | | } |
| | | |
| | | /** |
| | |
| | | return changeNumber; |
| | | } |
| | | |
| | | /** |
| | | * Get the previous cookie field. |
| | | * |
| | | * @return the previous cookie. |
| | | */ |
| | | public String getPreviousCookie() |
| | | { |
| | | return previousCookie; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN |
| | | + " previousCookie=" + previousCookie; |
| | | return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN; |
| | | } |
| | | } |
| | |
| | | * } |
| | | * </pre> |
| | | * |
| | | * A cursor can be initialised from a key, using a {@code KeyMatchingStrategy} and |
| | | * a {@code PositionStrategy}, to determine the exact starting position. |
| | | * <p> |
| | | * Let's call Kp the highest key lower than K and Kn the lowest key higher |
| | | * than K : Kp < K < Kn |
| | | * <ul> |
| | | * <li>When using EQUAL_TO_KEY on key K : |
| | | * <ul> |
| | | * <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log), |
| | | * otherwise it is empty</li> |
| | | * <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if K exists in log), |
| | | * otherwise it is empty</li> |
| | | * </ul> |
| | | * </li> |
| | | * <li>When using LESS_THAN_OR_EQUAL_TO_KEY on key K : |
| | | * <ul> |
| | | * <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log) |
| | | * or else Kp (if Kp exists in log), otherwise it is empty</li> |
| | | * <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if Kp or K exist in log), |
| | | * otherwise it is empty</li> |
| | | * </ul> |
| | | * </li> |
| | | * <li>When using GREATER_THAN_OR_EQUAL_TO_KEY on key K : |
| | | * <ul> |
| | | * <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log) |
| | | * or else Kn (if Kn exists in log), otherwise it is empty</li> |
| | | * <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if K or Kn exist in log), |
| | | * otherwise it is empty</li> |
| | | * </ul> |
| | | * </li> |
| | | * </ul> |
| | | * |
| | | * @param <T> |
| | | * type of the record being returned |
| | | * \@NotThreadSafe |
| | |
| | | |
| | | /** |
| | | * Represents a cursor key matching strategy, which allow to choose if only |
| | | * the exact key must be found or if any key equals or higher should match. |
| | | * the exact key must be found or if any key equal or lower/higher should match. |
| | | */ |
| | | public enum KeyMatchingStrategy { |
| | | /** matches if the key or a lower key is found. */ |
| | | LESS_THAN_OR_EQUAL_TO_KEY, |
| | | /** matches only if the exact key is found. */ |
| | | EQUAL_TO_KEY, |
| | | /** matches if the key or a greater key is found. */ |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | |
| | | /** |
| | | * This interface allows to query or control the replication domain database(s) |
| | | * (composed of one or more ReplicaDBs) and query/update each ReplicaDB. |
| | | * <p> |
| | | * In particular, the {@code getCursorFom()} methods allow to obtain a cursor at any level: |
| | | * <ul> |
| | | * <li>Across all the domains, provided a {@link MultiDomainServerState}</li> |
| | | * <li>Across all replicaDBs of a domain, provided a {@link ServerState}</li> |
| | | * <li>On one replica DB for a domain and serverId, provided a CSN</li> |
| | | * </ul> |
| | | * The cursor starting point is specified by providing a key, a {@link KeyMatchingStrategy} and |
| | | * a {@link PositionStrategy}. |
| | | */ |
| | | public interface ReplicationDomainDB |
| | | { |
| | |
| | | void removeDomain(DN baseDN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting at or after the |
| | | * provided {@link MultiDomainServerState} for each domain. |
| | | * Generates a {@link DBCursor} across all the domains starting before, at or |
| | | * after the provided {@link MultiDomainServerState} for each domain, |
| | | * depending on the provided matching and positioning strategies. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState, PositionStrategy) |
| | | * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting at or after |
| | | * the provided {@link MultiDomainServerState} for each domain, excluding a |
| | | * provided set of domain DNs. |
| | | * Generates a {@link DBCursor} across all the domains starting before, at or |
| | | * after the provided {@link MultiDomainServerState} for each domain, |
| | | * excluding a provided set of domain DNs. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which exact |
| | | * position the cursor must start |
| | | * Cursor position strategy |
| | | * @param excludedDomainDns |
| | | * Every domain appearing in this set is excluded from the cursor |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState, PositionStrategy) |
| | | * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy, |
| | | Set<DN> excludedDomainDns) throws ChangelogException; |
| | | |
| | | // serverId methods |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the replicaDBs for the specified |
| | | * replication domain starting at or after the provided {@link ServerState} for each |
| | | * replicaDBs. |
| | | * replication domain starting before, at or after the provided |
| | | * {@link ServerState} for each replicaDB, depending on the provided matching |
| | | * and positioning strategies. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN, PositionStrategy) |
| | | * @see #getCursorFrom(DN, int, CSN, KeyMatchingStrategy, PositionStrategy) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | | * replication domain and serverId starting at or after the provided {@link CSN}. |
| | | * replication domain and serverId starting beofre, at or after the provided |
| | | * {@link CSN}, depending on the provided matching and positioning strategies. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | |
| | | * @param startCSN |
| | | * Starting point for the ReplicaDB cursor. If the CSN is null, then |
| | | * start from the oldest CSN for this replicaDB |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException; |
| | | |
| | | /** |
| | | * Unregisters the provided cursor from this replication domain. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Position to provided key, starting from provided block start position and |
| | | * reading sequentially until key is found according to matching and |
| | | * positioning strategies. |
| | | * Position before, at or after provided key, starting from provided block |
| | | * start position and reading sequentially until key is found according to |
| | | * matching and positioning strategies. |
| | | * |
| | | * @param blockStartPosition |
| | | * Position of read pointer in the file, expected to be the start of |
| | | * a block where a record offset is written. |
| | | * a block where a record offset is written |
| | | * @param key |
| | | * The key to find. |
| | | * The key to find |
| | | * @param matchStrategy |
| | | * The key matching strategy. |
| | | * The key matching strategy |
| | | * @param positionStrategy |
| | | * The positioning strategy. |
| | | * @return The pair ({@code true}, last record read) if reader is successfully |
| | | * positioned (last record may be null if end of file is reached), ( |
| | | * {@code false}, null) otherwise. |
| | | * The positioning strategy |
| | | * @return The pair ({@code true}, selected record) if reader is successfully |
| | | * positioned (selected record may be null if end of file is reached), |
| | | * ({@code false}, null) otherwise. |
| | | * @throws ChangelogException |
| | | * If an error occurs. |
| | | */ |
| | | Pair<Boolean, Record<K,V>> positionToKeySequentially( |
| | | final long blockStartPosition, |
| | | final K key, |
| | | final KeyMatchingStrategy matchStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException { |
| | | Pair<Boolean, Record<K,V>> positionToKeySequentially(final long blockStartPosition, final K key, |
| | | final KeyMatchingStrategy matchStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | Record<K,V> record = readRecord(blockStartPosition); |
| | | do { |
| | | if (record != null) |
| | | Record<K,V> previousRecord = null; |
| | | long previousPosition = blockStartPosition; |
| | | boolean matchingKeyIsLowerThanAnyRecord = true; |
| | | while (record != null) |
| | | { |
| | | final int keysComparison = record.getKey().compareTo(key); |
| | | final boolean matches = (matchStrategy == EQUAL_TO_KEY && keysComparison == 0) |
| | | || (matchStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && keysComparison >= 0); |
| | | if (matches) |
| | | if (keysComparison <= 0) |
| | | { |
| | | if (positionStrategy == AFTER_MATCHING_KEY && keysComparison == 0) |
| | | matchingKeyIsLowerThanAnyRecord = false; |
| | | } |
| | | if ((keysComparison == 0 && matchStrategy == EQUAL_TO_KEY) |
| | | || (keysComparison >= 0 && matchStrategy != EQUAL_TO_KEY)) |
| | | { |
| | | return getMatchingRecord(matchStrategy, positionStrategy, keysComparison, matchingKeyIsLowerThanAnyRecord, |
| | | record, previousRecord, previousPosition); |
| | | } |
| | | previousRecord = record; |
| | | previousPosition = getFilePosition(); |
| | | record = readRecord(); |
| | | } |
| | | |
| | | if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | return getRecordNoMatchForLessStrategy(positionStrategy, previousRecord, previousPosition); |
| | | } |
| | | return Pair.of(false, null); |
| | | } |
| | | |
| | | private Pair<Boolean,Record<K,V>> getMatchingRecord(KeyMatchingStrategy matchStrategy, |
| | | PositionStrategy positionStrategy, int keysComparison, boolean matchKeyIsLowerThanAnyRecord, |
| | | Record<K, V> currentRecord, Record<K, V> previousRecord, long previousPosition) |
| | | throws ChangelogException |
| | | { |
| | | Record<K, V> record = currentRecord; |
| | | |
| | | if (positionStrategy == AFTER_MATCHING_KEY) |
| | | { |
| | | if (matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && matchKeyIsLowerThanAnyRecord) |
| | | { |
| | | return Pair.of(false, null); |
| | | } |
| | | if (keysComparison == 0) |
| | | { |
| | | // skip matching key |
| | | record = readRecord(); |
| | | } |
| | | } |
| | | else if (positionStrategy == ON_MATCHING_KEY && matchStrategy == LESS_THAN_OR_EQUAL_TO_KEY && keysComparison > 0) |
| | | { |
| | | seekToPosition(previousPosition); |
| | | return Pair.of(previousRecord != null, previousRecord); |
| | | } |
| | | return Pair.of(true, record); |
| | | } |
| | | |
| | | private Pair<Boolean, Record<K, V>> getRecordNoMatchForLessStrategy( |
| | | final PositionStrategy positionStrategy, final Record<K, V> previousRecord, final long previousPosition) |
| | | throws ChangelogException |
| | | { |
| | | if (positionStrategy == ON_MATCHING_KEY) |
| | | { |
| | | seekToPosition(previousPosition); |
| | | return Pair.of(previousRecord != null, previousRecord); |
| | | } |
| | | record = readRecord(); |
| | | else |
| | | { |
| | | return Pair.of(true, null); |
| | | } |
| | | while (record != null); |
| | | return Pair.of(false, null); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | final long changeNumber = nextChangeNumber(); |
| | | final ChangeNumberIndexRecord newRecord = |
| | | new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), record.getBaseDN(), record.getCSN()); |
| | | new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN()); |
| | | log.append(Record.from(newRecord.getChangeNumber(), newRecord)); |
| | | newestChangeNumber = changeNumber; |
| | | |
| | |
| | | final ChangeNumberIndexRecord cnIndexRecord = record.getValue(); |
| | | return new ByteStringBuilder() |
| | | .append(record.getKey()) |
| | | .append(cnIndexRecord.getPreviousCookie()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(cnIndexRecord.getBaseDN().toString()) |
| | | .append(STRING_SEPARATOR) |
| | | .append(cnIndexRecord.getCSN().toByteString()).toByteString(); |
| | |
| | | { |
| | | ByteSequenceReader reader = data.asReader(); |
| | | final long changeNumber = reader.getLong(); |
| | | String previousCookie = reader.getString(getNextStringLength(reader)); |
| | | reader.skip(1); |
| | | final DN baseDN = DN.decode( |
| | | reader.getString(getNextStringLength(reader))); |
| | | final DN baseDN = DN.decode(reader.getString(getNextStringLength(reader))); |
| | | reader.skip(1); |
| | | final CSN csn = CSN.valueOf(reader.getByteString(reader.remaining())); |
| | | |
| | | return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, previousCookie, baseDN, csn)); |
| | | return Record.from(changeNumber, new ChangeNumberIndexRecord(changeNumber, baseDN, csn)); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer; |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final Set<DN> excludedDomainDns = Collections.emptySet(); |
| | | return getCursorFrom(startState, positionStrategy, excludedDomainDns); |
| | | return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, |
| | | final Set<DN> excludedDomainDns) |
| | | throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the update messages from this DB. |
| | | * The starting position is defined by the provided CSN and cursor positioning |
| | | * strategy. |
| | | * The actual starting position is defined by the provided CSN, the key |
| | | * matching strategy and the positioning strategy. |
| | | * |
| | | * @param startCSN |
| | | * The position where the cursor must start. If null, start from the |
| | | * oldest CSN |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to choose if cursor must |
| | | * start from the provided CSN or just after the provided CSN. |
| | | * Cursor position strategy |
| | | * @return a new {@link DBCursor} to retrieve update messages. |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startCSN, positionStrategy); |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy); |
| | | return new FileReplicaDBCursor(cursor, startCSN, positionStrategy); |
| | | } |
| | | |
| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.io.Closeable; |
| | |
| | | */ |
| | | public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, null); |
| | | return getCursor(key, EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log. |
| | | * The starting position is defined by the provided key and cursor |
| | | * positioning strategy. |
| | | * Returns a cursor that allows to retrieve the records from this log. The |
| | | * starting position is defined by the provided key, cursor matching strategy |
| | | * and cursor positioning strategy. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy. |
| | | * @param positionStrategy |
| | | * The cursor positioning strategy. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | public RepositionableCursor<K, V> getNearestCursor(final K key, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor starting from a key, using the provided matching and |
| | | * position strategies for the cursor. |
| | | */ |
| | | private RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy, |
| | | public RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | if (key == null) |
| | |
| | | return new EmptyLogCursor<K, V>(); |
| | | } |
| | | cursor = new LogCursor<K, V>(this); |
| | | final boolean isFound = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // When not matching the exact key, it is ok if the target is not found |
| | | if (isFound || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | | final boolean isSuccessfullyPositioned = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // Allow for cursor re-initialization after exhaustion in case of GREATER_THAN_OR_EQUAL_TO_KEY strategy |
| | | if (isSuccessfullyPositioned || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | registerCursor(cursor); |
| | | return cursor; |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | import java.io.BufferedWriter; |
| | | import java.io.Closeable; |
| | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the provided key. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | LogFileCursor<K, V> getCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, PositionStrategy.ON_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the records from this log, |
| | | * starting at the position defined by the smallest key that is higher than |
| | | * the provided key. |
| | | * |
| | | * @param key |
| | | * Key to use as a start position for the cursor. If key is |
| | | * {@code null}, cursor will point at the first record of the log. |
| | | * @return a cursor on the log records, which is never {@code null} |
| | | * @throws ChangelogException |
| | | * If the cursor can't be created. |
| | | */ |
| | | LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException |
| | | { |
| | | return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | } |
| | | |
| | | /** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */ |
| | | private LogFileCursor<K, V> getCursor( |
| | | final K key, |
| | | final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | if (key == null) |
| | | { |
| | | return getCursor(); |
| | | } |
| | | LogFileCursor<K, V> cursor = null; |
| | | try |
| | | { |
| | | cursor = new LogFileCursor<K, V>(this); |
| | | cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | // if target is not found, cursor is positioned at end of stream |
| | | return cursor; |
| | | } |
| | | catch (ChangelogException e) { |
| | | StaticUtils.close(cursor); |
| | | throw e; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Returns a cursor initialised to the provided record and position in file. |
| | | * |
| | | * @param record |
| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * Thread responsible for inserting replicated changes into the ChangeNumber |
| | | * Index DB (CNIndexDB for short). Only changes older than the medium |
| | | * consistency point are inserted in the CNIndexDB. As a consequence this class |
| | | * is also responsible for maintaining the medium consistency point. |
| | | * Index DB (CNIndexDB for short). |
| | | * <p> |
| | | * Only changes older than the medium consistency point are inserted in the |
| | | * CNIndexDB. As a consequence this class is also responsible for maintaining |
| | | * the medium consistency point (indirectly through an |
| | | * {@code ECLMultiDomainDBCursor}). |
| | | */ |
| | | public class ChangeNumberIndexer extends DirectoryThread |
| | | { |
| | |
| | | /* |
| | | * The following MultiDomainServerState fields must be thread safe, because |
| | | * 1) initialization can happen while the replication server starts receiving |
| | | * updates 2) many updates can happen concurrently. |
| | | * updates |
| | | * 2) many updates can happen concurrently. |
| | | */ |
| | | /** |
| | | * Holds the cross domain medium consistency Replication Update Vector for the |
| | | * current replication server, also known as the previous cookie. |
| | | * <p> |
| | | * Stores the value of the cookie before the change currently processed is |
| | | * inserted in the DB. After insert, it is updated with the CSN of the change |
| | | * currently processed (thus becoming the "current" cookie just before the |
| | | * change is returned. |
| | | * <p> |
| | | * Note: This object is only updated by changes/updates. |
| | | * |
| | | * @see <a href= |
| | | * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names" |
| | | * >OpenDJ Domain Names - medium consistency RUV</a> |
| | | */ |
| | | private final MultiDomainServerState mediumConsistencyRUV = |
| | | new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Holds the last time each replica was seen alive, whether via updates or |
| | | * heartbeat notifications, or offline notifications. Data is held for each |
| | | * serverId cross domain. |
| | |
| | | * <p> |
| | | * Note: This object is updated by both heartbeats and changes/updates. |
| | | */ |
| | | private final MultiDomainServerState lastAliveCSNs = |
| | | new MultiDomainServerState(); |
| | | private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState(); |
| | | |
| | | /** Note: This object is updated by replica offline messages. */ |
| | | private final MultiDomainServerState replicasOffline = |
| | | new MultiDomainServerState(); |
| | | private final MultiDomainServerState replicasOffline = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Cursor across all the replicaDBs for all the replication domains. It is |
| | |
| | | } |
| | | |
| | | /** |
| | | * Restores in memory data needed to build the CNIndexDB, including the medium |
| | | * consistency point. |
| | | * Restores in memory data needed to build the CNIndexDB. In particular, |
| | | * initializes the changes cursor to the medium consistency point. |
| | | */ |
| | | private void initialize() throws ChangelogException, DirectoryException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = |
| | | changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the mediumConsistencyRUV from DB |
| | | mediumConsistencyRUV.update( |
| | | new MultiDomainServerState(newestRecord.getPreviousCookie())); |
| | | // Do not update with the newestRecord CSN |
| | | // as it will be used for a sanity check later in the same method |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | |
| | | initializeLastAliveCSNs(domainDB); |
| | | initializeNextChangeCursor(domainDB); |
| | | initializeOfflineReplicas(); |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | | |
| | | // initialize the DB cursor and the last seen updates |
| | | // to ensure the medium consistency CSN can move forward |
| | | final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB(); |
| | | private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException |
| | | { |
| | | final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN(); |
| | | |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = |
| | | domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** Returns a cookie initialised with the newest CSN for each replica. */ |
| | | private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState(); |
| | | if (newestRecord != null) |
| | | { |
| | | final CSN newestCsn = newestRecord.getCSN(); |
| | | for (DN baseDN : changelogState.getDomainToServerIds().keySet()) |
| | | { |
| | | cookieWithNewestCSN.update(baseDN, newestCsn); |
| | | } |
| | | } |
| | | return cookieWithNewestCSN; |
| | | } |
| | | |
| | | private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) |
| | | { |
| | | for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet()) |
| | | { |
| | | final DN baseDN = entry.getKey(); |
| | |
| | | lastAliveCSNs.update(baseDN, latestKnownState); |
| | | } |
| | | } |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, |
| | | domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY)); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | | { |
| | | // restore the "previousCookie" state before shutdown |
| | | UpdateMsg record = nextChangeForInsertDBCursor.getRecord(); |
| | | if (record instanceof ReplicaOfflineMsg) |
| | | { |
| | | // ignore: replica offline messages are never stored in the CNIndexDB |
| | | nextChangeForInsertDBCursor.next(); |
| | | record = nextChangeForInsertDBCursor.getRecord(); |
| | | } |
| | | |
| | | // sanity check: ensure that when initializing the cursors at the previous |
| | | // cookie, the next change we find is the newest record in the CNIndexDB |
| | | if (!record.getCSN().equals(newestRecord.getCSN())) |
| | | private void initializeOfflineReplicas() |
| | | { |
| | | throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get( |
| | | newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI())); |
| | | } |
| | | // Now we can update the mediumConsistencyRUV |
| | | mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN()); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas(); |
| | | for (DN baseDN : offlineReplicas) |
| | | { |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | // this will not be used any more. Discard for garbage collection. |
| | | this.changelogState = null; |
| | | } |
| | | |
| | | private CSN oldestPossibleCSN(int serverId) |
| | |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB. |
| | | final String previousCookie = mediumConsistencyRUV.toString(); |
| | | final long changeNumber = changelogDB.getChangeNumberIndexDB().addRecord( |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn)); |
| | | notifyEntryAddedToChangelog(baseDN, changeNumber, previousCookie, msg); |
| | | final long changeNumber = changelogDB.getChangeNumberIndexDB() |
| | | .addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie(); |
| | | notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | } |
| | | catch (InterruptedException ignored) |
| | |
| | | * the change number of the newly added entry. It will be greater |
| | | * than zero for entries added to the change number index and less |
| | | * than or equal to zero for entries added to any replica DB |
| | | * @param cookieString |
| | | * a string representing the cookie of the newly added entry. This is |
| | | * only meaningful for entries added to the change number index |
| | | * @param cookie |
| | | * the cookie of the newly added entry. This is only meaningful for |
| | | * entries added to the change number index |
| | | * @param msg |
| | | * the update message of the newly added entry |
| | | * @throws ChangelogException |
| | | * If a problem occurs while notifying of the newly added entry. |
| | | */ |
| | | protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber, |
| | | String cookieString, UpdateMsg msg) throws ChangelogException |
| | | MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException |
| | | { |
| | | ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookieString, msg); |
| | | ChangelogBackend.getInstance().notifyChangeNumberEntryAdded(baseDN, changeNumber, cookie.toString(), msg); |
| | | } |
| | | |
| | | /** |
| | |
| | | TRACER.debugError(msg.toString()); |
| | | } |
| | | |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, |
| | | final DN mcBaseDN) throws ChangelogException |
| | | private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException |
| | | { |
| | | // update, so it becomes the previous cookie for the next change |
| | | mediumConsistencyRUV.update(mcBaseDN, mcCSN); |
| | | |
| | | final int mcServerId = mcCSN.getServerId(); |
| | | final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId); |
| | | final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId); |
| | |
| | | * from the medium consistency RUV). |
| | | */ |
| | | lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN); |
| | | mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN); |
| | | } |
| | | } |
| | | |
| | |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * {@link DBCursor} implementation that iterates across a Collection of |
| | | * {@link DBCursor}s, advancing from the oldest to the newest change cross all |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this cursor. |
| | | * |
| | | * @return a list of (Data, UpdateMsg) pairs representing the state of the |
| | | * cursor. In each pair, the data or the update message may be |
| | | * {@code null}, but at least one of them is non-null. |
| | | */ |
| | | public List<Pair<Data, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<Data, UpdateMsg>> snapshot = new ArrayList<Pair<Data, UpdateMsg>>(); |
| | | for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet()) |
| | | { |
| | | final UpdateMsg updateMsg = entry.getKey().getRecord(); |
| | | final Data data = entry.getValue(); |
| | | if (updateMsg != null || data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, updateMsg)); |
| | | } |
| | | } |
| | | for (Data data : exhaustedCursors.values()) |
| | | { |
| | | if (data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, (UpdateMsg) null)); |
| | | } |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | |
| | | private final DN baseDN; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = |
| | | new ConcurrentSkipListMap<Integer, CSN>(); |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Replaces null CSNs in ConcurrentSkipListMap that does not support null values. |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | private final KeyMatchingStrategy matchingStrategy; |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy, which allow to indicates how key is |
| | | * matched |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * Cursor position strategy, which allow to indicates at which exact |
| | | * position the cursor must start |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | this.matchingStrategy = matchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | |
| | | * |
| | | * @param serverId |
| | | * the serverId of the replica |
| | | * @param startAfterCSN |
| | | * the CSN after which to start iterating |
| | | * @param startCSN |
| | | * the CSN to use as a starting point |
| | | */ |
| | | public void addReplicaDB(int serverId, CSN startAfterCSN) |
| | | public void addReplicaDB(int serverId, CSN startCSN) |
| | | { |
| | | // only keep the oldest CSN that will be the new cursor's starting point |
| | | newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN); |
| | | newReplicas.putIfAbsent(serverId, startCSN != null ? startCSN : NULL_CSN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy); |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, matchingStrategy, positionStrategy); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | { |
| | | final long changeNumber = record.getChangeNumber(); |
| | | DatabaseEntry key = new ReplicationDraftCNKey(changeNumber); |
| | | DatabaseEntry data = new DraftCNData(changeNumber, |
| | | record.getPreviousCookie(), record.getBaseDN().toNormalizedString(), |
| | | record.getCSN()); |
| | | DatabaseEntry data = new DraftCNData(changeNumber, record.getBaseDN().toNormalizedString(), record.getCSN()); |
| | | |
| | | // Use a transaction so that we can override durability. |
| | | Transaction txn = null; |
| | |
| | | * |
| | | * |
| | | * Copyright 2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2010-2013 ForgeRock AS. |
| | | * Portions Copyright 2010-2014 ForgeRock AS. |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | { |
| | | private static final String FIELD_SEPARATOR = "!"; |
| | | |
| | | private static final String EMPTY_STRING_PREVIOUS_COOKIE = ""; |
| | | |
| | | private static final long serialVersionUID = 1L; |
| | | |
| | | private long changeNumber; |
| | |
| | | * |
| | | * @param changeNumber |
| | | * the change number |
| | | * @param previousCookie |
| | | * The previous cookie |
| | | * @param baseDN |
| | | * The baseDN (domain DN) |
| | | * @param csn |
| | | * The replication CSN |
| | | */ |
| | | public DraftCNData(long changeNumber, String previousCookie, String baseDN, |
| | | CSN csn) |
| | | public DraftCNData(long changeNumber, String baseDN, CSN csn) |
| | | { |
| | | this.changeNumber = changeNumber; |
| | | String record = |
| | | previousCookie + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn; |
| | | // Although the previous cookie is not used any more, we need |
| | | // to keep it in database for compatibility with previous versions |
| | | String record = EMPTY_STRING_PREVIOUS_COOKIE + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn; |
| | | setData(getBytes(record)); |
| | | } |
| | | |
| | |
| | | { |
| | | try |
| | | { |
| | | // Although the previous cookie is not used any more, we need |
| | | // to keep it in database for compatibility with previous versions |
| | | String stringData = new String(data, "UTF-8"); |
| | | String[] str = stringData.split(FIELD_SEPARATOR, 3); |
| | | // str[0] contains previous cookie and is ignored |
| | | final DN baseDN = DN.decode(str[1]); |
| | | final CSN csn = new CSN(str[2]); |
| | | return new ChangeNumberIndexRecord(changeNumber, str[0], baseDN, csn); |
| | | return new ChangeNumberIndexRecord(changeNumber, baseDN, csn); |
| | | } |
| | | catch (UnsupportedEncodingException e) |
| | | { |
| | |
| | | public ChangeNumberIndexRecord getRecord() throws ChangelogException |
| | | { |
| | | if (record == null) |
| | | { |
| | | record = decodeData(changeNumber, getData()); |
| | | } |
| | | return record; |
| | | } |
| | | |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | | /** |
| | | * Multi domain DB cursor that only returns updates for the domains which have |
| | | * been enabled for the external changelog. |
| | |
| | | { |
| | | return getClass().getSimpleName() + " cursor=[" + cursor + ']'; |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this cursor. |
| | | * |
| | | * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled |
| | | * for the external changelog. The update message may be {@code null}. |
| | | */ |
| | | List<Pair<DN, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot(); |
| | | final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>(); |
| | | for (Pair<DN, UpdateMsg> pair : snapshot) |
| | | { |
| | | DN baseDN = pair.getFirst(); |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | eclSnapshot.add(pair); |
| | | } |
| | | } |
| | | return eclSnapshot; |
| | | } |
| | | |
| | | /** |
| | | * Returns the cookie corresponding to the state of this cursor. |
| | | * |
| | | * @return a valid cookie taking into account only the base DNs enabled for |
| | | * the external changelog |
| | | */ |
| | | public MultiDomainServerState toCookie() |
| | | { |
| | | List<Pair<DN, UpdateMsg>> snapshot = getSnapshot(); |
| | | MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | for (Pair<DN, UpdateMsg> pair : snapshot) |
| | | { |
| | | // only put base DNs where a CSN is available in the cookie |
| | | if (pair.getSecond() != null) |
| | | { |
| | | cookie.update(pair.getFirst(), pair.getSecond().getCSN()); |
| | | } |
| | | } |
| | | return cookie; |
| | | } |
| | | } |
| | |
| | | { |
| | | long changeNumber = nextChangeNumber(); |
| | | final ChangeNumberIndexRecord newRecord = |
| | | new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(), |
| | | record.getBaseDN(), record.getCSN()); |
| | | new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN()); |
| | | db.addRecord(newRecord); |
| | | newestChangeNumber = changeNumber; |
| | | |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final Set<DN> excludedDomainDns = Collections.emptySet(); |
| | | return getCursorFrom(startState, positionStrategy, excludedDomainDns); |
| | | return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, |
| | | final Set<DN> excludedDomainDns) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this); |
| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; |
| | | import org.opends.server.types.Attribute; |
| | |
| | | * @param startCSN |
| | | * The position where the cursor must start. If null, start from the |
| | | * oldest CSN |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @return a new {@link DBCursor} that allows to browse the db managed by this |
| | | * ReplicaDB and starting at the position defined by a given CSN. |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | return new JEReplicaDBCursor(db, startCSN, positionStrategy, this); |
| | | return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this); |
| | | } |
| | | |
| | | /** |
| | |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor; |
| | | |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | /** |
| | |
| | | class JEReplicaDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | private final ReplicationDB db; |
| | | private final PositionStrategy positionStrategy; |
| | | private PositionStrategy positionStrategy; |
| | | private KeyMatchingStrategy matchingStrategy; |
| | | private JEReplicaDB replicaDB; |
| | | private final CSN startCSN; |
| | | private CSN lastNonNullCurrentCSN; |
| | | private ReplServerDBCursor cursor; |
| | | private UpdateMsg currentChange; |
| | |
| | | * @param startCSN |
| | | * The CSN after which the cursor must start.If null, start from the |
| | | * oldest CSN |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @param replicaDB |
| | | * The associated JEReplicaDB. |
| | | * @throws ChangelogException |
| | | * if a database problem happened. |
| | | */ |
| | | public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy, |
| | | JEReplicaDB replicaDB) throws ChangelogException |
| | | public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, JEReplicaDB replicaDB) throws ChangelogException |
| | | { |
| | | this.db = db; |
| | | this.matchingStrategy = matchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | this.replicaDB = replicaDB; |
| | | this.startCSN = startCSN; |
| | | this.lastNonNullCurrentCSN = startCSN; |
| | | } |
| | | |
| | |
| | | // if following code is called while the cursor is closed. |
| | | // It is better to let the deadlock happen to help quickly identifying |
| | | // and fixing such issue with unit tests. |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy); |
| | | if (lastNonNullCurrentCSN != startCSN) |
| | | { |
| | | // re-initialize to further CSN, take care to use appropriate strategies |
| | | matchingStrategy = GREATER_THAN_OR_EQUAL_TO_KEY; |
| | | positionStrategy = AFTER_MATCHING_KEY; |
| | | } |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN, matchingStrategy, positionStrategy); |
| | | } |
| | | } |
| | | |
| | |
| | | private final ConcurrentSkipListMap<DN, ServerState> newDomains = |
| | | new ConcurrentSkipListMap<DN, ServerState>(); |
| | | |
| | | private final KeyMatchingStrategy matchingStrategy; |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * Cursor position strategy |
| | | */ |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | public MultiDomainDBCursor(final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | { |
| | | this.domainDB = domainDB; |
| | | this.matchingStrategy = matchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | |
| | | */ |
| | | public void addDomain(DN baseDN, ServerState startAfterState) |
| | | { |
| | | newDomains.put(baseDN, |
| | | startAfterState != null ? startAfterState : new ServerState()); |
| | | newDomains.put(baseDN, startAfterState != null ? startAfterState : new ServerState()); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy); |
| | | final DBCursor<UpdateMsg> domainDBCursor = |
| | | domainDB.getCursorFrom(baseDN, serverState, matchingStrategy, positionStrategy); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start.If null, start from the |
| | | * oldest CSN |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * Cursor position strategy |
| | | * @return The ReplServerDBCursor. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException |
| | | ReplServerDBCursor openReadCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | return new ReplServerDBCursor(startCSN, positionStrategy); |
| | | return new ReplServerDBCursor(startCSN, matchingStrategy, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | |
| | | return serverId + " " + baseDN.toNormalizedString(); |
| | | } |
| | | |
| | | /** Hold a cursor and an indicator of wether the cursor should be considered as empty. */ |
| | | private static class CursorWithEmptyIndicator |
| | | { |
| | | private Cursor cursor; |
| | | private boolean isEmpty; |
| | | |
| | | private CursorWithEmptyIndicator(Cursor localCursor, boolean isEmpty) |
| | | { |
| | | this.cursor = localCursor; |
| | | this.isEmpty = isEmpty; |
| | | } |
| | | |
| | | /** Creates cursor considered as empty. */ |
| | | static CursorWithEmptyIndicator createEmpty(Cursor cursor) |
| | | { |
| | | return new CursorWithEmptyIndicator(cursor, true); |
| | | } |
| | | |
| | | /** Creates cursor considered as non-empty. */ |
| | | static CursorWithEmptyIndicator createNonEmpty(Cursor cursor) |
| | | { |
| | | return new CursorWithEmptyIndicator(cursor, false); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This Class implements a cursor that can be used to browse a |
| | | * replicationServer database. |
| | |
| | | * <p> |
| | | * Will be set non null for a write cursor |
| | | */ |
| | | private final Cursor cursor; |
| | | private Cursor cursor; |
| | | private final DatabaseEntry key; |
| | | private final DatabaseEntry data; |
| | | /** \@Null for read cursors, \@NotNull for deleting cursors. */ |
| | |
| | | * |
| | | * @param startCSN |
| | | * The CSN from which the cursor must start. |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy, which allow to indicates how key |
| | | * is matched |
| | | * @param positionStrategy |
| | | * indicates at which exact position the cursor must start |
| | | * @throws ChangelogException |
| | | * When the startCSN does not exist. |
| | | */ |
| | | private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException |
| | | private ReplServerDBCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | key = createReplicationKey(startCSN); |
| | | data = new DatabaseEntry(); |
| | |
| | | // unlock it when throwing an exception. |
| | | dbCloseLock.readLock().lock(); |
| | | |
| | | boolean cursorHeld = false; |
| | | Cursor localCursor = null; |
| | | CursorWithEmptyIndicator maybeEmptyCursor = null; |
| | | try |
| | | { |
| | | // If the DB has been closed then create empty cursor. |
| | |
| | | return; |
| | | } |
| | | |
| | | localCursor = db.openCursor(txn, null); |
| | | if (startCSN != null |
| | | && localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | maybeEmptyCursor = generateCursor(startCSN, matchingStrategy, positionStrategy); |
| | | if (maybeEmptyCursor.isEmpty) |
| | | { |
| | | // We could not move the cursor to the expected startCSN |
| | | if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS) |
| | | { |
| | | // We could not even move the cursor close to it |
| | | // => return empty cursor |
| | | isClosed = true; |
| | | cursor = null; |
| | | return; |
| | | } |
| | | |
| | | if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY) |
| | | { |
| | | // We can move close to the startCSN. |
| | | // Let's create a cursor from that point. |
| | | key.setData(null); |
| | | if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | localCursor.close(); |
| | | localCursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | } |
| | | cursor = localCursor; |
| | | cursorHeld = cursor != null; |
| | | |
| | | cursor = maybeEmptyCursor.cursor; |
| | | if (key.getData() != null) |
| | | { |
| | | computeCurrentRecord(); |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (!cursorHeld) |
| | | if (maybeEmptyCursor != null && maybeEmptyCursor.isEmpty) |
| | | { |
| | | closeAndReleaseReadLock(localCursor); |
| | | closeAndReleaseReadLock(maybeEmptyCursor.cursor); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** Generate a possibly empty cursor with the provided start CSN and strategies. */ |
| | | private CursorWithEmptyIndicator generateCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) |
| | | { |
| | | Cursor cursor = db.openCursor(txn, null); |
| | | boolean isCsnFound = startCSN == null || cursor.getSearchKey(key, data, LockMode.DEFAULT) == SUCCESS; |
| | | if (!isCsnFound) |
| | | { |
| | | if (matchingStrategy == EQUAL_TO_KEY) |
| | | { |
| | | return CursorWithEmptyIndicator.createEmpty(cursor); |
| | | } |
| | | |
| | | boolean isGreaterCsnFound = cursor.getSearchKeyRange(key, data, DEFAULT) == SUCCESS; |
| | | if (isGreaterCsnFound) |
| | | { |
| | | if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && positionStrategy == AFTER_MATCHING_KEY) |
| | | { |
| | | // Move backward so that the first call to next() points to this greater csn |
| | | key.setData(null); |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | // Edge case: we're at the beginning of the database |
| | | cursor.close(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | else if (matchingStrategy == LESS_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | // Move backward to point on the lower csn |
| | | key.setData(null); |
| | | if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS) |
| | | { |
| | | // Edge case: we're at the beginning of the log, there is no lower csn |
| | | return CursorWithEmptyIndicator.createEmpty(cursor); |
| | | } |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY) |
| | | { |
| | | // There is no greater csn |
| | | return CursorWithEmptyIndicator.createEmpty(cursor); |
| | | } |
| | | // LESS_THAN_OR_EQUAL_TO_KEY case : the lower csn is the highest csn available |
| | | key.setData(null); |
| | | boolean isLastKeyFound = cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS; |
| | | if (!isLastKeyFound) |
| | | { |
| | | // Edge case: empty database |
| | | cursor.close(); |
| | | cursor = db.openCursor(txn, null); |
| | | } |
| | | } |
| | | } |
| | | return CursorWithEmptyIndicator.createNonEmpty(cursor); |
| | | } |
| | | |
| | | private ReplServerDBCursor() throws ChangelogException |
| | | { |
| | | key = new DatabaseEntry(); |
| | |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.OperationContext.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.types.ResultCode.*; |
| | | import static org.opends.server.util.CollectionUtils.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | |
| | | { |
| | | final ReplicationDomainDB domainDB = replicationServer.getChangelogDB().getReplicationDomainDB(); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, csn.getServerId(), null, PositionStrategy.ON_MATCHING_KEY); |
| | | domainDB.getCursorFrom(baseDN, csn.getServerId(), csn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | try { |
| | | assertTrue(cursor.next(), |
| | | "Expected to be to find at least one change in replicaDB(" + baseDN + " " + csn.getServerId() + ")"); |
| | |
| | | Object[][] recordsForSeek() |
| | | { |
| | | Object[][] data = new Object[][] { |
| | | // records, key, findNearest, expectedRecord, expectedFound |
| | | // records, key, key matching strategy, position strategy, expectedRecord, should be found ? |
| | | |
| | | // no record |
| | | { records(), 1, false, null, false }, |
| | | { records(), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | // 1 record exact find |
| | | { records(1), 1, false, record(1), true }, |
| | | // 1 record |
| | | { records(1), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 1 record nearest find |
| | | { records(1), 1, true, null, true }, |
| | | { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true }, |
| | | { records(1), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 2 records |
| | | { records(1,2), 1, false, record(1), true }, |
| | | { records(1,2), 2, false, record(2), true }, |
| | | { records(1,2), 1, true, record(2), true }, |
| | | { records(1,2), 2, true, null, true }, |
| | | // 3 records equal matching |
| | | { records(1,2,3), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 2, EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 3, EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 4, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | // 3 records exact find |
| | | { records(1,2,3), 0, false, null, false }, |
| | | { records(1,2,3), 1, false, record(1), true }, |
| | | { records(1,2,3), 2, false, record(2), true }, |
| | | { records(1,2,3), 3, false, record(3), true }, |
| | | { records(1,2,3), 4, false, null, false }, |
| | | { records(1,2,3), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 2, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 3, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 3 records nearest find |
| | | { records(1,2,3), 0, true, record(1), true }, |
| | | { records(1,2,3), 1, true, record(2), true }, |
| | | { records(1,2,3), 2, true, record(3), true }, |
| | | { records(1,2,3), 3, true, null, true }, |
| | | { records(1,2,3), 4, true, null, false }, |
| | | // 3 records less than or equal matching |
| | | { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | |
| | | // 10 records exact find |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, false, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, false, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, false, record(5), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, false, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, false, null, false }, |
| | | { records(1,2,3), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 2, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 3, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3), 4, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 10 records nearest find |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, true, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, true, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, true, record(6), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, true, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, true, null, false }, |
| | | // 3 records greater or equal matching |
| | | { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(3), true }, |
| | | |
| | | { records(1,2,3), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3), 2, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(3), true }, |
| | | { records(1,2,3), 3, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3), 4, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | // 10 records equal matching |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | // 10 records less than or equal matching |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | |
| | | // 10 records greater or equal matching |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(5), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, record(10), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, null, false }, |
| | | |
| | | { records(1,2,3,4,5,6,7,8,9,10), 0, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(1), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 1, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(2), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 5, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(6), true }, |
| | | { records(1,2,3,4,5,7,8,9,10), 6, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, record(7), true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 10, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, true }, |
| | | { records(1,2,3,4,5,6,7,8,9,10), 11, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, null, false }, |
| | | |
| | | |
| | | }; |
| | | |
| | | // For each test case, do a test with various block sizes to ensure algorithm is not broken |
| | | // on a given size |
| | | int[] sizes = new int[] { 500, 100, 50, 30, 25, 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10 }; |
| | | Object[][] finalData = new Object[sizes.length*data.length][6]; |
| | | Object[][] finalData = new Object[sizes.length * data.length][7]; |
| | | for (int i = 0; i < data.length; i++) |
| | | { |
| | | for (int j = 0; j < sizes.length; j++) |
| | | { |
| | | Object[] a = data[i]; |
| | | // add the block size at beginning of each test case |
| | | finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4]}; |
| | | finalData[sizes.length*i+j] = new Object[] { sizes[j], a[0], a[1], a[2], a[3], a[4], a[5]}; |
| | | } |
| | | } |
| | | return finalData; |
| | | } |
| | | |
| | | @Test(dataProvider="recordsForSeek") |
| | | public void testSeek(int blockSize, List<Record<Integer, Integer>> records, int key, boolean findNearest, |
| | | Record<Integer, Integer> expectedRecord, boolean expectedFound) throws Exception |
| | | public void testSeekToRecord(int blockSize, List<Record<Integer, Integer>> records, int key, |
| | | KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, Record<Integer, Integer> expectedRecord, |
| | | boolean shouldBeFound) throws Exception |
| | | { |
| | | writeRecords(blockSize, records); |
| | | |
| | |
| | | try |
| | | { |
| | | reader = newReader(blockSize); |
| | | KeyMatchingStrategy matchStrategy = |
| | | findNearest ? KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY : KeyMatchingStrategy.EQUAL_TO_KEY; |
| | | PositionStrategy posStrategy = |
| | | findNearest ? PositionStrategy.AFTER_MATCHING_KEY : PositionStrategy.ON_MATCHING_KEY; |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchStrategy, posStrategy); |
| | | Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchingStrategy, positionStrategy); |
| | | |
| | | assertThat(result.getFirst()).isEqualTo(expectedFound); |
| | | assertThat(result.getFirst()).isEqualTo(shouldBeFound); |
| | | assertThat(result.getSecond()).isEqualTo(expectedRecord); |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | |
| | | /** Write provided records. */ |
| | | /** Write provided records with the provided block size. */ |
| | | private void writeRecords(int blockSize, List<Record<Integer, Integer>> records) throws ChangelogException |
| | | { |
| | | BlockLogWriter<Integer, Integer> writer = null; |
| | |
| | | @SuppressWarnings("javadoc") |
| | | public class FileChangeNumberIndexDBTest extends ReplicationTestCase |
| | | { |
| | | private final MultiDomainServerState previousCookie = new MultiDomainServerState(); |
| | | private final List<String> cookies = new ArrayList<String>(); |
| | | |
| | | @BeforeMethod |
| | | public void clearCookie() |
| | | { |
| | | previousCookie.clear(); |
| | | cookies.clear(); |
| | | } |
| | | |
| | | @DataProvider(name = "messages") |
| | | Object[][] createMessages() throws Exception |
| | | { |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | DN dn1 = DN.decode("o=baseDN1"); |
| | | previousCookie.update(dn1, csns[0]); |
| | | return new Object[][] { |
| | | { new ChangeNumberIndexRecord(0L, previousCookie.toString(), DN.decode("o=baseDN1"), csns[1]) }, |
| | | { new ChangeNumberIndexRecord(999L, previousCookie.toString(), DN.decode("o=baseDN1"), csns[2]) }, |
| | | { new ChangeNumberIndexRecord(0L, dn1, csns[1]) }, |
| | | { new ChangeNumberIndexRecord(999L, dn1, csns[2]) }, |
| | | }; |
| | | } |
| | | |
| | |
| | | assertThat(record.getKey()).isEqualTo(msg.getChangeNumber()); |
| | | assertThat(record.getValue().getBaseDN()).isEqualTo(msg.getBaseDN()); |
| | | assertThat(record.getValue().getCSN()).isEqualTo(msg.getCSN()); |
| | | assertThat(record.getValue().getPreviousCookie()).isEqualTo(msg.getPreviousCookie()); |
| | | } |
| | | |
| | | @Test() |
| | |
| | | assertEquals(cnIndexDB.count(), 3, "Db count"); |
| | | assertFalse(cnIndexDB.isEmpty()); |
| | | |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0)); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1)); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2)); |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1); |
| | | assertCursorReadsInOrder(cursor, cn1, cn2, cn3); |
| | | |
| | |
| | | try |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0)); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1)); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2)); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3); |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | |
| | | |
| | | private long addRecord(FileChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException |
| | | { |
| | | final String cookie = previousCookie.toString(); |
| | | cookies.add(cookie); |
| | | final long changeNumber = cnIndexDB.addRecord( |
| | | new ChangeNumberIndexRecord(cookie, baseDN, csn)); |
| | | previousCookie.update(baseDN, csn); |
| | | return changeNumber; |
| | | return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | } |
| | | |
| | | private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie) |
| | | private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN) |
| | | { |
| | | assertEquals(record.getCSN(), csn); |
| | | assertEquals(record.getBaseDN(), baseDN); |
| | | assertEquals(record.getPreviousCookie(), cookie); |
| | | } |
| | | |
| | | private FileChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException |
| | |
| | | final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord(); |
| | | assertEquals(oldest.getChangeNumber(), newestChangeNumber); |
| | | assertEquals(oldest.getChangeNumber(), newest.getChangeNumber()); |
| | | assertEquals(oldest.getPreviousCookie(), newest.getPreviousCookie()); |
| | | assertEquals(oldest.getBaseDN(), newest.getBaseDN()); |
| | | assertEquals(oldest.getCSN(), newest.getCSN()); |
| | | } |
| | |
| | | return new ReplicationServer(cfg); |
| | | } |
| | | |
| | | private String getPreviousCookie(FileChangeNumberIndexDB cnIndexDB, |
| | | long changeNumber) throws Exception |
| | | { |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber); |
| | | try |
| | | { |
| | | cursor.next(); |
| | | return cursor.getRecord().getPreviousCookie(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor, |
| | | long... cns) throws ChangelogException |
| | | { |
| | |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.DN; |
| | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | |
| | | |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], AFTER_MATCHING_KEY); |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], |
| | | GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | |
| | | int[] indicesToAdd = new int[] { 0, 1, 2, 4 }; |
| | |
| | | final PositionStrategy positionStrategy, final CSN expectedCSN) |
| | | throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | |
| | | private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | |
| | | private void assertFoundInOrder(FileReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy); |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor; |
| | | import org.opends.server.types.ByteSequenceReader; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.ByteStringBuilder; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | | @Test(sequential=true) |
| | |
| | | } |
| | | |
| | | @BeforeMethod |
| | | /** Create a new log file with ten records starting from (key1, value1) until (key10, value10). */ |
| | | /** |
| | | * Create a new log file with ten records starting from (key01, value1) until (key10, value10). |
| | | * So log contains keys "key01", "key02", ..., "key10" |
| | | */ |
| | | public void initialize() throws Exception |
| | | { |
| | | if (TEST_LOG_FILE.exists()) |
| | |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnExistingKey() throws Exception |
| | | @DataProvider |
| | | Object[][] cursorPositionTo() |
| | | { |
| | | return new Object[][] { |
| | | // key to position to, key matching strategy, position strategy, position is found ?, |
| | | // expected start index of cursor (use -1 if cursor should be exhausted), expected end index of cursor |
| | | |
| | | // equal |
| | | { "key00", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | { "key02", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10}, |
| | | { "key05", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key050", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | { "key07", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10}, |
| | | { "key10", EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | { "key11", EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | { "key00", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | { "key02", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10}, |
| | | { "key05", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | { "key07", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10}, |
| | | { "key10", EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | { "key11", EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | // less than or equal |
| | | |
| | | // key00 is a special case : position is not found but cursor is positioned on beginning |
| | | // so it is possible to iterate on it from start to end |
| | | { "key00", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, 1, 10}, |
| | | { "key02", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10}, |
| | | { "key05", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key07", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10}, |
| | | { "key10", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | { "key11", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | |
| | | // key00 is a special case : position is not found but cursor is positioned on beginning |
| | | // so it is possible to iterate on it from 2 to end |
| | | { "key00", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, 2, 10}, |
| | | { "key02", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10}, |
| | | { "key05", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key07", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10}, |
| | | { "key10", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | { "key11", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | |
| | | // greater than or equal |
| | | { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 1, 10}, |
| | | { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 2, 10}, |
| | | { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 5, 10}, |
| | | { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 6, 10}, |
| | | { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 7, 10}, |
| | | { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, true, 10, 10}, |
| | | { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | { "key00", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 1, 10}, |
| | | { "key02", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 3, 10}, |
| | | { "key05", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 6, 10}, |
| | | { "key07", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, 8, 10}, |
| | | { "key10", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, true, -1, -1}, |
| | | { "key11", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, false, -1, -1}, |
| | | |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Test cursor positioning for a given key, matching strategy and position strategy. |
| | | * Cursor is fully read from the expected starting index to the expected end index, unless it is expected |
| | | * to be directly exhausted. |
| | | */ |
| | | @Test(dataProvider="cursorPositionTo") |
| | | public void testCursorPositionTo(String key, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy, |
| | | boolean positionShouldBeFound, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | LogFileCursor<String, String> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor("key05"); |
| | | cursor = changelog.getCursor(); |
| | | boolean success = cursor.positionTo(key, matchingStrategy, positionStrategy); |
| | | |
| | | assertThatCursorCanBeFullyRead(cursor, 5, 10); |
| | | assertThat(success).isEqualTo(positionShouldBeFound); |
| | | if (cursorShouldStartAt >= 0) |
| | | { |
| | | assertThatCursorCanBeFullyRead(cursor, cursorShouldStartAt, cursorShouldEndAt); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor("key"); |
| | | |
| | | assertThat(cursor).isNotNull(); |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThat(cursor.next()).isFalse(); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getCursor(null); |
| | | |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnExistingKey() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getNearestCursor("key01"); |
| | | |
| | | assertThatCursorCanBeFullyRead(cursor, 2, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getNearestCursor("key00"); |
| | | |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenANullKey() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = changelog.getNearestCursor(null); |
| | | |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, changelog); |
| | | } |
| | | } |
| | | |
| | | @Test(expectedExceptions=ChangelogException.class) |
| | | public void testCursorWhenParserFailsToRead() throws Exception |
| | | { |
| | | FailingStringRecordParser parser = new FailingStringRecordParser(); |
| | | LogFile<String, String> changelog = getLogFile(parser); |
| | | parser.setFailToRead(true); |
| | | try { |
| | | changelog.getCursor("key"); |
| | | } |
| | | finally { |
| | | StaticUtils.close(changelog); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGetOldestRecord() throws Exception |
| | | { |
| | | LogFile<String, String> changelog = getLogFile(RECORD_PARSER); |
| | |
| | | logFile.append(Record.from(String.format("key%02d", 11), "value"+ 11)); |
| | | |
| | | // ensure log can be fully read including the new record |
| | | cursor = logFile.getCursor("key05"); |
| | | assertThatCursorCanBeFullyRead(cursor, 5, 11); |
| | | cursor = logFile.getCursor(); |
| | | assertThatCursorCanBeFullyRead(cursor, 1, 11); |
| | | } |
| | | finally |
| | | { |
| | |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.file.LogFileTest.*; |
| | | |
| | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.file.LogFileTest.FailingStringRecordParser; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeMethod; |
| | |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnExistingKey() throws Exception |
| | | @DataProvider |
| | | Object[][] cursorData() |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null; |
| | | try { |
| | | // this key is the first key of the log file "key1_key2.log" |
| | | cursor1 = log.getNearestCursor("key001", AFTER_MATCHING_KEY); |
| | | assertThatCursorCanBeFullyReadFromStart(cursor1, 2, 10); |
| | | return new Object[][] { |
| | | // 3 first values are input data : key to position to, key matching strategy, position strategy, |
| | | // 2 last values are expected output : |
| | | // first index of cursor (-1 if cursor should be exhausted), last index of cursor |
| | | { "key000", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { "key001", EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key004", EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 }, |
| | | { "key0050", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { "key009", EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 }, |
| | | { "key010", EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | { "key011", EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | |
| | | // this key is the last key of the log file "key3_key4.log" |
| | | cursor2 = log.getNearestCursor("key004", AFTER_MATCHING_KEY); |
| | | assertThatCursorCanBeFullyReadFromStart(cursor2, 5, 10); |
| | | { "key000", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key001", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 }, |
| | | { "key004", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 }, |
| | | { "key0050", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key009", EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 }, |
| | | { "key010", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key011", EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | |
| | | cursor3 = log.getNearestCursor("key009", AFTER_MATCHING_KEY); |
| | | assertThatCursorCanBeFullyReadFromStart(cursor3, 10, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor1, cursor2, cursor3, log); |
| | | } |
| | | { "key000", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { "key001", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key004", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 }, |
| | | { "key005", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 }, |
| | | { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 5, 10 }, |
| | | { "key006", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 }, |
| | | { "key009", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 }, |
| | | { "key010", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | { "key011", LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | |
| | | { "key000", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key001", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 }, |
| | | { "key004", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 }, |
| | | { "key005", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 }, |
| | | { "key0050", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 }, |
| | | { "key006", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 7, 10 }, |
| | | { "key009", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 }, |
| | | { "key010", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key011", LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 10 }, |
| | | { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 4, 10 }, |
| | | { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 6, 10 }, |
| | | { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 9, 10 }, |
| | | { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 10, 10 }, |
| | | { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { "key000", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 10 }, |
| | | { "key001", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 10 }, |
| | | { "key004", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 5, 10 }, |
| | | { "key0050", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 6, 10 }, |
| | | { "key009", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 10, 10 }, |
| | | { "key010", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { "key011", GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | }; |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnExistingKey_KeyIsTheLastOne() throws Exception |
| | | @Test(dataProvider="cursorData") |
| | | public void testCursorWithStrategies(String key, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, int cursorShouldStartAt, int cursorShouldEndAt) throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getNearestCursor("key010", AFTER_MATCHING_KEY); |
| | | cursor = log.getCursor(key, matchingStrategy, positionStrategy); |
| | | |
| | | // lowest higher key does not exist |
| | | if (cursorShouldStartAt != -1) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, cursorShouldStartAt, cursorShouldEndAt); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenAnUnexistingKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | // key is between key005 and key006 |
| | | cursor = log.getNearestCursor("key005000", AFTER_MATCHING_KEY); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 6, 10); |
| | | } |
| | | finally { |
| | | StaticUtils.close(cursor, log); |
| | |
| | | } |
| | | |
| | | @Test |
| | | public void testNearestCursorWhenGivenANullKey() throws Exception |
| | | public void testCursorMatchingAnyPositioningAnyWhenGivenANullKey() throws Exception |
| | | { |
| | | Log<String, String> log = openLog(LogFileTest.RECORD_PARSER); |
| | | DBCursor<Record<String, String>> cursor = null; |
| | | try { |
| | | cursor = log.getNearestCursor(null, null); |
| | | cursor = log.getCursor(null, null, null); |
| | | |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10); |
| | | } |
| | |
| | | /** |
| | | * Read the cursor until exhaustion, beginning at start of cursor. |
| | | */ |
| | | private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, int endIndex) |
| | | throws Exception |
| | | private void assertThatCursorCanBeFullyReadFromStart(DBCursor<Record<String, String>> cursor, int fromIndex, |
| | | int endIndex) throws Exception |
| | | { |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThatCursorCanBeFullyRead(cursor, fromIndex, endIndex); |
| | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Matchers.*; |
| | | import static org.mockito.Mockito.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | /** |
| | |
| | | private Map<DN, ServerState> domainNewestCSNs; |
| | | private ECLEnabledDomainPredicate predicate; |
| | | private ChangeNumberIndexer cnIndexer; |
| | | private MultiDomainServerState initialCookie; |
| | | |
| | | @BeforeClass |
| | | public static void classSetup() throws Exception |
| | |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | | domainDBCursors = new HashMap<DN, DomainDBCursor>(); |
| | | domainNewestCSNs = new HashMap<DN, ServerState>(); |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(multiDomainCursor); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class), |
| | | eq(LESS_THAN_OR_EQUAL_TO_KEY), eq(AFTER_MATCHING_KEY))).thenReturn(multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | stopCNIndexer(); |
| | | } |
| | | |
| | | private static final String EMPTY_DB_NO_DS = "emptyDBNoDS"; |
| | | private static final String NO_DS = "noDS"; |
| | | |
| | | @Test |
| | | public void emptyDBNoDS() throws Exception |
| | | public void noDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneDS() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void oneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBOneDS() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | addReplica(BASE_DN1, serverId1); |
| | | setCNIndexDBInitialRecords(msg1); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2); |
| | | publishUpdateMsg(msg2); |
| | | assertExternalChangelogContent(msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSs() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDifferentDomains() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsDifferentDomains() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | * CompositeDBCursor currentRecord == Upd2.<li> |
| | | * </ol> |
| | | */ |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsDoesNotLoseChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1, msg2, msg3); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void nonEmptyDBTwoDSs() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1); |
| | | final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2); |
| | | addReplica(BASE_DN1, serverId1); |
| | | addReplica(BASE_DN1, serverId2); |
| | | setCNIndexDBInitialRecords(msg1, msg2); |
| | | startCNIndexer(); |
| | | assertExternalChangelogContent(); |
| | | |
| | | final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3); |
| | | final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4); |
| | | publishUpdateMsg(msg3, msg4); |
| | | assertExternalChangelogContent(msg3); |
| | | |
| | | final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5); |
| | | publishUpdateMsg(msg5); |
| | | assertExternalChangelogContent(msg3); |
| | | |
| | | final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6); |
| | | publishUpdateMsg(msg6); |
| | | assertExternalChangelogContent(msg3, msg4, msg5); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneSendsNoUpdatesForSomeTime() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1Sid2, msg2Sid1); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void threeDSsOneIsNotECLEnabledDomain() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(ADMIN_DATA_DN, serverId1); |
| | |
| | | assertExternalChangelogContent(msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void oneInitialDSAnotherDSJoining() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void oneInitialDSAnotherDSJoining2() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneSendingHeartbeats() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1, msg2); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneGoingOffline() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneGoingOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg1, msg2, msg4, msg5); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyOffline() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneInitiallyOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | * <li>RS starts</li> |
| | | * </ol> |
| | | */ |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneInitiallyWithChangesThenOffline() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | * <li>RS starts</li> |
| | | * </ol> |
| | | */ |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | assertExternalChangelogContent(msg2, msg3, msg4); |
| | | } |
| | | |
| | | @Test(dependsOnMethods = { EMPTY_DB_NO_DS }) |
| | | public void emptyDBTwoDSsOneKilled() throws Exception |
| | | @Test(dependsOnMethods = { NO_DS }) |
| | | public void twoDSsOneKilled() throws Exception |
| | | { |
| | | eclEnabledDomains = Arrays.asList(BASE_DN1); |
| | | addReplica(BASE_DN1, serverId1); |
| | |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY); |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(domainDBCursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(LESS_THAN_OR_EQUAL_TO_KEY), |
| | | eq(AFTER_MATCHING_KEY))).thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(replicaDBCursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(LESS_THAN_OR_EQUAL_TO_KEY), |
| | | eq(AFTER_MATCHING_KEY))).thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber, |
| | | String previousCookie, UpdateMsg msg) throws ChangelogException |
| | | MultiDomainServerState previousCookie, UpdateMsg msg) throws ChangelogException |
| | | { |
| | | // avoid problems with ChangelogBackend initialization |
| | | } |
| | |
| | | return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true); |
| | | } |
| | | |
| | | private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception |
| | | { |
| | | // Initialize the previous cookie that will be used to compare the records |
| | | // added to the CNIndexDB at the end of this test |
| | | for (int i = 0; i < msgs.length; i++) |
| | | { |
| | | ReplicatedUpdateMsg msg = msgs[i]; |
| | | if (i + 1 == msgs.length) |
| | | { |
| | | final ReplicatedUpdateMsg newestMsg = msg; |
| | | final DN baseDN = newestMsg.getBaseDN(); |
| | | final CSN csn = newestMsg.getCSN(); |
| | | when(cnIndexDB.getNewestRecord()).thenReturn( |
| | | new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn)); |
| | | final SequentialDBCursor cursor = |
| | | replicaDBCursors.get(Pair.of(baseDN, csn.getServerId())); |
| | | cursor.add(newestMsg); |
| | | } |
| | | initialCookie.update(msg.getBaseDN(), msg.getCSN()); |
| | | } |
| | | } |
| | | |
| | | private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception |
| | | { |
| | | for (ReplicatedUpdateMsg msg : msgs) |
| | |
| | | verify(cnIndexDB, atLeast(0)).addRecord(arg.capture()); |
| | | final List<ChangeNumberIndexRecord> allValues = arg.getAllValues(); |
| | | |
| | | // clone initial state to avoid modifying it |
| | | final MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(initialCookie.toString()); |
| | | // check it was not called more than expected |
| | | String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">"; |
| | | assertThat(allValues).as(desc1).hasSize(expectedMsgs.length); |
| | |
| | | String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">"; |
| | | assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN()); |
| | | assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN()); |
| | | assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString()); |
| | | previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN()); |
| | | } |
| | | } |
| | | |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Mockito.*; |
| | |
| | | { |
| | | TestCaseUtils.startFakeServer(); |
| | | MockitoAnnotations.initMocks(this); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, ON_MATCHING_KEY); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | eclCursor = new ECLMultiDomainDBCursor(predicate, multiDomainCursor); |
| | | } |
| | | |
| | |
| | | private void addDomainCursorToCursor(DN baseDN, SequentialDBCursor cursor) throws ChangelogException |
| | | { |
| | | final ServerState state = new ServerState(); |
| | | when(domainDB.getCursorFrom(baseDN, state, ON_MATCHING_KEY)).thenReturn(cursor); |
| | | when(domainDB.getCursorFrom(baseDN, state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY)).thenReturn(cursor); |
| | | multiDomainCursor.addDomain(baseDN, state); |
| | | } |
| | | } |
| | |
| | | { |
| | | private final MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(); |
| | | private final List<String> cookies = new ArrayList<String>(); |
| | | |
| | | private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation; |
| | | |
| | |
| | | setReplicationDBImplementation(previousDBImpl); |
| | | } |
| | | |
| | | @BeforeMethod |
| | | public void clearCookie() |
| | | { |
| | | previousCookie.clear(); |
| | | cookies.clear(); |
| | | } |
| | | |
| | | /** |
| | | * This test makes basic operations of a JEChangeNumberIndexDB: |
| | | * <ol> |
| | |
| | | try |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0)); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1)); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2)); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3); |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | |
| | | |
| | | private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException |
| | | { |
| | | final String cookie = previousCookie.toString(); |
| | | cookies.add(cookie); |
| | | final long changeNumber = cnIndexDB.addRecord( |
| | | new ChangeNumberIndexRecord(cookie, baseDN, csn)); |
| | | previousCookie.update(baseDN, csn); |
| | | return changeNumber; |
| | | return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | } |
| | | |
| | | private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie) |
| | | private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN) |
| | | { |
| | | assertEquals(record.getCSN(), csn); |
| | | assertEquals(record.getBaseDN(), baseDN); |
| | | assertEquals(record.getPreviousCookie(), cookie); |
| | | } |
| | | |
| | | private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException |
| | |
| | | assertEquals(cnIndexDB.count(), 3, "Db count"); |
| | | assertFalse(cnIndexDB.isEmpty()); |
| | | |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0)); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1)); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2)); |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1); |
| | | assertCursorReadsInOrder(cursor, cn1, cn2, cn3); |
| | | |
| | |
| | | final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord(); |
| | | assertEquals(oldest.getChangeNumber(), newestChangeNumber); |
| | | assertEquals(oldest.getChangeNumber(), newest.getChangeNumber()); |
| | | assertEquals(oldest.getPreviousCookie(), newest.getPreviousCookie()); |
| | | assertEquals(oldest.getBaseDN(), newest.getBaseDN()); |
| | | assertEquals(oldest.getCSN(), newest.getCSN()); |
| | | } |
| | |
| | | return new ReplicationServer(cfg); |
| | | } |
| | | |
| | | private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB, |
| | | long changeNumber) throws Exception |
| | | { |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber); |
| | | try |
| | | { |
| | | cursor.next(); |
| | | return cursor.getRecord().getPreviousCookie(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor, |
| | | long... cns) throws ChangelogException |
| | | { |
| | |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.List; |
| | | |
| | | import org.assertj.core.api.SoftAssertions; |
| | | import org.opends.server.TestCaseUtils; |
| | |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | |
| | | private DN TEST_ROOT_DN; |
| | | |
| | | private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation; |
| | | private ReplicationServer replicationServer; |
| | | private JEReplicaDB replicaDB; |
| | | |
| | | @BeforeClass |
| | | public void setDBImpl() |
| | |
| | | TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING); |
| | | } |
| | | |
| | | @Test |
| | | public void testGenerateCursorFrom() throws Exception |
| | | @DataProvider |
| | | Object[][] cursorData() |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEReplicaDB replicaDB = null; |
| | | // create 7 csns |
| | | final CSN[] sevenCsns = generateCSNs(1, System.currentTimeMillis(), 7); |
| | | CSN beforeCsn = sevenCsns[0]; |
| | | CSN middleCsn = sevenCsns[3]; // will be between csns[1] and csns[2] |
| | | CSN afterCsn = sevenCsns[6]; |
| | | |
| | | // but use only 4 of them for update msg |
| | | // beforeCsn, middleCsn and afterCsn are not used |
| | | // in order to test cursor generation from a key not present in the log (before, in the middle, after) |
| | | final List<CSN> usedCsns = new ArrayList<CSN>(Arrays.asList(sevenCsns)); |
| | | usedCsns.remove(beforeCsn); |
| | | usedCsns.remove(middleCsn); |
| | | usedCsns.remove(afterCsn); |
| | | final CSN[] csns = usedCsns.toArray(new CSN[4]); |
| | | |
| | | return new Object[][] { |
| | | // equal matching |
| | | { csns, beforeCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { csns, csns[0], EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 }, |
| | | { csns, csns[1], EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 }, |
| | | { csns, middleCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { csns, csns[2], EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 }, |
| | | { csns, csns[3], EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 }, |
| | | { csns, afterCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { csns, beforeCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { csns, csns[0], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 }, |
| | | { csns, csns[1], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 }, |
| | | { csns, middleCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { csns, csns[2], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 }, |
| | | { csns, csns[3], EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { csns, afterCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | |
| | | // less than or equal matching |
| | | { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 }, |
| | | { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 }, |
| | | { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 }, |
| | | { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 }, |
| | | { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 }, |
| | | { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 }, |
| | | |
| | | { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 }, |
| | | { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 }, |
| | | { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 }, |
| | | { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 }, |
| | | { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | |
| | | // greater than or equal matching |
| | | { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 }, |
| | | { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 }, |
| | | { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 }, |
| | | { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 }, |
| | | { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 }, |
| | | { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 }, |
| | | { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 }, |
| | | |
| | | { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 0, 3 }, |
| | | { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 }, |
| | | { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 }, |
| | | { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 }, |
| | | { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 }, |
| | | { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 }, |
| | | { null, null, null, null, -1, -1 } // stop line |
| | | }; |
| | | } |
| | | |
| | | /** |
| | | * Test the cursor with all acceptable strategies combination. |
| | | * Creation of a replication server is costly so it is created only once on first test and cleaned after the |
| | | * last test using the stop line in data to do so. |
| | | */ |
| | | @Test(dataProvider="cursorData") |
| | | public void testGenerateCursor(CSN[] csns, CSN startCsn, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, int startIndex, int endIndex) throws Exception |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | { |
| | | if (replicationServer == null) |
| | | { |
| | | // initialize only once |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns)); |
| | | csns2.remove(csns[3]); |
| | | |
| | | for (CSN csn : csns2) |
| | | for (CSN csn : csns) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn); |
| | | } |
| | | assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]); |
| | | |
| | | for (int i = 0; i < csns2.size() - 1; i++) |
| | | if (csns == null) |
| | | { |
| | | assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1)); |
| | | return; // stop line, time to clean replication artefacts |
| | | } |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(startCsn, matchingStrategy, positionStrategy); |
| | | if (startIndex != -1) |
| | | { |
| | | assertThatCursorCanBeFullyReadFromStart(cursor, csns, startIndex, endIndex); |
| | | } |
| | | else |
| | | { |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | if (csns == null) |
| | | { |
| | | // stop line, stop and remove replication |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | void testTrim() throws Exception |
| | | public void testTrim() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEReplicaDB replicaDB = null; |
| | |
| | | } |
| | | } |
| | | |
| | | private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy, final CSN expectedCSN) |
| | | throws ChangelogException |
| | | private void advanceCursorUpTo(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | try |
| | | for (int i = startIndex; i <= endIndex; i++) |
| | | { |
| | | assertThat(cursor.next()).as("next() value when i=" + i).isTrue(); |
| | | assertThat(cursor.getRecord().getCSN()).isEqualTo(csns[i]); |
| | | } |
| | | } |
| | | |
| | | private void assertThatCursorIsExhausted(DBCursor<UpdateMsg> cursor) throws Exception |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | |
| | | private void assertThatCursorCanBeFullyRead(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex) |
| | | throws Exception |
| | | { |
| | | close(cursor); |
| | | advanceCursorUpTo(cursor, csns, startIndex, endIndex); |
| | | assertThatCursorIsExhausted(cursor); |
| | | } |
| | | |
| | | private void assertThatCursorCanBeFullyReadFromStart(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, |
| | | int endIndex) throws Exception |
| | | { |
| | | assertThat(cursor.getRecord()).isNull(); |
| | | assertThatCursorCanBeFullyRead(cursor, csns, startIndex, endIndex); |
| | | } |
| | | |
| | | private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | |
| | | * optimize the oldest and newest records in the replication changelog db. |
| | | */ |
| | | @Test(groups = { "opendj-256" }) |
| | | void testGetOldestNewestCSNs() throws Exception |
| | | public void testGetOldestNewestCSNs() throws Exception |
| | | { |
| | | // It's worth testing with 2 different setting for counterRecord |
| | | // - a counter record is put every 10 Update msg in the db - just a unit |
| | |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy); |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy); |
| | | try |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |