| | |
| | | continueSearch = entrySender.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); |
| | | if (continueSearch) |
| | | { |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get()); |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(replicaUpdatesCursor.get(), cnIndexRecord.getCSN()); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); |
| | |
| | | private void initializeCookieForChangeNumberMode( |
| | | MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | | ECLMultiDomainDBCursor eclCursor = null; |
| | | try |
| | | // Initialize the multi domain cursor only from the change number index record. |
| | | // The cookie is always empty at this stage. |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, cnIndexRecord.getCSN()); |
| | | MultiDomainServerState unused = new MultiDomainServerState(); |
| | | MultiDomainDBCursor cursor = getChangelogDB().getReplicationDomainDB().getCursorFrom(unused, options); |
| | | try (ECLMultiDomainDBCursor eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor)) |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | MultiDomainDBCursor cursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, options); |
| | | eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | eclCursor.next(); |
| | | cookie.update(eclCursor.toCookie()); |
| | | updateCookieToMediumConsistencyPoint(cookie, eclCursor, cnIndexRecord); |
| | | } |
| | | finally |
| | | } |
| | | |
| | | /** |
| | | * Rebuilds the changelogcookie starting at the newest change number index record. |
| | | * <p> |
| | | * It updates the provided cookie with the changes from the provided ECL cursor, |
| | | * up to (and including) the provided change number index record. |
| | | * <p> |
| | | * Therefore, after calling this method, the cursor is positioned |
| | | * to the change immediately following the provided change number index record. |
| | | * |
| | | * @param cookie the cookie to update |
| | | * @param cursor the cursor where to read changes from |
| | | * @param cnIndexRecord the change number index record to go right after |
| | | * @throws ChangelogException if any problem occurs |
| | | */ |
| | | public static void updateCookieToMediumConsistencyPoint( |
| | | MultiDomainServerState cookie, ECLMultiDomainDBCursor cursor, ChangeNumberIndexRecord cnIndexRecord) |
| | | throws ChangelogException |
| | | { |
| | | if (cnIndexRecord == null) |
| | | { |
| | | close(eclCursor); |
| | | return; |
| | | } |
| | | |
| | | while (cursor.next()) |
| | | { |
| | | UpdateMsg updateMsg = cursor.getRecord(); |
| | | if (updateMsg.getCSN().compareTo(cnIndexRecord.getCSN()) > 0) |
| | | { |
| | | break; |
| | | } |
| | | cookie.update(cursor.getData(), updateMsg.getCSN()); |
| | | } |
| | | } |
| | | |
| | |
| | | * If inconsistency is detected between the available update |
| | | * messages and the provided cnIndexRecord |
| | | */ |
| | | private UpdateMsg findReplicaUpdateMessage( |
| | | final ChangeNumberIndexRecord cnIndexRecord, |
| | | final MultiDomainDBCursor replicaUpdatesCursor) |
| | | throws DirectoryException, ChangelogException |
| | | private UpdateMsg findReplicaUpdateMessage(final MultiDomainDBCursor replicaUpdatesCursor, CSN csn) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | while (true) |
| | | { |
| | | final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); |
| | | final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN()); |
| | | final int compareIndexWithUpdateMsg = csn.compareTo(updateMsg.getCSN()); |
| | | if (compareIndexWithUpdateMsg < 0) { |
| | | // Either update message has been purged or baseDN has been removed from changelogDB, |
| | | // ignore current index record and go to the next one |
| | |
| | | * for the supplied domain baseDNs. If a supplied domain is |
| | | * {@link DN#NULL_DN}, then all domains will be cleared. |
| | | */ |
| | | private final ConcurrentSkipListSet<DN> domainsToClear = |
| | | new ConcurrentSkipListSet<DN>(); |
| | | private final ConcurrentSkipListSet<DN> domainsToClear = new ConcurrentSkipListSet<>(); |
| | | private final ChangelogDB changelogDB; |
| | | /** Only used for initialization, and then discarded. */ |
| | | private ChangelogState changelogState; |
| | |
| | | * @NonNull |
| | | */ |
| | | private ECLMultiDomainDBCursor nextChangeForInsertDBCursor; |
| | | private MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | |
| | | /** |
| | | * Builds a ChangeNumberIndexer object. |
| | |
| | | |
| | | private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException |
| | | { |
| | | final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN(); |
| | | |
| | | CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY); |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = |
| | | domainDB.getCursorFrom(cookieWithNewestCSN, options); |
| | | // Initialize the multi domain cursor only from the change number index record. |
| | | // The cookie is always empty at this stage. |
| | | final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final CSN newestCsn = newestRecord != null ? newestRecord.getCSN() : null; |
| | | final CursorOptions options = new CursorOptions(LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, newestCsn); |
| | | final MultiDomainServerState unused = new MultiDomainServerState(); |
| | | MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint = domainDB.getCursorFrom(unused, options); |
| | | |
| | | nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint); |
| | | nextChangeForInsertDBCursor.next(); |
| | | } |
| | | |
| | | /** Returns a cookie initialised with the newest CSN for each replica. */ |
| | | private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord(); |
| | | final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState(); |
| | | if (newestRecord != null) |
| | | { |
| | | final CSN newestCsn = newestRecord.getCSN(); |
| | | for (DN baseDN : changelogState.getDomainToServerIds().keySet()) |
| | | { |
| | | cookieWithNewestCSN.update(baseDN, newestCsn); |
| | | } |
| | | } |
| | | return cookieWithNewestCSN; |
| | | ChangelogBackend.updateCookieToMediumConsistencyPoint(cookie, nextChangeForInsertDBCursor, newestRecord); |
| | | } |
| | | |
| | | private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB) |
| | |
| | | // let's publish it to the CNIndexDB. |
| | | final long changeNumber = changelogDB.getChangeNumberIndexDB() |
| | | .addRecord(new ChangeNumberIndexRecord(baseDN, csn)); |
| | | MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie(); |
| | | if (!cookie.update(baseDN, csn)) |
| | | { |
| | | throw new IllegalStateException("It was expected that change (baseDN=" + baseDN + ", csn=" + csn |
| | | + ") would have updated the cookie=" + cookie + ", but it did not"); |
| | | } |
| | | notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg); |
| | | moveForwardMediumConsistencyPoint(csn, baseDN); |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.util.*; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.Map; |
| | | import java.util.Map.Entry; |
| | | import java.util.TreeMap; |
| | | |
| | | import org.forgerock.util.Pair; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | |
| | | */ |
| | | abstract class CompositeDBCursor<T> implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private static final byte UNINITIALIZED = 0; |
| | | private static final byte READY = 1; |
| | | private static final byte CLOSED = 2; |
| | | |
| | | /** |
| | | * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or |
| | | * {@link #CLOSED} |
| | | */ |
| | | /** The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED} */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** |
| | |
| | | * last time {@link DBCursor#next()} was called on them. Exhausted cursors |
| | | * might be recycled at some point when they start returning changes again. |
| | | */ |
| | | private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = |
| | | new HashMap<DBCursor<UpdateMsg>, T>(); |
| | | private final Map<DBCursor<UpdateMsg>, T> exhaustedCursors = new HashMap<>(); |
| | | /** |
| | | * The cursors are sorted based on the current change of each cursor to |
| | | * consider the next change across all available cursors. |
| | |
| | | * thrown about |
| | | * "Non-transactional Cursors may not be used in multiple threads;". |
| | | */ |
| | | private final TreeMap<DBCursor<UpdateMsg>, T> cursors = |
| | | new TreeMap<DBCursor<UpdateMsg>, T>( |
| | | private final TreeMap<DBCursor<UpdateMsg>, T> cursors = new TreeMap<>( |
| | | new Comparator<DBCursor<UpdateMsg>>() |
| | | { |
| | | @Override |
| | |
| | | { |
| | | final CSN csn1 = o1.getRecord().getCSN(); |
| | | final CSN csn2 = o2.getRecord().getCSN(); |
| | | return CSN.compare(csn1, csn2); |
| | | int cmpCsn = CSN.compare(csn1, csn2); |
| | | if (cmpCsn == 0 |
| | | && o1 instanceof CompositeDBCursor |
| | | && o2 instanceof CompositeDBCursor) |
| | | { |
| | | // Ensures a consistent order when the CSNs are equal (rare in practice) |
| | | T data1 = ((CompositeDBCursor<T>) o1).getData(); |
| | | T data2 = ((CompositeDBCursor<T>) o1).getData(); |
| | | if (data1 instanceof Comparable && data2 instanceof Comparable) |
| | | { |
| | | return ((Comparable<T>) data1).compareTo(data2); |
| | | } |
| | | } |
| | | return cmpCsn; |
| | | } |
| | | }); |
| | | |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this cursor. |
| | | * |
| | | * @return a list of (Data, UpdateMsg) pairs representing the state of the |
| | | * cursor. In each pair, the data or the update message may be |
| | | * {@code null}, but at least one of them is non-null. |
| | | */ |
| | | public List<Pair<T, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<T, UpdateMsg>> snapshot = new ArrayList<Pair<T, UpdateMsg>>(); |
| | | for (Entry<DBCursor<UpdateMsg>, T> entry : cursors.entrySet()) |
| | | { |
| | | final UpdateMsg updateMsg = entry.getKey().getRecord(); |
| | | final T data = entry.getValue(); |
| | | if (updateMsg != null || data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, updateMsg)); |
| | | } |
| | | } |
| | | for (T data : exhaustedCursors.values()) |
| | | { |
| | | if (data != null) |
| | | { |
| | | snapshot.add(Pair.of(data, (UpdateMsg) null)); |
| | | } |
| | | } |
| | | return snapshot; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | |
| | | exhaustedCursors.clear(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " openCursors=" + cursors |
| | | + " exhaustedCursors=" + exhaustedCursors; |
| | | } |
| | | |
| | | } |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.file; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.forgerock.util.Pair; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | |
| | | */ |
| | | public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> |
| | | { |
| | | |
| | | private final ECLEnabledDomainPredicate predicate; |
| | | private final MultiDomainDBCursor cursor; |
| | | |
| | |
| | | cursor.removeDomain(baseDN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean next() throws ChangelogException |
| | | { |
| | |
| | | return hasNext; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void close() |
| | | { |
| | | cursor.close(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | | return getClass().getSimpleName() + " cursor=[" + cursor + ']'; |
| | | } |
| | | |
| | | /** |
| | | * Returns a snapshot of this cursor. |
| | | * |
| | | * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled |
| | | * for the external changelog. The update message may be {@code null}. |
| | | */ |
| | | List<Pair<DN, UpdateMsg>> getSnapshot() |
| | | { |
| | | final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot(); |
| | | final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>(); |
| | | for (Pair<DN, UpdateMsg> pair : snapshot) |
| | | { |
| | | DN baseDN = pair.getFirst(); |
| | | if (predicate.isECLEnabledDomain(baseDN)) |
| | | { |
| | | eclSnapshot.add(pair); |
| | | } |
| | | } |
| | | return eclSnapshot; |
| | | } |
| | | |
| | | /** |
| | | * Returns the cookie corresponding to the state of this cursor. |
| | | * |
| | | * @return a valid cookie taking into account only the base DNs enabled for |
| | | * the external changelog |
| | | */ |
| | | public MultiDomainServerState toCookie() |
| | | { |
| | | List<Pair<DN, UpdateMsg>> snapshot = getSnapshot(); |
| | | MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | for (Pair<DN, UpdateMsg> pair : snapshot) |
| | | { |
| | | // only put base DNs where a CSN is available in the cookie |
| | | if (pair.getSecond() != null) |
| | | { |
| | | cookie.update(pair.getFirst(), pair.getSecond().getCSN()); |
| | | } |
| | | } |
| | | return cookie; |
| | | } |
| | | } |
| | |
| | | final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | RepositionableCursor<CSN, UpdateMsg> cursor = log.getCursor(startCSN, matchingStrategy, positionStrategy); |
| | | return new FileReplicaDBCursor(cursor, startCSN, positionStrategy); |
| | | CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null; |
| | | return new FileReplicaDBCursor(cursor, actualStartCSN, positionStrategy); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | /** Shutdown this ReplicaDB. */ |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | |
| | | return null; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN, |
| | | CursorOptions options) throws ChangelogException |
| | |
| | | final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId); |
| | | if (replicaDB != null) |
| | | { |
| | | final CSN actualStartCSN = startCSN != null ? startCSN : options.getDefaultCSN(); |
| | | final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom( |
| | | startCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN); |
| | | actualStartCSN, options.getKeyMatchingStrategy(), options.getPositionStrategy()); |
| | | final CSN offlineCSN = getOfflineCSN(baseDN, serverId, actualStartCSN); |
| | | final ReplicaId replicaId = ReplicaId.of(baseDN, serverId); |
| | | final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaId, this); |
| | | |
| | |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this); |
| | | CSN actualStartCSN = (startCSN != null && startCSN.getServerId() == serverId) ? startCSN : null; |
| | | return new JEReplicaDBCursor(db, actualStartCSN, matchingStrategy, positionStrategy, this); |
| | | } |
| | | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | /** Shutdown this ReplicaDB. */ |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |