opendj-sdk/opends/src/messages/messages/replication.properties
@@ -616,3 +616,7 @@ recovered by removing a partially written record NOTICE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \ perform a search request on cn=changelog SEVERE_ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \ searching base DN '%s' with filter '%s' in changelog backend : %s SEVERE_ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES_287 =An error occurred when \ retrieving number of subordinates for entry DN '%s' in changelog backend : %s opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -25,51 +25,115 @@ */ package org.opends.server.backends; import java.util.Collections; import java.util.Set; import org.opends.server.admin.Configuration; import org.opends.server.api.Backend; import org.opends.server.config.ConfigEntry; import org.opends.server.config.ConfigException; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.SearchOperation; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.AttributeType; import org.opends.server.types.BackupConfig; import org.opends.server.types.BackupDirectory; import org.opends.server.types.CanceledOperationException; import org.opends.server.types.ConditionResult; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.IndexType; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.types.RestoreConfig; import org.opends.server.types.ResultCode; import org.opends.server.util.Validator; import static org.opends.messages.BackendMessages.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.config.ConfigConstants.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; import static org.opends.server.loggers.debug.DebugLogger.getTracer; import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*; import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; import static org.opends.server.util.LDIFWriter.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; import java.text.SimpleDateFormat; import java.util.*; import org.opends.messages.Category; import org.opends.messages.Message; import org.opends.messages.Severity; import org.opends.server.admin.Configuration; import org.opends.server.api.Backend; import org.opends.server.config.ConfigConstants; import org.opends.server.config.ConfigException; import org.opends.server.controls.EntryChangelogNotificationControl; import org.opends.server.controls.ExternalChangelogRequestControl; import org.opends.server.core.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.LDAPUpdateMsg; import org.opends.server.replication.protocol.ModifyCommonMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.api.ReplicationDomainDB; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor; import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor; import org.opends.server.types.*; import org.opends.server.util.StaticUtils; /** * A backend that provides access to the changelog, ie the "cn=changelog" suffix. * It is a read-only backend. * A backend that provides access to the changelog, ie the "cn=changelog" * suffix. It is a read-only backend that is created by a * {@code ReplicationServer} and is not configurable. * <p> * There are two modes to search the changelog: * <ul> * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the * request. The cookie provided in the control is used to retrieve entries from * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with * the entries.</li> * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with * the request. The entries are retrieved using the ChangeNumberIndexDB (or * DraftDB, hence the name) and their attributes are set with the information * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set * from the content of ChangeNumberIndexDB.</li> * </ul> * * @see ReplicationServer */ public class ChangelogBackend extends Backend public class ChangelogBackend extends Backend<Configuration> { private static final DebugTracer TRACER = getTracer(); /** The id of this backend. */ public static final String BACKEND_ID = "changelog"; private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L; private static final String CHANGE_NUMBER_ATTR = "changeNumber"; private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase(); /** The set of objectclasses that will be used in root entry. */ private static final Map<ObjectClass, String> CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2); static { CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP); CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container"); } /** The set of objectclasses that will be used in ECL entries. */ private static final Map<ObjectClass, String> CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2); static { CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP); CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY); } /** The attribute type for the "creatorsName" attribute. */ private static final AttributeType CREATORS_NAME_TYPE = DirectoryConfig.getAttributeType(OP_ATTR_CREATORS_NAME_LC, true); /** The attribute type for the "modifiersName" attribute. */ private static final AttributeType MODIFIERS_NAME_TYPE = DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true); /** The DN for the base changelog entry. */ private DN baseChangelogDN; @@ -77,52 +141,61 @@ private DN[] baseDNs; /** The set of supported controls for this backend. */ private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL); private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL); /** The replication server on which the changelog is read. */ private final ReplicationServer replicationServer; private final ECLEnabledDomainPredicate domainPredicate; /** * Creates a new backend with the provided repication server. * * @param replicationServer * The replication server on which the changes are read. * @param domainPredicate * Returns whether a domain is enabled for the external changelog. */ public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate) { this.replicationServer = replicationServer; this.domainPredicate = domainPredicate; setBackendID(BACKEND_ID); setWritabilityMode(WritabilityMode.DISABLED); setPrivateBackend(true); } /** {@inheritDoc} */ @Override public void configureBackend(final Configuration cfg) throws ConfigException public void configureBackend(final Configuration config) throws ConfigException { Validator.ensureNotNull(cfg); throw new UnsupportedOperationException("The changelog backend is not configurable"); } final ConfigEntry configEntry = DirectoryServer.getConfigEntry(cfg.dn()); // Make sure that a configuration entry was provided. If not, then we will // not be able to complete initialization. if (configEntry == null) { throw new ConfigException(ERR_BACKEND_CONFIG_ENTRY_NULL.get(getBackendID())); } // Create the set of base DNs that we will handle. In this case, it's just // the DN of the base changelog entry. /** {@inheritDoc} */ @Override public void initializeBackend() throws InitializationException { try { baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT); baseDNs = new DN[] { baseChangelogDN }; } catch (final Exception e) catch (final DirectoryException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } throw new ConfigException( throw new InitializationException( ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e); } this.baseDNs = new DN[] { baseChangelogDN }; } /** {@inheritDoc} */ @Override public void initializeBackend() throws ConfigException, InitializationException { try { DirectoryServer.registerBaseDN(baseChangelogDN, this, true); } catch (final Exception e) catch (final DirectoryException e) { throw new InitializationException( ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e); @@ -137,7 +210,7 @@ { DirectoryServer.deregisterBaseDN(baseChangelogDN); } catch (final Exception e) catch (final DirectoryException e) { if (debugEnabled()) { @@ -157,26 +230,26 @@ @Override public void preloadEntryCache() throws UnsupportedOperationException { throw new RuntimeException("Not implemented"); throw new UnsupportedOperationException("Operation not supported."); } /** {@inheritDoc} */ @Override public boolean isLocal() { throw new RuntimeException("Not implemented"); return true; } /** {@inheritDoc} */ @Override public boolean isIndexed(AttributeType attributeType, IndexType indexType) public boolean isIndexed(final AttributeType attributeType, final IndexType indexType) { throw new RuntimeException("Not implemented"); return true; } /** {@inheritDoc} */ @Override public Entry getEntry(DN entryDN) throws DirectoryException public Entry getEntry(final DN entryDN) throws DirectoryException { if (entryDN == null) { @@ -188,17 +261,90 @@ /** {@inheritDoc} */ @Override public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException public ConditionResult hasSubordinates(final DN entryDN) throws DirectoryException { throw new RuntimeException("Not implemented"); final long num = numSubordinates(entryDN, false); if (num < 0) { return ConditionResult.UNDEFINED; } else if (num == 0) { return ConditionResult.FALSE; } else { return ConditionResult.TRUE; } } /** Specific search operation to count number of entries. */ private final class NumSubordinatesSearchOperation extends SearchOperationWrapper { private long numSubordinates = -1; private NumSubordinatesSearchOperation() { super(null); } @Override public boolean returnEntry(Entry entry, List<Control> controls) { numSubordinates++; return true; } @Override public DN getBaseDN() { return baseChangelogDN; } @Override public SearchFilter getFilter() { return LDAPURL.DEFAULT_SEARCH_FILTER; } @Override public SearchScope getScope() { return SearchScope.WHOLE_SUBTREE; } } /** {@inheritDoc} */ @Override public long numSubordinates(DN entryDN, boolean subtree) throws DirectoryException public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException { throw new RuntimeException("Not implemented"); // Compute the num subordinates only for the base DN if (entryDN == null || !baseChangelogDN.equals(entryDN)) { return -1; } if (!subtree) { return 1; } // Search with cookie mode to count all update messages final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT); SearchParams params = new SearchParams("0", excludedDomains); params.requestType = REQUEST_TYPE_FROM_COOKIE; params.multiDomainServerState = new MultiDomainServerState(); NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation(); try { search0(params, searchOp); } catch (ChangelogException e) { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get( baseChangelogDN.toString(), stackTraceToSingleLineString(e))); } return searchOp.numSubordinates; } /** {@inheritDoc} */ @@ -241,10 +387,35 @@ /** {@inheritDoc} */ @Override public void search(SearchOperation searchOperation) throws DirectoryException, CanceledOperationException public void search(final SearchOperation searchOperation) throws DirectoryException { throw new RuntimeException("Not implemented"); final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains(); excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT); SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains); final ExternalChangelogRequestControl eclRequestControl = searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER); if (eclRequestControl == null) { params.requestType = REQUEST_TYPE_FROM_CHANGE_NUMBER; } else { params.requestType = REQUEST_TYPE_FROM_COOKIE; params.multiDomainServerState = eclRequestControl.getCookie(); } optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter()); try { search0(params, searchOperation); } catch (ChangelogException e) { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get( searchOperation.getBaseDN().toString(), searchOperation.getFilter().toString(), stackTraceToSingleLineString(e))); } } /** {@inheritDoc} */ @@ -270,7 +441,7 @@ /** {@inheritDoc} */ @Override public void exportLDIF(LDIFExportConfig exportConfig) public void exportLDIF(final LDIFExportConfig exportConfig) throws DirectoryException { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, @@ -297,15 +468,14 @@ @Override public boolean supportsBackup() { throw new RuntimeException("Not implemented"); return false; } /** {@inheritDoc} */ @Override public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason) public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason) { throw new RuntimeException("Not implemented"); return false; } /** {@inheritDoc} */ @@ -318,34 +488,843 @@ /** {@inheritDoc} */ @Override public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); } /** {@inheritDoc} */ @Override public boolean supportsRestore() { throw new RuntimeException("Not implemented"); return false; } /** {@inheritDoc} */ @Override public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException { throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID())); } /** {@inheritDoc} */ @Override public long getEntryCount() { throw new RuntimeException("Not implemented"); try { return numSubordinates(baseChangelogDN, true) + 1; } catch (DirectoryException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } return -1; } } /** * Represent the search parameters specific to the changelog. * * This class should be visible for tests. */ static class SearchParams { private ECLRequestType requestType; private final String operationId; private final Set<String> excludedBaseDNs; private long lowestChangeNumber = -1; private long highestChangeNumber = -1; private CSN csn = new CSN(0, 0, 0); private MultiDomainServerState multiDomainServerState; /** * Creates search parameters. */ SearchParams() { operationId = ""; excludedBaseDNs = Collections.emptySet(); } /** * Creates search parameters with provided id and excluded domain DNs. * * @param operationId * The id of the operation. * @param excludedBaseDNs * Set of DNs to exclude from search. */ SearchParams(final String operationId, final Set<String> excludedBaseDNs) { this.operationId = operationId; this.excludedBaseDNs = excludedBaseDNs; } /** * Indicates if provided change number is compatible with last change * number. * * @param changeNumber * The change number to test. * @return {@code true} if and only if the provided change number is in the * range of the last change number. */ boolean changeNumberIsInRange(long changeNumber) { return highestChangeNumber == -1 || changeNumber <= highestChangeNumber; } /** * Returns the lowest change number to retrieve (inclusive). * * @return the lowest change number */ long getLowestChangeNumber() { return lowestChangeNumber; } /** * Returns the highest change number to retrieve (inclusive). * * @return the highest change number */ long getHighestChangeNumber() { return highestChangeNumber; } /** * Returns the CSN to retrieve. * * @return the CSN, which may be the default CSN with zero values. */ CSN getCSN() { return csn; } /** * Returns the set of DNs to exclude from the search. * * @return the DNs corresponding to domains to exclude from the search. * @throws DirectoryException * If a DN can't be decoded. */ Set<DN> getExcludedBaseDNs() throws DirectoryException { final Set<DN> excludedDNs = new HashSet<DN>(); for (String dn : excludedBaseDNs) { excludedDNs.add(DN.decode(dn)); } return excludedDNs; } } /** * Optimize the search parameters by analyzing the DN and filter. * Populate the provided SearchParams with optimizations found. * * @param params the search parameters that are specific to external changelog * @param baseDN the provided search baseDN. * @param userFilter the provided search filter. * @throws DirectoryException when an exception occurs. */ void optimizeSearchParameters(final SearchParams params, final DN baseDN, final SearchFilter userFilter) throws DirectoryException { SearchFilter equalityFilter = null; switch (baseDN.getNumComponents()) { case 1: // "cn=changelog" : use user-provided search filter. break; case 2: // It is probably "changeNumber=xxx,cn=changelog", use equality filter // But it also could be "<service-id>,cn=changelog" so need to check on attribute equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR); break; default: // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN"); break; } final SearchParams optimized = optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter); params.lowestChangeNumber = optimized.lowestChangeNumber; params.highestChangeNumber = optimized.highestChangeNumber; params.csn = optimized.csn; } /** * Build a search filter from given DN and attribute. * * @return the search filter or {@code null} if attribute is not present in * the provided DN */ private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr) { final RDN rdn = baseDN.getRDN(); AttributeType attrType = DirectoryServer.getAttributeType(lowerCaseAttr); if (attrType == null) { attrType = DirectoryServer.getDefaultAttributeType(upperCaseAttr); } final AttributeValue attrValue = rdn.getAttributeValue(attrType); if (attrValue != null) { return SearchFilter.createEqualityFilter(attrType, attrValue); } return null; } private SearchParams optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException { final SearchParams params = new SearchParams(); if (filter == null) { return params; } if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR)) { params.lowestChangeNumber = decodeChangeNumber(filter.getAssertionValue()); } else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR)) { params.highestChangeNumber = decodeChangeNumber(filter.getAssertionValue()); } else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR)) { final long number = decodeChangeNumber(filter.getAssertionValue()); params.lowestChangeNumber = number; params.highestChangeNumber = number; } else if (matches(filter, FilterType.EQUALITY, "replicationcsn")) { // == exact CSN params.csn = new CSN(filter.getAssertionValue().toString()); } else if (filter.getFilterType() == FilterType.AND) { // TODO: it looks like it could be generalized to N components, not only two final Collection<SearchFilter> components = filter.getFilterComponents(); final SearchFilter filters[] = components.toArray(new SearchFilter[0]); long last1 = -1; long first1 = -1; long last2 = -1; long first2 = -1; if (filters.length > 0) { SearchParams msg1 = optimizeSearchUsingFilter(filters[0]); last1 = msg1.highestChangeNumber; first1 = msg1.lowestChangeNumber; } if (filters.length > 1) { SearchParams msg2 = optimizeSearchUsingFilter(filters[1]); last2 = msg2.highestChangeNumber; first2 = msg2.lowestChangeNumber; } if (last1 == -1) { params.highestChangeNumber = last2; } else if (last2 == -1) { params.highestChangeNumber = last1; } else { params.highestChangeNumber = Math.min(last1, last2); } params.lowestChangeNumber = Math.max(first1, first2); } return params; } private static long decodeChangeNumber(final AttributeValue assertionValue) throws DirectoryException { try { return Long.decode(assertionValue.getNormalizedValue().toString()); } catch (NumberFormatException e) { throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX, Message.raw("Could not convert value '%s' to long", assertionValue.getNormalizedValue().toString())); } } private boolean matches(SearchFilter filter, FilterType filterType, String primaryName) { return filter.getFilterType() == filterType && filter.getAttributeType() != null && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName); } private void search0(final SearchParams searchParams, final SearchOperation searchOperation) throws DirectoryException, ChangelogException { switch (searchParams.requestType) { case REQUEST_TYPE_FROM_CHANGE_NUMBER: searchFromChangeNumber(searchParams, searchOperation); break; case REQUEST_TYPE_FROM_COOKIE: searchFromCookie(searchParams, searchOperation); break; default: // not handled } } /** * Search the changelog when a cookie control is provided. */ private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation) throws DirectoryException, ChangelogException { final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB(); validateProvidedCookie(searchParams); boolean hasReturnedBaseEntry = false; ECLMultiDomainDBCursor replicaUpdatesCursor = null; try { final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom( searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs()); replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor); MultiDomainServerState cookie = searchParams.multiDomainServerState; boolean continueSearch = true; while (continueSearch && replicaUpdatesCursor.next()) { // Handle creation of base changelog entry on first update message found if (!hasReturnedBaseEntry) { if (!returnBaseChangelogEntry(searchOperation, true)) { return; } hasReturnedBaseEntry = true; } // Handle the update message final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); final DN domainBaseDN = replicaUpdatesCursor.getData(); cookie.update(domainBaseDN, updateMsg.getCSN()); final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg); if (matchBaseAndScopeAndFilter(entry, searchOperation)) { Control control = new EntryChangelogNotificationControl(true, cookie.toString()); continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control)); } } // Handle creation of base changelog entry when no update message is found if (!hasReturnedBaseEntry) { returnBaseChangelogEntry(searchOperation, false); } } finally { StaticUtils.close(replicaUpdatesCursor); } } /** * Validates the cookie contained in search parameters by checking its content * with the actual replication server state. * * @throws DirectoryException * If the state is not valid */ private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException { final MultiDomainServerState state = searchParams.multiDomainServerState; if (state != null && !state.isEmpty()) { replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs()); } } /** * Search the changelog using change number(s). */ private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation) throws ChangelogException, DirectoryException { boolean hasReturnedBaseEntry = false; final ChangelogDB changelogDB = replicationServer.getChangelogDB(); DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null; MultiDomainDBCursor replicaUpdatesCursor = null; try { cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber); boolean continueSearch = true; while (continueSearch && cnIndexDBCursor.next()) { // Handle creation of base changelog entry on cnIndex record found if (!hasReturnedBaseEntry) { if (!returnBaseChangelogEntry(searchOperation, true)) { return; } hasReturnedBaseEntry = true; } // Handle the current cnIndex record final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord(); if (replicaUpdatesCursor == null) { replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord); } continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber()); if (continueSearch) { UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor); if (updateMsg != null) { continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg); replicaUpdatesCursor.next(); } } } // Handle creation of base changelog entry when no update message is found if (!hasReturnedBaseEntry) { returnBaseChangelogEntry(searchOperation, false); } } finally { StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor); } } /** * Create and returns the base changelog entry to provided search operation. * * @return {@code true} if search should continue, {@code false} otherwise */ private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates) throws DirectoryException { final DN baseDN = searchOperation.getBaseDN(); final SearchFilter filter = searchOperation.getFilter(); final SearchScope scope = searchOperation.getScope(); if (baseChangelogDN.matchesBaseAndScope(baseDN, scope)) { final Entry entry = buildBaseChangelogEntry(hasSubordinates); if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null)) { // Abandon, size limit reached. return false; } } if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT)) { // Only the change log root entry was requested return false; } return true; } /** * @return {@code true} if search should continue, {@code false} otherwise */ private boolean returnEntryForUpdateMessage( final SearchOperation searchOperation, final ChangeNumberIndexRecord cnIndexRecord, final UpdateMsg updateMsg) throws DirectoryException { final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie()); final DN changeDN = cnIndexRecord.getBaseDN(); cookie.update(changeDN, cnIndexRecord.getCSN()); final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg); if (matchBaseAndScopeAndFilter(entry, searchOperation)) { return searchOperation.returnEntry(entry, null); } return true; } private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB, final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException { final MultiDomainServerState state = new MultiDomainServerState(); state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN()); // No need for ECLMultiDomainDBCursor in this case // as updateMsg will be matched with cnIndexRecord final MultiDomainDBCursor replicaUpdatesCursor = changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY); replicaUpdatesCursor.next(); return replicaUpdatesCursor; } /** * Returns the replica update message corresponding to the provided * cnIndexRecord. * * @return the update message, which may be {@code null} if the update message * could not be found because it was purged or because corresponding * baseDN was removed from the changelog * @throws DirectoryException * If inconsistency is detected between the available update * messages and the provided cnIndexRecord */ private UpdateMsg findReplicaUpdateMessage( final ChangeNumberIndexRecord cnIndexRecord, final MultiDomainDBCursor replicaUpdatesCursor) throws DirectoryException, ChangelogException { while (true) { final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord(); final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN()); if (compareIndexWithUpdateMsg < 0) { // Either update message has been purged or baseDN has been removed from changelogDB, // ignore current index record and go to the next one return null; } else if (compareIndexWithUpdateMsg == 0) { // Found the matching update message return updateMsg; } // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet if (!replicaUpdatesCursor.next()) { // Should never happen, as it means some messages have disappeared // TODO : put the correct I18N message throw new DirectoryException(ResultCode.OPERATIONS_ERROR, Message.raw("Could not find replica update message matching index record. " + "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist.")); } } } /** Returns a cursor on CNIndexDB for the provided first change number. */ private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB, final long firstChangeNumber) throws ChangelogException { final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB(); long changeNumberToUse = firstChangeNumber; if (changeNumberToUse <= 1) { final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord(); changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber(); } return cnIndexDB.getCursorFrom(changeNumberToUse); } /** Indicates if the provided entry matches the filter, base and scope. */ private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException { return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope()) && searchOp.getFilter().matchesEntry(entry); } /** * Retrieves the base changelog entry. */ private Entry buildBaseChangelogEntry(boolean hasSubordinates) { final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>(); final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>(); addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs); addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY, ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs); addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates), userAttrs, operationalAttrs); addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(), userAttrs, operationalAttrs); return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs); } /** * Creates a changelog entry. */ private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) throws DirectoryException { if (msg instanceof AddMsg) { return createAddMsg(baseDN, changeNumber, cookie, msg); } else if (msg instanceof ModifyCommonMsg) { return createModifyMsg(baseDN, changeNumber, cookie, msg); } else if (msg instanceof DeleteMsg) { final DeleteMsg delMsg = (DeleteMsg) msg; return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName()); } throw new DirectoryException(ResultCode.OPERATIONS_ERROR, Message.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN.toString(), msg.getClass().toString())); } /** * Creates an entry from an add message. * <p> * Map addMsg to an LDIF string for the 'changes' attribute, and pull out * change initiators name if available which is contained in the creatorsName * attribute. */ private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) throws DirectoryException { final AddMsg addMsg = (AddMsg) msg; String changeInitiatorsName = null; String ldifChanges = null; try { final StringBuilder builder = new StringBuilder(256); for (Attribute attr : addMsg.getAttributes()) { if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty()) { // This attribute is not multi-valued. changeInitiatorsName = attr.iterator().next().toString(); } final String attrName = attr.getNameWithOptions(); for (AttributeValue value : attr) { builder.append(attrName); appendLDIFSeparatorAndValue(builder, value.getValue()); builder.append('\n'); } } ldifChanges = builder.toString(); } catch (Exception e) { logEncodingMessageError("add", addMsg.getDN(), e); } return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName); } /** * Creates an entry from a modify message. * <p> * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull * out change initiators name if available which is contained in the * modifiersName attribute. */ private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg) throws DirectoryException { final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg; String changeInitiatorsName = null; String ldifChanges = null; try { final StringBuilder builder = new StringBuilder(128); for (Modification mod : modifyMsg.getMods()) { final Attribute attr = mod.getAttribute(); if (mod.getModificationType() == ModificationType.REPLACE && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE) && !attr.isEmpty()) { // This attribute is not multi-valued. changeInitiatorsName = attr.iterator().next().toString(); } final String attrName = attr.getNameWithOptions(); builder.append(mod.getModificationType().getLDIFName()); builder.append(": "); builder.append(attrName); builder.append('\n'); for (AttributeValue value : attr) { builder.append(attrName); appendLDIFSeparatorAndValue(builder, value.getValue()); builder.append('\n'); } builder.append("-\n"); } ldifChanges = builder.toString(); } catch (Exception e) { logEncodingMessageError("modify", modifyMsg.getDN(), e); } final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg; final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges, isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName); if (isModifyDNMsg) { final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg; addAttribute(entry, "newrdn", modDNMsg.getNewRDN()); if (modDNMsg.getNewSuperior() != null) { addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior()); } addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn())); } return entry; } /** * Log an encoding message error. * * @param messageType * String identifying type of message. Should be "add" or "modify". * @param entryDN * DN of original entry */ private void logEncodingMessageError(String messageType, DN entryDN, Exception exception) { TRACER.debugCaught(DebugLogLevel.ERROR, exception); logError(Message.raw(Category.SYNC, Severity.MILD_ERROR, "An exception was encountered while trying to encode a replication " + messageType + " message for entry \"" + entryDN + "\" into an External Change Log entry: " + exception.getMessage())); } /** * Create a changelog entry from a set of provided information. This is the part of * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN). */ private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie, final LDAPUpdateMsg msg, final String ldifChanges, final String changeType, final String changeInitiatorsName) throws DirectoryException { final CSN csn = msg.getCSN(); String dnString; if (changeNumber == 0) { // Cookie mode dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT; } else { // Draft compat mode dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT; } final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>(); final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>(); // Operational standard attributes addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC, ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs); addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs); addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs); addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs); // REQUIRED attributes if (changeNumber != 0) { addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs); } SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME); dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ?? final String format = dateFormat.format(new Date(csn.getTime())); addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs); addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs); addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs); // NON REQUESTED attributes addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs); addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()), userAttrs, opAttrs); if (ldifChanges != null) { addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs); } if (changeInitiatorsName != null) { addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs); } final String targetUUID = msg.getEntryUUID(); if (targetUUID != null) { addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs); } addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs); final List<RawAttribute> includedAttributes = msg.getEclIncludes(); if (includedAttributes != null && !includedAttributes.isEmpty()) { final StringBuilder builder = new StringBuilder(256); for (final RawAttribute includedAttribute : includedAttributes) { final String name = includedAttribute.getAttributeType(); for (final ByteString value : includedAttribute.getValues()) { builder.append(name); appendLDIFSeparatorAndValue(builder, value); builder.append('\n'); } } final String includedAttributesLDIF = builder.toString(); addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs); } return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs); } private static void addAttribute(final Entry e, final String attrType, final String attrValue) { e.addAttribute(Attributes.create(attrType, attrValue), null); } private static void addAttributeByType(String attrNameLowercase, String attrNameUppercase, String attrValue, Map<AttributeType, List<Attribute>> userAttrs, Map<AttributeType, List<Attribute>> operationalAttrs) { addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true); } private void addAttributeByUppercaseName(String attrNameLowercase, String attrNameUppercase, String attrValue, Map<AttributeType, List<Attribute>> userAttrs, Map<AttributeType, List<Attribute>> operationalAttrs) { addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false); } private static void addAttribute(final String attrNameLowercase, final String attrNameUppercase, final String attrValue, final Map<AttributeType, List<Attribute>> userAttrs, final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType) { AttributeType attrType = DirectoryServer.getAttributeType(attrNameLowercase); if (attrType == null) { attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase); } final Attribute a = addByType ? Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue); final List<Attribute> attrList = Collections.singletonList(a); if (attrType.isOperational()) { operationalAttrs.put(attrType, attrList); } else { userAttrs.put(attrType, attrList); } } } opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@ import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; import org.opends.server.api.VirtualAttributeProvider; import org.opends.server.backends.ChangelogBackend; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.core.WorkflowImpl; @@ -56,6 +57,7 @@ import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.file.FileChangelogDB; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.replication.server.changelog.je.JEChangelogDB; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.*; @@ -63,6 +65,7 @@ import org.opends.server.util.StaticUtils; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import static org.opends.messages.ConfigMessages.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; @@ -95,11 +98,19 @@ private final Map<DN, ReplicationServerDomain> baseDNs = new HashMap<DN, ReplicationServerDomain>(); /** The database storing the changes. */ private final ChangelogDB changelogDB; /** The backend that allow to search the changes (external changelog). */ private ChangelogBackend changelogBackend; private final AtomicBoolean shutdown = new AtomicBoolean(); private boolean stopListen = false; private final ReplSessionSecurity replSessionSecurity; /** To know whether a domain is enabled for the external changelog. */ private final ECLEnabledDomainPredicate domainPredicate; /** The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); @@ -136,26 +147,41 @@ */ public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException { this(cfg, new DSRSShutdownSync()); this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()); } /** * Creates a new Replication server using the provided configuration entry. * Creates a new Replication server using the provided configuration entry and shutdown * synchronization object. * * @param cfg The configuration of this replication server. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException { this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate()); } /** * Creates a new Replication server using the provided configuration entry, shutdown * synchronization object and domain predicate. * * @param cfg The configuration of this replication server. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @param predicate Indicates whether a domain is enabled for the external changelog. * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync, final ECLEnabledDomainPredicate predicate) throws ConfigException { this.config = cfg; this.dsrsShutdownSync = dsrsShutdownSync; this.domainPredicate = predicate; ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation(); if (DebugLogger.debugEnabled()) { TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB"); TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB"); } this.changelogDB = dbImpl == ReplicationDBImplementation.JE ? new JEChangelogDB(this, cfg) @@ -165,6 +191,9 @@ initialize(); cfg.addChangeListener(this); // TODO : uncomment to branch changelog backend //enableExternalChangeLog(); localPorts.add(getReplicationPort()); // Keep track of this new instance @@ -501,6 +530,57 @@ registerVirtualAttributeRules(); } /** * Enable the external changelog if it is not already enabled. * <p> * The external changelog is provided by the changelog backend. * * @throws ConfigException * If an error occurs. */ private void enableExternalChangeLog() throws ConfigException { if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID)) { // Backend has already been created and initialized // This can occurs in tests return; } try { changelogBackend = new ChangelogBackend(this, domainPredicate); changelogBackend.initializeBackend(); try { DirectoryServer.registerBackend(changelogBackend); } catch (Exception e) { logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(), getExceptionMessage(e))); } registerVirtualAttributeRules(); } catch (Exception e) { // TODO : I18N with correct message + what kind of exception should we really throw ? // (Directory/Initialization/Config Exception) throw new ConfigException(Message.raw("Error when enabling external changelog"), e); } } private void shutdownExternalChangelog() { if (changelogBackend != null) { DirectoryServer.deregisterBackend(changelogBackend); changelogBackend.finalizeBackend(); changelogBackend = null; } deregisterVirtualAttributeRules(); } private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException { final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>(); @@ -609,6 +689,64 @@ return getReplicationServerDomain(baseDN, false); } /** Returns the replicated domain DNs minus the provided set of excluded DNs. */ private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException { Set<DN> domains = null; synchronized (baseDNs) { domains = new HashSet<DN>(baseDNs.keySet()); } domains.removeAll(excludedBaseDNs); return domains; } /** * Validate that provided state is coherent with this replication server, * when ignoring the provided set of DNs. * <p> * The state is coherent if and only if it exactly has the set of DNs corresponding to * the replication domains. * * @param state * The multi domain state (cookie) to validate. * @param ignoredBaseDNs * The set of DNs to ignore when validating * @throws DirectoryException * If the state is not valid */ public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException { // TODO : should skip unused domains, where domain.getLatestServerState(); is empty final Set<DN> domains = getDomainDNs(ignoredBaseDNs); final Set<DN> stateDomains = state.getSnapshot().keySet(); final Set<DN> domainsCopy = new HashSet<DN>(domains); final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains); domainsCopy.removeAll(stateDomains); if (!domainsCopy.isEmpty()) { final StringBuilder missingDomains = new StringBuilder(); for (DN dn : domainsCopy) { missingDomains.append(dn).append(":;"); } throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get( missingDomains, "<" + state.toString() + missingDomains + ">")); } stateDomainsCopy.removeAll(domains); if (!stateDomainsCopy.isEmpty()) { final StringBuilder startState = new StringBuilder(); for (DN dn : domains) { startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";"); } throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( stateDomainsCopy.toString(), startState)); } } /** * Get the ReplicationServerDomain associated to the base DN given in * parameter. @@ -706,7 +844,9 @@ domain.shutdown(); } // TODO : switch to second method when changelog backend is branched shutdownECL(); //shutdownExternalChangelog(); try { opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -25,6 +25,8 @@ */ package org.opends.server.replication.server.changelog.api; import java.util.Set; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; @@ -115,6 +117,32 @@ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy) throws ChangelogException; /** * Generates a {@link DBCursor} across all the domains starting at or after * the provided {@link MultiDomainServerState} for each domain, excluding a * provided set of domain DNs. * <p> * When the cursor is not used anymore, client code MUST call the * {@link DBCursor#close()} method to free the resources and locks used by the * cursor. * * @param startState * Starting point for each domain cursor. If any {@link ServerState} * for a domain is null, then start from the oldest CSN for each * replicaDBs * @param positionStrategy * Cursor position strategy, which allow to indicates at which exact * position the cursor must start * @param excludedDomainDns * Every domain appearing in this set is excluded from the cursor * @return a non null {@link DBCursor} * @throws ChangelogException * If a database problem happened * @see #getCursorFrom(DN, ServerState, PositionStrategy) */ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException; // serverId methods /** opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -661,11 +661,24 @@ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final PositionStrategy positionStrategy) throws ChangelogException { final Set<DN> excludedDomainDns = Collections.emptySet(); return getCursorFrom(startState, positionStrategy, excludedDomainDns); } /** {@inheritDoc} */ @Override public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException { final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); registeredMultiDomainCursors.add(cursor); for (DN baseDN : domainToReplicaDBs.keySet()) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); if (!excludedDomainDns.contains(baseDN)) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); } } return cursor; } opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
@@ -32,7 +32,7 @@ * * @FunctionalInterface */ class ECLEnabledDomainPredicate public class ECLEnabledDomainPredicate { /** opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -33,7 +33,7 @@ * Multi domain DB cursor that only returns updates for the domains which have * been enabled for the external changelog. */ class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> { private final ECLEnabledDomainPredicate predicate; opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -712,11 +712,22 @@ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final PositionStrategy positionStrategy) throws ChangelogException { final Set<DN> excludedDomainDns = Collections.emptySet(); return getCursorFrom(startState, positionStrategy, excludedDomainDns); } /** {@inheritDoc} */ @Override public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException { final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); registeredMultiDomainCursors.add(cursor); for (DN baseDN : domainToReplicaDBs.keySet()) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); if (!excludedDomainDns.contains(baseDN)) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); } } return cursor; } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
New file @@ -0,0 +1,968 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2014 ForgeRock AS. */ package org.opends.server.backends; import static org.assertj.core.api.Assertions.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.TestCaseUtils.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.replication.protocol.OperationContext.*; import static org.opends.server.types.ResultCode.*; import static org.opends.server.util.ServerConstants.*; import static org.testng.Assert.*; import java.io.ByteArrayOutputStream; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; import java.util.List; import java.util.Set; import java.util.SortedSet; import org.opends.server.TestCaseUtils; import org.opends.server.backends.ChangelogBackend.SearchParams; import org.opends.server.controls.ExternalChangelogRequestControl; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyDNOperationBasis; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.protocols.internal.InternalClientConnection; import org.opends.server.protocols.internal.InternalSearchListener; import org.opends.server.protocols.internal.InternalSearchOperation; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.plugin.DomainFakeCfg; import org.opends.server.replication.plugin.ExternalChangelogDomainFakeCfg; import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.replication.plugin.MultimasterReplication; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.DeleteMsg; import org.opends.server.replication.protocol.ModifyDNMsg; import org.opends.server.replication.protocol.ModifyDnContext; import org.opends.server.replication.protocol.ModifyMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.types.Attribute; import org.opends.server.types.AttributeValue; import org.opends.server.types.Attributes; import org.opends.server.types.AuthenticationInfo; import org.opends.server.types.Control; import org.opends.server.types.DN; import org.opends.server.types.DereferencePolicy; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.Modification; import org.opends.server.types.ModificationType; import org.opends.server.types.Operation; import org.opends.server.types.RDN; import org.opends.server.types.ResultCode; import org.opends.server.types.SearchFilter; import org.opends.server.types.SearchResultEntry; import org.opends.server.types.SearchScope; import org.opends.server.util.LDIFWriter; import org.opends.server.util.TimeThread; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import com.forgerock.opendj.util.Pair; @SuppressWarnings("javadoc") public class ChangelogBackendTestCase extends ReplicationTestCase { private static final DebugTracer TRACER = getTracer(); private static final String USER1_ENTRY_UUID = "11111111-1111-1111-1111-111111111111"; private static final long CHANGENUMBER_ZERO = 0L; private static final int SERVER_ID_1 = 1201; private static final int SERVER_ID_2 = 1202; private static final String TEST_ROOT_DN_STRING2 = "o=test2"; private static DN ROOT_DN_OTEST; private static DN ROOT_DN_OTEST2; private final int brokerSessionTimeout = 5000; private final int maxWindow = 100; /** The replicationServer that will be used in this test. */ private ReplicationServer replicationServer; /** The port of the replicationServer. */ private int replicationServerPort; /** * When used in a search operation, it includes all attributes (user and * operational) */ private static final Set<String> ALL_ATTRIBUTES = newSet("*", "+"); private static final List<Control> NO_CONTROL = null; @BeforeClass @Override public void setUp() throws Exception { super.setUp(); ROOT_DN_OTEST = DN.decode(TEST_ROOT_DN_STRING); ROOT_DN_OTEST2 = DN.decode(TEST_ROOT_DN_STRING2); // This test suite depends on having the schema available. configureReplicationServer(); } @Override @AfterClass public void classCleanUp() throws Exception { callParanoiaCheck = false; super.classCleanUp(); remove(replicationServer); replicationServer = null; paranoiaCheck(); } @AfterMethod public void clearReplicationDb() throws Exception { clearChangelogDB(replicationServer); } /** Configure a replicationServer for test. */ private void configureReplicationServer() throws Exception { replicationServerPort = TestCaseUtils.findFreePort(); ReplServerFakeConfiguration config = new ReplServerFakeConfiguration( replicationServerPort, "ChangelogBackendTestDB", replicationDbImplementation, 0, // purge delay 71, // server id 0, // queue size maxWindow, // window size null // servers ); config.setComputeChangeNumber(true); replicationServer = new ReplicationServer(config, new DSRSShutdownSync(), new ECLEnabledDomainPredicate() { @Override public boolean isECLEnabledDomain(DN baseDN) { return baseDN.equals(ROOT_DN_OTEST); } }); debugInfo("configure", "ReplicationServer created:" + replicationServer); } /** Enable replication on provided domain DN and serverid, using provided port. */ private Pair<ReplicationBroker, LDAPReplicationDomain> enableReplication(DN domainDN, int serverId, int replicationPort, int timeout) throws Exception { ReplicationBroker broker = openReplicationSession(domainDN, serverId, 100, replicationPort, timeout); DomainFakeCfg domainConf = newFakeCfg(domainDN, serverId, replicationPort); LDAPReplicationDomain replicationDomain = startNewReplicationDomain(domainConf, null, null); return Pair.of(broker, replicationDomain); } /** Start a new replication domain on the directory server side. */ private LDAPReplicationDomain startNewReplicationDomain( DomainFakeCfg domainConf, SortedSet<String> eclInclude, SortedSet<String> eclIncludeForDeletes) throws Exception { domainConf.setExternalChangelogDomain(new ExternalChangelogDomainFakeCfg(true, eclInclude, eclIncludeForDeletes)); // Set a Changetime heartbeat interval low enough // (less than default value that is 1000 ms) // for the test to be sure to consider all changes as eligible. domainConf.setChangetimeHeartbeatInterval(10); LDAPReplicationDomain newDomain = MultimasterReplication.createNewDomain(domainConf); newDomain.start(); return newDomain; } private void removeReplicationDomains(LDAPReplicationDomain... domains) { for (LDAPReplicationDomain domain : domains) { if (domain != null) { domain.shutdown(); MultimasterReplication.deleteDomain(domain.getBaseDN()); } } } @Test(enabled=false) public void searchChangesOnOneSuffixUsingEmptyCookie() throws Exception { String testName = "FourChangesCookie"; debugInfo(testName, "Starting test\n\n"); CSN[] csns = generateAndPublishChangesForEachOperationType(testName); searchChangesForEachOperationTypeUsingEmptyCookie(csns, testName); assertChangelogAttributesInRootDSE(true, 1, 4); debugInfo(testName, "Ending search with success"); } @Test(enabled=false) public void searchChangesOnOneSuffixUsingDraftMode() throws Exception { long firstChangeNumber = 1; String testName = "FourChanges/" + firstChangeNumber; debugInfo(testName, "Starting test\n\n"); CSN[] csns = generateAndPublishChangesForEachOperationType(testName); searchChangesForEachOperationTypeUsingDraftMode(firstChangeNumber, csns, testName); assertChangelogAttributesInRootDSE(true, 1, 4); debugInfo(testName, "Ending search with success"); } @Test(enabled=false) public void searchChangesOnOneSuffixMultipleTimesUsingDraftMode() throws Exception { replicationServer.getChangelogDB().setPurgeDelay(0); // write 4 changes starting from changenumber 1, and search them String testName = "Multiple/1"; CSN[] csns = generateAndPublishChangesForEachOperationType(testName); searchChangesForEachOperationTypeUsingDraftMode(1, csns, testName); // write 4 more changes starting from changenumber 5, and search them testName = "Multiple/5"; csns = generateAndPublishChangesForEachOperationType(testName); searchChangesForEachOperationTypeUsingDraftMode(5, csns, testName); // search from the provided change number: 6 (should be the add msg) CSN csnOfLastAddMsg = csns[1]; searchChangelogForOneChangeNumber(6, csnOfLastAddMsg); // search from a provided change number interval: 5-7 searchChangelogFromToChangeNumber(5,7); // check first and last change number assertChangelogAttributesInRootDSE(true, 1, 8); // add a new change, then check again first and last change number without previous search CSN csn = new CSN(TimeThread.getTime(), 10, SERVER_ID_1); publishChanges(testName, generateDeleteMsg(TEST_ROOT_DN_STRING, csn, testName, 1)); assertChangelogAttributesInRootDSE(true, 1, 9); } /** * Verifies that is not possible to read the changelog without the changelog-read privilege */ // TODO : enable when code is checking the privileges correctly @Test(enabled=false) public void searchingChangelogWithoutPrivilegeShouldFail() throws Exception { AuthenticationInfo nonPrivilegedUser = new AuthenticationInfo(); InternalClientConnection conn = new InternalClientConnection(nonPrivilegedUser); InternalSearchOperation op = conn.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)"); assertEquals(op.getResultCode(), ResultCode.INSUFFICIENT_ACCESS_RIGHTS); assertEquals(op.getErrorMessage().toMessage(), NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get()); } /** * With an empty RS, a search should return only root entry. */ @Test(enabled=false) public void searchWhenNoChangesShouldReturnRootEntryOnly() throws Exception { String testName = "EmptyRS"; debugInfo(testName, "Starting test\n\n"); searchChangelog("(objectclass=*)", 1, SUCCESS, testName); debugInfo(testName, "Ending test successfully"); } @Test(enabled=false) public void searchWithUnknownChangeNumberShouldReturnNoResult() throws Exception { String testName = "UnknownChangeNumber"; debugInfo(testName, "Starting test\n\n"); searchChangelog("(changenumber=1000)", 0, SUCCESS, testName); debugInfo(testName, "Ending test with success"); } @Test(enabled=false) public void operationalAndVirtualAttributesShouldNotBeVisibleOutsideRootDSE() throws Exception { String testName = "attributesVisibleOutsideRootDSE"; debugInfo(testName, "Starting test \n\n"); Set<String> attributes = newSet("firstchangenumber", "lastchangenumber", "changelog", "lastExternalChangelogCookie"); InternalSearchOperation searchOp = searchDNWithBaseScope(TEST_ROOT_DN_STRING, attributes); waitForSearchOpResult(searchOp, ResultCode.SUCCESS); final List<SearchResultEntry> entries = searchOp.getSearchEntries(); assertThat(entries).hasSize(1); debugAndWriteEntries(null, entries, testName); SearchResultEntry entry = entries.get(0); assertNull(getAttributeValue(entry, "firstchangenumber")); assertNull(getAttributeValue(entry, "lastchangenumber")); assertNull(getAttributeValue(entry, "changelog")); assertNull(getAttributeValue(entry, "lastExternalChangelogCookie")); debugInfo(testName, "Ending test with success"); } @DataProvider() public Object[][] getFilters() { return new Object[][] { // base DN, filter, expected first change number, expected last change number { "cn=changelog", "(objectclass=*)", -1, -1 }, { "cn=changelog", "(changenumber>=2)", 2, -1 }, { "cn=changelog", "(&(changenumber>=2)(changenumber<=5))", 2, 5 }, { "cn=changelog", "(&(dc=x)(&(changenumber>=2)(changenumber<=5)))", 2, 5 }, { "cn=changelog", "(&(&(changenumber>=3)(changenumber<=4))(&(|(dc=y)(dc=x))(&(changenumber>=2)(changenumber<=5))))", 3, 4 }, { "cn=changelog", "(|(objectclass=*)(&(changenumber>=2)(changenumber<=5)))", -1, -1 }, { "cn=changelog", "(changenumber=8)", 8, 8 }, { "changeNumber=8,cn=changelog", "(objectclass=*)", 8, 8 }, { "changeNumber=8,cn=changelog", "(changenumber>=2)", 8, 8 }, { "changeNumber=8,cn=changelog", "(&(changenumber>=2)(changenumber<=5))", 8, 8 }, }; } @Test(dataProvider="getFilters") public void optimizeFiltersWithChangeNumber(String dn, String filter, long expectedFirstCN, long expectedLastCN) throws Exception { final ChangelogBackend backend = new ChangelogBackend(null, null); final DN baseDN = DN.decode(dn); final SearchParams searchParams = new SearchParams(); backend.optimizeSearchParameters(searchParams, baseDN, SearchFilter.createFilterFromString(filter)); assertSearchParameters(searchParams, expectedFirstCN, expectedLastCN, null); } @Test public void optimizeFiltersWithReplicationCsn() throws Exception { final ChangelogBackend backend = new ChangelogBackend(null, null); final DN baseDN = DN.decode("cn=changelog"); final CSN csn = new CSNGenerator(1, 0).newCSN(); final SearchParams searchParams = new SearchParams(); backend.optimizeSearchParameters(searchParams, baseDN, SearchFilter.createFilterFromString("(replicationcsn=" + csn + ")")); assertSearchParameters(searchParams, -1, -1, csn); } private List<SearchResultEntry> assertChangelogAttributesInRootDSE(boolean isECLEnabled, int expectedFirstChangeNumber, int expectedLastChangeNumber) throws Exception { AssertionError error = null; for (int count = 0 ; count < 30; count++) { try { final Set<String> attributes = new LinkedHashSet<String>(); if (expectedFirstChangeNumber > 0) { attributes.add("firstchangenumber"); } attributes.add("lastchangenumber"); attributes.add("changelog"); attributes.add("lastExternalChangelogCookie"); final InternalSearchOperation searchOp = searchDNWithBaseScope("", attributes); final List<SearchResultEntry> entries = searchOp.getSearchEntries(); assertThat(entries).hasSize(1); final SearchResultEntry entry = entries.get(0); if (isECLEnabled) { if (expectedFirstChangeNumber > 0) { assertAttributeValue(entry, "firstchangenumber", String.valueOf(expectedFirstChangeNumber)); } assertAttributeValue(entry, "lastchangenumber", String.valueOf(expectedLastChangeNumber)); assertAttributeValue(entry, "changelog", String.valueOf("cn=changelog")); assertNotNull(getAttributeValue(entry, "lastExternalChangelogCookie")); } else { if (expectedFirstChangeNumber > 0) { assertNull(getAttributeValue(entry, "firstchangenumber")); } assertNull(getAttributeValue(entry, "lastchangenumber")); assertNull(getAttributeValue(entry, "changelog")); assertNull(getAttributeValue(entry, "lastExternalChangelogCookie")); } return entries; } catch (AssertionError ae) { // try again to see if changes have been persisted error = ae; } Thread.sleep(100); } assertNotNull(error); throw error; } private void assertSearchParameters(SearchParams searchParams, long firstChangeNumber, long lastChangeNumber, CSN csn) throws Exception { assertEquals(searchParams.getLowestChangeNumber(), firstChangeNumber); assertEquals(searchParams.getHighestChangeNumber(), lastChangeNumber); assertEquals(searchParams.getCSN(), csn == null ? new CSN(0, 0, 0) : csn); } private CSN[] generateAndPublishChangesForEachOperationType(String testName) throws Exception { CSN[] csns = generateCSNs(4, SERVER_ID_1); List<UpdateMsg> messages = new ArrayList<UpdateMsg>(); messages.add(generateDeleteMsg(TEST_ROOT_DN_STRING, csns[0], testName, 1)); messages.add(generateAddMsg(TEST_ROOT_DN_STRING, csns[1], USER1_ENTRY_UUID, testName)); messages.add(generateModMsg(TEST_ROOT_DN_STRING, csns[2], testName)); messages.add(generateModDNMsg(TEST_ROOT_DN_STRING, csns[3], testName)); publishChanges(testName, messages.toArray(new UpdateMsg[4])); return csns; } /** Publish a list changes to the default replication broker used by tests. */ private void publishChanges(String testName, UpdateMsg...messages) throws Exception { Pair<ReplicationBroker, LDAPReplicationDomain> replicationObjects = null; try { replicationObjects = enableReplication(ROOT_DN_OTEST, SERVER_ID_1, replicationServerPort, brokerSessionTimeout); ReplicationBroker broker = replicationObjects.getFirst(); for (UpdateMsg msg : messages) { debugInfo(testName, " publishes " + msg.getCSN()); broker.publish(msg); } } finally { if (replicationObjects != null) { removeReplicationDomains(replicationObjects.getSecond()); stop(replicationObjects.getFirst()); } } } private void searchChangesForEachOperationTypeUsingEmptyCookie(CSN[] csns, String testName) throws Exception { int nbEntries = 4; String cookie= ""; InternalSearchOperation searchOp = searchChangelogUsingCookie("(targetdn=*" + testName + "*,o=test)", cookie, nbEntries, SUCCESS, testName); final String[] cookies = new String[nbEntries]; for (int j = 0; j < cookies.length; j++) { cookies[j] = "o=test:" + csns[j] + ";"; } final List<SearchResultEntry> searchEntries = searchOp.getSearchEntries(); assertDelEntry(searchEntries.get(0), testName + 1, testName + "uuid1", CHANGENUMBER_ZERO, csns[0], cookies[0]); assertAddEntry(searchEntries.get(1), testName + 2, USER1_ENTRY_UUID, CHANGENUMBER_ZERO, csns[1], cookies[1]); assertModEntry(searchEntries.get(2), testName + 3, testName + "uuid3", CHANGENUMBER_ZERO, csns[2], cookies[2]); assertModDNEntry(searchEntries.get(3), testName + 4, testName + "new4", testName+"uuid4", CHANGENUMBER_ZERO, csns[3], cookies[3]); assertResultsContainCookieControl(searchOp, cookies); } private void searchChangesForEachOperationTypeUsingDraftMode(long firstChangeNumber, CSN[] csns, String testName) throws Exception { // Search the changelog and check 4 entries are returned String filter = "(targetdn=*" + testName + "*,o=test)"; InternalSearchOperation searchOp = searchChangelog(filter, 4, SUCCESS, testName); assertContainsNoControl(searchOp); assertEntriesForEachOperationType(searchOp.getSearchEntries(), firstChangeNumber, testName, USER1_ENTRY_UUID, csns); // Search the changelog with filter on change number and check 4 entries are returned filter = "(&(targetdn=*" + testName + "*,o=test)" + "(&(changenumber>=" + firstChangeNumber + ")" + "(changenumber<=" + (firstChangeNumber + 3) + ")))"; searchOp = searchChangelog(filter, 4, SUCCESS, testName); assertContainsNoControl(searchOp); assertEntriesForEachOperationType(searchOp.getSearchEntries(), firstChangeNumber, testName, USER1_ENTRY_UUID, csns); } /** * Search on the provided change number and check the result. * * @param changeNumber * Change number to search * @param expectedCsn * Expected CSN in the entry corresponding to the change number */ private void searchChangelogForOneChangeNumber(long changeNumber, CSN expectedCsn) throws Exception { String testName = "searchOneChangeNumber/" + changeNumber; debugInfo(testName, "Starting search\n\n"); InternalSearchOperation searchOp = searchChangelog("(changenumber=" + changeNumber + ")", 1, SUCCESS, testName); SearchResultEntry entry = searchOp.getSearchEntries().get(0); String uncheckedUid = null; assertEntryCommonAttributes(entry, uncheckedUid, USER1_ENTRY_UUID, changeNumber, expectedCsn, "o=test:" + expectedCsn + ";"); debugInfo(testName, "Ending search with success"); } private void searchChangelogFromToChangeNumber(int firstChangeNumber, int lastChangeNumber) throws Exception { String testName = "searchFromToChangeNumber/" + firstChangeNumber + "/" + lastChangeNumber; debugInfo(testName, "Starting search\n\n"); String filter = "(&(changenumber>=" + firstChangeNumber + ")" + "(changenumber<=" + lastChangeNumber + "))"; final int expectedNbEntries = lastChangeNumber - firstChangeNumber + 1; searchChangelog(filter, expectedNbEntries, SUCCESS, testName); debugInfo(testName, "Ending search with success"); } private InternalSearchOperation searchChangelogUsingCookie(String filterString, String cookie, int expectedNbEntries, ResultCode expectedResultCode, String testName) throws Exception { debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]"); return searchChangelog(filterString, ALL_ATTRIBUTES, createCookieControl(cookie), expectedNbEntries, expectedResultCode, testName); } private InternalSearchOperation searchChangelog(String filterString, int expectedNbEntries, ResultCode expectedResultCode, String testName) throws Exception { return searchChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, expectedResultCode, testName); } private InternalSearchOperation searchChangelog(String filterString, Set<String> attributes, List<Control> controls, int expectedNbEntries, ResultCode expectedResultCode, String testName) throws Exception { InternalSearchOperation searchOperation = null; int sizeLimitZero = 0; int timeLimitZero = 0; InternalSearchListener noSearchListener = null; int count = 0; do { Thread.sleep(10); boolean typesOnlyFalse = false; searchOperation = connection.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE, DereferencePolicy.NEVER_DEREF_ALIASES, sizeLimitZero, timeLimitZero, typesOnlyFalse, filterString, attributes, controls, noSearchListener); count++; } while (count < 300 && searchOperation.getSearchEntries().size() != expectedNbEntries); final List<SearchResultEntry> entries = searchOperation.getSearchEntries(); assertThat(entries).hasSize(expectedNbEntries); debugAndWriteEntries(getLDIFWriter(), entries, testName); waitForSearchOpResult(searchOperation, expectedResultCode); return searchOperation; } private InternalSearchOperation searchDNWithBaseScope(String dn, Set<String> attributes) throws Exception { final InternalSearchOperation searchOp = connection.processSearch( dn, SearchScope.BASE_OBJECT, DereferencePolicy.NEVER_DEREF_ALIASES, 0, // Size limit 0, // Time limit false, // Types only "(objectclass=*)", attributes); waitForSearchOpResult(searchOp, ResultCode.SUCCESS); return searchOp; } /** Build a list of controls including the cookie provided. */ private List<Control> createCookieControl(String cookie) throws DirectoryException { final MultiDomainServerState state = new MultiDomainServerState(cookie); final Control cookieControl = new ExternalChangelogRequestControl(true, state); return newList(cookieControl); } private static LDIFWriter getLDIFWriter() throws Exception { ByteArrayOutputStream stream = new ByteArrayOutputStream(); LDIFExportConfig exportConfig = new LDIFExportConfig(stream); return new LDIFWriter(exportConfig); } private CSN[] generateCSNs(int numberOfCsns, int serverId) { long startTime = TimeThread.getTime(); CSN[] csns = new CSN[numberOfCsns]; for (int i = 0; i < numberOfCsns; i++) { // seqNum must be greater than 0, so start at 1 csns[i] = new CSN(startTime + i, i + 1, serverId); } return csns; } private UpdateMsg generateDeleteMsg(String baseDn, CSN csn, String testName, int testIndex) throws Exception { String dn = "uid=" + testName + testIndex + "," + baseDn; return new DeleteMsg(DN.decode(dn), csn, testName + "uuid" + testIndex); } private UpdateMsg generateAddMsg(String baseDn, CSN csn, String user1entryUUID, String testName) throws Exception { String baseUUID = "22222222-2222-2222-2222-222222222222"; String entryLdif = "dn: uid="+ testName + "2," + baseDn + "\n" + "objectClass: top\n" + "objectClass: domain\n" + "entryUUID: "+ user1entryUUID +"\n"; Entry entry = TestCaseUtils.entryFromLdifString(entryLdif); return new AddMsg( csn, DN.decode("uid="+testName+"2," + baseDn), user1entryUUID, baseUUID, entry.getObjectClassAttribute(), entry.getAttributes(), Collections.<Attribute> emptyList()); } private UpdateMsg generateModMsg(String baseDn, CSN csn, String testName) throws Exception { DN baseDN = DN.decode("uid=" + testName + "3," + baseDn); List<Modification> mods = createAttributeModif("description", "new value"); return new ModifyMsg(csn, baseDN, mods, testName + "uuid3"); } private List<Modification> createAttributeModif(String attributeName, String valueString) { Attribute attr = Attributes.create(attributeName, valueString); return newList(new Modification(ModificationType.REPLACE, attr)); } private UpdateMsg generateModDNMsg(String baseDn, CSN csn, String testName) throws Exception { final DN newSuperior = ROOT_DN_OTEST2; ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null, DN.decode("uid=" + testName + "4," + baseDn), // entryDN RDN.decode("uid=" + testName + "new4"), // new rdn true, // deleteoldrdn newSuperior); op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, testName + "uuid4", "newparentId")); LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op); return new ModifyDNMsg(localOp); } //TODO : share this code with other classes ? private void waitForSearchOpResult(Operation operation, ResultCode expectedResult) throws Exception { int i = 0; while (operation.getResultCode() == ResultCode.UNDEFINED || operation.getResultCode() != expectedResult) { Thread.sleep(50); i++; if (i > 10) { assertEquals(operation.getResultCode(), expectedResult, operation.getErrorMessage().toString()); } } } /** Verify that no entry contains the ChangeLogCookie control. */ private void assertContainsNoControl(InternalSearchOperation searchOp) { for (SearchResultEntry entry : searchOp.getSearchEntries()) { assertTrue(entry.getControls().isEmpty(), "result entry " + entry.toString() + " should contain no control(s)"); } } /** Verify that all entries contains the ChangeLogCookie control with the correct cookie value. */ private void assertResultsContainCookieControl(InternalSearchOperation searchOp, String[] cookies) throws Exception { for (SearchResultEntry entry : searchOp.getSearchEntries()) { boolean cookieControlFound = false; for (Control control : entry.getControls()) { if (control.getOID().equals(OID_ECL_COOKIE_EXCHANGE_CONTROL)) { String cookieString = searchOp.getRequestControl(ExternalChangelogRequestControl.DECODER).getCookie().toString(); assertThat(cookieString).isIn((Object[]) cookies); cookieControlFound = true; } } assertTrue(cookieControlFound, "result entry " + entry.toString() + " should contain the cookie control"); } } /** Check the DEL entry has the right content. */ private void assertDelEntry(SearchResultEntry entry, String uid, String entryUUID, long changeNumber, CSN csn, String cookie) { assertAttributeValue(entry, "changetype", "delete"); assertAttributeValue(entry, "targetuniqueid", entryUUID); assertAttributeValue(entry, "targetentryuuid", entryUUID); assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie); } /** Check the ADD entry has the right content. */ private void assertAddEntry(SearchResultEntry entry, String uid, String entryUUID, long changeNumber, CSN csn, String cookie) { assertAttributeValue(entry, "changetype", "add"); assertEntryMatchesLDIF(entry, "changes", "objectClass: domain", "objectClass: top", "entryUUID: " + entryUUID); assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie); } private void assertModEntry(SearchResultEntry entry, String uid, String entryUUID, long changeNumber, CSN csn, String cookie) { assertAttributeValue(entry, "changetype", "modify"); assertEntryMatchesLDIF(entry, "changes", "replace: description", "description: new value", "-"); assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie); } private void assertModDNEntry(SearchResultEntry entry, String uid, String newUid, String entryUUID, long changeNumber, CSN csn, String cookie) { assertAttributeValue(entry, "changetype", "modrdn"); assertAttributeValue(entry, "newrdn", "uid=" + newUid); assertAttributeValue(entry, "newsuperior", TEST_ROOT_DN_STRING2); assertAttributeValue(entry, "deleteoldrdn", "true"); assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie); } private void assertEntryCommonAttributes(SearchResultEntry resultEntry, String uid, String entryUUID, long changeNumber, CSN csn, String cookie) { if (changeNumber == 0) { assertDNWithCSN(resultEntry, csn); } else { assertDNWithChangeNumber(resultEntry, changeNumber); assertAttributeValue(resultEntry, "changenumber", String.valueOf(changeNumber)); } assertAttributeValue(resultEntry, "targetentryuuid", entryUUID); assertAttributeValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1)); assertAttributeValue(resultEntry, "replicationcsn", csn.toString()); assertAttributeValue(resultEntry, "changelogcookie", cookie); // A null value can be provided for uid if it should not be checked if (uid != null) { final String targetDN = "uid=" + uid + "," + TEST_ROOT_DN_STRING; assertAttributeValue(resultEntry, "targetdn", targetDN); } } private void assertEntriesForEachOperationType(List<SearchResultEntry> entries, long firstChangeNumber, String testName, String entryUUID, CSN... csns) throws Exception { debugAndWriteEntries(getLDIFWriter(), entries, testName); assertThat(entries).hasSize(4); CSN csn = csns[0]; assertDelEntry(entries.get(0), testName + "1", testName + "uuid1", firstChangeNumber, csn, "o=test:" + csn + ";"); csn = csns[1]; assertAddEntry(entries.get(1), testName + "2", entryUUID, firstChangeNumber+1, csn, "o=test:" + csn + ";"); csn = csns[2]; assertModEntry(entries.get(2), testName + "3", testName + "uuid3", firstChangeNumber+2, csn, "o=test:" + csn + ";"); csn = csns[3]; assertModDNEntry(entries.get(3), testName + "4", testName + "new4", testName + "uuid4", firstChangeNumber+3, csn, "o=test:" + csn + ";"); } /** * Asserts the attribute value as LDIF to ignore lines ordering. */ private static void assertEntryMatchesLDIF(Entry entry, String attrName, String... expectedLDIFLines) { final String actualVal = getAttributeValue(entry, attrName); final Set<Set<String>> actual = toLDIFEntries(actualVal.split("\n")); final Set<Set<String>> expected = toLDIFEntries(expectedLDIFLines); assertThat(actual) .as("In entry " + entry + " incorrect value for attr '" + attrName + "'") .isEqualTo(expected); } private static void assertAttributeValue(Entry entry, String attrName, String expectedValue) { assertFalse(expectedValue.contains("\n"), "You should use assertEntryMatchesLDIF() method for asserting on this value: \"" + expectedValue + "\""); final String actualValue = getAttributeValue(entry, attrName); assertThat(actualValue) .as("In entry " + entry + " incorrect value for attr '" + attrName + "'") .isEqualToIgnoringCase(expectedValue); } private void assertDNWithChangeNumber(SearchResultEntry resultEntry, long changeNumber) { String actualDN = resultEntry.getDN().toNormalizedString(); String expectedDN = "changenumber=" + changeNumber + ",cn=changelog"; assertThat(actualDN).isEqualToIgnoringCase(expectedDN); } private void assertDNWithCSN(SearchResultEntry resultEntry, CSN csn) { String actualDN = resultEntry.getDN().toNormalizedString(); String expectedDN = "replicationcsn=" + csn + "," + TEST_ROOT_DN_STRING + ",cn=changelog"; assertThat(actualDN).isEqualToIgnoringCase(expectedDN); } /** * Returns a data structure allowing to compare arbitrary LDIF lines. The * algorithm splits LDIF entries on lines containing only a dash ("-"). It * then returns LDIF entries and lines in an LDIF entry in ordering * insensitive data structures. * <p> * Note: a last line with only a dash ("-") is significant. i.e.: * * <pre> * <code> * boolean b = toLDIFEntries("-").equals(toLDIFEntries())); * System.out.println(b); // prints "false" * </code> * </pre> */ private static Set<Set<String>> toLDIFEntries(String... ldifLines) { final Set<Set<String>> results = new HashSet<Set<String>>(); Set<String> ldifEntryLines = new HashSet<String>(); for (String ldifLine : ldifLines) { if (!"-".equals(ldifLine)) { // same entry keep adding ldifEntryLines.add(ldifLine); } else { // this is a new entry results.add(ldifEntryLines); ldifEntryLines = new HashSet<String>(); } } results.add(ldifEntryLines); return results; } // TODO : share this code with other classes private static String getAttributeValue(Entry entry, String attrName) { List<Attribute> attrs = entry.getAttribute(attrName.toLowerCase()); if (attrs == null) { return null; } Attribute a = attrs.iterator().next(); AttributeValue av = a.iterator().next(); return av.toString(); } private void debugAndWriteEntries(LDIFWriter ldifWriter,List<SearchResultEntry> entries, String tn) throws Exception { if (entries != null) { for (SearchResultEntry entry : entries) { // Can use entry.toSingleLineString() debugInfo(tn, " RESULT entry returned:" + entry.toLDIFString()); if (ldifWriter != null) { ldifWriter.writeEntry(entry); } } } } /** * Utility - log debug message - highlight it is from the test and not * from the server code. Makes easier to observe the test steps. */ private void debugInfo(String testName, String message) { if (debugEnabled()) { TRACER.debugInfo("** TEST " + testName + " ** " + message); } } } opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -41,6 +41,8 @@ import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.tools.LDAPSearch; import org.opends.server.types.Attribute; @@ -174,7 +176,15 @@ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0, 100, servers); ReplicationServer replicationServer = new ReplicationServer(conf); final DN testBaseDN = this.baseDN; ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate() { @Override public boolean isECLEnabledDomain(DN baseDN) { return testBaseDN.equals(baseDN); } }); Thread.sleep(1000); return replicationServer;