| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.config.ConfigConstants.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.LDIFWriter.*; |
| | |
| | | |
| | | import java.text.SimpleDateFormat; |
| | | import java.util.*; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.plugin.MultimasterReplication; |
| | | import org.opends.server.replication.protocol.AddMsg; |
| | | import org.opends.server.replication.protocol.DeleteMsg; |
| | |
| | | import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | |
| | | import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor; |
| | | import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.ServerConstants; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | /** |
| | | * A backend that provides access to the changelog, ie the "cn=changelog" |
| | | * A backend that provides access to the changelog, i.e. the "cn=changelog" |
| | | * suffix. It is a read-only backend that is created by a |
| | | * {@code ReplicationServer} and is not configurable. |
| | | * <p> |
| | |
| | | * request. The cookie provided in the control is used to retrieve entries from |
| | | * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with |
| | | * the entries.</li> |
| | | * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with |
| | | * the request. The entries are retrieved using the ChangeNumberIndexDB (or |
| | | * <li>Draft compatibility mode: when no "ECL Cookie Exchange Control" is provided |
| | | * with the request. The entries are retrieved using the ChangeNumberIndexDB (or |
| | | * DraftDB, hence the name) and their attributes are set with the information |
| | | * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set |
| | | * from the content of ChangeNumberIndexDB.</li> |
| | |
| | | private static final AttributeType MODIFIERS_NAME_TYPE = |
| | | DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true); |
| | | |
| | | /** The DN for the base changelog entry. */ |
| | | private DN baseChangelogDN; |
| | | /** The base DN for the external change log. */ |
| | | public static final DN CHANGELOG_BASE_DN; |
| | | |
| | | static |
| | | { |
| | | try |
| | | { |
| | | CHANGELOG_BASE_DN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new RuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | /** The set of base DNs for this backend. */ |
| | | private DN[] baseDNs; |
| | |
| | | private final ECLEnabledDomainPredicate domainPredicate; |
| | | |
| | | /** |
| | | * Creates a new backend with the provided repication server. |
| | | * Creates a new backend with the provided replication server. |
| | | * |
| | | * @param replicationServer |
| | | * The replication server on which the changes are read. |
| | |
| | | setPrivateBackend(true); |
| | | } |
| | | |
| | | private ChangelogDB getChangelogDB() |
| | | { |
| | | return replicationServer.getChangelogDB(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the ChangelogBackend configured for "cn=changelog" in this directory server. |
| | | * |
| | | * @return the ChangelogBackend configured for "cn=changelog" in this directory server |
| | | * @deprecated instead inject the required object where needed |
| | | */ |
| | | @Deprecated |
| | | public static ChangelogBackend getInstance() |
| | | { |
| | | return (ChangelogBackend) DirectoryServer.getBackend(CHANGELOG_BASE_DN); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void configureBackend(final Configuration config) throws ConfigException |
| | |
| | | @Override |
| | | public void initializeBackend() throws InitializationException |
| | | { |
| | | try |
| | | { |
| | | baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | baseDNs = new DN[] { baseChangelogDN }; |
| | | } |
| | | catch (final DirectoryException e) |
| | | { |
| | | if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | throw new InitializationException( |
| | | ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e); |
| | | } |
| | | baseDNs = new DN[] { CHANGELOG_BASE_DN }; |
| | | |
| | | try |
| | | { |
| | | DirectoryServer.registerBaseDN(baseChangelogDN, this, true); |
| | | DirectoryServer.registerBaseDN(CHANGELOG_BASE_DN, this, true); |
| | | } |
| | | catch (final DirectoryException e) |
| | | { |
| | | throw new InitializationException( |
| | | ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e); |
| | | ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(CHANGELOG_BASE_DN.toString(), getExceptionMessage(e)), e); |
| | | } |
| | | } |
| | | |
| | |
| | | @Override |
| | | public void finalizeBackend() |
| | | { |
| | | super.finalizeBackend(); |
| | | |
| | | try |
| | | { |
| | | DirectoryServer.deregisterBaseDN(baseChangelogDN); |
| | | DirectoryServer.deregisterBaseDN(CHANGELOG_BASE_DN); |
| | | } |
| | | catch (final DirectoryException e) |
| | | { |
| | |
| | | @Override |
| | | public DN getBaseDN() |
| | | { |
| | | return baseChangelogDN; |
| | | return CHANGELOG_BASE_DN; |
| | | } |
| | | |
| | | @Override |
| | |
| | | { |
| | | return SearchScope.WHOLE_SUBTREE; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Object setAttachment(String name, Object value) |
| | | { |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException |
| | | { |
| | | // Compute the num subordinates only for the base DN |
| | | if (entryDN == null || !baseChangelogDN.equals(entryDN)) |
| | | if (entryDN == null || !CHANGELOG_BASE_DN.equals(entryDN)) |
| | | { |
| | | return -1; |
| | | } |
| | |
| | | return 1; |
| | | } |
| | | // Search with cookie mode to count all update messages |
| | | final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | SearchParams params = new SearchParams("0", excludedDomains); |
| | | final SearchParams params = new SearchParams(getExcludedDomains()); |
| | | params.requestType = REQUEST_TYPE_FROM_COOKIE; |
| | | params.multiDomainServerState = new MultiDomainServerState(); |
| | | params.cookie = new MultiDomainServerState(); |
| | | NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation(); |
| | | try |
| | | { |
| | |
| | | catch (ChangelogException e) |
| | | { |
| | | throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get( |
| | | baseChangelogDN.toString(), stackTraceToSingleLineString(e))); |
| | | CHANGELOG_BASE_DN.toString(), stackTraceToSingleLineString(e))); |
| | | } |
| | | return searchOp.numSubordinates; |
| | | } |
| | | |
| | | private Set<String> getExcludedDomains() |
| | | { |
| | | final Set<String> domains = MultimasterReplication.getECLDisabledDomains(); |
| | | domains.add(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | return domains; |
| | | } |
| | | |
| | | /** |
| | | * Notifies persistent searches of this backend that a new entry was added to it. |
| | | * <p> |
| | | * Note: This method is called in a multi-threaded context. |
| | | * |
| | | * @param baseDN |
| | | * the baseDN of the newly added entry. |
| | | * @param changeNumber |
| | | * the change number of the newly added entry. It will be greater |
| | | * than zero for entries added to the change number index and less |
| | | * than or equal to zero for entries added to any replica DB |
| | | * @param cookieString |
| | | * a string representing the cookie of the newly added entry. |
| | | * This is only meaningful for entries added to the change number index |
| | | * @param updateMsg |
| | | * the update message of the newly added entry |
| | | * @throws ChangelogException |
| | | * If a problem occurs while notifying of the newly added entry. |
| | | */ |
| | | public void notifyEntryAdded(DN baseDN, long changeNumber, String cookieString, UpdateMsg updateMsg) |
| | | throws ChangelogException |
| | | { |
| | | final boolean isCookieEntry = changeNumber <= 0; |
| | | final List<SearchOperation> pSearchOps = getPersistentSearches(isCookieEntry); |
| | | if (pSearchOps.isEmpty() || !(updateMsg instanceof LDAPUpdateMsg)) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | final Entry entry = createEntryFromMsg(baseDN, changeNumber, cookieString, updateMsg); |
| | | for (SearchOperation pSearchOp : pSearchOps) |
| | | { |
| | | final EntrySender entrySender = (EntrySender) |
| | | pSearchOp.getAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL); |
| | | |
| | | // when returning changesOnly, the first incoming update must return |
| | | // the base entry before any other changes, |
| | | // so force sending now, when protected by the synchronized block |
| | | if (isCookieEntry) |
| | | { // cookie based search |
| | | final String cookieStr; |
| | | synchronized (entrySender) |
| | | { // forbid concurrent updates to the cookie |
| | | entrySender.cookie.update(baseDN, updateMsg.getCSN()); |
| | | cookieStr = entrySender.cookie.toString(); |
| | | |
| | | entrySender.sendBaseChangelogEntry(true); |
| | | } |
| | | final Entry entry2 = createEntryFromMsg(baseDN, changeNumber, cookieStr, updateMsg); |
| | | // FIXME JNR use this instead of previous line: |
| | | // entry.replaceAttribute(Attributes.create("changelogcookie", cookieStr)); |
| | | entrySender.sendEntryIfMatches(entry2, cookieStr); |
| | | } |
| | | else |
| | | { // draft changeNumber search |
| | | if (!entrySender.hasReturnedBaseEntry.get()) |
| | | { |
| | | synchronized (entrySender) |
| | | { |
| | | entrySender.sendBaseChangelogEntry(true); |
| | | } |
| | | } |
| | | entrySender.sendEntryIfMatches(entry, null); |
| | | } |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | throw new ChangelogException(e.getMessageObject(), e); |
| | | } |
| | | } |
| | | |
| | | private List<SearchOperation> getPersistentSearches(boolean wantCookieBasedSearch) |
| | | { |
| | | final List<SearchOperation> results = new ArrayList<SearchOperation>(); |
| | | for (PersistentSearch pSearch : getPersistentSearches()) |
| | | { |
| | | final SearchOperation op = pSearch.getSearchOperation(); |
| | | if (wantCookieBasedSearch == isCookieBased(op)) |
| | | { |
| | | results.add(op); |
| | | } |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | private boolean isCookieBased(final SearchOperation searchOp) |
| | | { |
| | | for (Control c : searchOp.getRequestControls()) |
| | | { |
| | | if (OID_ECL_COOKIE_EXCHANGE_CONTROL.equals(c.getOID())) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void addEntry(Entry entry, AddOperation addOperation) |
| | |
| | | |
| | | private SearchParams buildSearchParameters(final SearchOperation searchOperation) throws DirectoryException |
| | | { |
| | | final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); |
| | | excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT); |
| | | final SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains); |
| | | final SearchParams params = new SearchParams(getExcludedDomains()); |
| | | final ExternalChangelogRequestControl eclRequestControl = |
| | | searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER); |
| | | if (eclRequestControl == null) |
| | |
| | | else |
| | | { |
| | | params.requestType = REQUEST_TYPE_FROM_COOKIE; |
| | | params.multiDomainServerState = eclRequestControl.getCookie(); |
| | | params.cookie = eclRequestControl.getCookie(); |
| | | } |
| | | return params; |
| | | } |
| | |
| | | { |
| | | try |
| | | { |
| | | return numSubordinates(baseChangelogDN, true) + 1; |
| | | return numSubordinates(CHANGELOG_BASE_DN, true) + 1; |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | |
| | | static class SearchParams |
| | | { |
| | | private ECLRequestType requestType; |
| | | private final String operationId; |
| | | private final Set<String> excludedBaseDNs; |
| | | private long lowestChangeNumber = -1; |
| | | private long highestChangeNumber = -1; |
| | | private CSN csn = new CSN(0, 0, 0); |
| | | private MultiDomainServerState multiDomainServerState; |
| | | private MultiDomainServerState cookie; |
| | | |
| | | /** |
| | | * Creates search parameters. |
| | | */ |
| | | SearchParams() |
| | | { |
| | | operationId = ""; |
| | | excludedBaseDNs = Collections.emptySet(); |
| | | this.excludedBaseDNs = Collections.emptySet(); |
| | | } |
| | | |
| | | /** |
| | | * Creates search parameters with provided id and excluded domain DNs. |
| | | * |
| | | * @param operationId |
| | | * The id of the operation. |
| | | * @param excludedBaseDNs |
| | | * Set of DNs to exclude from search. |
| | | */ |
| | | SearchParams(final String operationId, final Set<String> excludedBaseDNs) |
| | | SearchParams(final Set<String> excludedBaseDNs) |
| | | { |
| | | this.operationId = operationId; |
| | | this.excludedBaseDNs = excludedBaseDNs; |
| | | } |
| | | |
| | |
| | | private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation) |
| | | throws DirectoryException, ChangelogException |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB(); |
| | | validateProvidedCookie(searchParams); |
| | | final boolean isPersistentSearch = isPersistentSearch(searchOperation); |
| | | |
| | | boolean hasReturnedBaseEntry = false; |
| | | final EntrySender entrySender = new EntrySender(searchOperation, searchParams.cookie); |
| | | if (isPersistentSearch) |
| | | { |
| | | searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender); |
| | | } |
| | | |
| | | ECLMultiDomainDBCursor replicaUpdatesCursor = null; |
| | | try |
| | | { |
| | | final ReplicationDomainDB replicationDomainDB = getChangelogDB().getReplicationDomainDB(); |
| | | final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( |
| | | searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | searchParams.cookie, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); |
| | | replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); |
| | | |
| | | MultiDomainServerState cookie = searchParams.multiDomainServerState; |
| | | boolean continueSearch = true; |
| | | while (continueSearch && replicaUpdatesCursor.next()) |
| | | { |
| | | // Handle creation of base changelog entry on first update message found |
| | | if (!hasReturnedBaseEntry) |
| | | { |
| | | if (!returnBaseChangelogEntry(searchOperation, true)) |
| | | { |
| | | return; |
| | | } |
| | | hasReturnedBaseEntry = true; |
| | | } |
| | | // Handle the update message |
| | | final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); |
| | | final DN domainBaseDN = replicaUpdatesCursor.getData(); |
| | | cookie.update(domainBaseDN, updateMsg.getCSN()); |
| | | final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg); |
| | | if (matchBaseAndScopeAndFilter(entry, searchOperation)) |
| | | { |
| | | Control control = new EntryChangelogNotificationControl(true, cookie.toString()); |
| | | continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control)); |
| | | } |
| | | searchParams.cookie.update(domainBaseDN, updateMsg.getCSN()); |
| | | final String cookieString = searchParams.cookie.toString(); |
| | | |
| | | final Entry entry = createEntryFromMsg(domainBaseDN, 0, cookieString, updateMsg); |
| | | continueSearch = entrySender.sendEntryIfMatches(entry, cookieString); |
| | | } |
| | | // Handle creation of base changelog entry when no update message is found |
| | | if (!hasReturnedBaseEntry) |
| | | |
| | | if (!isPersistentSearch) |
| | | { |
| | | returnBaseChangelogEntry(searchOperation, false); |
| | | // send the base changelog entry if no update message is found |
| | | entrySender.sendBaseChangelogEntry(false); |
| | | } |
| | | } |
| | | finally |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isPersistentSearch(SearchOperation op) |
| | | { |
| | | for (PersistentSearch pSearch : getPersistentSearches()) |
| | | { |
| | | if (op == pSearch.getSearchOperation()) |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | return false; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void registerPersistentSearch(PersistentSearch pSearch) |
| | | { |
| | | super.registerPersistentSearch(pSearch); |
| | | |
| | | final SearchOperation searchOp = pSearch.getSearchOperation(); |
| | | if (pSearch.isChangesOnly()) |
| | | { |
| | | // this persistent search will not go through #search0() down below |
| | | // so we must initialize the cookie here |
| | | searchOp.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, |
| | | new EntrySender(searchOp, getNewestCookie(searchOp))); |
| | | } |
| | | } |
| | | |
| | | private MultiDomainServerState getNewestCookie(SearchOperation searchOp) |
| | | { |
| | | if (!isCookieBased(searchOp)) |
| | | { |
| | | return null; |
| | | } |
| | | |
| | | final MultiDomainServerState cookie = new MultiDomainServerState(); |
| | | for (final Iterator<ReplicationServerDomain> it = |
| | | replicationServer.getDomainIterator(); it.hasNext();) |
| | | { |
| | | final DN baseDN = it.next().getBaseDN(); |
| | | final ServerState state = getChangelogDB().getReplicationDomainDB().getDomainNewestCSNs(baseDN); |
| | | cookie.update(baseDN, state); |
| | | } |
| | | return cookie; |
| | | } |
| | | |
| | | /** |
| | | * Validates the cookie contained in search parameters by checking its content |
| | | * with the actual replication server state. |
| | |
| | | */ |
| | | private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException |
| | | { |
| | | final MultiDomainServerState state = searchParams.multiDomainServerState; |
| | | final MultiDomainServerState state = searchParams.cookie; |
| | | if (state != null && !state.isEmpty()) |
| | | { |
| | | replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs()); |
| | |
| | | private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation) |
| | | throws ChangelogException, DirectoryException |
| | | { |
| | | boolean hasReturnedBaseEntry = false; |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | final EntrySender entrySender = new EntrySender(searchOperation, null); |
| | | final boolean isPersistentSearch = isPersistentSearch(searchOperation); |
| | | if (isPersistentSearch) |
| | | { |
| | | searchOperation.setAttachment(OID_ECL_COOKIE_EXCHANGE_CONTROL, entrySender); |
| | | } |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null; |
| | | MultiDomainDBCursor replicaUpdatesCursor = null; |
| | | try { |
| | | cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber); |
| | | try |
| | | { |
| | | cnIndexDBCursor = getCNIndexDBCursor(params.lowestChangeNumber); |
| | | boolean continueSearch = true; |
| | | while (continueSearch && cnIndexDBCursor.next()) |
| | | { |
| | | // Handle creation of base changelog entry on cnIndex record found |
| | | if (!hasReturnedBaseEntry) |
| | | { |
| | | if (!returnBaseChangelogEntry(searchOperation, true)) |
| | | { |
| | | return; |
| | | } |
| | | hasReturnedBaseEntry = true; |
| | | } |
| | | // Handle the current cnIndex record |
| | | final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord(); |
| | | if (replicaUpdatesCursor == null) |
| | | { |
| | | replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord); |
| | | replicaUpdatesCursor = initializeReplicaUpdatesCursor(cnIndexRecord); |
| | | } |
| | | continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); |
| | | if (continueSearch) |
| | | { |
| | | UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg); |
| | | replicaUpdatesCursor.next(); |
| | | } |
| | | UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor); |
| | | if (updateMsg != null) |
| | | { |
| | | continueSearch = sendEntryForUpdateMessage(entrySender, cnIndexRecord, updateMsg); |
| | | replicaUpdatesCursor.next(); |
| | | } |
| | | } |
| | | } |
| | | // Handle creation of base changelog entry when no update message is found |
| | | if (!hasReturnedBaseEntry) |
| | | |
| | | if (!isPersistentSearch) |
| | | { |
| | | returnBaseChangelogEntry(searchOperation, false); |
| | | // send the base changelog entry if no update message is found |
| | | entrySender.sendBaseChangelogEntry(false); |
| | | } |
| | | } |
| | | finally { |
| | | finally |
| | | { |
| | | StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Create and returns the base changelog entry to provided search operation. |
| | | * |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates) |
| | | throws DirectoryException |
| | | private boolean sendEntryForUpdateMessage(EntrySender entrySender, |
| | | ChangeNumberIndexRecord cnIndexRecord, UpdateMsg updateMsg) throws DirectoryException |
| | | { |
| | | final DN baseDN = searchOperation.getBaseDN(); |
| | | final SearchFilter filter = searchOperation.getFilter(); |
| | | final SearchScope scope = searchOperation.getScope(); |
| | | |
| | | if (baseChangelogDN.matchesBaseAndScope(baseDN, scope)) |
| | | { |
| | | final Entry entry = buildBaseChangelogEntry(hasSubordinates); |
| | | if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null)) |
| | | { |
| | | // Abandon, size limit reached. |
| | | return false; |
| | | } |
| | | } |
| | | if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT)) |
| | | { |
| | | // Only the change log root entry was requested |
| | | return false; |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean returnEntryForUpdateMessage( |
| | | final SearchOperation searchOperation, |
| | | final ChangeNumberIndexRecord cnIndexRecord, |
| | | final UpdateMsg updateMsg) |
| | | throws DirectoryException |
| | | { |
| | | final DN baseDN = cnIndexRecord.getBaseDN(); |
| | | final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie()); |
| | | final DN changeDN = cnIndexRecord.getBaseDN(); |
| | | cookie.update(changeDN, cnIndexRecord.getCSN()); |
| | | final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); |
| | | if (matchBaseAndScopeAndFilter(entry, searchOperation)) |
| | | { |
| | | return searchOperation.returnEntry(entry, null); |
| | | } |
| | | return true; |
| | | cookie.update(baseDN, cnIndexRecord.getCSN()); |
| | | final String cookieString = cookie.toString(); |
| | | |
| | | final Entry entry = createEntryFromMsg(baseDN, cnIndexRecord.getChangeNumber(), cookieString, updateMsg); |
| | | return entrySender.sendEntryIfMatches(entry, null); |
| | | } |
| | | |
| | | private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB, |
| | | private MultiDomainDBCursor initializeReplicaUpdatesCursor( |
| | | final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException |
| | | { |
| | | final MultiDomainServerState state = new MultiDomainServerState(); |
| | |
| | | // No need for ECLMultiDomainDBCursor in this case |
| | | // as updateMsg will be matched with cnIndexRecord |
| | | final MultiDomainDBCursor replicaUpdatesCursor = |
| | | changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY); |
| | | getChangelogDB().getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY); |
| | | replicaUpdatesCursor.next(); |
| | | return replicaUpdatesCursor; |
| | | } |
| | |
| | | } |
| | | |
| | | /** Returns a cursor on CNIndexDB for the provided first change number. */ |
| | | private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB, |
| | | private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor( |
| | | final long firstChangeNumber) throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB(); |
| | | final ChangeNumberIndexDB cnIndexDB = getChangelogDB().getChangeNumberIndexDB(); |
| | | long changeNumberToUse = firstChangeNumber; |
| | | if (changeNumberToUse <= 1) |
| | | { |
| | |
| | | return cnIndexDB.getCursorFrom(changeNumberToUse); |
| | | } |
| | | |
| | | /** Indicates if the provided entry matches the filter, base and scope. */ |
| | | private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException |
| | | { |
| | | return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) |
| | | && searchOp.getFilter().matchesEntry(entry); |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the base changelog entry. |
| | | */ |
| | | private Entry buildBaseChangelogEntry(boolean hasSubordinates) |
| | | { |
| | | final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>(); |
| | | final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>(); |
| | | |
| | | addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs); |
| | | addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY, |
| | | ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs); |
| | | addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates), |
| | | userAttrs, operationalAttrs); |
| | | addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(), |
| | | userAttrs, operationalAttrs); |
| | | return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs); |
| | | } |
| | | |
| | | /** |
| | | * Creates a changelog entry. |
| | | */ |
| | |
| | | { |
| | | final CSN csn = msg.getCSN(); |
| | | String dnString; |
| | | if (changeNumber == 0) |
| | | { |
| | | // Cookie mode |
| | | dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT; |
| | | } |
| | | else |
| | | if (changeNumber > 0) |
| | | { |
| | | // Draft compat mode |
| | | dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT; |
| | | } |
| | | else |
| | | { |
| | | // Cookie mode |
| | | dnString = "replicationCSN=" + csn + "," + baseDN + "," + DN_EXTERNAL_CHANGELOG_ROOT; |
| | | } |
| | | |
| | | final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>(); |
| | | final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>(); |
| | |
| | | addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs); |
| | | |
| | | // REQUIRED attributes |
| | | if (changeNumber != 0) |
| | | if (changeNumber > 0) |
| | | { |
| | | addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs); |
| | | } |
| | |
| | | { |
| | | addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs); |
| | | } |
| | | addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs); |
| | | final String cookie2 = cookie != null ? cookie : ""; |
| | | addAttributeByType("changelogcookie", "changeLogCookie", cookie2, userAttrs, opAttrs); |
| | | |
| | | final List<RawAttribute> includedAttributes = msg.getEclIncludes(); |
| | | if (includedAttributes != null && !includedAttributes.isEmpty()) |
| | |
| | | return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs); |
| | | } |
| | | |
| | | /** |
| | | * Used to send entries to searches on cn=changelog. This class ensures the |
| | | * base changelog entry is sent before sending any other entry. It is also |
| | | * used as a store when going from the "initial search" phase to the |
| | | * "persistent search" phase. |
| | | */ |
| | | private static class EntrySender |
| | | { |
| | | |
| | | private final SearchOperation searchOp; |
| | | /** |
| | | * Used by the cookie-based searches to communicate the cookie between the |
| | | * initial search phase and the persistent search phase. This is unused with |
| | | * draft change number searches. |
| | | */ |
| | | private final MultiDomainServerState cookie; |
| | | private final AtomicBoolean hasReturnedBaseEntry = new AtomicBoolean(); |
| | | |
| | | public EntrySender(SearchOperation searchOp, MultiDomainServerState cookie) |
| | | { |
| | | this.searchOp = searchOp; |
| | | this.cookie = cookie; |
| | | } |
| | | |
| | | /** |
| | | * Sends the entry if it matches the base, scope and filter of the current search operation. |
| | | * It will also send the base changelog entry if it needs to be sent and was not sent before. |
| | | * |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean sendEntryIfMatches(Entry entry, String cookie) throws DirectoryException |
| | | { |
| | | // About to send one entry: ensure the base changelog entry is sent first |
| | | if (!sendBaseChangelogEntry(true)) |
| | | { |
| | | // only return the base entry: stop here |
| | | return false; |
| | | } |
| | | if (matchBaseAndScopeAndFilter(entry)) |
| | | { |
| | | return searchOp.returnEntry(entry, getControls(cookie)); |
| | | } |
| | | // maybe the next entry will match? |
| | | return true; |
| | | } |
| | | |
| | | /** Indicates if the provided entry matches the filter, base and scope. */ |
| | | private boolean matchBaseAndScopeAndFilter(Entry entry) throws DirectoryException |
| | | { |
| | | return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) |
| | | && searchOp.getFilter().matchesEntry(entry); |
| | | } |
| | | |
| | | private List<Control> getControls(String cookie) |
| | | { |
| | | if (cookie != null) |
| | | { |
| | | Control c = new EntryChangelogNotificationControl(true, cookie); |
| | | return Arrays.asList(c); |
| | | } |
| | | return Collections.emptyList(); |
| | | } |
| | | |
| | | /** |
| | | * Create and returns the base changelog entry to the underlying search operation. |
| | | * |
| | | * @return {@code true} if search should continue, {@code false} otherwise |
| | | */ |
| | | private boolean sendBaseChangelogEntry(boolean hasSubordinates) throws DirectoryException |
| | | { |
| | | if (hasReturnedBaseEntry.compareAndSet(false, true)) |
| | | { |
| | | final DN baseDN = searchOp.getBaseDN(); |
| | | final SearchFilter filter = searchOp.getFilter(); |
| | | final SearchScope scope = searchOp.getScope(); |
| | | |
| | | if (ChangelogBackend.CHANGELOG_BASE_DN.matchesBaseAndScope(baseDN, scope)) |
| | | { |
| | | final Entry entry = buildBaseChangelogEntry(hasSubordinates); |
| | | if (filter.matchesEntry(entry) && !searchOp.returnEntry(entry, null)) |
| | | { |
| | | // Abandon, size limit reached. |
| | | return false; |
| | | } |
| | | } |
| | | return !baseDN.equals(ChangelogBackend.CHANGELOG_BASE_DN) |
| | | || !scope.equals(SearchScope.BASE_OBJECT); |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private Entry buildBaseChangelogEntry(boolean hasSubordinates) |
| | | { |
| | | final Map<AttributeType, List<Attribute>> userAttrs = |
| | | new LinkedHashMap<AttributeType, List<Attribute>>(); |
| | | final Map<AttributeType, List<Attribute>> operationalAttrs = |
| | | new LinkedHashMap<AttributeType, List<Attribute>>(); |
| | | |
| | | addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, |
| | | ChangelogBackend.BACKEND_ID, userAttrs, operationalAttrs); |
| | | addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY, |
| | | ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs); |
| | | addAttributeByUppercaseName("hassubordinates", "hasSubordinates", |
| | | Boolean.toString(hasSubordinates), userAttrs, operationalAttrs); |
| | | addAttributeByUppercaseName("entrydn", "entryDN", |
| | | ServerConstants.DN_EXTERNAL_CHANGELOG_ROOT, userAttrs, operationalAttrs); |
| | | return new Entry(CHANGELOG_BASE_DN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs); |
| | | } |
| | | } |
| | | |
| | | private static void addAttribute(final Entry e, final String attrType, final String attrValue) |
| | | { |
| | | e.addAttribute(Attributes.create(attrType, attrValue), null); |
| | |
| | | addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true); |
| | | } |
| | | |
| | | private void addAttributeByUppercaseName(String attrNameLowercase, |
| | | private static void addAttributeByUppercaseName(String attrNameLowercase, |
| | | String attrNameUppercase, String attrValue, |
| | | Map<AttributeType, List<Attribute>> userAttrs, |
| | | Map<AttributeType, List<Attribute>> operationalAttrs) |
| | |
| | | { |
| | | attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase); |
| | | } |
| | | final Attribute a = addByType ? |
| | | Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue); |
| | | final Attribute a = addByType |
| | | ? Attributes.create(attrType, attrValue) |
| | | : Attributes.create(attrNameUppercase, attrValue); |
| | | final List<Attribute> attrList = Collections.singletonList(a); |
| | | if (attrType.isOperational()) |
| | | { |