| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; |
| | |
| | | try |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | new MultiDomainServerState(), GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, getExcludedBaseDNs()); |
| | | new MultiDomainServerState(), options, getExcludedBaseDNs()); |
| | | try |
| | | { |
| | | baseEntryHasSubordinates = cursor.next(); |
| | |
| | | try |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | entrySender.cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, entrySender.excludedBaseDNs); |
| | | entrySender.cookie, options, entrySender.excludedBaseDNs); |
| | | replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | |
| | | if (sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor)) |
| | |
| | | try |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | MultiDomainDBCursor cursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, |
| | | LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, options); |
| | | eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | eclCursor.next(); |
| | | cookie.update(eclCursor.toCookie()); |
| | |
| | | |
| | | // No need for ECLMultiDomainDBCursor in this case |
| | | // as updateMsg will be matched with cnIndexRecord |
| | | CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | final MultiDomainDBCursor replicaUpdatesCursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, options); |
| | | replicaUpdatesCursor.next(); |
| | | return replicaUpdatesCursor; |
| | | } |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.Attribute; |
| | | import org.opends.server.types.Attributes; |
| | |
| | | * @return a non null {@link DBCursor} going from oldest to newest CSN |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy) |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, CursorOptions) |
| | | */ |
| | | public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | { |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState, options); |
| | | } |
| | | |
| | | /** |
| | |
| | | package org.opends.server.replication.server.changelog.api; |
| | | |
| | | import java.io.Closeable; |
| | | import java.util.Objects; |
| | | |
| | | import org.opends.server.replication.common.CSN; |
| | | |
| | | /** |
| | | * Generic cursor interface into the changelog database. Once it is not used |
| | |
| | | AFTER_MATCHING_KEY |
| | | } |
| | | |
| | | /** Options to create a cursor. */ |
| | | public static final class CursorOptions |
| | | { |
| | | private final KeyMatchingStrategy keyMatchingStrategy; |
| | | private final PositionStrategy positionStrategy; |
| | | private final CSN defaultCSN; |
| | | |
| | | /** |
| | | * Creates options with provided strategies. |
| | | * |
| | | * @param keyMatchingStrategy |
| | | * The key matching strategy |
| | | * @param positionStrategy |
| | | * The position strategy |
| | | */ |
| | | public CursorOptions(KeyMatchingStrategy keyMatchingStrategy, PositionStrategy positionStrategy) |
| | | { |
| | | this(keyMatchingStrategy, positionStrategy, null); |
| | | } |
| | | |
| | | /** |
| | | * Creates options with provided strategies and default CSN. |
| | | * |
| | | * @param keyMatchingStrategy |
| | | * The key matching strategy |
| | | * @param positionStrategy |
| | | * The position strategy |
| | | * @param defaultCSN |
| | | * When creating a replica DB Cursor, this is the default CSN to |
| | | * use for replicas which do not have an associated CSN |
| | | */ |
| | | public CursorOptions(KeyMatchingStrategy keyMatchingStrategy, PositionStrategy positionStrategy, CSN defaultCSN) |
| | | { |
| | | this.keyMatchingStrategy = keyMatchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | this.defaultCSN = defaultCSN; |
| | | } |
| | | |
| | | /** |
| | | * Returns the key matching strategy. |
| | | * |
| | | * @return the key matching strategy |
| | | */ |
| | | public KeyMatchingStrategy getKeyMatchingStrategy() |
| | | { |
| | | return keyMatchingStrategy; |
| | | } |
| | | |
| | | /** |
| | | * Returns the position strategy. |
| | | * |
| | | * @return the position strategy |
| | | */ |
| | | public PositionStrategy getPositionStrategy() |
| | | { |
| | | return positionStrategy; |
| | | } |
| | | |
| | | /** |
| | | * Returns the default CSN. |
| | | * |
| | | * @return the default CSN |
| | | */ |
| | | public CSN getDefaultCSN() |
| | | { |
| | | return defaultCSN; |
| | | } |
| | | |
| | | @Override |
| | | public boolean equals(Object obj) |
| | | { |
| | | if (this == obj) { |
| | | return true; |
| | | } |
| | | if (obj instanceof CursorOptions) { |
| | | CursorOptions other = (CursorOptions) obj; |
| | | return keyMatchingStrategy == other.keyMatchingStrategy |
| | | && positionStrategy == other.positionStrategy |
| | | && Objects.equals(defaultCSN, other.defaultCSN); |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | @Override |
| | | public int hashCode() |
| | | { |
| | | final int prime = 31; |
| | | int result = 1; |
| | | result = prime * result + ((keyMatchingStrategy == null) ? 0 : keyMatchingStrategy.hashCode()); |
| | | result = prime * result + ((positionStrategy == null) ? 0 : positionStrategy.hashCode()); |
| | | result = prime * result + ((defaultCSN == null) ? 0 : defaultCSN.hashCode()); |
| | | return result; |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() |
| | | + " [keyMatchingStrategy=" + keyMatchingStrategy |
| | | + ", positionStrategy=" + positionStrategy |
| | | + ", defaultCSN=" + defaultCSN + "]"; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Getter for the current record. |
| | | * |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy |
| | | * @param options The cursor options |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy) |
| | | * @see #getCursorFrom(DN, ServerState, CursorOptions) |
| | | */ |
| | | MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException; |
| | | MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, CursorOptions options) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting before, at or |
| | |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy |
| | | * @param options The cursor options |
| | | * @param excludedDomainDns |
| | | * Every domain appearing in this set is excluded from the cursor |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy) |
| | | * @see #getCursorFrom(DN, ServerState, CursorOptions) |
| | | */ |
| | | MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException; |
| | | MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, CursorOptions options, Set<DN> excludedDomainDns) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the replicaDBs for the specified |
| | |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy |
| | | * @param options The cursor options |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN, KeyMatchingStrategy, PositionStrategy) |
| | | * @see #getCursorFrom(DN, int, CSN, CursorOptions) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, CursorOptions options) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | | * replication domain and serverId starting beofre, at or after the provided |
| | | * replication domain and serverId starting before, at or after the provided |
| | | * {@link CSN}, depending on the provided matching and positioning strategies. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | |
| | | * @param startCSN |
| | | * Starting point for the ReplicaDB cursor. If the CSN is null, then |
| | | * start from the oldest CSN for this replicaDB |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy |
| | | * @param options The cursor options |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, KeyMatchingStrategy matchingStrategy, |
| | | PositionStrategy positionStrategy) throws ChangelogException; |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, CursorOptions options) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Unregisters the provided cursor from this replication domain. |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.types.DN; |
| | | |
| | |
| | | { |
| | | final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN(); |
| | | |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = |
| | | domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | domainDB.getCursorFrom(cookieWithNewestCSN, options); |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); |
| | | nextChangeForInsertDBCursor.next(); |
| | |
| | | */ |
| | | public class DomainDBCursor extends CompositeDBCursor<Void> |
| | | { |
| | | /** Replaces null CSNs in ConcurrentSkipListMap that does not support null values. */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | private final DN baseDN; |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<Integer, CSN>(); |
| | | /** |
| | | * Replaces null CSNs in ConcurrentSkipListMap that does not support null values. |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | private final KeyMatchingStrategy matchingStrategy; |
| | | private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<>(); |
| | | private final CursorOptions options; |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy, which allow to indicates how key is |
| | | * matched |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which exact |
| | | * position the cursor must start |
| | | * @param options The cursor options |
| | | */ |
| | | public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, CursorOptions options) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | this.matchingStrategy = matchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | this.options = options; |
| | | } |
| | | |
| | | /** |
| | |
| | | final int serverId = pair.getKey(); |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, matchingStrategy, positionStrategy); |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startCSN, options); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options) |
| | | throws ChangelogException |
| | | { |
| | | final Set<DN> excludedDomainDns = Collections.emptySet(); |
| | | return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); |
| | | return getCursorFrom(startState, options, excludedDomainDns); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, |
| | | final Set<DN> excludedDomainDns) throws ChangelogException |
| | | CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options) |
| | | throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, options); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final CursorOptions options) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options); |
| | | putCursor(registeredDomainCursors, baseDN, cursor); |
| | | return cursor; |
| | | } |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | CursorOptions options) throws ChangelogException |
| | | { |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | |
| | | public class MultiDomainDBCursor extends CompositeDBCursor<DN> |
| | | { |
| | | private final ReplicationDomainDB domainDB; |
| | | |
| | | private final ConcurrentSkipListMap<DN, ServerState> newDomains = |
| | | new ConcurrentSkipListMap<DN, ServerState>(); |
| | | |
| | | private final KeyMatchingStrategy matchingStrategy; |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | private final ConcurrentSkipListMap<DN, ServerState> newDomains = new ConcurrentSkipListMap<>(); |
| | | private final CursorOptions options; |
| | | |
| | | /** |
| | | * Builds a MultiDomainDBCursor instance. |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | * @param matchingStrategy |
| | | * Cursor key matching strategy |
| | | * @param positionStrategy |
| | | * Cursor position strategy |
| | | * @param options The cursor options |
| | | */ |
| | | public MultiDomainDBCursor(final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | public MultiDomainDBCursor(final ReplicationDomainDB domainDB, CursorOptions options) |
| | | { |
| | | this.domainDB = domainDB; |
| | | this.matchingStrategy = matchingStrategy; |
| | | this.positionStrategy = positionStrategy; |
| | | this.options = options; |
| | | } |
| | | |
| | | /** |
| | |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = |
| | | domainDB.getCursorFrom(baseDN, serverState, matchingStrategy, positionStrategy); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, options); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, CursorOptions options) |
| | | throws ChangelogException |
| | | { |
| | | final Set<DN> excludedDomainDns = Collections.emptySet(); |
| | | return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns); |
| | | return getCursorFrom(startState, options, excludedDomainDns); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy, |
| | | final Set<DN> excludedDomainDns) throws ChangelogException |
| | | CursorOptions options, final Set<DN> excludedDomainDns) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, options); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, CursorOptions options) |
| | | throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, options); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, CursorOptions options) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, options); |
| | | putCursor(registeredDomainCursors, baseDN, cursor); |
| | | return cursor; |
| | | } |
| | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException |
| | | CursorOptions options) throws ChangelogException |
| | | { |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; |
| | | import org.opends.server.replication.service.DSRSShutdownSync; |
| | |
| | | private void isOldestCSNForReplica(DN baseDN, CSN csn) throws Exception |
| | | { |
| | | final ReplicationDomainDB domainDB = replicationServer.getChangelogDB().getReplicationDomainDB(); |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, csn.getServerId(), csn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | CursorOptions options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, csn.getServerId(), csn, options); |
| | | try { |
| | | assertTrue(cursor.next(), |
| | | "Expected to be to find at least one change in replicaDB(" + baseDN + " " + csn.getServerId() + ")"); |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.api.ReplicaId; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.file.ChangeNumberIndexer; |
| | |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, options); |
| | | initialState = new ChangelogState(); |
| | | replicaDBCursors = new HashMap<ReplicaId, SequentialDBCursor>(); |
| | | domainDBCursors = new HashMap<DN, DomainDBCursor>(); |
| | |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class), |
| | | eq(LESS_THAN_OR_EQUAL_TO_KEY), eq(AFTER_MATCHING_KEY))).thenReturn(multiDomainCursor); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(options))).thenReturn(multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB, options); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(LESS_THAN_OR_EQUAL_TO_KEY), |
| | | eq(AFTER_MATCHING_KEY))).thenReturn(domainDBCursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(options))).thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(LESS_THAN_OR_EQUAL_TO_KEY), |
| | | eq(AFTER_MATCHING_KEY))).thenReturn(replicaDBCursor); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(options))).thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn( |
| | | getDomainNewestCSNs(baseDN)); |
| | | when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(getDomainNewestCSNs(baseDN)); |
| | | initialState.addServerIdToDomain(serverId, baseDN); |
| | | } |
| | | |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.CursorOptions; |
| | | import org.opends.server.replication.server.changelog.file.ECLEnabledDomainPredicate; |
| | | import org.opends.server.replication.server.changelog.file.ECLMultiDomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.file.MultiDomainDBCursor; |
| | |
| | | |
| | | @Mock |
| | | private ReplicationDomainDB domainDB; |
| | | private CursorOptions options; |
| | | private MultiDomainDBCursor multiDomainCursor; |
| | | private ECLMultiDomainDBCursor eclCursor; |
| | | private final Set<DN> eclEnabledDomains = new HashSet<DN>(); |
| | |
| | | } |
| | | }; |
| | | |
| | | |
| | | @BeforeMethod |
| | | public void setup() throws Exception |
| | | { |
| | | TestCaseUtils.startFakeServer(); |
| | | MockitoAnnotations.initMocks(this); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | options = new CursorOptions(GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, options); |
| | | eclCursor = new ECLMultiDomainDBCursor(predicate, multiDomainCursor); |
| | | } |
| | | |
| | |
| | | private void addDomainCursorToCursor(DN baseDN, SequentialDBCursor cursor) throws ChangelogException |
| | | { |
| | | final ServerState state = new ServerState(); |
| | | when(domainDB.getCursorFrom(baseDN, state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY)).thenReturn(cursor); |
| | | when(domainDB.getCursorFrom(baseDN, state, options)).thenReturn(cursor); |
| | | multiDomainCursor.addDomain(baseDN, state); |
| | | } |
| | | } |