Forward port of checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend
to support cn=changelog
CR-4053
Preparatory work for the changelog backend: add a new behavior to cursors, in order to be
able to start cursor at a given key instead of starting just after.
In order to avoid introducing another boolean in the methods, I created two enums to
define the behavior of a cursor.
* DBCursor class : add two enums KeyMatchingStrategy and PositionStrategy
** The new behavior corresponds to PositionStrategy.ON_MATCHING_KEY, which allow
to position on the record with the given key (while AFTER_MATCHING_KEY position
just after the record with the given key).
* ReplicationDomainDB : add PositionStrategy argument for all methods that returns a cursor
* ReplicationServerDomain : getCursorFrom(DN, ServerState) method calls underlying method
with PositionStrategy.AFTER_MATCHING_KEY
* DomainDBCursor, MultiDomainDBCursor : add PositionStrategy argument to the constructor,
pass the strategy to underlying cursors
* ChangeNumberIndexer : use AFTER_MATCHING_KEY strategy when retrieving the
cursor (no behavior change)
* JEChangelogDB : add PositionStrategy argument for all methods that returns a cursor,
but getCursorFrom(DN, int, CSN, PositionStrategy) method does NOT implement the
new ON_MATCHING_KEY strategy.
* Update of tests classes to match method signature but no new tests added for the
new behavior (ON_MATCHING_KEY) - to be done later
| | |
| | | import static org.opends.server.replication.common.ServerStatus.*; |
| | | import static org.opends.server.replication.common.StatusMachineEvent.*; |
| | | import static org.opends.server.replication.protocol.ProtocolVersion.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | |
| | | * @return a non null {@link DBCursor} going from oldest to newest CSN |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState) |
| | | * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy) |
| | | */ |
| | | public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState) |
| | | throws ChangelogException |
| | | { |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState); |
| | | return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY); |
| | | } |
| | | |
| | | /** |
| | |
| | | { |
| | | |
| | | /** |
| | | * Represents a cursor key matching strategy, which allow to choose if only |
| | | * the exact key must be found or if any key equals or higher should match. |
| | | */ |
| | | public enum KeyMatchingStrategy { |
| | | /** matches only if the exact key is found. */ |
| | | EQUAL_TO_KEY, |
| | | /** matches if the key or a greater key is found. */ |
| | | GREATER_THAN_OR_EQUAL_TO_KEY |
| | | } |
| | | |
| | | /** |
| | | * Represents a cursor positioning strategy, which allow to choose if the start point |
| | | * corresponds to the record at the provided key or the record just after the provided |
| | | * key. |
| | | */ |
| | | public enum PositionStrategy { |
| | | /** start point is on the matching key. */ |
| | | ON_MATCHING_KEY, |
| | | /** start point is after the matching key. */ |
| | | AFTER_MATCHING_KEY |
| | | } |
| | | |
| | | /** |
| | | * Getter for the current record. |
| | | * |
| | | * @return The current record. |
| | |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.DN; |
| | | |
| | |
| | | void removeDomain(DN baseDN) throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the domains starting after the |
| | | * Generates a {@link DBCursor} across all the domains starting at or after the |
| | | * provided {@link MultiDomainServerState} for each domain. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | | * cursor. |
| | | * |
| | | * @param startAfterState |
| | | * @param startState |
| | | * Starting point for each domain cursor. If any {@link ServerState} |
| | | * for a domain is null, then start from the oldest CSN for each |
| | | * replicaDBs |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, ServerState) |
| | | * @see #getCursorFrom(DN, ServerState, PositionStrategy) |
| | | */ |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState) |
| | | public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | |
| | | // serverId methods |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} across all the replicaDBs for the specified |
| | | * replication domain starting after the provided {@link ServerState} for each |
| | | * replication domain starting at or after the provided {@link ServerState} for each |
| | | * replicaDBs. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | |
| | | * |
| | | * @param baseDN |
| | | * the replication domain baseDN |
| | | * @param startAfterState |
| | | * @param startState |
| | | * Starting point for each ReplicaDB cursor. If any CSN for a |
| | | * replicaDB is null, then start from the oldest CSN for this |
| | | * replicaDB |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | * @see #getCursorFrom(DN, int, CSN) |
| | | * @see #getCursorFrom(DN, int, CSN, PositionStrategy) |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState) |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | | * Generates a {@link DBCursor} for one replicaDB for the specified |
| | | * replication domain and serverId starting after the provided {@link CSN}. |
| | | * replication domain and serverId starting at or after the provided {@link CSN}. |
| | | * <p> |
| | | * When the cursor is not used anymore, client code MUST call the |
| | | * {@link DBCursor#close()} method to free the resources and locks used by the |
| | |
| | | * the replication domain baseDN of the replicaDB |
| | | * @param serverId |
| | | * the serverId of the replicaDB |
| | | * @param startAfterCSN |
| | | * @param startCSN |
| | | * Starting point for the ReplicaDB cursor. If the CSN is null, then |
| | | * start from the oldest CSN for this replicaDB |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | * @return a non null {@link DBCursor} |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN) |
| | | DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy) |
| | | throws ChangelogException; |
| | | |
| | | /** |
| | |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV); |
| | | nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY); |
| | | nextChangeForInsertDBCursor.next(); |
| | | |
| | | if (newestRecord != null) |
| | |
| | | */ |
| | | private static final CSN NULL_CSN = new CSN(0, 0, 0); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Builds a DomainDBCursor instance. |
| | | * |
| | |
| | | * the replication domain baseDN of this cursor |
| | | * @param domainDB |
| | | * the DB for the provided replication domain |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | */ |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB) |
| | | public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | { |
| | | this.baseDN = baseDN; |
| | | this.domainDB = domainDB; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** |
| | |
| | | final Entry<Integer, CSN> pair = iter.next(); |
| | | final int serverId = pair.getKey(); |
| | | final CSN csn = pair.getValue(); |
| | | final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN); |
| | | final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null; |
| | | final DBCursor<UpdateMsg> cursor = |
| | | domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy); |
| | | addCursor(cursor, null); |
| | | iter.remove(); |
| | | } |
| | |
| | | import org.forgerock.i18n.LocalizableMessageBuilder; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.util.Reject; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.replication.common.CSN; |
| | |
| | | import org.opends.server.replication.server.ChangelogState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException |
| | | public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this); |
| | | final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); |
| | | registeredMultiDomainCursors.add(cursor); |
| | | for (DN baseDN : domainToReplicaDBs.keySet()) |
| | | { |
| | | cursor.addDomain(baseDN, startAfterState.getServerState(baseDN)); |
| | | cursor.addDomain(baseDN, startState.getServerState(baseDN)); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN); |
| | | final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy); |
| | | for (int serverId : getDomainMap(baseDN).keySet()) |
| | | { |
| | | // get the last already sent CSN from that server to get a cursor |
| | | final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null; |
| | | final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null; |
| | | cursor.addReplicaDB(serverId, lastCSN); |
| | | } |
| | | return cursor; |
| | | } |
| | | |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN) |
| | | private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy) |
| | | { |
| | | synchronized (registeredDomainCursors) |
| | | { |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this); |
| | | final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy); |
| | | List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN); |
| | | if (cursors == null) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) |
| | | throws ChangelogException |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN, |
| | | PositionStrategy positionStrategy) throws ChangelogException |
| | | |
| | | { |
| | | Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY" |
| | | + " is not supported for the JE implementation fo changelog"); |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final DBCursor<UpdateMsg> cursor = |
| | | replicaDB.generateCursorFrom(startAfterCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN); |
| | | replicaDB.generateCursorFrom(startCSN); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | // TODO JNR if (offlineCSN != null) ?? |
| | | // What about replicas that suddenly become offline? |
| | | return new ReplicaOfflineCursor(cursor, offlineCSN); |
| | |
| | | private final ConcurrentSkipListSet<DN> removeDomains = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | |
| | | private final PositionStrategy positionStrategy; |
| | | |
| | | /** |
| | | * Builds a MultiDomainDBCursor instance. |
| | | * |
| | | * @param domainDB |
| | | * the replication domain management DB |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to indicates at which |
| | | * exact position the cursor must start |
| | | */ |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB) |
| | | public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy) |
| | | { |
| | | this.domainDB = domainDB; |
| | | this.positionStrategy = positionStrategy; |
| | | } |
| | | |
| | | /** |
| | |
| | | final Entry<DN, ServerState> entry = iter.next(); |
| | | final DN baseDN = entry.getKey(); |
| | | final ServerState serverState = entry.getValue(); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState); |
| | | final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy); |
| | | addCursor(domainDBCursor, baseDN); |
| | | iter.remove(); |
| | | } |
| | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.mockito.Matchers.*; |
| | | import static org.mockito.Mockito.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | |
| | | /** |
| | | * Test for ChangeNumberIndexer class. All dependencies to the changelog DB |
| | |
| | | { |
| | | MockitoAnnotations.initMocks(this); |
| | | |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB); |
| | | multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY); |
| | | initialState = new ChangelogState(); |
| | | initialCookie = new MultiDomainServerState(); |
| | | replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>(); |
| | |
| | | |
| | | when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB); |
| | | when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn( |
| | | multiDomainCursor); |
| | | when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(multiDomainCursor); |
| | | } |
| | | |
| | | @AfterMethod |
| | |
| | | DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN); |
| | | if (domainDBCursor == null) |
| | | { |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB); |
| | | domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY); |
| | | domainDBCursors.put(baseDN, domainDBCursor); |
| | | |
| | | multiDomainCursor.addDomain(baseDN, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class))) |
| | | when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(domainDBCursor); |
| | | } |
| | | domainDBCursor.addReplicaDB(serverId, null); |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class))) |
| | | when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY))) |
| | | .thenReturn(replicaDBCursor); |
| | | } |
| | | |
| | |
| | | of(msg4, baseDN1)); |
| | | } |
| | | |
| | | @Test |
| | | public void recycleTwoElementsCursorsLongerExhaustion() throws Exception |
| | | { |
| | | final CompositeDBCursor<String> compCursor = newCompositeDBCursor( |