| | |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.plugin.MultimasterReplication.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.LDIFWriter.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | new MultiDomainServerState(), ON_MATCHING_KEY, getExcludedBaseDNs()); |
| | | new MultiDomainServerState(), GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, getExcludedBaseDNs()); |
| | | try |
| | | { |
| | | baseEntryHasSubordinates = cursor.next(); |
| | |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | cookie, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | |
| | | boolean continueSearch = sendCookieEntriesFromCursor(entrySender, replicaUpdatesCursor, cookie); |
| | |
| | | try |
| | | { |
| | | cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber); |
| | | MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | final boolean continueSearch = |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie); |
| | | if (continueSearch) |
| | | { |
| | | entrySender.transitioningToPersistentSearchPhase(); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor); |
| | | sendChangeNumberEntriesFromCursors(entrySender, params, cnIndexDBCursor, replicaUpdatesCursor, cookie); |
| | | } |
| | | } |
| | | finally |
| | |
| | | |
| | | private boolean sendChangeNumberEntriesFromCursors(final ChangeNumberEntrySender entrySender, |
| | | final SearchParams params, DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor, |
| | | AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor) throws ChangelogException, DirectoryException |
| | | AtomicReference<MultiDomainDBCursor> replicaUpdatesCursor, MultiDomainServerState cookie) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | boolean continueSearch = true; |
| | | while (continueSearch && cnIndexDBCursor.next()) |
| | |
| | | if (replicaUpdatesCursor.get() == null) |
| | | { |
| | | replicaUpdatesCursor.set(initializeReplicaUpdatesCursor(cnIndexRecord)); |
| | | initializeCookieForChangeNumberMode(cookie, cnIndexRecord); |
| | | } |
| | | else |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | } |
| | | continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); |
| | | if (continueSearch) |
| | |
| | | final UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor.get()); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg); |
| | | continueSearch = entrySender.initialSearchSendEntry(cnIndexRecord, updateMsg, cookie); |
| | | replicaUpdatesCursor.get().next(); |
| | | } |
| | | } |
| | |
| | | return continueSearch; |
| | | } |
| | | |
| | | /** Initialize the provided cookie from the provided change number index record. */ |
| | | private void initializeCookieForChangeNumberMode( |
| | | MultiDomainServerState cookie, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | | ECLMultiDomainDBCursor eclCursor = null; |
| | | try |
| | | { |
| | | cookie.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); |
| | | MultiDomainDBCursor cursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(cookie, |
| | | LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | eclCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | eclCursor.next(); |
| | | cookie.update(eclCursor.toCookie()); |
| | | } |
| | | finally |
| | | { |
| | | close(eclCursor); |
| | | } |
| | | } |
| | | |
| | | private MultiDomainDBCursor initializeReplicaUpdatesCursor( |
| | | final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | |
| | | // No need for ECLMultiDomainDBCursor in this case |
| | | // as updateMsg will be matched with cnIndexRecord |
| | | final MultiDomainDBCursor replicaUpdatesCursor = |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY); |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY); |
| | | replicaUpdatesCursor.next(); |
| | | return replicaUpdatesCursor; |
| | | } |
| | |
| | | /** |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) |
| | | throws DirectoryException |
| | | private boolean initialSearchSendEntry(ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg, |
| | | MultiDomainServerState cookie) throws DirectoryException |
| | | { |
| | | final DN baseDN = cnIndexRecord.getBaseDN(); |
| | | final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie()); |
| | | cookie.update(baseDN, cnIndexRecord.getCSN()); |
| | | final String cookieString = cookie.toString(); |
| | | |
| | | sendEntryData.initialSearchSendsEntry(cnIndexRecord.getChangeNumber()); |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg); |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); |
| | | return sendEntryIfMatches(searchOp, entry, null); |
| | | } |
| | | |