From 13f31d030c3b205931b63c29b0d6bc1d4eefd163 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 21 Aug 2014 13:07:07 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend to support cn=changelog CR-4083
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 13
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java | 968 ++++++++++++++++++++++++
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 152 +++
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java | 2
opendj-sdk/opends/src/messages/messages/replication.properties | 4
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 28
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java | 12
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java | 2
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java | 1159 ++++++++++++++++++++++++++--
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 15
10 files changed, 2,254 insertions(+), 101 deletions(-)
diff --git a/opendj-sdk/opends/src/messages/messages/replication.properties b/opendj-sdk/opends/src/messages/messages/replication.properties
index e8a0ac6..11702ec 100644
--- a/opendj-sdk/opends/src/messages/messages/replication.properties
+++ b/opendj-sdk/opends/src/messages/messages/replication.properties
@@ -616,3 +616,7 @@
recovered by removing a partially written record
NOTICE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \
perform a search request on cn=changelog
+SEVERE_ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \
+ searching base DN '%s' with filter '%s' in changelog backend : %s
+SEVERE_ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES_287 =An error occurred when \
+ retrieving number of subordinates for entry DN '%s' in changelog backend : %s
\ No newline at end of file
diff --git a/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java b/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
index f281c8c..1a1ec5c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -25,51 +25,115 @@
*/
package org.opends.server.backends;
-import java.util.Collections;
-import java.util.Set;
-
-import org.opends.server.admin.Configuration;
-import org.opends.server.api.Backend;
-import org.opends.server.config.ConfigEntry;
-import org.opends.server.config.ConfigException;
-import org.opends.server.core.AddOperation;
-import org.opends.server.core.DeleteOperation;
-import org.opends.server.core.DirectoryServer;
-import org.opends.server.core.ModifyDNOperation;
-import org.opends.server.core.ModifyOperation;
-import org.opends.server.core.SearchOperation;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.types.AttributeType;
-import org.opends.server.types.BackupConfig;
-import org.opends.server.types.BackupDirectory;
-import org.opends.server.types.CanceledOperationException;
-import org.opends.server.types.ConditionResult;
-import org.opends.server.types.DebugLogLevel;
-import org.opends.server.types.DN;
-import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
-import org.opends.server.types.IndexType;
-import org.opends.server.types.InitializationException;
-import org.opends.server.types.LDIFExportConfig;
-import org.opends.server.types.LDIFImportConfig;
-import org.opends.server.types.LDIFImportResult;
-import org.opends.server.types.RestoreConfig;
-import org.opends.server.types.ResultCode;
-import org.opends.server.util.Validator;
-
import static org.opends.messages.BackendMessages.*;
-import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.config.ConfigConstants.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
+import static org.opends.server.loggers.debug.DebugLogger.getTracer;
+import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.LDIFWriter.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+import org.opends.messages.Category;
+import org.opends.messages.Message;
+import org.opends.messages.Severity;
+import org.opends.server.admin.Configuration;
+import org.opends.server.api.Backend;
+import org.opends.server.config.ConfigConstants;
+import org.opends.server.config.ConfigException;
+import org.opends.server.controls.EntryChangelogNotificationControl;
+import org.opends.server.controls.ExternalChangelogRequestControl;
+import org.opends.server.core.*;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.ModifyCommonMsg;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
+import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
+import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
+import org.opends.server.types.*;
+import org.opends.server.util.StaticUtils;
+
/**
- * A backend that provides access to the changelog, ie the "cn=changelog" suffix.
- * It is a read-only backend.
+ * A backend that provides access to the changelog, ie the "cn=changelog"
+ * suffix. It is a read-only backend that is created by a
+ * {@code ReplicationServer} and is not configurable.
+ * <p>
+ * There are two modes to search the changelog:
+ * <ul>
+ * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
+ * request. The cookie provided in the control is used to retrieve entries from
+ * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
+ * the entries.</li>
+ * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with
+ * the request. The entries are retrieved using the ChangeNumberIndexDB (or
+ * DraftDB, hence the name) and their attributes are set with the information
+ * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
+ * from the content of ChangeNumberIndexDB.</li>
+ * </ul>
+ *
+ * @see ReplicationServer
*/
-public class ChangelogBackend extends Backend
+public class ChangelogBackend extends Backend<Configuration>
{
private static final DebugTracer TRACER = getTracer();
+ /** The id of this backend. */
+ public static final String BACKEND_ID = "changelog";
+
+ private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
+
+ private static final String CHANGE_NUMBER_ATTR = "changeNumber";
+ private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
+
+ /** The set of objectclasses that will be used in root entry. */
+ private static final Map<ObjectClass, String>
+ CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
+
+ static
+ {
+ CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
+ CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container");
+ }
+
+ /** The set of objectclasses that will be used in ECL entries. */
+ private static final Map<ObjectClass, String>
+ CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
+
+ static
+ {
+ CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
+ CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY);
+ }
+
+ /** The attribute type for the "creatorsName" attribute. */
+ private static final AttributeType CREATORS_NAME_TYPE =
+ DirectoryConfig.getAttributeType(OP_ATTR_CREATORS_NAME_LC, true);
+
+ /** The attribute type for the "modifiersName" attribute. */
+ private static final AttributeType MODIFIERS_NAME_TYPE =
+ DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
+
/** The DN for the base changelog entry. */
private DN baseChangelogDN;
@@ -77,52 +141,61 @@
private DN[] baseDNs;
/** The set of supported controls for this backend. */
- private final Set<String> supportedControls =
- Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
+ private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
+
+ /** The replication server on which the changelog is read. */
+ private final ReplicationServer replicationServer;
+
+ private final ECLEnabledDomainPredicate domainPredicate;
+
+ /**
+ * Creates a new backend with the provided repication server.
+ *
+ * @param replicationServer
+ * The replication server on which the changes are read.
+ * @param domainPredicate
+ * Returns whether a domain is enabled for the external changelog.
+ */
+ public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
+ {
+ this.replicationServer = replicationServer;
+ this.domainPredicate = domainPredicate;
+ setBackendID(BACKEND_ID);
+ setWritabilityMode(WritabilityMode.DISABLED);
+ setPrivateBackend(true);
+ }
/** {@inheritDoc} */
@Override
- public void configureBackend(final Configuration cfg) throws ConfigException
+ public void configureBackend(final Configuration config) throws ConfigException
{
- Validator.ensureNotNull(cfg);
+ throw new UnsupportedOperationException("The changelog backend is not configurable");
+ }
- final ConfigEntry configEntry = DirectoryServer.getConfigEntry(cfg.dn());
-
- // Make sure that a configuration entry was provided. If not, then we will
- // not be able to complete initialization.
- if (configEntry == null)
- {
- throw new ConfigException(ERR_BACKEND_CONFIG_ENTRY_NULL.get(getBackendID()));
- }
-
- // Create the set of base DNs that we will handle. In this case, it's just
- // the DN of the base changelog entry.
+ /** {@inheritDoc} */
+ @Override
+ public void initializeBackend() throws InitializationException
+ {
try
{
baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
+ baseDNs = new DN[] { baseChangelogDN };
}
- catch (final Exception e)
+ catch (final DirectoryException e)
{
if (debugEnabled())
{
TRACER.debugCaught(DebugLogLevel.ERROR, e);
}
- throw new ConfigException(
+ throw new InitializationException(
ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e);
}
- this.baseDNs = new DN[] { baseChangelogDN };
- }
-
- /** {@inheritDoc} */
- @Override
- public void initializeBackend() throws ConfigException, InitializationException
- {
try
{
DirectoryServer.registerBaseDN(baseChangelogDN, this, true);
}
- catch (final Exception e)
+ catch (final DirectoryException e)
{
throw new InitializationException(
ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e);
@@ -137,7 +210,7 @@
{
DirectoryServer.deregisterBaseDN(baseChangelogDN);
}
- catch (final Exception e)
+ catch (final DirectoryException e)
{
if (debugEnabled())
{
@@ -157,26 +230,26 @@
@Override
public void preloadEntryCache() throws UnsupportedOperationException
{
- throw new RuntimeException("Not implemented");
+ throw new UnsupportedOperationException("Operation not supported.");
}
/** {@inheritDoc} */
@Override
public boolean isLocal()
{
- throw new RuntimeException("Not implemented");
+ return true;
}
/** {@inheritDoc} */
@Override
- public boolean isIndexed(AttributeType attributeType, IndexType indexType)
+ public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
{
- throw new RuntimeException("Not implemented");
+ return true;
}
/** {@inheritDoc} */
@Override
- public Entry getEntry(DN entryDN) throws DirectoryException
+ public Entry getEntry(final DN entryDN) throws DirectoryException
{
if (entryDN == null)
{
@@ -188,17 +261,90 @@
/** {@inheritDoc} */
@Override
- public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException
+ public ConditionResult hasSubordinates(final DN entryDN)
+ throws DirectoryException
{
- throw new RuntimeException("Not implemented");
+ final long num = numSubordinates(entryDN, false);
+ if (num < 0)
+ {
+ return ConditionResult.UNDEFINED;
+ }
+ else if (num == 0)
+ {
+ return ConditionResult.FALSE;
+ }
+ else
+ {
+ return ConditionResult.TRUE;
+ }
+ }
+
+ /** Specific search operation to count number of entries. */
+ private final class NumSubordinatesSearchOperation extends SearchOperationWrapper
+ {
+ private long numSubordinates = -1;
+
+ private NumSubordinatesSearchOperation()
+ {
+ super(null);
+ }
+
+ @Override
+ public boolean returnEntry(Entry entry, List<Control> controls)
+ {
+ numSubordinates++;
+ return true;
+ }
+
+ @Override
+ public DN getBaseDN()
+ {
+ return baseChangelogDN;
+ }
+
+ @Override
+ public SearchFilter getFilter()
+ {
+ return LDAPURL.DEFAULT_SEARCH_FILTER;
+ }
+
+ @Override
+ public SearchScope getScope()
+ {
+ return SearchScope.WHOLE_SUBTREE;
+ }
}
/** {@inheritDoc} */
@Override
- public long numSubordinates(DN entryDN, boolean subtree)
- throws DirectoryException
+ public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
{
- throw new RuntimeException("Not implemented");
+ // Compute the num subordinates only for the base DN
+ if (entryDN == null || !baseChangelogDN.equals(entryDN))
+ {
+ return -1;
+ }
+ if (!subtree)
+ {
+ return 1;
+ }
+ // Search with cookie mode to count all update messages
+ final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
+ excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
+ SearchParams params = new SearchParams("0", excludedDomains);
+ params.requestType = REQUEST_TYPE_FROM_COOKIE;
+ params.multiDomainServerState = new MultiDomainServerState();
+ NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
+ try
+ {
+ search0(params, searchOp);
+ }
+ catch (ChangelogException e)
+ {
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
+ baseChangelogDN.toString(), stackTraceToSingleLineString(e)));
+ }
+ return searchOp.numSubordinates;
}
/** {@inheritDoc} */
@@ -241,10 +387,35 @@
/** {@inheritDoc} */
@Override
- public void search(SearchOperation searchOperation)
- throws DirectoryException, CanceledOperationException
+ public void search(final SearchOperation searchOperation) throws DirectoryException
{
- throw new RuntimeException("Not implemented");
+ final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
+ excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
+ SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains);
+ final ExternalChangelogRequestControl eclRequestControl =
+ searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
+ if (eclRequestControl == null)
+ {
+ params.requestType = REQUEST_TYPE_FROM_CHANGE_NUMBER;
+ }
+ else
+ {
+ params.requestType = REQUEST_TYPE_FROM_COOKIE;
+ params.multiDomainServerState = eclRequestControl.getCookie();
+ }
+
+ optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
+ try
+ {
+ search0(params, searchOperation);
+ }
+ catch (ChangelogException e)
+ {
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
+ searchOperation.getBaseDN().toString(),
+ searchOperation.getFilter().toString(),
+ stackTraceToSingleLineString(e)));
+ }
}
/** {@inheritDoc} */
@@ -270,7 +441,7 @@
/** {@inheritDoc} */
@Override
- public void exportLDIF(LDIFExportConfig exportConfig)
+ public void exportLDIF(final LDIFExportConfig exportConfig)
throws DirectoryException
{
throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
@@ -297,15 +468,14 @@
@Override
public boolean supportsBackup()
{
- throw new RuntimeException("Not implemented");
+ return false;
}
/** {@inheritDoc} */
@Override
- public boolean supportsBackup(BackupConfig backupConfig,
- StringBuilder unsupportedReason)
+ public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason)
{
- throw new RuntimeException("Not implemented");
+ return false;
}
/** {@inheritDoc} */
@@ -318,34 +488,843 @@
/** {@inheritDoc} */
@Override
- public void removeBackup(BackupDirectory backupDirectory, String backupID)
- throws DirectoryException
+ public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
{
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
}
/** {@inheritDoc} */
@Override
public boolean supportsRestore()
{
- throw new RuntimeException("Not implemented");
+ return false;
}
/** {@inheritDoc} */
@Override
- public void restoreBackup(RestoreConfig restoreConfig)
- throws DirectoryException
+ public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
{
- throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
- ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
}
/** {@inheritDoc} */
@Override
public long getEntryCount()
{
- throw new RuntimeException("Not implemented");
+ try
+ {
+ return numSubordinates(baseChangelogDN, true) + 1;
+ }
+ catch (DirectoryException e)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
+ return -1;
+ }
+ }
+
+ /**
+ * Represent the search parameters specific to the changelog.
+ *
+ * This class should be visible for tests.
+ */
+ static class SearchParams
+ {
+ private ECLRequestType requestType;
+ private final String operationId;
+ private final Set<String> excludedBaseDNs;
+ private long lowestChangeNumber = -1;
+ private long highestChangeNumber = -1;
+ private CSN csn = new CSN(0, 0, 0);
+ private MultiDomainServerState multiDomainServerState;
+
+ /**
+ * Creates search parameters.
+ */
+ SearchParams()
+ {
+ operationId = "";
+ excludedBaseDNs = Collections.emptySet();
+ }
+
+ /**
+ * Creates search parameters with provided id and excluded domain DNs.
+ *
+ * @param operationId
+ * The id of the operation.
+ * @param excludedBaseDNs
+ * Set of DNs to exclude from search.
+ */
+ SearchParams(final String operationId, final Set<String> excludedBaseDNs)
+ {
+ this.operationId = operationId;
+ this.excludedBaseDNs = excludedBaseDNs;
+ }
+
+ /**
+ * Indicates if provided change number is compatible with last change
+ * number.
+ *
+ * @param changeNumber
+ * The change number to test.
+ * @return {@code true} if and only if the provided change number is in the
+ * range of the last change number.
+ */
+ boolean changeNumberIsInRange(long changeNumber)
+ {
+ return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
+ }
+
+ /**
+ * Returns the lowest change number to retrieve (inclusive).
+ *
+ * @return the lowest change number
+ */
+ long getLowestChangeNumber()
+ {
+ return lowestChangeNumber;
+ }
+
+ /**
+ * Returns the highest change number to retrieve (inclusive).
+ *
+ * @return the highest change number
+ */
+ long getHighestChangeNumber()
+ {
+ return highestChangeNumber;
+ }
+
+ /**
+ * Returns the CSN to retrieve.
+ *
+ * @return the CSN, which may be the default CSN with zero values.
+ */
+ CSN getCSN()
+ {
+ return csn;
+ }
+
+ /**
+ * Returns the set of DNs to exclude from the search.
+ *
+ * @return the DNs corresponding to domains to exclude from the search.
+ * @throws DirectoryException
+ * If a DN can't be decoded.
+ */
+ Set<DN> getExcludedBaseDNs() throws DirectoryException
+ {
+ final Set<DN> excludedDNs = new HashSet<DN>();
+ for (String dn : excludedBaseDNs)
+ {
+ excludedDNs.add(DN.decode(dn));
+ }
+ return excludedDNs;
+ }
+
+ }
+
+ /**
+ * Optimize the search parameters by analyzing the DN and filter.
+ * Populate the provided SearchParams with optimizations found.
+ *
+ * @param params the search parameters that are specific to external changelog
+ * @param baseDN the provided search baseDN.
+ * @param userFilter the provided search filter.
+ * @throws DirectoryException when an exception occurs.
+ */
+ void optimizeSearchParameters(final SearchParams params, final DN baseDN, final SearchFilter userFilter)
+ throws DirectoryException
+ {
+ SearchFilter equalityFilter = null;
+ switch (baseDN.getNumComponents())
+ {
+ case 1:
+ // "cn=changelog" : use user-provided search filter.
+ break;
+ case 2:
+ // It is probably "changeNumber=xxx,cn=changelog", use equality filter
+ // But it also could be "<service-id>,cn=changelog" so need to check on attribute
+ equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR);
+ break;
+ default:
+ // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
+ equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN");
+ break;
+ }
+
+ final SearchParams optimized = optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
+ params.lowestChangeNumber = optimized.lowestChangeNumber;
+ params.highestChangeNumber = optimized.highestChangeNumber;
+ params.csn = optimized.csn;
+ }
+
+ /**
+ * Build a search filter from given DN and attribute.
+ *
+ * @return the search filter or {@code null} if attribute is not present in
+ * the provided DN
+ */
+ private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr)
+ {
+ final RDN rdn = baseDN.getRDN();
+ AttributeType attrType = DirectoryServer.getAttributeType(lowerCaseAttr);
+ if (attrType == null)
+ {
+ attrType = DirectoryServer.getDefaultAttributeType(upperCaseAttr);
+ }
+ final AttributeValue attrValue = rdn.getAttributeValue(attrType);
+ if (attrValue != null)
+ {
+ return SearchFilter.createEqualityFilter(attrType, attrValue);
+ }
+ return null;
+ }
+
+ private SearchParams optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
+ {
+ final SearchParams params = new SearchParams();
+ if (filter == null)
+ {
+ return params;
+ }
+
+ if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
+ {
+ params.lowestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
+ }
+ else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
+ {
+ params.highestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
+ }
+ else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
+ {
+ final long number = decodeChangeNumber(filter.getAssertionValue());
+ params.lowestChangeNumber = number;
+ params.highestChangeNumber = number;
+ }
+ else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
+ {
+ // == exact CSN
+ params.csn = new CSN(filter.getAssertionValue().toString());
+ }
+ else if (filter.getFilterType() == FilterType.AND)
+ {
+ // TODO: it looks like it could be generalized to N components, not only two
+ final Collection<SearchFilter> components = filter.getFilterComponents();
+ final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
+ long last1 = -1;
+ long first1 = -1;
+ long last2 = -1;
+ long first2 = -1;
+ if (filters.length > 0)
+ {
+ SearchParams msg1 = optimizeSearchUsingFilter(filters[0]);
+ last1 = msg1.highestChangeNumber;
+ first1 = msg1.lowestChangeNumber;
+ }
+ if (filters.length > 1)
+ {
+ SearchParams msg2 = optimizeSearchUsingFilter(filters[1]);
+ last2 = msg2.highestChangeNumber;
+ first2 = msg2.lowestChangeNumber;
+ }
+ if (last1 == -1)
+ {
+ params.highestChangeNumber = last2;
+ }
+ else if (last2 == -1)
+ {
+ params.highestChangeNumber = last1;
+ }
+ else
+ {
+ params.highestChangeNumber = Math.min(last1, last2);
+ }
+
+ params.lowestChangeNumber = Math.max(first1, first2);
+ }
+ return params;
+ }
+
+ private static long decodeChangeNumber(final AttributeValue assertionValue)
+ throws DirectoryException
+ {
+ try
+ {
+ return Long.decode(assertionValue.getNormalizedValue().toString());
+ }
+ catch (NumberFormatException e)
+ {
+ throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
+ Message.raw("Could not convert value '%s' to long", assertionValue.getNormalizedValue().toString()));
+ }
+ }
+
+ private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
+ {
+ return filter.getFilterType() == filterType
+ && filter.getAttributeType() != null
+ && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
+ }
+
+ private void search0(final SearchParams searchParams, final SearchOperation searchOperation)
+ throws DirectoryException, ChangelogException
+ {
+ switch (searchParams.requestType)
+ {
+ case REQUEST_TYPE_FROM_CHANGE_NUMBER:
+ searchFromChangeNumber(searchParams, searchOperation);
+ break;
+ case REQUEST_TYPE_FROM_COOKIE:
+ searchFromCookie(searchParams, searchOperation);
+ break;
+ default:
+ // not handled
+ }
+ }
+
+ /**
+ * Search the changelog when a cookie control is provided.
+ */
+ private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
+ throws DirectoryException, ChangelogException
+ {
+ final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB();
+ validateProvidedCookie(searchParams);
+
+ boolean hasReturnedBaseEntry = false;
+ ECLMultiDomainDBCursor replicaUpdatesCursor = null;
+ try
+ {
+ final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
+ searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
+ replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
+
+ MultiDomainServerState cookie = searchParams.multiDomainServerState;
+ boolean continueSearch = true;
+ while (continueSearch && replicaUpdatesCursor.next())
+ {
+ // Handle creation of base changelog entry on first update message found
+ if (!hasReturnedBaseEntry)
+ {
+ if (!returnBaseChangelogEntry(searchOperation, true))
+ {
+ return;
+ }
+ hasReturnedBaseEntry = true;
+ }
+ // Handle the update message
+ final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
+ final DN domainBaseDN = replicaUpdatesCursor.getData();
+ cookie.update(domainBaseDN, updateMsg.getCSN());
+ final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg);
+ if (matchBaseAndScopeAndFilter(entry, searchOperation))
+ {
+ Control control = new EntryChangelogNotificationControl(true, cookie.toString());
+ continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control));
+ }
+ }
+ // Handle creation of base changelog entry when no update message is found
+ if (!hasReturnedBaseEntry)
+ {
+ returnBaseChangelogEntry(searchOperation, false);
+ }
+ }
+ finally
+ {
+ StaticUtils.close(replicaUpdatesCursor);
+ }
+ }
+
+ /**
+ * Validates the cookie contained in search parameters by checking its content
+ * with the actual replication server state.
+ *
+ * @throws DirectoryException
+ * If the state is not valid
+ */
+ private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
+ {
+ final MultiDomainServerState state = searchParams.multiDomainServerState;
+ if (state != null && !state.isEmpty())
+ {
+ replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs());
+ }
+ }
+
+ /**
+ * Search the changelog using change number(s).
+ */
+ private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
+ throws ChangelogException, DirectoryException
+ {
+ boolean hasReturnedBaseEntry = false;
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
+ MultiDomainDBCursor replicaUpdatesCursor = null;
+ try {
+ cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber);
+ boolean continueSearch = true;
+ while (continueSearch && cnIndexDBCursor.next())
+ {
+ // Handle creation of base changelog entry on cnIndex record found
+ if (!hasReturnedBaseEntry)
+ {
+ if (!returnBaseChangelogEntry(searchOperation, true))
+ {
+ return;
+ }
+ hasReturnedBaseEntry = true;
+ }
+ // Handle the current cnIndex record
+ final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
+ if (replicaUpdatesCursor == null)
+ {
+ replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord);
+ }
+ continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
+ if (continueSearch)
+ {
+ UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
+ if (updateMsg != null)
+ {
+ continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
+ replicaUpdatesCursor.next();
+ }
+ }
+ }
+ // Handle creation of base changelog entry when no update message is found
+ if (!hasReturnedBaseEntry)
+ {
+ returnBaseChangelogEntry(searchOperation, false);
+ }
+ }
+ finally {
+ StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
+ }
+ }
+
+ /**
+ * Create and returns the base changelog entry to provided search operation.
+ *
+ * @return {@code true} if search should continue, {@code false} otherwise
+ */
+ private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates)
+ throws DirectoryException
+ {
+ final DN baseDN = searchOperation.getBaseDN();
+ final SearchFilter filter = searchOperation.getFilter();
+ final SearchScope scope = searchOperation.getScope();
+
+ if (baseChangelogDN.matchesBaseAndScope(baseDN, scope))
+ {
+ final Entry entry = buildBaseChangelogEntry(hasSubordinates);
+ if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null))
+ {
+ // Abandon, size limit reached.
+ return false;
+ }
+ }
+ if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT))
+ {
+ // Only the change log root entry was requested
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * @return {@code true} if search should continue, {@code false} otherwise
+ */
+ private boolean returnEntryForUpdateMessage(
+ final SearchOperation searchOperation,
+ final ChangeNumberIndexRecord cnIndexRecord,
+ final UpdateMsg updateMsg)
+ throws DirectoryException
+ {
+ final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
+ final DN changeDN = cnIndexRecord.getBaseDN();
+ cookie.update(changeDN, cnIndexRecord.getCSN());
+ final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
+ if (matchBaseAndScopeAndFilter(entry, searchOperation))
+ {
+ return searchOperation.returnEntry(entry, null);
+ }
+ return true;
+ }
+
+ private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB,
+ final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
+ {
+ final MultiDomainServerState state = new MultiDomainServerState();
+ state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
+
+ // No need for ECLMultiDomainDBCursor in this case
+ // as updateMsg will be matched with cnIndexRecord
+ final MultiDomainDBCursor replicaUpdatesCursor =
+ changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
+ replicaUpdatesCursor.next();
+ return replicaUpdatesCursor;
+ }
+
+ /**
+ * Returns the replica update message corresponding to the provided
+ * cnIndexRecord.
+ *
+ * @return the update message, which may be {@code null} if the update message
+ * could not be found because it was purged or because corresponding
+ * baseDN was removed from the changelog
+ * @throws DirectoryException
+ * If inconsistency is detected between the available update
+ * messages and the provided cnIndexRecord
+ */
+ private UpdateMsg findReplicaUpdateMessage(
+ final ChangeNumberIndexRecord cnIndexRecord,
+ final MultiDomainDBCursor replicaUpdatesCursor)
+ throws DirectoryException, ChangelogException
+ {
+ while (true)
+ {
+ final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
+ final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
+ if (compareIndexWithUpdateMsg < 0) {
+ // Either update message has been purged or baseDN has been removed from changelogDB,
+ // ignore current index record and go to the next one
+ return null;
+ }
+ else if (compareIndexWithUpdateMsg == 0)
+ {
+ // Found the matching update message
+ return updateMsg;
+ }
+ // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
+ if (!replicaUpdatesCursor.next())
+ {
+ // Should never happen, as it means some messages have disappeared
+ // TODO : put the correct I18N message
+ throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
+ Message.raw("Could not find replica update message matching index record. " +
+ "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
+ }
+ }
+ }
+
+ /** Returns a cursor on CNIndexDB for the provided first change number. */
+ private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB,
+ final long firstChangeNumber) throws ChangelogException
+ {
+ final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB();
+ long changeNumberToUse = firstChangeNumber;
+ if (changeNumberToUse <= 1)
+ {
+ final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
+ changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
+ }
+ return cnIndexDB.getCursorFrom(changeNumberToUse);
+ }
+
+ /** Indicates if the provided entry matches the filter, base and scope. */
+ private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException
+ {
+ return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
+ && searchOp.getFilter().matchesEntry(entry);
+ }
+
+ /**
+ * Retrieves the base changelog entry.
+ */
+ private Entry buildBaseChangelogEntry(boolean hasSubordinates)
+ {
+ final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
+ final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
+
+ addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
+ addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
+ ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
+ addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates),
+ userAttrs, operationalAttrs);
+ addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(),
+ userAttrs, operationalAttrs);
+ return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
+ }
+
+ /**
+ * Creates a changelog entry.
+ */
+ private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
+ throws DirectoryException
+ {
+ if (msg instanceof AddMsg)
+ {
+ return createAddMsg(baseDN, changeNumber, cookie, msg);
+ }
+ else if (msg instanceof ModifyCommonMsg)
+ {
+ return createModifyMsg(baseDN, changeNumber, cookie, msg);
+ }
+ else if (msg instanceof DeleteMsg)
+ {
+ final DeleteMsg delMsg = (DeleteMsg) msg;
+ return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
+ }
+ throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
+ Message.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN.toString(),
+ msg.getClass().toString()));
+ }
+
+ /**
+ * Creates an entry from an add message.
+ * <p>
+ * Map addMsg to an LDIF string for the 'changes' attribute, and pull out
+ * change initiators name if available which is contained in the creatorsName
+ * attribute.
+ */
+ private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
+ throws DirectoryException
+ {
+ final AddMsg addMsg = (AddMsg) msg;
+ String changeInitiatorsName = null;
+ String ldifChanges = null;
+ try
+ {
+ final StringBuilder builder = new StringBuilder(256);
+ for (Attribute attr : addMsg.getAttributes())
+ {
+ if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty())
+ {
+ // This attribute is not multi-valued.
+ changeInitiatorsName = attr.iterator().next().toString();
+ }
+ final String attrName = attr.getNameWithOptions();
+ for (AttributeValue value : attr)
+ {
+ builder.append(attrName);
+ appendLDIFSeparatorAndValue(builder, value.getValue());
+ builder.append('\n');
+ }
+ }
+ ldifChanges = builder.toString();
+ }
+ catch (Exception e)
+ {
+ logEncodingMessageError("add", addMsg.getDN(), e);
+ }
+
+ return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
+ }
+
+ /**
+ * Creates an entry from a modify message.
+ * <p>
+ * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
+ * out change initiators name if available which is contained in the
+ * modifiersName attribute.
+ */
+ private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
+ throws DirectoryException
+ {
+ final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
+ String changeInitiatorsName = null;
+ String ldifChanges = null;
+ try
+ {
+ final StringBuilder builder = new StringBuilder(128);
+ for (Modification mod : modifyMsg.getMods())
+ {
+ final Attribute attr = mod.getAttribute();
+ if (mod.getModificationType() == ModificationType.REPLACE
+ && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE)
+ && !attr.isEmpty())
+ {
+ // This attribute is not multi-valued.
+ changeInitiatorsName = attr.iterator().next().toString();
+ }
+ final String attrName = attr.getNameWithOptions();
+ builder.append(mod.getModificationType().getLDIFName());
+ builder.append(": ");
+ builder.append(attrName);
+ builder.append('\n');
+
+ for (AttributeValue value : attr)
+ {
+ builder.append(attrName);
+ appendLDIFSeparatorAndValue(builder, value.getValue());
+ builder.append('\n');
+ }
+ builder.append("-\n");
+ }
+ ldifChanges = builder.toString();
+ }
+ catch (Exception e)
+ {
+ logEncodingMessageError("modify", modifyMsg.getDN(), e);
+ }
+
+ final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
+ final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
+ isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
+
+ if (isModifyDNMsg)
+ {
+ final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
+ addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
+ if (modDNMsg.getNewSuperior() != null)
+ {
+ addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
+ }
+ addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
+ }
+ return entry;
+ }
+
+ /**
+ * Log an encoding message error.
+ *
+ * @param messageType
+ * String identifying type of message. Should be "add" or "modify".
+ * @param entryDN
+ * DN of original entry
+ */
+ private void logEncodingMessageError(String messageType, DN entryDN, Exception exception)
+ {
+ TRACER.debugCaught(DebugLogLevel.ERROR, exception);
+ logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
+ "An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
+ + entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
+ }
+
+ /**
+ * Create a changelog entry from a set of provided information. This is the part of
+ * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
+ */
+ private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
+ final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
+ final String changeInitiatorsName) throws DirectoryException
+ {
+ final CSN csn = msg.getCSN();
+ String dnString;
+ if (changeNumber == 0)
+ {
+ // Cookie mode
+ dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT;
+ }
+ else
+ {
+ // Draft compat mode
+ dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
+ }
+
+ final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
+ final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
+
+ // Operational standard attributes
+ addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
+ ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
+ addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs);
+ addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs);
+ addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
+
+ // REQUIRED attributes
+ if (changeNumber != 0)
+ {
+ addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
+ }
+ SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
+ dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
+ final String format = dateFormat.format(new Date(csn.getTime()));
+ addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs);
+ addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs);
+ addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs);
+
+ // NON REQUESTED attributes
+ addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs);
+ addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()),
+ userAttrs, opAttrs);
+
+ if (ldifChanges != null)
+ {
+ addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs);
+ }
+ if (changeInitiatorsName != null)
+ {
+ addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
+ }
+
+ final String targetUUID = msg.getEntryUUID();
+ if (targetUUID != null)
+ {
+ addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
+ }
+ addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs);
+
+ final List<RawAttribute> includedAttributes = msg.getEclIncludes();
+ if (includedAttributes != null && !includedAttributes.isEmpty())
+ {
+ final StringBuilder builder = new StringBuilder(256);
+ for (final RawAttribute includedAttribute : includedAttributes)
+ {
+ final String name = includedAttribute.getAttributeType();
+ for (final ByteString value : includedAttribute.getValues())
+ {
+ builder.append(name);
+ appendLDIFSeparatorAndValue(builder, value);
+ builder.append('\n');
+ }
+ }
+ final String includedAttributesLDIF = builder.toString();
+ addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
+ }
+
+ return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
+ }
+
+ private static void addAttribute(final Entry e, final String attrType, final String attrValue)
+ {
+ e.addAttribute(Attributes.create(attrType, attrValue), null);
+ }
+
+ private static void addAttributeByType(String attrNameLowercase,
+ String attrNameUppercase, String attrValue,
+ Map<AttributeType, List<Attribute>> userAttrs,
+ Map<AttributeType, List<Attribute>> operationalAttrs)
+ {
+ addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
+ }
+
+ private void addAttributeByUppercaseName(String attrNameLowercase,
+ String attrNameUppercase, String attrValue,
+ Map<AttributeType, List<Attribute>> userAttrs,
+ Map<AttributeType, List<Attribute>> operationalAttrs)
+ {
+ addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false);
+ }
+
+ private static void addAttribute(final String attrNameLowercase,
+ final String attrNameUppercase, final String attrValue,
+ final Map<AttributeType, List<Attribute>> userAttrs,
+ final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
+ {
+ AttributeType attrType = DirectoryServer.getAttributeType(attrNameLowercase);
+ if (attrType == null)
+ {
+ attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
+ }
+ final Attribute a = addByType ?
+ Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue);
+ final List<Attribute> attrList = Collections.singletonList(a);
+ if (attrType.isOperational())
+ {
+ operationalAttrs.put(attrType, attrList);
+ }
+ else
+ {
+ userAttrs.put(attrType, attrList);
+ }
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 652ce4f..93a9f57 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
+import org.opends.server.backends.ChangelogBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
@@ -56,6 +57,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
@@ -63,6 +65,7 @@
import org.opends.server.util.StaticUtils;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
+import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -95,11 +98,19 @@
private final Map<DN, ReplicationServerDomain> baseDNs =
new HashMap<DN, ReplicationServerDomain>();
+ /** The database storing the changes. */
private final ChangelogDB changelogDB;
+
+ /** The backend that allow to search the changes (external changelog). */
+ private ChangelogBackend changelogBackend;
+
private final AtomicBoolean shutdown = new AtomicBoolean();
private boolean stopListen = false;
private final ReplSessionSecurity replSessionSecurity;
+ /** To know whether a domain is enabled for the external changelog. */
+ private final ECLEnabledDomainPredicate domainPredicate;
+
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
@@ -136,26 +147,41 @@
*/
public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
{
- this(cfg, new DSRSShutdownSync());
+ this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
}
/**
- * Creates a new Replication server using the provided configuration entry.
+ * Creates a new Replication server using the provided configuration entry and shutdown
+ * synchronization object.
*
* @param cfg The configuration of this replication server.
* @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
* @throws ConfigException When Configuration is invalid.
*/
- public ReplicationServer(ReplicationServerCfg cfg,
- DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+ public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+ {
+ this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
+ }
+
+ /**
+ * Creates a new Replication server using the provided configuration entry, shutdown
+ * synchronization object and domain predicate.
+ *
+ * @param cfg The configuration of this replication server.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+ * @param predicate Indicates whether a domain is enabled for the external changelog.
+ * @throws ConfigException When Configuration is invalid.
+ */
+ public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
+ final ECLEnabledDomainPredicate predicate) throws ConfigException
{
this.config = cfg;
this.dsrsShutdownSync = dsrsShutdownSync;
+ this.domainPredicate = predicate;
ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
if (DebugLogger.debugEnabled())
{
- TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
- + " as DB implementation for changelog DB");
+ TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB");
}
this.changelogDB = dbImpl == ReplicationDBImplementation.JE
? new JEChangelogDB(this, cfg)
@@ -165,6 +191,9 @@
initialize();
cfg.addChangeListener(this);
+ // TODO : uncomment to branch changelog backend
+ //enableExternalChangeLog();
+
localPorts.add(getReplicationPort());
// Keep track of this new instance
@@ -501,6 +530,57 @@
registerVirtualAttributeRules();
}
+ /**
+ * Enable the external changelog if it is not already enabled.
+ * <p>
+ * The external changelog is provided by the changelog backend.
+ *
+ * @throws ConfigException
+ * If an error occurs.
+ */
+ private void enableExternalChangeLog() throws ConfigException
+ {
+ if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
+ {
+ // Backend has already been created and initialized
+ // This can occurs in tests
+ return;
+ }
+ try
+ {
+ changelogBackend = new ChangelogBackend(this, domainPredicate);
+ changelogBackend.initializeBackend();
+ try
+ {
+ DirectoryServer.registerBackend(changelogBackend);
+ }
+ catch (Exception e)
+ {
+ logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
+ getExceptionMessage(e)));
+ }
+
+ registerVirtualAttributeRules();
+ }
+ catch (Exception e)
+ {
+ // TODO : I18N with correct message + what kind of exception should we really throw ?
+ // (Directory/Initialization/Config Exception)
+ throw new ConfigException(Message.raw("Error when enabling external changelog"), e);
+ }
+ }
+
+ private void shutdownExternalChangelog()
+ {
+ if (changelogBackend != null)
+ {
+ DirectoryServer.deregisterBackend(changelogBackend);
+ changelogBackend.finalizeBackend();
+ changelogBackend = null;
+ }
+ deregisterVirtualAttributeRules();
+ }
+
private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
{
final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -609,6 +689,64 @@
return getReplicationServerDomain(baseDN, false);
}
+ /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
+ private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
+ {
+ Set<DN> domains = null;
+ synchronized (baseDNs)
+ {
+ domains = new HashSet<DN>(baseDNs.keySet());
+ }
+ domains.removeAll(excludedBaseDNs);
+ return domains;
+ }
+
+ /**
+ * Validate that provided state is coherent with this replication server,
+ * when ignoring the provided set of DNs.
+ * <p>
+ * The state is coherent if and only if it exactly has the set of DNs corresponding to
+ * the replication domains.
+ *
+ * @param state
+ * The multi domain state (cookie) to validate.
+ * @param ignoredBaseDNs
+ * The set of DNs to ignore when validating
+ * @throws DirectoryException
+ * If the state is not valid
+ */
+ public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
+ {
+ // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
+ final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
+ final Set<DN> stateDomains = state.getSnapshot().keySet();
+ final Set<DN> domainsCopy = new HashSet<DN>(domains);
+ final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
+ domainsCopy.removeAll(stateDomains);
+ if (!domainsCopy.isEmpty())
+ {
+ final StringBuilder missingDomains = new StringBuilder();
+ for (DN dn : domainsCopy)
+ {
+ missingDomains.append(dn).append(":;");
+ }
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+ missingDomains, "<" + state.toString() + missingDomains + ">"));
+ }
+ stateDomainsCopy.removeAll(domains);
+ if (!stateDomainsCopy.isEmpty())
+ {
+ final StringBuilder startState = new StringBuilder();
+ for (DN dn : domains) {
+ startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
+ }
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+ stateDomainsCopy.toString(), startState));
+ }
+ }
+
/**
* Get the ReplicationServerDomain associated to the base DN given in
* parameter.
@@ -706,7 +844,9 @@
domain.shutdown();
}
+ // TODO : switch to second method when changelog backend is branched
shutdownECL();
+ //shutdownExternalChangelog();
try
{
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index e275020..8970417 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -25,6 +25,8 @@
*/
package org.opends.server.replication.server.changelog.api;
+import java.util.Set;
+
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
@@ -115,6 +117,32 @@
public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
throws ChangelogException;
+ /**
+ * Generates a {@link DBCursor} across all the domains starting at or after
+ * the provided {@link MultiDomainServerState} for each domain, excluding a
+ * provided set of domain DNs.
+ * <p>
+ * When the cursor is not used anymore, client code MUST call the
+ * {@link DBCursor#close()} method to free the resources and locks used by the
+ * cursor.
+ *
+ * @param startState
+ * Starting point for each domain cursor. If any {@link ServerState}
+ * for a domain is null, then start from the oldest CSN for each
+ * replicaDBs
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which exact
+ * position the cursor must start
+ * @param excludedDomainDns
+ * Every domain appearing in this set is excluded from the cursor
+ * @return a non null {@link DBCursor}
+ * @throws ChangelogException
+ * If a database problem happened
+ * @see #getCursorFrom(DN, ServerState, PositionStrategy)
+ */
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy,
+ Set<DN> excludedDomainDns) throws ChangelogException;
+
// serverId methods
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index af4cbb8..4db2e85 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -661,11 +661,24 @@
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
final PositionStrategy positionStrategy) throws ChangelogException
{
+ final Set<DN> excludedDomainDns = Collections.emptySet();
+ return getCursorFrom(startState, positionStrategy, excludedDomainDns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns)
+ throws ChangelogException
+ {
final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
- cursor.addDomain(baseDN, startState.getServerState(baseDN));
+ if (!excludedDomainDns.contains(baseDN))
+ {
+ cursor.addDomain(baseDN, startState.getServerState(baseDN));
+ }
}
return cursor;
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
index d1a4721..9a9972d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
@@ -32,7 +32,7 @@
*
* @FunctionalInterface
*/
-class ECLEnabledDomainPredicate
+public class ECLEnabledDomainPredicate
{
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
index a035657..97a1806 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -33,7 +33,7 @@
* Multi domain DB cursor that only returns updates for the domains which have
* been enabled for the external changelog.
*/
-class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
+public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
{
private final ECLEnabledDomainPredicate predicate;
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 5b3cca6..2706782 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -712,11 +712,22 @@
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
final PositionStrategy positionStrategy) throws ChangelogException
{
+ final Set<DN> excludedDomainDns = Collections.emptySet();
+ return getCursorFrom(startState, positionStrategy, excludedDomainDns);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException
+ {
final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
- cursor.addDomain(baseDN, startState.getServerState(baseDN));
+ if (!excludedDomainDns.contains(baseDN)) {
+ cursor.addDomain(baseDN, startState.getServerState(baseDN));
+ }
}
return cursor;
}
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
new file mode 100644
index 0000000..6e552a3
--- /dev/null
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
@@ -0,0 +1,968 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ * Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.backends;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.protocol.OperationContext.*;
+import static org.opends.server.types.ResultCode.*;
+import static org.opends.server.util.ServerConstants.*;
+import static org.testng.Assert.*;
+
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.opends.server.TestCaseUtils;
+import org.opends.server.backends.ChangelogBackend.SearchParams;
+import org.opends.server.controls.ExternalChangelogRequestControl;
+import org.opends.server.core.ModifyDNOperation;
+import org.opends.server.core.ModifyDNOperationBasis;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.protocols.internal.InternalClientConnection;
+import org.opends.server.protocols.internal.InternalSearchListener;
+import org.opends.server.protocols.internal.InternalSearchOperation;
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.CSNGenerator;
+import org.opends.server.replication.common.MultiDomainServerState;
+import org.opends.server.replication.plugin.DomainFakeCfg;
+import org.opends.server.replication.plugin.ExternalChangelogDomainFakeCfg;
+import org.opends.server.replication.plugin.LDAPReplicationDomain;
+import org.opends.server.replication.plugin.MultimasterReplication;
+import org.opends.server.replication.protocol.AddMsg;
+import org.opends.server.replication.protocol.DeleteMsg;
+import org.opends.server.replication.protocol.ModifyDNMsg;
+import org.opends.server.replication.protocol.ModifyDnContext;
+import org.opends.server.replication.protocol.ModifyMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.ReplServerFakeConfiguration;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
+import org.opends.server.replication.service.DSRSShutdownSync;
+import org.opends.server.replication.service.ReplicationBroker;
+import org.opends.server.types.Attribute;
+import org.opends.server.types.AttributeValue;
+import org.opends.server.types.Attributes;
+import org.opends.server.types.AuthenticationInfo;
+import org.opends.server.types.Control;
+import org.opends.server.types.DN;
+import org.opends.server.types.DereferencePolicy;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.Modification;
+import org.opends.server.types.ModificationType;
+import org.opends.server.types.Operation;
+import org.opends.server.types.RDN;
+import org.opends.server.types.ResultCode;
+import org.opends.server.types.SearchFilter;
+import org.opends.server.types.SearchResultEntry;
+import org.opends.server.types.SearchScope;
+import org.opends.server.util.LDIFWriter;
+import org.opends.server.util.TimeThread;
+import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import com.forgerock.opendj.util.Pair;
+
+@SuppressWarnings("javadoc")
+public class ChangelogBackendTestCase extends ReplicationTestCase
+{
+ private static final DebugTracer TRACER = getTracer();
+
+ private static final String USER1_ENTRY_UUID = "11111111-1111-1111-1111-111111111111";
+ private static final long CHANGENUMBER_ZERO = 0L;
+ private static final int SERVER_ID_1 = 1201;
+ private static final int SERVER_ID_2 = 1202;
+ private static final String TEST_ROOT_DN_STRING2 = "o=test2";
+ private static DN ROOT_DN_OTEST;
+ private static DN ROOT_DN_OTEST2;
+
+ private final int brokerSessionTimeout = 5000;
+ private final int maxWindow = 100;
+
+ /** The replicationServer that will be used in this test. */
+ private ReplicationServer replicationServer;
+
+ /** The port of the replicationServer. */
+ private int replicationServerPort;
+
+ /**
+ * When used in a search operation, it includes all attributes (user and
+ * operational)
+ */
+ private static final Set<String> ALL_ATTRIBUTES = newSet("*", "+");
+ private static final List<Control> NO_CONTROL = null;
+
+ @BeforeClass
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ ROOT_DN_OTEST = DN.decode(TEST_ROOT_DN_STRING);
+ ROOT_DN_OTEST2 = DN.decode(TEST_ROOT_DN_STRING2);
+
+ // This test suite depends on having the schema available.
+ configureReplicationServer();
+ }
+
+ @Override
+ @AfterClass
+ public void classCleanUp() throws Exception
+ {
+ callParanoiaCheck = false;
+ super.classCleanUp();
+
+ remove(replicationServer);
+ replicationServer = null;
+
+ paranoiaCheck();
+ }
+
+ @AfterMethod
+ public void clearReplicationDb() throws Exception
+ {
+ clearChangelogDB(replicationServer);
+ }
+
+ /** Configure a replicationServer for test. */
+ private void configureReplicationServer() throws Exception
+ {
+ replicationServerPort = TestCaseUtils.findFreePort();
+
+ ReplServerFakeConfiguration config = new ReplServerFakeConfiguration(
+ replicationServerPort,
+ "ChangelogBackendTestDB",
+ replicationDbImplementation,
+ 0, // purge delay
+ 71, // server id
+ 0, // queue size
+ maxWindow, // window size
+ null // servers
+ );
+ config.setComputeChangeNumber(true);
+ replicationServer = new ReplicationServer(config, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
+ {
+ @Override
+ public boolean isECLEnabledDomain(DN baseDN)
+ {
+ return baseDN.equals(ROOT_DN_OTEST);
+ }
+ });
+ debugInfo("configure", "ReplicationServer created:" + replicationServer);
+ }
+
+ /** Enable replication on provided domain DN and serverid, using provided port. */
+ private Pair<ReplicationBroker, LDAPReplicationDomain> enableReplication(DN domainDN, int serverId,
+ int replicationPort, int timeout) throws Exception
+ {
+ ReplicationBroker broker = openReplicationSession(domainDN, serverId, 100, replicationPort, timeout);
+ DomainFakeCfg domainConf = newFakeCfg(domainDN, serverId, replicationPort);
+ LDAPReplicationDomain replicationDomain = startNewReplicationDomain(domainConf, null, null);
+ return Pair.of(broker, replicationDomain);
+ }
+
+ /** Start a new replication domain on the directory server side. */
+ private LDAPReplicationDomain startNewReplicationDomain(
+ DomainFakeCfg domainConf,
+ SortedSet<String> eclInclude,
+ SortedSet<String> eclIncludeForDeletes)
+ throws Exception
+ {
+ domainConf.setExternalChangelogDomain(new ExternalChangelogDomainFakeCfg(true, eclInclude, eclIncludeForDeletes));
+ // Set a Changetime heartbeat interval low enough
+ // (less than default value that is 1000 ms)
+ // for the test to be sure to consider all changes as eligible.
+ domainConf.setChangetimeHeartbeatInterval(10);
+ LDAPReplicationDomain newDomain = MultimasterReplication.createNewDomain(domainConf);
+ newDomain.start();
+ return newDomain;
+ }
+
+ private void removeReplicationDomains(LDAPReplicationDomain... domains)
+ {
+ for (LDAPReplicationDomain domain : domains)
+ {
+ if (domain != null)
+ {
+ domain.shutdown();
+ MultimasterReplication.deleteDomain(domain.getBaseDN());
+ }
+ }
+ }
+
+ @Test(enabled=false)
+ public void searchChangesOnOneSuffixUsingEmptyCookie() throws Exception
+ {
+ String testName = "FourChangesCookie";
+ debugInfo(testName, "Starting test\n\n");
+
+ CSN[] csns = generateAndPublishChangesForEachOperationType(testName);
+
+ searchChangesForEachOperationTypeUsingEmptyCookie(csns, testName);
+
+ assertChangelogAttributesInRootDSE(true, 1, 4);
+
+ debugInfo(testName, "Ending search with success");
+ }
+
+ @Test(enabled=false)
+ public void searchChangesOnOneSuffixUsingDraftMode() throws Exception
+ {
+ long firstChangeNumber = 1;
+ String testName = "FourChanges/" + firstChangeNumber;
+ debugInfo(testName, "Starting test\n\n");
+
+ CSN[] csns = generateAndPublishChangesForEachOperationType(testName);
+
+ searchChangesForEachOperationTypeUsingDraftMode(firstChangeNumber, csns, testName);
+
+ assertChangelogAttributesInRootDSE(true, 1, 4);
+
+ debugInfo(testName, "Ending search with success");
+ }
+
+ @Test(enabled=false)
+ public void searchChangesOnOneSuffixMultipleTimesUsingDraftMode() throws Exception
+ {
+ replicationServer.getChangelogDB().setPurgeDelay(0);
+
+ // write 4 changes starting from changenumber 1, and search them
+ String testName = "Multiple/1";
+ CSN[] csns = generateAndPublishChangesForEachOperationType(testName);
+ searchChangesForEachOperationTypeUsingDraftMode(1, csns, testName);
+
+ // write 4 more changes starting from changenumber 5, and search them
+ testName = "Multiple/5";
+ csns = generateAndPublishChangesForEachOperationType(testName);
+ searchChangesForEachOperationTypeUsingDraftMode(5, csns, testName);
+
+ // search from the provided change number: 6 (should be the add msg)
+ CSN csnOfLastAddMsg = csns[1];
+ searchChangelogForOneChangeNumber(6, csnOfLastAddMsg);
+
+ // search from a provided change number interval: 5-7
+ searchChangelogFromToChangeNumber(5,7);
+
+ // check first and last change number
+ assertChangelogAttributesInRootDSE(true, 1, 8);
+
+ // add a new change, then check again first and last change number without previous search
+ CSN csn = new CSN(TimeThread.getTime(), 10, SERVER_ID_1);
+ publishChanges(testName, generateDeleteMsg(TEST_ROOT_DN_STRING, csn, testName, 1));
+
+ assertChangelogAttributesInRootDSE(true, 1, 9);
+ }
+
+ /**
+ * Verifies that is not possible to read the changelog without the changelog-read privilege
+ */
+ // TODO : enable when code is checking the privileges correctly
+ @Test(enabled=false)
+ public void searchingChangelogWithoutPrivilegeShouldFail() throws Exception
+ {
+ AuthenticationInfo nonPrivilegedUser = new AuthenticationInfo();
+
+ InternalClientConnection conn = new InternalClientConnection(nonPrivilegedUser);
+ InternalSearchOperation op = conn.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
+
+ assertEquals(op.getResultCode(), ResultCode.INSUFFICIENT_ACCESS_RIGHTS);
+ assertEquals(op.getErrorMessage().toMessage(), NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
+ }
+
+ /**
+ * With an empty RS, a search should return only root entry.
+ */
+ @Test(enabled=false)
+ public void searchWhenNoChangesShouldReturnRootEntryOnly() throws Exception
+ {
+ String testName = "EmptyRS";
+ debugInfo(testName, "Starting test\n\n");
+
+ searchChangelog("(objectclass=*)", 1, SUCCESS, testName);
+
+ debugInfo(testName, "Ending test successfully");
+ }
+
+ @Test(enabled=false)
+ public void searchWithUnknownChangeNumberShouldReturnNoResult() throws Exception
+ {
+ String testName = "UnknownChangeNumber";
+ debugInfo(testName, "Starting test\n\n");
+
+ searchChangelog("(changenumber=1000)", 0, SUCCESS, testName);
+
+ debugInfo(testName, "Ending test with success");
+ }
+
+ @Test(enabled=false)
+ public void operationalAndVirtualAttributesShouldNotBeVisibleOutsideRootDSE() throws Exception
+ {
+ String testName = "attributesVisibleOutsideRootDSE";
+ debugInfo(testName, "Starting test \n\n");
+
+ Set<String> attributes =
+ newSet("firstchangenumber", "lastchangenumber", "changelog", "lastExternalChangelogCookie");
+
+ InternalSearchOperation searchOp = searchDNWithBaseScope(TEST_ROOT_DN_STRING, attributes);
+ waitForSearchOpResult(searchOp, ResultCode.SUCCESS);
+
+ final List<SearchResultEntry> entries = searchOp.getSearchEntries();
+ assertThat(entries).hasSize(1);
+ debugAndWriteEntries(null, entries, testName);
+ SearchResultEntry entry = entries.get(0);
+ assertNull(getAttributeValue(entry, "firstchangenumber"));
+ assertNull(getAttributeValue(entry, "lastchangenumber"));
+ assertNull(getAttributeValue(entry, "changelog"));
+ assertNull(getAttributeValue(entry, "lastExternalChangelogCookie"));
+
+ debugInfo(testName, "Ending test with success");
+ }
+
+ @DataProvider()
+ public Object[][] getFilters()
+ {
+ return new Object[][] {
+ // base DN, filter, expected first change number, expected last change number
+ { "cn=changelog", "(objectclass=*)", -1, -1 },
+ { "cn=changelog", "(changenumber>=2)", 2, -1 },
+ { "cn=changelog", "(&(changenumber>=2)(changenumber<=5))", 2, 5 },
+ { "cn=changelog", "(&(dc=x)(&(changenumber>=2)(changenumber<=5)))", 2, 5 },
+ { "cn=changelog",
+ "(&(&(changenumber>=3)(changenumber<=4))(&(|(dc=y)(dc=x))(&(changenumber>=2)(changenumber<=5))))", 3, 4 },
+ { "cn=changelog", "(|(objectclass=*)(&(changenumber>=2)(changenumber<=5)))", -1, -1 },
+ { "cn=changelog", "(changenumber=8)", 8, 8 },
+
+ { "changeNumber=8,cn=changelog", "(objectclass=*)", 8, 8 },
+ { "changeNumber=8,cn=changelog", "(changenumber>=2)", 8, 8 },
+ { "changeNumber=8,cn=changelog", "(&(changenumber>=2)(changenumber<=5))", 8, 8 },
+ };
+ }
+
+ @Test(dataProvider="getFilters")
+ public void optimizeFiltersWithChangeNumber(String dn, String filter, long expectedFirstCN, long expectedLastCN)
+ throws Exception
+ {
+ final ChangelogBackend backend = new ChangelogBackend(null, null);
+ final DN baseDN = DN.decode(dn);
+ final SearchParams searchParams = new SearchParams();
+
+ backend.optimizeSearchParameters(searchParams, baseDN, SearchFilter.createFilterFromString(filter));
+
+ assertSearchParameters(searchParams, expectedFirstCN, expectedLastCN, null);
+ }
+
+ @Test
+ public void optimizeFiltersWithReplicationCsn() throws Exception
+ {
+ final ChangelogBackend backend = new ChangelogBackend(null, null);
+ final DN baseDN = DN.decode("cn=changelog");
+ final CSN csn = new CSNGenerator(1, 0).newCSN();
+ final SearchParams searchParams = new SearchParams();
+
+ backend.optimizeSearchParameters(searchParams, baseDN,
+ SearchFilter.createFilterFromString("(replicationcsn=" + csn + ")"));
+
+ assertSearchParameters(searchParams, -1, -1, csn);
+ }
+
+ private List<SearchResultEntry> assertChangelogAttributesInRootDSE(boolean isECLEnabled,
+ int expectedFirstChangeNumber, int expectedLastChangeNumber) throws Exception
+ {
+ AssertionError error = null;
+ for (int count = 0 ; count < 30; count++)
+ {
+ try
+ {
+ final Set<String> attributes = new LinkedHashSet<String>();
+ if (expectedFirstChangeNumber > 0)
+ {
+ attributes.add("firstchangenumber");
+ }
+ attributes.add("lastchangenumber");
+ attributes.add("changelog");
+ attributes.add("lastExternalChangelogCookie");
+
+ final InternalSearchOperation searchOp = searchDNWithBaseScope("", attributes);
+ final List<SearchResultEntry> entries = searchOp.getSearchEntries();
+ assertThat(entries).hasSize(1);
+
+ final SearchResultEntry entry = entries.get(0);
+ if (isECLEnabled)
+ {
+ if (expectedFirstChangeNumber > 0)
+ {
+ assertAttributeValue(entry, "firstchangenumber", String.valueOf(expectedFirstChangeNumber));
+ }
+ assertAttributeValue(entry, "lastchangenumber", String.valueOf(expectedLastChangeNumber));
+ assertAttributeValue(entry, "changelog", String.valueOf("cn=changelog"));
+ assertNotNull(getAttributeValue(entry, "lastExternalChangelogCookie"));
+ }
+ else
+ {
+ if (expectedFirstChangeNumber > 0) {
+ assertNull(getAttributeValue(entry, "firstchangenumber"));
+ }
+ assertNull(getAttributeValue(entry, "lastchangenumber"));
+ assertNull(getAttributeValue(entry, "changelog"));
+ assertNull(getAttributeValue(entry, "lastExternalChangelogCookie"));
+ }
+ return entries;
+ }
+ catch (AssertionError ae)
+ {
+ // try again to see if changes have been persisted
+ error = ae;
+ }
+ Thread.sleep(100);
+ }
+ assertNotNull(error);
+ throw error;
+ }
+
+ private void assertSearchParameters(SearchParams searchParams, long firstChangeNumber,
+ long lastChangeNumber, CSN csn) throws Exception
+ {
+ assertEquals(searchParams.getLowestChangeNumber(), firstChangeNumber);
+ assertEquals(searchParams.getHighestChangeNumber(), lastChangeNumber);
+ assertEquals(searchParams.getCSN(), csn == null ? new CSN(0, 0, 0) : csn);
+ }
+
+ private CSN[] generateAndPublishChangesForEachOperationType(String testName) throws Exception
+ {
+ CSN[] csns = generateCSNs(4, SERVER_ID_1);
+
+ List<UpdateMsg> messages = new ArrayList<UpdateMsg>();
+ messages.add(generateDeleteMsg(TEST_ROOT_DN_STRING, csns[0], testName, 1));
+ messages.add(generateAddMsg(TEST_ROOT_DN_STRING, csns[1], USER1_ENTRY_UUID, testName));
+ messages.add(generateModMsg(TEST_ROOT_DN_STRING, csns[2], testName));
+ messages.add(generateModDNMsg(TEST_ROOT_DN_STRING, csns[3], testName));
+
+ publishChanges(testName, messages.toArray(new UpdateMsg[4]));
+ return csns;
+ }
+
+ /** Publish a list changes to the default replication broker used by tests. */
+ private void publishChanges(String testName, UpdateMsg...messages) throws Exception
+ {
+ Pair<ReplicationBroker, LDAPReplicationDomain> replicationObjects = null;
+ try
+ {
+ replicationObjects = enableReplication(ROOT_DN_OTEST, SERVER_ID_1, replicationServerPort, brokerSessionTimeout);
+ ReplicationBroker broker = replicationObjects.getFirst();
+ for (UpdateMsg msg : messages)
+ {
+ debugInfo(testName, " publishes " + msg.getCSN());
+ broker.publish(msg);
+ }
+ }
+ finally
+ {
+ if (replicationObjects != null)
+ {
+ removeReplicationDomains(replicationObjects.getSecond());
+ stop(replicationObjects.getFirst());
+ }
+ }
+ }
+
+ private void searchChangesForEachOperationTypeUsingEmptyCookie(CSN[] csns, String testName) throws Exception
+ {
+ int nbEntries = 4;
+ String cookie= "";
+
+ InternalSearchOperation searchOp =
+ searchChangelogUsingCookie("(targetdn=*" + testName + "*,o=test)", cookie, nbEntries, SUCCESS, testName);
+
+ final String[] cookies = new String[nbEntries];
+ for (int j = 0; j < cookies.length; j++)
+ {
+ cookies[j] = "o=test:" + csns[j] + ";";
+ }
+ final List<SearchResultEntry> searchEntries = searchOp.getSearchEntries();
+ assertDelEntry(searchEntries.get(0), testName + 1, testName + "uuid1", CHANGENUMBER_ZERO, csns[0], cookies[0]);
+ assertAddEntry(searchEntries.get(1), testName + 2, USER1_ENTRY_UUID, CHANGENUMBER_ZERO, csns[1], cookies[1]);
+ assertModEntry(searchEntries.get(2), testName + 3, testName + "uuid3", CHANGENUMBER_ZERO, csns[2], cookies[2]);
+ assertModDNEntry(searchEntries.get(3), testName + 4, testName + "new4", testName+"uuid4", CHANGENUMBER_ZERO,
+ csns[3], cookies[3]);
+ assertResultsContainCookieControl(searchOp, cookies);
+ }
+
+ private void searchChangesForEachOperationTypeUsingDraftMode(long firstChangeNumber, CSN[] csns, String testName)
+ throws Exception
+ {
+ // Search the changelog and check 4 entries are returned
+ String filter = "(targetdn=*" + testName + "*,o=test)";
+ InternalSearchOperation searchOp = searchChangelog(filter, 4, SUCCESS, testName);
+
+ assertContainsNoControl(searchOp);
+ assertEntriesForEachOperationType(searchOp.getSearchEntries(), firstChangeNumber, testName, USER1_ENTRY_UUID, csns);
+
+ // Search the changelog with filter on change number and check 4 entries are returned
+ filter =
+ "(&(targetdn=*" + testName + "*,o=test)"
+ + "(&(changenumber>=" + firstChangeNumber + ")"
+ + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
+ searchOp = searchChangelog(filter, 4, SUCCESS, testName);
+
+ assertContainsNoControl(searchOp);
+ assertEntriesForEachOperationType(searchOp.getSearchEntries(), firstChangeNumber, testName, USER1_ENTRY_UUID, csns);
+ }
+
+ /**
+ * Search on the provided change number and check the result.
+ *
+ * @param changeNumber
+ * Change number to search
+ * @param expectedCsn
+ * Expected CSN in the entry corresponding to the change number
+ */
+ private void searchChangelogForOneChangeNumber(long changeNumber, CSN expectedCsn) throws Exception
+ {
+ String testName = "searchOneChangeNumber/" + changeNumber;
+ debugInfo(testName, "Starting search\n\n");
+
+ InternalSearchOperation searchOp =
+ searchChangelog("(changenumber=" + changeNumber + ")", 1, SUCCESS, testName);
+
+ SearchResultEntry entry = searchOp.getSearchEntries().get(0);
+ String uncheckedUid = null;
+ assertEntryCommonAttributes(entry, uncheckedUid, USER1_ENTRY_UUID, changeNumber, expectedCsn,
+ "o=test:" + expectedCsn + ";");
+
+ debugInfo(testName, "Ending search with success");
+ }
+
+ private void searchChangelogFromToChangeNumber(int firstChangeNumber, int lastChangeNumber) throws Exception
+ {
+ String testName = "searchFromToChangeNumber/" + firstChangeNumber + "/" + lastChangeNumber;
+ debugInfo(testName, "Starting search\n\n");
+
+ String filter = "(&(changenumber>=" + firstChangeNumber + ")" + "(changenumber<=" + lastChangeNumber + "))";
+ final int expectedNbEntries = lastChangeNumber - firstChangeNumber + 1;
+ searchChangelog(filter, expectedNbEntries, SUCCESS, testName);
+
+ debugInfo(testName, "Ending search with success");
+ }
+
+ private InternalSearchOperation searchChangelogUsingCookie(String filterString,
+ String cookie, int expectedNbEntries, ResultCode expectedResultCode, String testName)
+ throws Exception
+ {
+ debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
+ return searchChangelog(filterString, ALL_ATTRIBUTES, createCookieControl(cookie),
+ expectedNbEntries, expectedResultCode, testName);
+ }
+
+ private InternalSearchOperation searchChangelog(String filterString, int expectedNbEntries,
+ ResultCode expectedResultCode, String testName) throws Exception
+ {
+ return searchChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, expectedResultCode, testName);
+ }
+
+ private InternalSearchOperation searchChangelog(String filterString, Set<String> attributes,
+ List<Control> controls, int expectedNbEntries, ResultCode expectedResultCode, String testName) throws Exception
+ {
+ InternalSearchOperation searchOperation = null;
+ int sizeLimitZero = 0;
+ int timeLimitZero = 0;
+ InternalSearchListener noSearchListener = null;
+ int count = 0;
+ do
+ {
+ Thread.sleep(10);
+ boolean typesOnlyFalse = false;
+ searchOperation = connection.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE,
+ DereferencePolicy.NEVER_DEREF_ALIASES, sizeLimitZero, timeLimitZero, typesOnlyFalse, filterString,
+ attributes, controls, noSearchListener);
+ count++;
+ }
+ while (count < 300 && searchOperation.getSearchEntries().size() != expectedNbEntries);
+
+ final List<SearchResultEntry> entries = searchOperation.getSearchEntries();
+ assertThat(entries).hasSize(expectedNbEntries);
+ debugAndWriteEntries(getLDIFWriter(), entries, testName);
+ waitForSearchOpResult(searchOperation, expectedResultCode);
+ return searchOperation;
+ }
+
+ private InternalSearchOperation searchDNWithBaseScope(String dn, Set<String> attributes) throws Exception
+ {
+ final InternalSearchOperation searchOp = connection.processSearch(
+ dn,
+ SearchScope.BASE_OBJECT,
+ DereferencePolicy.NEVER_DEREF_ALIASES,
+ 0, // Size limit
+ 0, // Time limit
+ false, // Types only
+ "(objectclass=*)",
+ attributes);
+ waitForSearchOpResult(searchOp, ResultCode.SUCCESS);
+ return searchOp;
+ }
+
+ /** Build a list of controls including the cookie provided. */
+ private List<Control> createCookieControl(String cookie) throws DirectoryException
+ {
+ final MultiDomainServerState state = new MultiDomainServerState(cookie);
+ final Control cookieControl = new ExternalChangelogRequestControl(true, state);
+ return newList(cookieControl);
+ }
+
+ private static LDIFWriter getLDIFWriter() throws Exception
+ {
+ ByteArrayOutputStream stream = new ByteArrayOutputStream();
+ LDIFExportConfig exportConfig = new LDIFExportConfig(stream);
+ return new LDIFWriter(exportConfig);
+ }
+
+ private CSN[] generateCSNs(int numberOfCsns, int serverId)
+ {
+ long startTime = TimeThread.getTime();
+
+ CSN[] csns = new CSN[numberOfCsns];
+ for (int i = 0; i < numberOfCsns; i++)
+ {
+ // seqNum must be greater than 0, so start at 1
+ csns[i] = new CSN(startTime + i, i + 1, serverId);
+ }
+ return csns;
+ }
+
+ private UpdateMsg generateDeleteMsg(String baseDn, CSN csn, String testName, int testIndex)
+ throws Exception
+ {
+ String dn = "uid=" + testName + testIndex + "," + baseDn;
+ return new DeleteMsg(DN.decode(dn), csn, testName + "uuid" + testIndex);
+ }
+
+ private UpdateMsg generateAddMsg(String baseDn, CSN csn, String user1entryUUID, String testName)
+ throws Exception
+ {
+ String baseUUID = "22222222-2222-2222-2222-222222222222";
+ String entryLdif = "dn: uid="+ testName + "2," + baseDn + "\n"
+ + "objectClass: top\n" + "objectClass: domain\n"
+ + "entryUUID: "+ user1entryUUID +"\n";
+ Entry entry = TestCaseUtils.entryFromLdifString(entryLdif);
+ return new AddMsg(
+ csn,
+ DN.decode("uid="+testName+"2," + baseDn),
+ user1entryUUID,
+ baseUUID,
+ entry.getObjectClassAttribute(),
+ entry.getAttributes(),
+ Collections.<Attribute> emptyList());
+ }
+
+ private UpdateMsg generateModMsg(String baseDn, CSN csn, String testName) throws Exception
+ {
+ DN baseDN = DN.decode("uid=" + testName + "3," + baseDn);
+ List<Modification> mods = createAttributeModif("description", "new value");
+ return new ModifyMsg(csn, baseDN, mods, testName + "uuid3");
+ }
+
+ private List<Modification> createAttributeModif(String attributeName, String valueString)
+ {
+ Attribute attr = Attributes.create(attributeName, valueString);
+ return newList(new Modification(ModificationType.REPLACE, attr));
+ }
+
+ private UpdateMsg generateModDNMsg(String baseDn, CSN csn, String testName) throws Exception
+ {
+ final DN newSuperior = ROOT_DN_OTEST2;
+ ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
+ DN.decode("uid=" + testName + "4," + baseDn), // entryDN
+ RDN.decode("uid=" + testName + "new4"), // new rdn
+ true, // deleteoldrdn
+ newSuperior);
+ op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, testName + "uuid4", "newparentId"));
+ LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
+ return new ModifyDNMsg(localOp);
+ }
+
+ //TODO : share this code with other classes ?
+ private void waitForSearchOpResult(Operation operation, ResultCode expectedResult) throws Exception
+ {
+ int i = 0;
+ while (operation.getResultCode() == ResultCode.UNDEFINED || operation.getResultCode() != expectedResult)
+ {
+ Thread.sleep(50);
+ i++;
+ if (i > 10)
+ {
+ assertEquals(operation.getResultCode(), expectedResult, operation.getErrorMessage().toString());
+ }
+ }
+ }
+
+ /** Verify that no entry contains the ChangeLogCookie control. */
+ private void assertContainsNoControl(InternalSearchOperation searchOp)
+ {
+ for (SearchResultEntry entry : searchOp.getSearchEntries())
+ {
+ assertTrue(entry.getControls().isEmpty(), "result entry " + entry.toString() +
+ " should contain no control(s)");
+ }
+ }
+
+ /** Verify that all entries contains the ChangeLogCookie control with the correct cookie value. */
+ private void assertResultsContainCookieControl(InternalSearchOperation searchOp, String[] cookies) throws Exception
+ {
+ for (SearchResultEntry entry : searchOp.getSearchEntries())
+ {
+ boolean cookieControlFound = false;
+ for (Control control : entry.getControls())
+ {
+ if (control.getOID().equals(OID_ECL_COOKIE_EXCHANGE_CONTROL))
+ {
+ String cookieString =
+ searchOp.getRequestControl(ExternalChangelogRequestControl.DECODER).getCookie().toString();
+ assertThat(cookieString).isIn((Object[]) cookies);
+ cookieControlFound = true;
+ }
+ }
+ assertTrue(cookieControlFound, "result entry " + entry.toString() + " should contain the cookie control");
+ }
+ }
+
+ /** Check the DEL entry has the right content. */
+ private void assertDelEntry(SearchResultEntry entry, String uid, String entryUUID,
+ long changeNumber, CSN csn, String cookie)
+ {
+ assertAttributeValue(entry, "changetype", "delete");
+ assertAttributeValue(entry, "targetuniqueid", entryUUID);
+ assertAttributeValue(entry, "targetentryuuid", entryUUID);
+ assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
+ }
+
+ /** Check the ADD entry has the right content. */
+ private void assertAddEntry(SearchResultEntry entry, String uid, String entryUUID,
+ long changeNumber, CSN csn, String cookie)
+ {
+ assertAttributeValue(entry, "changetype", "add");
+ assertEntryMatchesLDIF(entry, "changes",
+ "objectClass: domain",
+ "objectClass: top",
+ "entryUUID: " + entryUUID);
+ assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
+ }
+
+ private void assertModEntry(SearchResultEntry entry, String uid, String entryUUID,
+ long changeNumber, CSN csn, String cookie)
+ {
+ assertAttributeValue(entry, "changetype", "modify");
+ assertEntryMatchesLDIF(entry, "changes",
+ "replace: description",
+ "description: new value",
+ "-");
+ assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
+ }
+
+ private void assertModDNEntry(SearchResultEntry entry, String uid, String newUid,
+ String entryUUID, long changeNumber, CSN csn, String cookie)
+ {
+ assertAttributeValue(entry, "changetype", "modrdn");
+ assertAttributeValue(entry, "newrdn", "uid=" + newUid);
+ assertAttributeValue(entry, "newsuperior", TEST_ROOT_DN_STRING2);
+ assertAttributeValue(entry, "deleteoldrdn", "true");
+ assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
+
+ }
+
+ private void assertEntryCommonAttributes(SearchResultEntry resultEntry,
+ String uid, String entryUUID, long changeNumber, CSN csn, String cookie)
+ {
+ if (changeNumber == 0)
+ {
+ assertDNWithCSN(resultEntry, csn);
+ }
+ else
+ {
+ assertDNWithChangeNumber(resultEntry, changeNumber);
+ assertAttributeValue(resultEntry, "changenumber", String.valueOf(changeNumber));
+ }
+ assertAttributeValue(resultEntry, "targetentryuuid", entryUUID);
+ assertAttributeValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
+ assertAttributeValue(resultEntry, "replicationcsn", csn.toString());
+ assertAttributeValue(resultEntry, "changelogcookie", cookie);
+ // A null value can be provided for uid if it should not be checked
+ if (uid != null)
+ {
+ final String targetDN = "uid=" + uid + "," + TEST_ROOT_DN_STRING;
+ assertAttributeValue(resultEntry, "targetdn", targetDN);
+ }
+ }
+
+ private void assertEntriesForEachOperationType(List<SearchResultEntry> entries, long firstChangeNumber,
+ String testName, String entryUUID, CSN... csns) throws Exception
+ {
+ debugAndWriteEntries(getLDIFWriter(), entries, testName);
+
+ assertThat(entries).hasSize(4);
+
+ CSN csn = csns[0];
+ assertDelEntry(entries.get(0), testName + "1", testName + "uuid1", firstChangeNumber, csn, "o=test:" + csn + ";");
+
+ csn = csns[1];
+ assertAddEntry(entries.get(1), testName + "2", entryUUID, firstChangeNumber+1, csn, "o=test:" + csn + ";");
+
+ csn = csns[2];
+ assertModEntry(entries.get(2), testName + "3", testName + "uuid3", firstChangeNumber+2, csn,
+ "o=test:" + csn + ";");
+
+ csn = csns[3];
+ assertModDNEntry(entries.get(3), testName + "4", testName + "new4", testName + "uuid4", firstChangeNumber+3, csn,
+ "o=test:" + csn + ";");
+ }
+
+ /**
+ * Asserts the attribute value as LDIF to ignore lines ordering.
+ */
+ private static void assertEntryMatchesLDIF(Entry entry, String attrName, String... expectedLDIFLines)
+ {
+ final String actualVal = getAttributeValue(entry, attrName);
+ final Set<Set<String>> actual = toLDIFEntries(actualVal.split("\n"));
+ final Set<Set<String>> expected = toLDIFEntries(expectedLDIFLines);
+ assertThat(actual)
+ .as("In entry " + entry + " incorrect value for attr '" + attrName + "'")
+ .isEqualTo(expected);
+ }
+
+ private static void assertAttributeValue(Entry entry, String attrName, String expectedValue)
+ {
+ assertFalse(expectedValue.contains("\n"),
+ "You should use assertEntryMatchesLDIF() method for asserting on this value: \"" + expectedValue + "\"");
+ final String actualValue = getAttributeValue(entry, attrName);
+ assertThat(actualValue)
+ .as("In entry " + entry + " incorrect value for attr '" + attrName + "'")
+ .isEqualToIgnoringCase(expectedValue);
+ }
+
+ private void assertDNWithChangeNumber(SearchResultEntry resultEntry, long changeNumber)
+ {
+ String actualDN = resultEntry.getDN().toNormalizedString();
+ String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
+ assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
+ }
+
+ private void assertDNWithCSN(SearchResultEntry resultEntry, CSN csn)
+ {
+ String actualDN = resultEntry.getDN().toNormalizedString();
+ String expectedDN = "replicationcsn=" + csn + "," + TEST_ROOT_DN_STRING + ",cn=changelog";
+ assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
+ }
+
+ /**
+ * Returns a data structure allowing to compare arbitrary LDIF lines. The
+ * algorithm splits LDIF entries on lines containing only a dash ("-"). It
+ * then returns LDIF entries and lines in an LDIF entry in ordering
+ * insensitive data structures.
+ * <p>
+ * Note: a last line with only a dash ("-") is significant. i.e.:
+ *
+ * <pre>
+ * <code>
+ * boolean b = toLDIFEntries("-").equals(toLDIFEntries()));
+ * System.out.println(b); // prints "false"
+ * </code>
+ * </pre>
+ */
+ private static Set<Set<String>> toLDIFEntries(String... ldifLines)
+ {
+ final Set<Set<String>> results = new HashSet<Set<String>>();
+ Set<String> ldifEntryLines = new HashSet<String>();
+ for (String ldifLine : ldifLines)
+ {
+ if (!"-".equals(ldifLine))
+ {
+ // same entry keep adding
+ ldifEntryLines.add(ldifLine);
+ }
+ else
+ {
+ // this is a new entry
+ results.add(ldifEntryLines);
+ ldifEntryLines = new HashSet<String>();
+ }
+ }
+ results.add(ldifEntryLines);
+ return results;
+ }
+
+ // TODO : share this code with other classes
+ private static String getAttributeValue(Entry entry, String attrName)
+ {
+ List<Attribute> attrs = entry.getAttribute(attrName.toLowerCase());
+ if (attrs == null)
+ {
+ return null;
+ }
+ Attribute a = attrs.iterator().next();
+ AttributeValue av = a.iterator().next();
+ return av.toString();
+ }
+
+ private void debugAndWriteEntries(LDIFWriter ldifWriter,List<SearchResultEntry> entries, String tn) throws Exception
+ {
+ if (entries != null)
+ {
+ for (SearchResultEntry entry : entries)
+ {
+ // Can use entry.toSingleLineString()
+ debugInfo(tn, " RESULT entry returned:" + entry.toLDIFString());
+ if (ldifWriter != null)
+ {
+ ldifWriter.writeEntry(entry);
+ }
+ }
+ }
+ }
+
+ /**
+ * Utility - log debug message - highlight it is from the test and not
+ * from the server code. Makes easier to observe the test steps.
+ */
+ private void debugInfo(String testName, String message)
+ {
+ if (debugEnabled())
+ {
+ TRACER.debugInfo("** TEST " + testName + " ** " + message);
+ }
+ }
+}
+
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
index 77c5dff..baf76db 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -41,6 +41,8 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
+import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Attribute;
@@ -174,7 +176,15 @@
ReplServerFakeConfiguration conf =
new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0,
100, servers);
- ReplicationServer replicationServer = new ReplicationServer(conf);
+ final DN testBaseDN = this.baseDN;
+ ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
+ {
+ @Override
+ public boolean isECLEnabledDomain(DN baseDN)
+ {
+ return testBaseDN.equals(baseDN);
+ }
+ });
Thread.sleep(1000);
return replicationServer;
--
Gitblit v1.10.0