OPENDJ-2141 (CR-7337) Cannot find entry in cn=changelog when searching with changelogCookie filter
Joint effort with Nicolas Capponi. Follow up of preparatory refactoring in r12534.
The problem lies in ChangelogCookie.initializeCookieForChangeNumberMode(). Here, the cookie is correctly initialized for the starting change number index record, but incorrectly for all the other replicas. The effect is that other replicas are incorrectly cursoring from the beginning of the replicaDB rather than the change immediately preceding the change number index record.
The fix is to make all replicaDBs cursor from the change immediately preceding the change number index record.
ChangelogBackend.java:
Added updateCookieToMediumConsistencyPoint() method to initialize a cookie from a ECLMultiDomainDBCursor.
ECLMultiDomainDBCursor.java:
Removed badly implemented methods getSnapshot(), toCookie() which returned the wrong result.
CompositeDBCursor.java
Removed badly implemented method getSnapshot() which returned the wrong result.
In cursors comparator, ensured more consistent ordering when the CSNs are equal.
ChangeNumberIndexer.java:
Added cookie field to replace the use of ECLMultiDomainDBCursor.toCookie().
Reimplemented initializeNextChangeCursor(), by calling ChangelogBackend.updateUpTo() + removed getCookieInitializedWithNewestCSN().
FileChangelogDB.java, JEChangelogDB.java:
In getCursorFrom(baseDN, serverId, startCSN, options), catered for the case where the provided startCSN does not come from this serverId.
FileReplicaDB.java, JEReplicaDB.java:
In generateCursorFrom(), catered for the case where the provided startCSN does not come from this serverId.
| | |
| | | continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); |
| | | if (continueSearch) |
| | | { |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get()); |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN()); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); |
| | |
| | | private void initializeCookieForChangeNumberMode( |
| | | MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | | ECLMultiDomainDBCursor eclCursor = null; |
| | | try |
| | | // Initialize the multi domain cursor only from the change number index record. |
| | | // The cookie is always empty at this stage. |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN()); |
| | | MultiDomainServerState unused = new MultiDomainServerState(); |
| | | MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options); |
| | | try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | MultiDomainDBCursor cursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, options); |
| | | eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | eclCursor.next(); |
| | | cookie.update(eclCursor.toCookie()); |
| | | updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord); |
| | | } |
| | | finally |
| | | } |
| | | |
| | | /** |
| | | * Rebuilds the changelogcookie starting at the newest change number index record. |
| | | * <p> |
| | | * It updates the provided cookie with the changes from the provided ECL cursor, |
| | | * up to (and including) the provided change number index record. |
| | | * <p> |
| | | * Therefore, after calling this method, the cursor is positioned |
| | | * to the change immediately following the provided change number index record. |
| | | * |
| | | * @param cookie the cookie to update |
| | | * @param cursor the cursor where to read changes from |
| | | * @param cnIndexRecord the change number index record to go right after |
| | | * @throws ChangelogException if any problem occurs |
| | | */ |
| | | public static void updateCookieToMediumConsistencyPoint( |
| | | MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord) |
| | | throws ChangelogException |
| | | { |
| | | if (cnIndexRecord == null) |
| | | { |
| | | close(eclCursor); |
| | | return; |
| | | } |
| | | |
| | | while (cursor.next()) |
| | | { |
| | | UpdateMsg updateMsg = cursor.getRecord(); |
| | | if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0) |
| | | { |
| | | break; |
| | | } |
| | | cookie.update(cursor.getData(), updateMsg.getCSN()); |
| | | } |
| | | } |
| | | |
| | |
| | | * If inconsistency is detected between the available update |
| | | * messages and the provided cnIndexRecord |
| | | */ |
| | | private UpdateMsg findReplicaUpdateMessage( |
| | | final ChangeNumberIndexRecord cnIndexRecord, |
| | | final MultiDomainDBCursor replicaUpdatesCursor) |
| | | throws DirectoryException, ChangelogException |
| | | private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | while (true) |
| | | { |
| | | final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); |
| | | final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN()); |
| | | final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN()); |
| | | if (compareIndexWithUpdateMsg < 0) { |
| | | // Either update message has been purged or baseDN has been removed from changelogDB, |
| | | // ignore current index record and go to the next one |
| | |
| | | * for the supplied domain baseDNs. If a supplied domain is |
| | | * {@link DN#NULL_DN}, then all domains will be cleared. |
| | | */ |
| | | private final ConcurrentSkipListSet<DN> domainsToClear = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>(); |
| | | private final ChangelogDB changelogDB; |
| | | /** Only used for initialization, and then discarded. */ |
| | | private ChangelogState changelogState; |
| | |
| | | * @NonNull |
| | | */ |
| | | private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | private MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | |
| | | private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException |
| | | { |
| | | final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN(); |
| | | |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = |
| | | domainDB.getCursorFrom(cookieWithNewestCSN, options); |
| | | // Initialize the multi domain cursor only from the change number index record. |
| | | // The cookie is always empty at this stage. |
| | | final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null; |
| | | final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn); |
| | | final MultiDomainServerState unused = new MultiDomainServerState(); |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options); |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** Returns a cookie initialised with the newest CSN for each replica. */ |
| | | private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState(); |
| | | if (newestRecord != null) |
| | | { |
| | | final CSN newestCsn = newestRecord.getCSN(); |
| | | for (DN baseDN : changelogState.getDomainToServerIds().keySet()) |
| | | { |
| | | cookieWithNewestCSN.update(baseDN, newestCsn); |
| | | } |
| | | } |
| | | return cookieWithNewestCSN; |
| | | ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord); |
| | | } |
| | | |
| | | private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) |
| | |
| | | // let's publish it to the CNIndexDB. |
| | | final long changeNumber = changelogDB.getChangeNumberIndexDB() |
| | | .addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie(); |
| | | if (!cookie.update(baseDN, csn)) |
| | | { |
| | | throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn |
| | | + ") would have updated the cookie=" + cookie + ", but it did not"); |
| | | } |
| | | notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.util.*; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.forgerock.util.Pair; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | |
| | | */ |
| | | abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | | private static final byte READY = 1; |
| | | private static final byte CLOSED = 2; |
| | | |
| | | /** |
| | | * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or |
| | | * {@link #CLOSED} |
| | | */ |
| | | /** The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED} */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** |
| | |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | | * might be recycled at some point when they start returning changes again. |
| | | */ |
| | | private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, T>(); |
| | | private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = new HashMap<>(); |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final TreeMap<DBCursor<UpdateMsg>, T> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, T>( |
| | | private final TreeMap<DBCursor<UpdateMsg>, T> cursors = new TreeMap<>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | int cmpCsn = CSN.compare(csn1, csn2); |
| | | if (cmpCsn == 0 |
| | | && o1 instanceof CompositeDBCursor |
| | | && o2 instanceof CompositeDBCursor) |
| | | { |
| | | // Ensures a consistent order when the CSNs are equal (rare in practice) |
| | | T data1 = ((CompositeDBCursor<T>) o1).getData(); |
| | | T data2 = ((CompositeDBCursor<T>) o1).getData(); |
| | | if (data1 instanceof Comparable && data2 instanceof Comparable) |
| | | { |
| | | return ((Comparable<T>) data1).compareTo(data2); |
| | | } |
| | | } |
| | | return cmpCsn; |
| | | } |
| | | }); |
| | | |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this cursor. |
| | | * |
| | | * @return a list of (Data, UpdateMsg) pairs representing the state of the |
| | | * cursor. In each pair, the data or the update message may be |
| | | * {@code null}, but at least one of them is non-null. |
| | | */ |
| | | public List<Pair<T, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>(); |
| | | for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet()) |
| | | { |
| | | final UpdateMsg updateMsg = entry.getKey().getRecord(); |
| | | final T data = entry.getValue(); |
| | | if (updateMsg != null || data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, updateMsg)); |
| | | } |
| | | } |
| | | for (T data : exhaustedCursors.values()) |
| | | { |
| | | if (data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, (UpdateMsg) null)); |
| | | } |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | |
| | | exhaustedCursors.clear(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " openCursors=" + cursors |
| | | + " exhaustedCursors=" + exhaustedCursors; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.forgerock.util.Pair; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | |
| | | */ |
| | | public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private final ECLEnabledDomainPredicate predicate; |
| | | private final MultiDomainDBCursor cursor; |
| | | |
| | |
| | | cursor.removeDomain(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | |
| | | return hasNext; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | cursor.close(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " cursor=[" + cursor + ']'; |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this cursor. |
| | | * |
| | | * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled |
| | | * for the external changelog. The update message may be {@code null}. |
| | | */ |
| | | List<Pair<DN, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot(); |
| | | final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>(); |
| | | for (Pair<DN, UpdateMsg> pair : snapshot) |
| | | { |
| | | DN baseDN = pair.getFirst(); |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | eclSnapshot.add(pair); |
| | | } |
| | | } |
| | | return eclSnapshot; |
| | | } |
| | | |
| | | /** |
| | | * Returns the cookie corresponding to the state of this cursor. |
| | | * |
| | | * @return a valid cookie taking into account only the base DNs enabled for |
| | | * the external changelog |
| | | */ |
| | | public MultiDomainServerState toCookie() |
| | | { |
| | | List<Pair<DN, UpdateMsg>> snapshot = getSnapshot(); |
| | | MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | for (Pair<DN, UpdateMsg> pair : snapshot) |
| | | { |
| | | // only put base DNs where a CSN is available in the cookie |
| | | if (pair.getSecond() != null) |
| | | { |
| | | cookie.update(pair.getFirst(), pair.getSecond().getCSN()); |
| | | } |
| | | } |
| | | return cookie; |
| | | } |
| | | } |
| | |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy); |
| | | return new FileReplicaDBCursor(cursor, startCSN, positionStrategy); |
| | | CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null; |
| | | return new FileReplicaDBCursor(cursor, actualStartCSN, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | /** Shutdown this ReplicaDB. */ |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | CursorOptions options) throws ChangelogException |
| | |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this); |
| | | CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null; |
| | | return new JEReplicaDBCursor(db, actualStartCSN, matchingStrategy, positionStrategy, this); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | /** Shutdown this ReplicaDB. */ |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |