mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
21.07.2014 13f31d030c3b205931b63c29b0d6bc1d4eefd163
Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend 
to support cn=changelog
CR-4083

Implementation of core features of the changelog backend:
* Initialization and finalization of the backend
* Search of the changelog, in cookie mode (with a cookie control) and
draft compat mode (by change number)

Note that :
* The support for persistent searches is not implemented yet.
* The changelog backend is currently not branched into code, i.e
the directory server still uses the ECL workflow for "cn=changelog",
because it is not possible to enable both changelog backend and ECL
workflow at the same time (waiting for psearches implementation
before branching the new code)

Code changes:
* ChangelogBackend.java:
- implementation of search(), hasSubordinates(), numSubordinates(),
and getEntryCount() methods

* ReplicationServer.java:
- new dependency on ChangelogBackend and ECLEnabledDomainPredicate
- new constructor with ECLEnabledDomainPredicate argument
- new methods enableExternalChangeLog() and shutdownExternalChangelog()
as future replacement of enabledECL() and shutdownECL()
- new method getDomainDNs(Set<DN>) for retrieval of domain DNs but an
excluded set of dns
- new method validateServerState(MultiDomainServerState, Set<DN>) for
checking coherency of given state with the replication server

* ReplicationDomainDB.java:
- new method getCursorFrom(MultiDomainServerState, PositionStrategy, Set<DN>)
that exclude a given set of domain DNs from the cursor obtained

* FileChangelogDB.java, JEChangelogDB.java:
- implementation of new method
getCursorFrom(MultiDomainServerState, PositionStrategy, Set<DN>)

* ECLEnabledDomainPredicate.java, ECLMultiDomainDBCursor.java:
- update visibility to public in order to use these classes in ChangelogBackend

* ChangelogBackedTestCase.java:
- test of ChangelogBackend class, built from ExternalChangeLogTest.java,
with lots of renaming, refactoring, cleaning compared to original class
- majority of tests are disabled until the changelog backend is branched into
code (as these tests require a running server)

* MonitorTest.java:
- update creation of ReplicationServer class to use a custom
ECLEnabledDomainPredicate

* replication.properties:
- add messages for changelog backend
9 files modified
1 files added
2341 ■■■■■ changed files
opendj-sdk/opends/src/messages/messages/replication.properties 4 ●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java 1149 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java 152 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 28 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 13 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java 2 ●●● patch | view | raw | blame | history
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 11 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java 968 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java 12 ●●●●● patch | view | raw | blame | history
opendj-sdk/opends/src/messages/messages/replication.properties
@@ -616,3 +616,7 @@
 recovered by removing a partially written record
NOTICE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \
 perform a search request on cn=changelog
SEVERE_ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \
 searching base DN '%s' with filter '%s' in changelog backend : %s
SEVERE_ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES_287 =An error occurred when \
 retrieving number of subordinates for entry DN '%s' in changelog backend : %s
opendj-sdk/opends/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -25,51 +25,115 @@
 */
package org.opends.server.backends;
import java.util.Collections;
import java.util.Set;
import org.opends.server.admin.Configuration;
import org.opends.server.api.Backend;
import org.opends.server.config.ConfigEntry;
import org.opends.server.config.ConfigException;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.types.AttributeType;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.CanceledOperationException;
import org.opends.server.types.ConditionResult;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.IndexType;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.RestoreConfig;
import org.opends.server.types.ResultCode;
import org.opends.server.util.Validator;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.LDIFWriter.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.text.SimpleDateFormat;
import java.util.*;
import org.opends.messages.Category;
import org.opends.messages.Message;
import org.opends.messages.Severity;
import org.opends.server.admin.Configuration;
import org.opends.server.api.Backend;
import org.opends.server.config.ConfigConstants;
import org.opends.server.config.ConfigException;
import org.opends.server.controls.EntryChangelogNotificationControl;
import org.opends.server.controls.ExternalChangelogRequestControl;
import org.opends.server.core.*;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ModifyCommonMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.StartECLSessionMsg.ECLRequestType;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.je.ECLMultiDomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.*;
import org.opends.server.util.StaticUtils;
/**
 * A backend that provides access to the changelog, ie the "cn=changelog" suffix.
 * It is a read-only backend.
 * A backend that provides access to the changelog, ie the "cn=changelog"
 * suffix. It is a read-only backend that is created by a
 * {@code ReplicationServer} and is not configurable.
 * <p>
 * There are two modes to search the changelog:
 * <ul>
 * <li>Cookie mode: when a "ECL Cookie Exchange Control" is provided with the
 * request. The cookie provided in the control is used to retrieve entries from
 * the ReplicaDBs. The <code>changeNumber</code> attribute is not returned with
 * the entries.</li>
 * <li>Draft compat mode: when no "ECL Cookie Exchange Control" is provided with
 * the request. The entries are retrieved using the ChangeNumberIndexDB (or
 * DraftDB, hence the name) and their attributes are set with the information
 * from the ReplicasDBs. The <code>changeNumber</code> attribute value is set
 * from the content of ChangeNumberIndexDB.</li>
 * </ul>
 *
 * @see ReplicationServer
 */
public class ChangelogBackend extends Backend
public class ChangelogBackend extends Backend<Configuration>
{
  private static final DebugTracer TRACER = getTracer();
  /** The id of this backend. */
  public static final String BACKEND_ID = "changelog";
  private static final long CHANGE_NUMBER_FOR_EMPTY_CURSOR = 0L;
  private static final String CHANGE_NUMBER_ATTR = "changeNumber";
  private static final String CHANGE_NUMBER_ATTR_LC = CHANGE_NUMBER_ATTR.toLowerCase();
  /** The set of objectclasses that will be used in root entry. */
  private static final Map<ObjectClass, String>
    CHANGELOG_ROOT_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
  static
  {
    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
    CHANGELOG_ROOT_OBJECT_CLASSES.put(DirectoryServer.getObjectClass("container", true), "container");
  }
  /** The set of objectclasses that will be used in ECL entries. */
  private static final Map<ObjectClass, String>
    CHANGELOG_ENTRY_OBJECT_CLASSES = new LinkedHashMap<ObjectClass, String>(2);
  static
  {
    CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_TOP, true), OC_TOP);
    CHANGELOG_ENTRY_OBJECT_CLASSES.put(DirectoryServer.getObjectClass(OC_CHANGELOG_ENTRY, true), OC_CHANGELOG_ENTRY);
  }
  /** The attribute type for the "creatorsName" attribute. */
  private static final AttributeType CREATORS_NAME_TYPE =
      DirectoryConfig.getAttributeType(OP_ATTR_CREATORS_NAME_LC, true);
  /** The attribute type for the "modifiersName" attribute. */
  private static final AttributeType MODIFIERS_NAME_TYPE =
      DirectoryConfig.getAttributeType(OP_ATTR_MODIFIERS_NAME_LC, true);
  /** The DN for the base changelog entry. */
  private DN baseChangelogDN;
@@ -77,52 +141,61 @@
  private DN[] baseDNs;
  /** The set of supported controls for this backend. */
  private final Set<String> supportedControls =
      Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
  private final Set<String> supportedControls = Collections.singleton(OID_ECL_COOKIE_EXCHANGE_CONTROL);
  /** The replication server on which the changelog is read. */
  private final ReplicationServer replicationServer;
  private final ECLEnabledDomainPredicate domainPredicate;
  /**
   * Creates a new backend with the provided repication server.
   *
   * @param replicationServer
   *          The replication server on which the changes are read.
   * @param domainPredicate
   *          Returns whether a domain is enabled for the external changelog.
   */
  public ChangelogBackend(final ReplicationServer replicationServer, final ECLEnabledDomainPredicate domainPredicate)
  {
    this.replicationServer = replicationServer;
    this.domainPredicate = domainPredicate;
    setBackendID(BACKEND_ID);
    setWritabilityMode(WritabilityMode.DISABLED);
    setPrivateBackend(true);
  }
  /** {@inheritDoc} */
  @Override
  public void configureBackend(final Configuration cfg) throws ConfigException
  public void configureBackend(final Configuration config) throws ConfigException
  {
    Validator.ensureNotNull(cfg);
    final ConfigEntry configEntry = DirectoryServer.getConfigEntry(cfg.dn());
    // Make sure that a configuration entry was provided. If not, then we will
    // not be able to complete initialization.
    if (configEntry == null)
    {
      throw new ConfigException(ERR_BACKEND_CONFIG_ENTRY_NULL.get(getBackendID()));
    throw new UnsupportedOperationException("The changelog backend is not configurable");
    }
    // Create the set of base DNs that we will handle. In this case, it's just
    // the DN of the base changelog entry.
  /** {@inheritDoc} */
  @Override
  public void initializeBackend() throws InitializationException
  {
    try
    {
      baseChangelogDN = DN.decode(DN_EXTERNAL_CHANGELOG_ROOT);
      baseDNs = new DN[] { baseChangelogDN };
    }
    catch (final Exception e)
    catch (final DirectoryException e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      throw new ConfigException(
      throw new InitializationException(
          ERR_BACKEND_CANNOT_DECODE_BACKEND_ROOT_DN.get(getBackendID(), getExceptionMessage(e)), e);
    }
    this.baseDNs = new DN[] { baseChangelogDN };
  }
  /** {@inheritDoc} */
  @Override
  public void initializeBackend() throws ConfigException, InitializationException
  {
    try
    {
      DirectoryServer.registerBaseDN(baseChangelogDN, this, true);
    }
    catch (final Exception e)
    catch (final DirectoryException e)
    {
      throw new InitializationException(
          ERR_BACKEND_CANNOT_REGISTER_BASEDN.get(baseChangelogDN.toString(), getExceptionMessage(e)), e);
@@ -137,7 +210,7 @@
    {
      DirectoryServer.deregisterBaseDN(baseChangelogDN);
    }
    catch (final Exception e)
    catch (final DirectoryException e)
    {
      if (debugEnabled())
      {
@@ -157,26 +230,26 @@
  @Override
  public void preloadEntryCache() throws UnsupportedOperationException
  {
    throw new RuntimeException("Not implemented");
    throw new UnsupportedOperationException("Operation not supported.");
  }
  /** {@inheritDoc} */
  @Override
  public boolean isLocal()
  {
    throw new RuntimeException("Not implemented");
    return true;
  }
  /** {@inheritDoc} */
  @Override
  public boolean isIndexed(AttributeType attributeType, IndexType indexType)
  public boolean isIndexed(final AttributeType attributeType, final IndexType indexType)
  {
    throw new RuntimeException("Not implemented");
    return true;
  }
  /** {@inheritDoc} */
  @Override
  public Entry getEntry(DN entryDN) throws DirectoryException
  public Entry getEntry(final DN entryDN) throws DirectoryException
  {
    if (entryDN == null)
    {
@@ -188,17 +261,90 @@
  /** {@inheritDoc} */
  @Override
  public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException
  public ConditionResult hasSubordinates(final DN entryDN)
      throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
    final long num = numSubordinates(entryDN, false);
    if (num < 0)
    {
      return ConditionResult.UNDEFINED;
    }
    else if (num == 0)
    {
      return ConditionResult.FALSE;
    }
    else
    {
      return ConditionResult.TRUE;
    }
  }
  /** Specific search operation to count number of entries. */
  private final class NumSubordinatesSearchOperation extends SearchOperationWrapper
  {
    private long numSubordinates = -1;
    private NumSubordinatesSearchOperation()
    {
      super(null);
    }
    @Override
    public boolean returnEntry(Entry entry, List<Control> controls)
    {
      numSubordinates++;
      return true;
    }
    @Override
    public DN getBaseDN()
    {
      return baseChangelogDN;
    }
    @Override
    public SearchFilter getFilter()
    {
      return LDAPURL.DEFAULT_SEARCH_FILTER;
    }
    @Override
    public SearchScope getScope()
    {
      return SearchScope.WHOLE_SUBTREE;
    }
  }
  /** {@inheritDoc} */
  @Override
  public long numSubordinates(DN entryDN, boolean subtree)
      throws DirectoryException
  public long numSubordinates(final DN entryDN, final boolean subtree) throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
    // Compute the num subordinates only for the base DN
    if (entryDN == null || !baseChangelogDN.equals(entryDN))
    {
      return -1;
    }
    if (!subtree)
    {
      return 1;
    }
    // Search with cookie mode to count all update messages
    final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
    excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
    SearchParams params = new SearchParams("0", excludedDomains);
    params.requestType = REQUEST_TYPE_FROM_COOKIE;
    params.multiDomainServerState = new MultiDomainServerState();
    NumSubordinatesSearchOperation searchOp = new NumSubordinatesSearchOperation();
    try
    {
      search0(params, searchOp);
    }
    catch (ChangelogException e)
    {
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES.get(
          baseChangelogDN.toString(), stackTraceToSingleLineString(e)));
    }
    return searchOp.numSubordinates;
  }
  /** {@inheritDoc} */
@@ -241,10 +387,35 @@
  /** {@inheritDoc} */
  @Override
  public void search(SearchOperation searchOperation)
      throws DirectoryException, CanceledOperationException
  public void search(final SearchOperation searchOperation) throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
    final Set<String> excludedDomains = MultimasterReplication.getECLDisabledDomains();
    excludedDomains.add(DN_EXTERNAL_CHANGELOG_ROOT);
    SearchParams params = new SearchParams(searchOperation.toString(), excludedDomains);
    final ExternalChangelogRequestControl eclRequestControl =
        searchOperation.getRequestControl(ExternalChangelogRequestControl.DECODER);
    if (eclRequestControl == null)
    {
      params.requestType = REQUEST_TYPE_FROM_CHANGE_NUMBER;
    }
    else
    {
      params.requestType = REQUEST_TYPE_FROM_COOKIE;
      params.multiDomainServerState = eclRequestControl.getCookie();
    }
    optimizeSearchParameters(params, searchOperation.getBaseDN(), searchOperation.getFilter());
    try
    {
      search0(params, searchOperation);
    }
    catch (ChangelogException e)
    {
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_CHANGELOG_BACKEND_SEARCH.get(
          searchOperation.getBaseDN().toString(),
          searchOperation.getFilter().toString(),
          stackTraceToSingleLineString(e)));
    }
  }
  /** {@inheritDoc} */
@@ -270,7 +441,7 @@
  /** {@inheritDoc} */
  @Override
  public void exportLDIF(LDIFExportConfig exportConfig)
  public void exportLDIF(final LDIFExportConfig exportConfig)
      throws DirectoryException
  {
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
@@ -297,15 +468,14 @@
  @Override
  public boolean supportsBackup()
  {
    throw new RuntimeException("Not implemented");
    return false;
  }
  /** {@inheritDoc} */
  @Override
  public boolean supportsBackup(BackupConfig backupConfig,
      StringBuilder unsupportedReason)
  public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason)
  {
    throw new RuntimeException("Not implemented");
    return false;
  }
  /** {@inheritDoc} */
@@ -318,8 +488,7 @@
  /** {@inheritDoc} */
  @Override
  public void removeBackup(BackupDirectory backupDirectory, String backupID)
      throws DirectoryException
  public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
  {
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
        ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
@@ -329,13 +498,12 @@
  @Override
  public boolean supportsRestore()
  {
    throw new RuntimeException("Not implemented");
    return false;
  }
  /** {@inheritDoc} */
  @Override
  public void restoreBackup(RestoreConfig restoreConfig)
      throws DirectoryException
  public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
  {
    throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
        ERR_BACKEND_BACKUP_AND_RESTORE_NOT_SUPPORTED.get(getBackendID()));
@@ -345,7 +513,818 @@
  @Override
  public long getEntryCount()
  {
    throw new RuntimeException("Not implemented");
    try
    {
      return numSubordinates(baseChangelogDN, true) + 1;
    }
    catch (DirectoryException e)
    {
      if (debugEnabled())
      {
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
      }
      return -1;
    }
  }
  /**
   * Represent the search parameters specific to the changelog.
   *
   * This class should be visible for tests.
   */
  static class SearchParams
  {
    private ECLRequestType requestType;
    private final String operationId;
    private final Set<String> excludedBaseDNs;
    private long lowestChangeNumber = -1;
    private long highestChangeNumber = -1;
    private CSN csn = new CSN(0, 0, 0);
    private MultiDomainServerState multiDomainServerState;
    /**
     * Creates search parameters.
     */
    SearchParams()
    {
      operationId = "";
      excludedBaseDNs = Collections.emptySet();
    }
    /**
     * Creates search parameters with provided id and excluded domain DNs.
     *
     * @param operationId
     *          The id of the operation.
     * @param excludedBaseDNs
     *          Set of DNs to exclude from search.
     */
    SearchParams(final String operationId, final Set<String> excludedBaseDNs)
    {
      this.operationId = operationId;
      this.excludedBaseDNs = excludedBaseDNs;
    }
    /**
     * Indicates if provided change number is compatible with last change
     * number.
     *
     * @param changeNumber
     *          The change number to test.
     * @return {@code true} if and only if the provided change number is in the
     *         range of the last change number.
     */
    boolean changeNumberIsInRange(long changeNumber)
    {
      return highestChangeNumber == -1 || changeNumber <= highestChangeNumber;
    }
    /**
     * Returns the lowest change number to retrieve (inclusive).
     *
     * @return the lowest change number
     */
    long getLowestChangeNumber()
    {
      return lowestChangeNumber;
    }
    /**
     * Returns the highest change number to retrieve (inclusive).
     *
     * @return the highest change number
     */
    long getHighestChangeNumber()
    {
      return highestChangeNumber;
    }
    /**
     * Returns the CSN to retrieve.
     *
     * @return the CSN, which may be the default CSN with zero values.
     */
    CSN getCSN()
    {
      return csn;
    }
    /**
     * Returns the set of DNs to exclude from the search.
     *
     * @return the DNs corresponding to domains to exclude from the search.
     * @throws DirectoryException
     *           If a DN can't be decoded.
     */
    Set<DN> getExcludedBaseDNs() throws DirectoryException
    {
      final Set<DN> excludedDNs = new HashSet<DN>();
      for (String dn : excludedBaseDNs)
      {
        excludedDNs.add(DN.decode(dn));
      }
      return excludedDNs;
    }
  }
  /**
   * Optimize the search parameters by analyzing the DN and filter.
   * Populate the provided SearchParams with optimizations found.
   *
   * @param params the search parameters that are specific to external changelog
   * @param baseDN the provided search baseDN.
   * @param userFilter the provided search filter.
   * @throws DirectoryException when an exception occurs.
   */
   void optimizeSearchParameters(final SearchParams params, final DN baseDN, final SearchFilter userFilter)
       throws DirectoryException
  {
    SearchFilter equalityFilter = null;
    switch (baseDN.getNumComponents())
    {
    case 1:
      // "cn=changelog" : use user-provided search filter.
      break;
    case 2:
      // It is probably "changeNumber=xxx,cn=changelog", use equality filter
      // But it also could be "<service-id>,cn=changelog" so need to check on attribute
      equalityFilter = buildSearchFilterFrom(baseDN, CHANGE_NUMBER_ATTR_LC, CHANGE_NUMBER_ATTR);
      break;
    default:
      // "replicationCSN=xxx,<service-id>,cn=changelog" : use equality filter
      equalityFilter = buildSearchFilterFrom(baseDN, "replicationcsn", "replicationCSN");
      break;
    }
    final SearchParams optimized = optimizeSearchUsingFilter(equalityFilter != null ? equalityFilter : userFilter);
    params.lowestChangeNumber = optimized.lowestChangeNumber;
    params.highestChangeNumber = optimized.highestChangeNumber;
    params.csn = optimized.csn;
  }
  /**
   * Build a search filter from given DN and attribute.
   *
   * @return the search filter or {@code null} if attribute is not present in
   *         the provided DN
   */
  private SearchFilter buildSearchFilterFrom(final DN baseDN, final String lowerCaseAttr, final String upperCaseAttr)
  {
    final RDN rdn = baseDN.getRDN();
    AttributeType attrType = DirectoryServer.getAttributeType(lowerCaseAttr);
    if (attrType == null)
    {
      attrType = DirectoryServer.getDefaultAttributeType(upperCaseAttr);
    }
    final AttributeValue attrValue = rdn.getAttributeValue(attrType);
    if (attrValue != null)
    {
      return SearchFilter.createEqualityFilter(attrType, attrValue);
    }
    return null;
  }
  private SearchParams optimizeSearchUsingFilter(final SearchFilter filter) throws DirectoryException
  {
    final SearchParams params = new SearchParams();
    if (filter == null)
    {
      return params;
    }
    if (matches(filter, FilterType.GREATER_OR_EQUAL, CHANGE_NUMBER_ATTR))
    {
      params.lowestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
    }
    else if (matches(filter, FilterType.LESS_OR_EQUAL, CHANGE_NUMBER_ATTR))
    {
      params.highestChangeNumber = decodeChangeNumber(filter.getAssertionValue());
    }
    else if (matches(filter, FilterType.EQUALITY, CHANGE_NUMBER_ATTR))
    {
      final long number = decodeChangeNumber(filter.getAssertionValue());
      params.lowestChangeNumber = number;
      params.highestChangeNumber = number;
    }
    else if (matches(filter, FilterType.EQUALITY, "replicationcsn"))
    {
      // == exact CSN
      params.csn = new CSN(filter.getAssertionValue().toString());
    }
    else if (filter.getFilterType() == FilterType.AND)
    {
      // TODO: it looks like it could be generalized to N components, not only two
      final Collection<SearchFilter> components = filter.getFilterComponents();
      final SearchFilter filters[] = components.toArray(new SearchFilter[0]);
      long last1 = -1;
      long first1 = -1;
      long last2 = -1;
      long first2 = -1;
      if (filters.length > 0)
      {
        SearchParams msg1 = optimizeSearchUsingFilter(filters[0]);
        last1 = msg1.highestChangeNumber;
        first1 = msg1.lowestChangeNumber;
      }
      if (filters.length > 1)
      {
        SearchParams msg2 = optimizeSearchUsingFilter(filters[1]);
        last2 = msg2.highestChangeNumber;
        first2 = msg2.lowestChangeNumber;
      }
      if (last1 == -1)
      {
        params.highestChangeNumber = last2;
      }
      else if (last2 == -1)
      {
        params.highestChangeNumber = last1;
      }
      else
      {
        params.highestChangeNumber = Math.min(last1, last2);
      }
      params.lowestChangeNumber = Math.max(first1, first2);
    }
    return params;
  }
  private static long decodeChangeNumber(final AttributeValue assertionValue)
      throws DirectoryException
  {
    try
    {
      return Long.decode(assertionValue.getNormalizedValue().toString());
    }
    catch (NumberFormatException e)
    {
      throw new DirectoryException(ResultCode.INVALID_ATTRIBUTE_SYNTAX,
          Message.raw("Could not convert value '%s' to long", assertionValue.getNormalizedValue().toString()));
    }
  }
  private boolean matches(SearchFilter filter, FilterType filterType, String primaryName)
  {
    return filter.getFilterType() == filterType
           && filter.getAttributeType() != null
           && filter.getAttributeType().getPrimaryName().equalsIgnoreCase(primaryName);
  }
  private void search0(final SearchParams searchParams, final SearchOperation searchOperation)
      throws DirectoryException, ChangelogException
  {
    switch (searchParams.requestType)
    {
      case REQUEST_TYPE_FROM_CHANGE_NUMBER:
        searchFromChangeNumber(searchParams, searchOperation);
        break;
      case REQUEST_TYPE_FROM_COOKIE:
        searchFromCookie(searchParams, searchOperation);
        break;
      default:
        // not handled
    }
  }
  /**
   * Search the changelog when a cookie control is provided.
   */
  private void searchFromCookie(final SearchParams searchParams, final SearchOperation searchOperation)
      throws DirectoryException, ChangelogException
  {
    final ReplicationDomainDB replicationDomainDB = replicationServer.getChangelogDB().getReplicationDomainDB();
    validateProvidedCookie(searchParams);
    boolean hasReturnedBaseEntry = false;
    ECLMultiDomainDBCursor replicaUpdatesCursor = null;
    try
    {
      final MultiDomainDBCursor cursor = replicationDomainDB.getCursorFrom(
          searchParams.multiDomainServerState, AFTER_MATCHING_KEY, searchParams.getExcludedBaseDNs());
      replicaUpdatesCursor = new ECLMultiDomainDBCursor(domainPredicate, cursor);
      MultiDomainServerState cookie = searchParams.multiDomainServerState;
      boolean continueSearch = true;
      while (continueSearch && replicaUpdatesCursor.next())
      {
        // Handle creation of base changelog entry on first update message found
        if (!hasReturnedBaseEntry)
        {
          if (!returnBaseChangelogEntry(searchOperation, true))
          {
            return;
          }
          hasReturnedBaseEntry = true;
        }
        // Handle the update message
        final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
        final DN domainBaseDN = replicaUpdatesCursor.getData();
        cookie.update(domainBaseDN, updateMsg.getCSN());
        final Entry entry = createEntryFromMsg(domainBaseDN, 0L, cookie.toString(), updateMsg);
        if (matchBaseAndScopeAndFilter(entry, searchOperation))
        {
          Control control = new EntryChangelogNotificationControl(true, cookie.toString());
          continueSearch = searchOperation.returnEntry(entry, Arrays.asList(control));
        }
      }
      // Handle creation of base changelog entry when no update message is found
      if (!hasReturnedBaseEntry)
      {
        returnBaseChangelogEntry(searchOperation, false);
      }
    }
    finally
    {
      StaticUtils.close(replicaUpdatesCursor);
    }
  }
  /**
   * Validates the cookie contained in search parameters by checking its content
   * with the actual replication server state.
   *
   * @throws DirectoryException
   *           If the state is not valid
   */
  private void validateProvidedCookie(final SearchParams searchParams) throws DirectoryException
  {
    final MultiDomainServerState state = searchParams.multiDomainServerState;
    if (state != null && !state.isEmpty())
    {
      replicationServer.validateServerState(state, searchParams.getExcludedBaseDNs());
    }
  }
  /**
   * Search the changelog using change number(s).
   */
  private void searchFromChangeNumber(final SearchParams params, final SearchOperation searchOperation)
      throws ChangelogException, DirectoryException
  {
    boolean hasReturnedBaseEntry = false;
    final ChangelogDB changelogDB = replicationServer.getChangelogDB();
    DBCursor<ChangeNumberIndexRecord> cnIndexDBCursor = null;
    MultiDomainDBCursor replicaUpdatesCursor = null;
    try {
      cnIndexDBCursor = getCNIndexDBCursor(changelogDB, params.lowestChangeNumber);
      boolean continueSearch = true;
      while (continueSearch && cnIndexDBCursor.next())
      {
        // Handle creation of base changelog entry on cnIndex record found
        if (!hasReturnedBaseEntry)
        {
          if (!returnBaseChangelogEntry(searchOperation, true))
          {
            return;
          }
          hasReturnedBaseEntry = true;
        }
        // Handle the current cnIndex record
        final ChangeNumberIndexRecord cnIndexRecord = cnIndexDBCursor.getRecord();
        if (replicaUpdatesCursor == null)
        {
          replicaUpdatesCursor = initializeReplicaUpdatesCursor(changelogDB, cnIndexRecord);
        }
        continueSearch = params.changeNumberIsInRange(cnIndexRecord.getChangeNumber());
        if (continueSearch)
        {
           UpdateMsg updateMsg = findReplicaUpdateMessage(cnIndexRecord, replicaUpdatesCursor);
           if (updateMsg != null)
           {
             continueSearch = returnEntryForUpdateMessage(searchOperation, cnIndexRecord, updateMsg);
             replicaUpdatesCursor.next();
           }
        }
      }
      // Handle creation of base changelog entry when no update message is found
      if (!hasReturnedBaseEntry)
      {
        returnBaseChangelogEntry(searchOperation, false);
      }
    }
    finally {
      StaticUtils.close(cnIndexDBCursor, replicaUpdatesCursor);
    }
  }
  /**
   * Create and returns the base changelog entry to provided search operation.
   *
   * @return {@code true} if search should continue, {@code false} otherwise
   */
  private boolean returnBaseChangelogEntry(final SearchOperation searchOperation, boolean hasSubordinates)
      throws DirectoryException
  {
    final DN baseDN = searchOperation.getBaseDN();
    final SearchFilter filter = searchOperation.getFilter();
    final SearchScope scope = searchOperation.getScope();
    if (baseChangelogDN.matchesBaseAndScope(baseDN, scope))
    {
      final Entry entry = buildBaseChangelogEntry(hasSubordinates);
      if (filter.matchesEntry(entry) && !searchOperation.returnEntry(entry, null))
      {
        // Abandon, size limit reached.
        return false;
      }
    }
    if (baseDN.equals(baseChangelogDN) && scope.equals(SearchScope.BASE_OBJECT))
    {
      // Only the change log root entry was requested
      return false;
    }
    return true;
  }
  /**
   * @return {@code true} if search should continue, {@code false} otherwise
   */
  private boolean returnEntryForUpdateMessage(
      final SearchOperation searchOperation,
      final ChangeNumberIndexRecord cnIndexRecord,
      final UpdateMsg updateMsg)
          throws DirectoryException
  {
    final MultiDomainServerState cookie = new MultiDomainServerState(cnIndexRecord.getPreviousCookie());
    final DN changeDN = cnIndexRecord.getBaseDN();
    cookie.update(changeDN, cnIndexRecord.getCSN());
    final Entry entry = createEntryFromMsg(changeDN, cnIndexRecord.getChangeNumber(), cookie.toString(), updateMsg);
    if (matchBaseAndScopeAndFilter(entry, searchOperation))
    {
      return searchOperation.returnEntry(entry, null);
    }
    return true;
  }
  private MultiDomainDBCursor initializeReplicaUpdatesCursor(final ChangelogDB changelogDB,
      final ChangeNumberIndexRecord cnIndexRecord) throws ChangelogException
  {
    final MultiDomainServerState state = new MultiDomainServerState();
    state.update(cnIndexRecord.getBaseDN(), cnIndexRecord.getCSN());
    // No need for ECLMultiDomainDBCursor in this case
    // as updateMsg will be matched with cnIndexRecord
    final MultiDomainDBCursor replicaUpdatesCursor =
        changelogDB.getReplicationDomainDB().getCursorFrom(state, ON_MATCHING_KEY);
    replicaUpdatesCursor.next();
    return replicaUpdatesCursor;
  }
  /**
   * Returns the replica update message corresponding to the provided
   * cnIndexRecord.
   *
   * @return the update message, which may be {@code null} if the update message
   *         could not be found because it was purged or because corresponding
   *         baseDN was removed from the changelog
   * @throws DirectoryException
   *           If inconsistency is detected between the available update
   *           messages and the provided cnIndexRecord
   */
  private UpdateMsg findReplicaUpdateMessage(
      final ChangeNumberIndexRecord cnIndexRecord,
      final MultiDomainDBCursor replicaUpdatesCursor)
          throws DirectoryException, ChangelogException
  {
    while (true)
    {
      final UpdateMsg updateMsg = replicaUpdatesCursor.getRecord();
      final int compareIndexWithUpdateMsg = cnIndexRecord.getCSN().compareTo(updateMsg.getCSN());
      if (compareIndexWithUpdateMsg < 0) {
        // Either update message has been purged or baseDN has been removed from changelogDB,
        // ignore current index record and go to the next one
        return null;
      }
      else if (compareIndexWithUpdateMsg == 0)
      {
        // Found the matching update message
        return updateMsg;
      }
      // Case compareIndexWithUpdateMsg > 0 : the update message has not bean reached yet
      if (!replicaUpdatesCursor.next())
      {
        // Should never happen, as it means some messages have disappeared
        // TODO : put the correct I18N message
        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
            Message.raw("Could not find replica update message matching index record. " +
                "No more replica update messages with a csn newer than " + updateMsg.getCSN() + " exist."));
      }
    }
  }
  /** Returns a cursor on CNIndexDB for the provided first change number. */
  private DBCursor<ChangeNumberIndexRecord> getCNIndexDBCursor(final ChangelogDB changelogDB,
      final long firstChangeNumber) throws ChangelogException
  {
    final ChangeNumberIndexDB cnIndexDB = changelogDB.getChangeNumberIndexDB();
    long changeNumberToUse = firstChangeNumber;
    if (changeNumberToUse <= 1)
    {
      final ChangeNumberIndexRecord oldestRecord = cnIndexDB.getOldestRecord();
      changeNumberToUse = oldestRecord == null ? CHANGE_NUMBER_FOR_EMPTY_CURSOR : oldestRecord.getChangeNumber();
    }
    return cnIndexDB.getCursorFrom(changeNumberToUse);
  }
  /** Indicates if the provided entry matches the filter, base and scope. */
  private boolean matchBaseAndScopeAndFilter(Entry entry, SearchOperation searchOp) throws DirectoryException
  {
    return entry.matchesBaseAndScope(searchOp.getBaseDN(), searchOp.getScope())
        && searchOp.getFilter().matchesEntry(entry);
  }
  /**
   * Retrieves the base changelog entry.
   */
  private Entry buildBaseChangelogEntry(boolean hasSubordinates)
  {
    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
    final Map<AttributeType, List<Attribute>> operationalAttrs = new LinkedHashMap<AttributeType,List<Attribute>>();
    addAttributeByUppercaseName(ATTR_COMMON_NAME, ATTR_COMMON_NAME, BACKEND_ID, userAttrs, operationalAttrs);
    addAttributeByUppercaseName(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY,
        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, operationalAttrs);
    addAttributeByUppercaseName("hassubordinates", "hasSubordinates", Boolean.toString(hasSubordinates),
        userAttrs, operationalAttrs);
    addAttributeByUppercaseName("entrydn", "entryDN", baseChangelogDN.toString(),
        userAttrs, operationalAttrs);
    return new Entry(baseChangelogDN, CHANGELOG_ROOT_OBJECT_CLASSES, userAttrs, operationalAttrs);
  }
  /**
   * Creates a changelog entry.
   */
  private Entry createEntryFromMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
      throws DirectoryException
  {
    if (msg instanceof AddMsg)
    {
      return createAddMsg(baseDN, changeNumber, cookie, msg);
    }
    else if (msg instanceof ModifyCommonMsg)
    {
      return createModifyMsg(baseDN, changeNumber, cookie, msg);
    }
    else if (msg instanceof DeleteMsg)
    {
      final DeleteMsg delMsg = (DeleteMsg) msg;
      return createChangelogEntry(baseDN, changeNumber, cookie, delMsg, null, "delete", delMsg.getInitiatorsName());
    }
    throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
        Message.raw("Unexpected message type when trying to create changelog entry for dn %s : %s", baseDN.toString(),
            msg.getClass().toString()));
  }
  /**
   * Creates an entry from an add message.
   * <p>
   * Map addMsg to an LDIF string for the 'changes' attribute, and pull out
   * change initiators name if available which is contained in the creatorsName
   * attribute.
   */
  private Entry createAddMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
      throws DirectoryException
  {
    final AddMsg addMsg = (AddMsg) msg;
    String changeInitiatorsName = null;
    String ldifChanges = null;
    try
    {
      final StringBuilder builder = new StringBuilder(256);
      for (Attribute attr : addMsg.getAttributes())
      {
        if (attr.getAttributeType().equals(CREATORS_NAME_TYPE) && !attr.isEmpty())
        {
          // This attribute is not multi-valued.
          changeInitiatorsName = attr.iterator().next().toString();
        }
        final String attrName = attr.getNameWithOptions();
        for (AttributeValue value : attr)
        {
          builder.append(attrName);
          appendLDIFSeparatorAndValue(builder, value.getValue());
          builder.append('\n');
        }
      }
      ldifChanges = builder.toString();
    }
    catch (Exception e)
    {
      logEncodingMessageError("add", addMsg.getDN(), e);
    }
    return createChangelogEntry(baseDN, changeNumber, cookie, addMsg, ldifChanges, "add", changeInitiatorsName);
  }
  /**
   * Creates an entry from a modify message.
   * <p>
   * Map the modifyMsg to an LDIF string for the 'changes' attribute, and pull
   * out change initiators name if available which is contained in the
   * modifiersName attribute.
   */
  private Entry createModifyMsg(final DN baseDN, final long changeNumber, final String cookie, final UpdateMsg msg)
      throws DirectoryException
  {
    final ModifyCommonMsg modifyMsg = (ModifyCommonMsg) msg;
    String changeInitiatorsName = null;
    String ldifChanges = null;
    try
    {
      final StringBuilder builder = new StringBuilder(128);
      for (Modification mod : modifyMsg.getMods())
      {
        final Attribute attr = mod.getAttribute();
        if (mod.getModificationType() == ModificationType.REPLACE
            && attr.getAttributeType().equals(MODIFIERS_NAME_TYPE)
            && !attr.isEmpty())
        {
          // This attribute is not multi-valued.
          changeInitiatorsName = attr.iterator().next().toString();
        }
        final String attrName = attr.getNameWithOptions();
        builder.append(mod.getModificationType().getLDIFName());
        builder.append(": ");
        builder.append(attrName);
        builder.append('\n');
        for (AttributeValue value : attr)
        {
          builder.append(attrName);
          appendLDIFSeparatorAndValue(builder, value.getValue());
          builder.append('\n');
        }
        builder.append("-\n");
      }
      ldifChanges = builder.toString();
    }
    catch (Exception e)
    {
      logEncodingMessageError("modify", modifyMsg.getDN(), e);
    }
    final boolean isModifyDNMsg = modifyMsg instanceof ModifyDNMsg;
    final Entry entry = createChangelogEntry(baseDN, changeNumber, cookie, modifyMsg, ldifChanges,
        isModifyDNMsg ? "modrdn" : "modify", changeInitiatorsName);
    if (isModifyDNMsg)
    {
      final ModifyDNMsg modDNMsg = (ModifyDNMsg) modifyMsg;
      addAttribute(entry, "newrdn", modDNMsg.getNewRDN());
      if (modDNMsg.getNewSuperior() != null)
      {
        addAttribute(entry, "newsuperior", modDNMsg.getNewSuperior());
      }
      addAttribute(entry, "deleteoldrdn", String.valueOf(modDNMsg.deleteOldRdn()));
    }
    return entry;
  }
  /**
   * Log an encoding message error.
   *
   * @param messageType
   *            String identifying type of message. Should be "add" or "modify".
   * @param entryDN
   *            DN of original entry
   */
  private void logEncodingMessageError(String messageType, DN entryDN,  Exception exception)
  {
    TRACER.debugCaught(DebugLogLevel.ERROR, exception);
    logError(Message.raw(Category.SYNC, Severity.MILD_ERROR,
        "An exception was encountered while trying to encode a replication " + messageType + " message for entry \""
        + entryDN + "\" into an External Change Log entry: " + exception.getMessage()));
  }
  /**
   * Create a changelog entry from a set of provided information. This is the part of
   * entry creation common to all types of msgs (ADD, DEL, MOD, MODDN).
   */
  private static Entry createChangelogEntry(final DN baseDN, final long changeNumber, final String cookie,
      final LDAPUpdateMsg msg, final String ldifChanges, final String changeType,
      final String changeInitiatorsName) throws DirectoryException
  {
    final CSN csn = msg.getCSN();
    String dnString;
    if (changeNumber == 0)
    {
      // Cookie mode
      dnString = "replicationCSN=" + csn + "," + baseDN.toString() + "," + DN_EXTERNAL_CHANGELOG_ROOT;
    }
    else
    {
      // Draft compat mode
      dnString = "changeNumber=" + changeNumber + "," + DN_EXTERNAL_CHANGELOG_ROOT;
    }
    final Map<AttributeType, List<Attribute>> userAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
    final Map<AttributeType, List<Attribute>> opAttrs = new LinkedHashMap<AttributeType, List<Attribute>>();
    // Operational standard attributes
    addAttributeByType(ATTR_SUBSCHEMA_SUBENTRY_LC, ATTR_SUBSCHEMA_SUBENTRY_LC,
        ConfigConstants.DN_DEFAULT_SCHEMA_ROOT, userAttrs, opAttrs);
    addAttributeByType("numsubordinates", "numSubordinates", "0", userAttrs, opAttrs);
    addAttributeByType("hassubordinates", "hasSubordinates", "false", userAttrs, opAttrs);
    addAttributeByType("entrydn", "entryDN", dnString, userAttrs, opAttrs);
    // REQUIRED attributes
    if (changeNumber != 0)
    {
      addAttributeByType("changenumber", "changeNumber", String.valueOf(changeNumber), userAttrs, opAttrs);
    }
    SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT_GMT_TIME);
    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); // ??
    final String format = dateFormat.format(new Date(csn.getTime()));
    addAttributeByType("changetime", "changeTime", format, userAttrs, opAttrs);
    addAttributeByType("changetype", "changeType", changeType, userAttrs, opAttrs);
    addAttributeByType("targetdn", "targetDN", msg.getDN().toString(), userAttrs, opAttrs);
    // NON REQUESTED attributes
    addAttributeByType("replicationcsn", "replicationCSN", csn.toString(), userAttrs, opAttrs);
    addAttributeByType("replicaidentifier", "replicaIdentifier", Integer.toString(csn.getServerId()),
        userAttrs, opAttrs);
    if (ldifChanges != null)
    {
      addAttributeByType("changes", "changes", ldifChanges, userAttrs, opAttrs);
    }
    if (changeInitiatorsName != null)
    {
      addAttributeByType("changeinitiatorsname", "changeInitiatorsName", changeInitiatorsName, userAttrs, opAttrs);
    }
    final String targetUUID = msg.getEntryUUID();
    if (targetUUID != null)
    {
      addAttributeByType("targetentryuuid", "targetEntryUUID", targetUUID, userAttrs, opAttrs);
    }
    addAttributeByType("changelogcookie", "changeLogCookie", cookie, userAttrs, opAttrs);
    final List<RawAttribute> includedAttributes = msg.getEclIncludes();
    if (includedAttributes != null && !includedAttributes.isEmpty())
    {
      final StringBuilder builder = new StringBuilder(256);
      for (final RawAttribute includedAttribute : includedAttributes)
      {
        final String name = includedAttribute.getAttributeType();
        for (final ByteString value : includedAttribute.getValues())
        {
          builder.append(name);
          appendLDIFSeparatorAndValue(builder, value);
          builder.append('\n');
        }
      }
      final String includedAttributesLDIF = builder.toString();
      addAttributeByType("includedattributes", "includedAttributes", includedAttributesLDIF, userAttrs, opAttrs);
    }
    return new Entry(DN.decode(dnString), CHANGELOG_ENTRY_OBJECT_CLASSES, userAttrs, opAttrs);
  }
  private static void addAttribute(final Entry e, final String attrType, final String attrValue)
  {
    e.addAttribute(Attributes.create(attrType, attrValue), null);
  }
  private static void addAttributeByType(String attrNameLowercase,
      String attrNameUppercase, String attrValue,
      Map<AttributeType, List<Attribute>> userAttrs,
      Map<AttributeType, List<Attribute>> operationalAttrs)
  {
    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, true);
  }
  private void addAttributeByUppercaseName(String attrNameLowercase,
      String attrNameUppercase,  String attrValue,
      Map<AttributeType, List<Attribute>> userAttrs,
      Map<AttributeType, List<Attribute>> operationalAttrs)
  {
    addAttribute(attrNameLowercase, attrNameUppercase, attrValue, userAttrs, operationalAttrs, false);
  }
  private static void addAttribute(final String attrNameLowercase,
      final String attrNameUppercase, final String attrValue,
      final Map<AttributeType, List<Attribute>> userAttrs,
      final Map<AttributeType, List<Attribute>> operationalAttrs, final boolean addByType)
  {
    AttributeType attrType = DirectoryServer.getAttributeType(attrNameLowercase);
    if (attrType == null)
    {
      attrType = DirectoryServer.getDefaultAttributeType(attrNameUppercase);
    }
    final Attribute a = addByType ?
        Attributes.create(attrType, attrValue) : Attributes.create(attrNameUppercase, attrValue);
    final List<Attribute> attrList = Collections.singletonList(a);
    if (attrType.isOperational())
    {
      operationalAttrs.put(attrType, attrList);
    }
    else
    {
      userAttrs.put(attrType, attrList);
    }
  }
}
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
import org.opends.server.backends.ChangelogBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
@@ -56,6 +57,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
@@ -63,6 +65,7 @@
import org.opends.server.util.StaticUtils;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -95,11 +98,19 @@
  private final Map<DN, ReplicationServerDomain> baseDNs =
      new HashMap<DN, ReplicationServerDomain>();
  /** The database storing the changes. */
  private final ChangelogDB changelogDB;
  /** The backend that allow to search the changes (external changelog). */
  private ChangelogBackend changelogBackend;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private boolean stopListen = false;
  private final ReplSessionSecurity replSessionSecurity;
  /** To know whether a domain is enabled for the external changelog. */
  private final ECLEnabledDomainPredicate domainPredicate;
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
@@ -136,26 +147,41 @@
   */
  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
  {
    this(cfg, new DSRSShutdownSync());
    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
  }
  /**
   * Creates a new Replication server using the provided configuration entry.
   * Creates a new Replication server using the provided configuration entry and shutdown
   * synchronization object.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg cfg,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
  }
  /**
   * Creates a new Replication server using the provided configuration entry, shutdown
   * synchronization object and domain predicate.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @param predicate Indicates whether a domain is enabled for the external changelog.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
      final ECLEnabledDomainPredicate predicate) throws ConfigException
  {
    this.config = cfg;
    this.dsrsShutdownSync = dsrsShutdownSync;
    this.domainPredicate = predicate;
    ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
    if (DebugLogger.debugEnabled())
    {
      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
          + " as DB implementation for changelog DB");
      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB");
    }
    this.changelogDB = dbImpl == ReplicationDBImplementation.JE
        ? new JEChangelogDB(this, cfg)
@@ -165,6 +191,9 @@
    initialize();
    cfg.addChangeListener(this);
    // TODO : uncomment to branch changelog backend
    //enableExternalChangeLog();
    localPorts.add(getReplicationPort());
    // Keep track of this new instance
@@ -501,6 +530,57 @@
    registerVirtualAttributeRules();
  }
  /**
   * Enable the external changelog if it is not already enabled.
   * <p>
   * The external changelog is provided by the changelog backend.
   *
   * @throws ConfigException
   *            If an error occurs.
   */
  private void enableExternalChangeLog() throws ConfigException
  {
    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
    {
      // Backend has already been created and initialized
      // This can occurs in tests
      return;
    }
    try
    {
      changelogBackend = new ChangelogBackend(this, domainPredicate);
      changelogBackend.initializeBackend();
      try
      {
        DirectoryServer.registerBackend(changelogBackend);
      }
      catch (Exception e)
      {
        logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
            getExceptionMessage(e)));
      }
      registerVirtualAttributeRules();
    }
    catch (Exception e)
    {
      // TODO : I18N with correct message + what kind of exception should we really throw ?
      // (Directory/Initialization/Config Exception)
      throw new ConfigException(Message.raw("Error when enabling external changelog"), e);
    }
  }
  private void shutdownExternalChangelog()
  {
    if (changelogBackend != null)
    {
      DirectoryServer.deregisterBackend(changelogBackend);
      changelogBackend.finalizeBackend();
      changelogBackend = null;
    }
    deregisterVirtualAttributeRules();
  }
  private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
  {
    final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -609,6 +689,64 @@
    return getReplicationServerDomain(baseDN, false);
  }
  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
  {
    Set<DN> domains = null;
    synchronized (baseDNs)
    {
      domains = new HashSet<DN>(baseDNs.keySet());
    }
    domains.removeAll(excludedBaseDNs);
    return domains;
  }
  /**
   * Validate that provided state is coherent with this replication server,
   * when ignoring the provided set of DNs.
   * <p>
   * The state is coherent if and only if it exactly has the set of DNs corresponding to
   * the replication domains.
   *
   * @param state
   *            The multi domain state (cookie) to validate.
   * @param ignoredBaseDNs
   *            The set of DNs to ignore when validating
   * @throws DirectoryException
   *            If the state is not valid
   */
  public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
  {
    // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
    final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
    final Set<DN> stateDomains = state.getSnapshot().keySet();
    final Set<DN> domainsCopy = new HashSet<DN>(domains);
    final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
    domainsCopy.removeAll(stateDomains);
    if (!domainsCopy.isEmpty())
    {
      final StringBuilder missingDomains = new StringBuilder();
      for (DN dn : domainsCopy)
      {
        missingDomains.append(dn).append(":;");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
              missingDomains, "<" + state.toString() + missingDomains + ">"));
    }
    stateDomainsCopy.removeAll(domains);
    if (!stateDomainsCopy.isEmpty())
    {
      final StringBuilder startState = new StringBuilder();
      for (DN dn : domains) {
        startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
              stateDomainsCopy.toString(), startState));
    }
  }
  /**
   * Get the ReplicationServerDomain associated to the base DN given in
   * parameter.
@@ -706,7 +844,9 @@
      domain.shutdown();
    }
    // TODO : switch to second method when changelog backend is branched
    shutdownECL();
    //shutdownExternalChangelog();
    try
    {
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -25,6 +25,8 @@
 */
package org.opends.server.replication.server.changelog.api;
import java.util.Set;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
@@ -115,6 +117,32 @@
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting at or after
   * the provided {@link MultiDomainServerState} for each domain, excluding a
   * provided set of domain DNs.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startState
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which exact
   *          position the cursor must start
   * @param excludedDomainDns
   *          Every domain appearing in this set is excluded from the cursor
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy,
      Set<DN> excludedDomainDns) throws ChangelogException;
  // serverId methods
  /**
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -661,12 +661,25 @@
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final Set<DN> excludedDomainDns = Collections.emptySet();
    return getCursorFrom(startState, positionStrategy, excludedDomainDns);
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns)
      throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      if (!excludedDomainDns.contains(baseDN))
      {
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
    }
    }
    return cursor;
  }
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
@@ -32,7 +32,7 @@
 *
 * @FunctionalInterface
 */
class ECLEnabledDomainPredicate
public class ECLEnabledDomainPredicate
{
  /**
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -33,7 +33,7 @@
 * Multi domain DB cursor that only returns updates for the domains which have
 * been enabled for the external changelog.
 */
class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
{
  private final ECLEnabledDomainPredicate predicate;
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -712,12 +712,23 @@
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final Set<DN> excludedDomainDns = Collections.emptySet();
    return getCursorFrom(startState, positionStrategy, excludedDomainDns);
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy, final  Set<DN> excludedDomainDns) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      if (!excludedDomainDns.contains(baseDN)) {
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
    }
    }
    return cursor;
  }
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/backends/ChangelogBackendTestCase.java
New file
@@ -0,0 +1,968 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.backends;
import static org.assertj.core.api.Assertions.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.types.ResultCode.*;
import static org.opends.server.util.ServerConstants.*;
import static org.testng.Assert.*;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import org.opends.server.TestCaseUtils;
import org.opends.server.backends.ChangelogBackend.SearchParams;
import org.opends.server.controls.ExternalChangelogRequestControl;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.protocols.internal.InternalClientConnection;
import org.opends.server.protocols.internal.InternalSearchListener;
import org.opends.server.protocols.internal.InternalSearchOperation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.plugin.DomainFakeCfg;
import org.opends.server.replication.plugin.ExternalChangelogDomainFakeCfg;
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.plugin.MultimasterReplication;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.DeleteMsg;
import org.opends.server.replication.protocol.ModifyDNMsg;
import org.opends.server.replication.protocol.ModifyDnContext;
import org.opends.server.replication.protocol.ModifyMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.types.Attribute;
import org.opends.server.types.AttributeValue;
import org.opends.server.types.Attributes;
import org.opends.server.types.AuthenticationInfo;
import org.opends.server.types.Control;
import org.opends.server.types.DN;
import org.opends.server.types.DereferencePolicy;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.Modification;
import org.opends.server.types.ModificationType;
import org.opends.server.types.Operation;
import org.opends.server.types.RDN;
import org.opends.server.types.ResultCode;
import org.opends.server.types.SearchFilter;
import org.opends.server.types.SearchResultEntry;
import org.opends.server.types.SearchScope;
import org.opends.server.util.LDIFWriter;
import org.opends.server.util.TimeThread;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import com.forgerock.opendj.util.Pair;
@SuppressWarnings("javadoc")
public class ChangelogBackendTestCase extends ReplicationTestCase
{
  private static final DebugTracer TRACER = getTracer();
  private static final String USER1_ENTRY_UUID = "11111111-1111-1111-1111-111111111111";
  private static final long CHANGENUMBER_ZERO = 0L;
  private static final int SERVER_ID_1 = 1201;
  private static final int SERVER_ID_2 = 1202;
  private static final String TEST_ROOT_DN_STRING2 = "o=test2";
  private static DN ROOT_DN_OTEST;
  private static DN ROOT_DN_OTEST2;
  private final int brokerSessionTimeout = 5000;
  private final int maxWindow = 100;
  /** The replicationServer that will be used in this test. */
  private ReplicationServer replicationServer;
  /** The port of the replicationServer. */
  private int replicationServerPort;
  /**
   * When used in a search operation, it includes all attributes (user and
   * operational)
   */
  private static final Set<String> ALL_ATTRIBUTES = newSet("*", "+");
  private static final List<Control> NO_CONTROL = null;
  @BeforeClass
  @Override
  public void setUp() throws Exception
  {
    super.setUp();
    ROOT_DN_OTEST = DN.decode(TEST_ROOT_DN_STRING);
    ROOT_DN_OTEST2 = DN.decode(TEST_ROOT_DN_STRING2);
    // This test suite depends on having the schema available.
    configureReplicationServer();
  }
  @Override
  @AfterClass
  public void classCleanUp() throws Exception
  {
    callParanoiaCheck = false;
    super.classCleanUp();
    remove(replicationServer);
    replicationServer = null;
    paranoiaCheck();
  }
  @AfterMethod
  public void clearReplicationDb() throws Exception
  {
    clearChangelogDB(replicationServer);
  }
  /** Configure a replicationServer for test. */
  private void configureReplicationServer() throws Exception
  {
    replicationServerPort = TestCaseUtils.findFreePort();
    ReplServerFakeConfiguration config = new ReplServerFakeConfiguration(
          replicationServerPort,
          "ChangelogBackendTestDB",
          replicationDbImplementation,
          0,         // purge delay
          71,        // server id
          0,         // queue size
          maxWindow, // window size
          null       // servers
    );
    config.setComputeChangeNumber(true);
    replicationServer = new ReplicationServer(config, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
    {
      @Override
      public boolean isECLEnabledDomain(DN baseDN)
      {
        return baseDN.equals(ROOT_DN_OTEST);
      }
    });
    debugInfo("configure", "ReplicationServer created:" + replicationServer);
  }
  /** Enable replication on provided domain DN and serverid, using provided port. */
  private Pair<ReplicationBroker, LDAPReplicationDomain> enableReplication(DN domainDN, int serverId,
      int replicationPort, int timeout) throws Exception
  {
    ReplicationBroker broker = openReplicationSession(domainDN, serverId, 100, replicationPort, timeout);
    DomainFakeCfg domainConf = newFakeCfg(domainDN, serverId, replicationPort);
    LDAPReplicationDomain replicationDomain = startNewReplicationDomain(domainConf, null, null);
    return Pair.of(broker, replicationDomain);
  }
  /** Start a new replication domain on the directory server side. */
  private LDAPReplicationDomain startNewReplicationDomain(
      DomainFakeCfg domainConf,
      SortedSet<String> eclInclude,
      SortedSet<String> eclIncludeForDeletes)
          throws Exception
  {
    domainConf.setExternalChangelogDomain(new ExternalChangelogDomainFakeCfg(true, eclInclude, eclIncludeForDeletes));
    // Set a Changetime heartbeat interval low enough
    // (less than default value that is 1000 ms)
    // for the test to be sure to consider all changes as eligible.
    domainConf.setChangetimeHeartbeatInterval(10);
    LDAPReplicationDomain newDomain = MultimasterReplication.createNewDomain(domainConf);
    newDomain.start();
    return newDomain;
  }
  private void removeReplicationDomains(LDAPReplicationDomain... domains)
  {
    for (LDAPReplicationDomain domain : domains)
    {
      if (domain != null)
      {
        domain.shutdown();
        MultimasterReplication.deleteDomain(domain.getBaseDN());
      }
    }
  }
  @Test(enabled=false)
  public void searchChangesOnOneSuffixUsingEmptyCookie() throws Exception
  {
    String testName = "FourChangesCookie";
    debugInfo(testName, "Starting test\n\n");
    CSN[] csns = generateAndPublishChangesForEachOperationType(testName);
    searchChangesForEachOperationTypeUsingEmptyCookie(csns, testName);
    assertChangelogAttributesInRootDSE(true, 1, 4);
    debugInfo(testName, "Ending search with success");
  }
  @Test(enabled=false)
  public void searchChangesOnOneSuffixUsingDraftMode() throws Exception
  {
    long firstChangeNumber = 1;
    String testName = "FourChanges/" + firstChangeNumber;
    debugInfo(testName, "Starting test\n\n");
    CSN[] csns = generateAndPublishChangesForEachOperationType(testName);
    searchChangesForEachOperationTypeUsingDraftMode(firstChangeNumber, csns, testName);
    assertChangelogAttributesInRootDSE(true, 1, 4);
    debugInfo(testName, "Ending search with success");
  }
  @Test(enabled=false)
  public void searchChangesOnOneSuffixMultipleTimesUsingDraftMode() throws Exception
  {
    replicationServer.getChangelogDB().setPurgeDelay(0);
    // write 4 changes starting from changenumber 1, and search them
    String testName = "Multiple/1";
    CSN[] csns = generateAndPublishChangesForEachOperationType(testName);
    searchChangesForEachOperationTypeUsingDraftMode(1, csns, testName);
    // write 4 more changes starting from changenumber 5, and search them
    testName = "Multiple/5";
    csns = generateAndPublishChangesForEachOperationType(testName);
    searchChangesForEachOperationTypeUsingDraftMode(5, csns, testName);
    // search from the provided change number: 6 (should be the add msg)
    CSN csnOfLastAddMsg = csns[1];
    searchChangelogForOneChangeNumber(6, csnOfLastAddMsg);
    // search from a provided change number interval: 5-7
    searchChangelogFromToChangeNumber(5,7);
    // check first and last change number
    assertChangelogAttributesInRootDSE(true, 1, 8);
    // add a new change, then check again first and last change number without previous search
    CSN csn = new CSN(TimeThread.getTime(), 10, SERVER_ID_1);
    publishChanges(testName, generateDeleteMsg(TEST_ROOT_DN_STRING, csn, testName, 1));
    assertChangelogAttributesInRootDSE(true, 1, 9);
  }
  /**
   * Verifies that is not possible to read the changelog without the changelog-read privilege
   */
  // TODO : enable when code is checking the privileges correctly
  @Test(enabled=false)
  public void searchingChangelogWithoutPrivilegeShouldFail() throws Exception
  {
    AuthenticationInfo nonPrivilegedUser = new AuthenticationInfo();
    InternalClientConnection conn = new InternalClientConnection(nonPrivilegedUser);
    InternalSearchOperation op = conn.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE, "(objectclass=*)");
    assertEquals(op.getResultCode(), ResultCode.INSUFFICIENT_ACCESS_RIGHTS);
    assertEquals(op.getErrorMessage().toMessage(), NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES.get());
  }
  /**
   * With an empty RS, a search should return only root entry.
   */
  @Test(enabled=false)
  public void searchWhenNoChangesShouldReturnRootEntryOnly() throws Exception
  {
    String testName = "EmptyRS";
    debugInfo(testName, "Starting test\n\n");
    searchChangelog("(objectclass=*)", 1, SUCCESS, testName);
    debugInfo(testName, "Ending test successfully");
  }
  @Test(enabled=false)
  public void searchWithUnknownChangeNumberShouldReturnNoResult() throws Exception
  {
    String testName = "UnknownChangeNumber";
    debugInfo(testName, "Starting test\n\n");
    searchChangelog("(changenumber=1000)", 0, SUCCESS, testName);
    debugInfo(testName, "Ending test with success");
  }
  @Test(enabled=false)
  public void operationalAndVirtualAttributesShouldNotBeVisibleOutsideRootDSE() throws Exception
  {
    String testName = "attributesVisibleOutsideRootDSE";
    debugInfo(testName, "Starting test \n\n");
    Set<String> attributes =
        newSet("firstchangenumber", "lastchangenumber", "changelog", "lastExternalChangelogCookie");
    InternalSearchOperation searchOp = searchDNWithBaseScope(TEST_ROOT_DN_STRING, attributes);
    waitForSearchOpResult(searchOp, ResultCode.SUCCESS);
    final List<SearchResultEntry> entries = searchOp.getSearchEntries();
    assertThat(entries).hasSize(1);
    debugAndWriteEntries(null, entries, testName);
    SearchResultEntry entry = entries.get(0);
    assertNull(getAttributeValue(entry, "firstchangenumber"));
    assertNull(getAttributeValue(entry, "lastchangenumber"));
    assertNull(getAttributeValue(entry, "changelog"));
    assertNull(getAttributeValue(entry, "lastExternalChangelogCookie"));
    debugInfo(testName, "Ending test with success");
  }
  @DataProvider()
  public Object[][] getFilters()
  {
    return new Object[][] {
      // base DN, filter, expected first change number, expected last change number
      { "cn=changelog", "(objectclass=*)", -1, -1 },
      { "cn=changelog", "(changenumber>=2)", 2, -1 },
      { "cn=changelog", "(&(changenumber>=2)(changenumber<=5))", 2, 5 },
      { "cn=changelog", "(&(dc=x)(&(changenumber>=2)(changenumber<=5)))", 2, 5 },
      { "cn=changelog",
          "(&(&(changenumber>=3)(changenumber<=4))(&(|(dc=y)(dc=x))(&(changenumber>=2)(changenumber<=5))))", 3, 4 },
      { "cn=changelog", "(|(objectclass=*)(&(changenumber>=2)(changenumber<=5)))", -1, -1 },
      { "cn=changelog", "(changenumber=8)", 8, 8 },
      { "changeNumber=8,cn=changelog", "(objectclass=*)", 8, 8 },
      { "changeNumber=8,cn=changelog", "(changenumber>=2)", 8, 8 },
      { "changeNumber=8,cn=changelog", "(&(changenumber>=2)(changenumber<=5))", 8, 8 },
    };
  }
  @Test(dataProvider="getFilters")
  public void optimizeFiltersWithChangeNumber(String dn, String filter, long expectedFirstCN, long expectedLastCN)
      throws Exception
  {
    final ChangelogBackend backend = new ChangelogBackend(null, null);
    final DN baseDN = DN.decode(dn);
    final SearchParams searchParams = new SearchParams();
    backend.optimizeSearchParameters(searchParams, baseDN, SearchFilter.createFilterFromString(filter));
    assertSearchParameters(searchParams, expectedFirstCN, expectedLastCN, null);
  }
  @Test
  public void optimizeFiltersWithReplicationCsn() throws Exception
  {
    final ChangelogBackend backend = new ChangelogBackend(null, null);
    final DN baseDN = DN.decode("cn=changelog");
    final CSN csn = new CSNGenerator(1, 0).newCSN();
    final SearchParams searchParams = new SearchParams();
    backend.optimizeSearchParameters(searchParams, baseDN,
        SearchFilter.createFilterFromString("(replicationcsn=" + csn + ")"));
    assertSearchParameters(searchParams, -1, -1, csn);
  }
  private List<SearchResultEntry> assertChangelogAttributesInRootDSE(boolean isECLEnabled,
      int expectedFirstChangeNumber, int expectedLastChangeNumber) throws Exception
  {
    AssertionError error = null;
    for (int count = 0 ; count < 30; count++)
    {
      try
      {
        final Set<String> attributes = new LinkedHashSet<String>();
        if (expectedFirstChangeNumber > 0)
        {
          attributes.add("firstchangenumber");
        }
        attributes.add("lastchangenumber");
        attributes.add("changelog");
        attributes.add("lastExternalChangelogCookie");
        final InternalSearchOperation searchOp = searchDNWithBaseScope("", attributes);
        final List<SearchResultEntry> entries = searchOp.getSearchEntries();
        assertThat(entries).hasSize(1);
        final SearchResultEntry entry = entries.get(0);
        if (isECLEnabled)
        {
          if (expectedFirstChangeNumber > 0)
          {
            assertAttributeValue(entry, "firstchangenumber", String.valueOf(expectedFirstChangeNumber));
          }
          assertAttributeValue(entry, "lastchangenumber", String.valueOf(expectedLastChangeNumber));
          assertAttributeValue(entry, "changelog", String.valueOf("cn=changelog"));
          assertNotNull(getAttributeValue(entry, "lastExternalChangelogCookie"));
        }
        else
        {
          if (expectedFirstChangeNumber > 0) {
            assertNull(getAttributeValue(entry, "firstchangenumber"));
          }
          assertNull(getAttributeValue(entry, "lastchangenumber"));
          assertNull(getAttributeValue(entry, "changelog"));
          assertNull(getAttributeValue(entry, "lastExternalChangelogCookie"));
        }
        return entries;
      }
      catch (AssertionError ae)
      {
        // try again to see if changes have been persisted
        error = ae;
      }
      Thread.sleep(100);
    }
    assertNotNull(error);
    throw error;
  }
  private void assertSearchParameters(SearchParams searchParams, long firstChangeNumber,
      long lastChangeNumber, CSN csn) throws Exception
  {
    assertEquals(searchParams.getLowestChangeNumber(), firstChangeNumber);
    assertEquals(searchParams.getHighestChangeNumber(), lastChangeNumber);
    assertEquals(searchParams.getCSN(), csn == null ? new CSN(0, 0, 0) : csn);
  }
  private CSN[] generateAndPublishChangesForEachOperationType(String testName) throws Exception
  {
    CSN[] csns = generateCSNs(4, SERVER_ID_1);
    List<UpdateMsg> messages = new ArrayList<UpdateMsg>();
    messages.add(generateDeleteMsg(TEST_ROOT_DN_STRING, csns[0], testName, 1));
    messages.add(generateAddMsg(TEST_ROOT_DN_STRING, csns[1], USER1_ENTRY_UUID, testName));
    messages.add(generateModMsg(TEST_ROOT_DN_STRING, csns[2], testName));
    messages.add(generateModDNMsg(TEST_ROOT_DN_STRING, csns[3], testName));
    publishChanges(testName, messages.toArray(new UpdateMsg[4]));
    return csns;
  }
  /** Publish a list changes to the default replication broker used by tests. */
  private void publishChanges(String testName, UpdateMsg...messages) throws Exception
  {
    Pair<ReplicationBroker, LDAPReplicationDomain> replicationObjects = null;
    try
    {
      replicationObjects = enableReplication(ROOT_DN_OTEST, SERVER_ID_1, replicationServerPort, brokerSessionTimeout);
      ReplicationBroker broker = replicationObjects.getFirst();
      for (UpdateMsg msg : messages)
      {
        debugInfo(testName, " publishes " + msg.getCSN());
        broker.publish(msg);
      }
    }
    finally
    {
      if (replicationObjects != null)
      {
        removeReplicationDomains(replicationObjects.getSecond());
        stop(replicationObjects.getFirst());
      }
    }
  }
  private void searchChangesForEachOperationTypeUsingEmptyCookie(CSN[] csns, String testName) throws Exception
  {
    int nbEntries = 4;
    String cookie= "";
    InternalSearchOperation searchOp =
        searchChangelogUsingCookie("(targetdn=*" + testName + "*,o=test)", cookie, nbEntries, SUCCESS, testName);
    final String[] cookies = new String[nbEntries];
    for (int j = 0; j < cookies.length; j++)
    {
      cookies[j] = "o=test:" + csns[j] + ";";
    }
    final List<SearchResultEntry> searchEntries = searchOp.getSearchEntries();
    assertDelEntry(searchEntries.get(0), testName + 1, testName + "uuid1", CHANGENUMBER_ZERO, csns[0], cookies[0]);
    assertAddEntry(searchEntries.get(1), testName + 2, USER1_ENTRY_UUID, CHANGENUMBER_ZERO, csns[1], cookies[1]);
    assertModEntry(searchEntries.get(2), testName + 3, testName + "uuid3", CHANGENUMBER_ZERO, csns[2], cookies[2]);
    assertModDNEntry(searchEntries.get(3), testName + 4, testName + "new4", testName+"uuid4", CHANGENUMBER_ZERO,
        csns[3], cookies[3]);
    assertResultsContainCookieControl(searchOp, cookies);
  }
  private void searchChangesForEachOperationTypeUsingDraftMode(long firstChangeNumber, CSN[] csns, String testName)
      throws Exception
  {
    // Search the changelog and check 4 entries are returned
    String filter = "(targetdn=*" + testName + "*,o=test)";
    InternalSearchOperation searchOp = searchChangelog(filter, 4, SUCCESS, testName);
    assertContainsNoControl(searchOp);
    assertEntriesForEachOperationType(searchOp.getSearchEntries(), firstChangeNumber, testName, USER1_ENTRY_UUID, csns);
    // Search the changelog with filter on change number and check 4 entries are returned
    filter =
        "(&(targetdn=*" + testName + "*,o=test)"
          + "(&(changenumber>=" + firstChangeNumber + ")"
            + "(changenumber<=" + (firstChangeNumber + 3) + ")))";
    searchOp = searchChangelog(filter, 4, SUCCESS, testName);
    assertContainsNoControl(searchOp);
    assertEntriesForEachOperationType(searchOp.getSearchEntries(), firstChangeNumber, testName, USER1_ENTRY_UUID, csns);
  }
  /**
   * Search on the provided change number and check the result.
   *
   * @param changeNumber
   *          Change number to search
   * @param expectedCsn
   *          Expected CSN in the entry corresponding to the change number
   */
  private void searchChangelogForOneChangeNumber(long changeNumber, CSN expectedCsn) throws Exception
  {
    String testName = "searchOneChangeNumber/" + changeNumber;
    debugInfo(testName, "Starting search\n\n");
    InternalSearchOperation searchOp =
        searchChangelog("(changenumber=" + changeNumber + ")", 1, SUCCESS, testName);
    SearchResultEntry entry = searchOp.getSearchEntries().get(0);
    String uncheckedUid = null;
    assertEntryCommonAttributes(entry, uncheckedUid, USER1_ENTRY_UUID, changeNumber, expectedCsn,
        "o=test:" + expectedCsn + ";");
    debugInfo(testName, "Ending search with success");
  }
  private void searchChangelogFromToChangeNumber(int firstChangeNumber, int lastChangeNumber) throws Exception
  {
    String testName = "searchFromToChangeNumber/" + firstChangeNumber + "/" + lastChangeNumber;
    debugInfo(testName, "Starting search\n\n");
    String filter = "(&(changenumber>=" + firstChangeNumber + ")" + "(changenumber<=" + lastChangeNumber + "))";
    final int expectedNbEntries = lastChangeNumber - firstChangeNumber + 1;
    searchChangelog(filter, expectedNbEntries, SUCCESS, testName);
    debugInfo(testName, "Ending search with success");
  }
  private InternalSearchOperation searchChangelogUsingCookie(String filterString,
      String cookie, int expectedNbEntries, ResultCode expectedResultCode, String testName)
      throws Exception
  {
    debugInfo(testName, "Search with cookie=[" + cookie + "] filter=[" + filterString + "]");
    return searchChangelog(filterString, ALL_ATTRIBUTES, createCookieControl(cookie),
        expectedNbEntries, expectedResultCode, testName);
  }
  private InternalSearchOperation searchChangelog(String filterString, int expectedNbEntries,
      ResultCode expectedResultCode, String testName) throws Exception
  {
    return searchChangelog(filterString, ALL_ATTRIBUTES, NO_CONTROL, expectedNbEntries, expectedResultCode, testName);
  }
  private InternalSearchOperation searchChangelog(String filterString, Set<String> attributes,
      List<Control> controls, int expectedNbEntries, ResultCode expectedResultCode, String testName) throws Exception
  {
    InternalSearchOperation searchOperation = null;
    int sizeLimitZero = 0;
    int timeLimitZero = 0;
    InternalSearchListener noSearchListener = null;
    int count = 0;
    do
    {
      Thread.sleep(10);
      boolean typesOnlyFalse = false;
      searchOperation = connection.processSearch("cn=changelog", SearchScope.WHOLE_SUBTREE,
          DereferencePolicy.NEVER_DEREF_ALIASES, sizeLimitZero, timeLimitZero, typesOnlyFalse, filterString,
          attributes, controls, noSearchListener);
      count++;
    }
    while (count < 300 && searchOperation.getSearchEntries().size() != expectedNbEntries);
    final List<SearchResultEntry> entries = searchOperation.getSearchEntries();
    assertThat(entries).hasSize(expectedNbEntries);
    debugAndWriteEntries(getLDIFWriter(), entries, testName);
    waitForSearchOpResult(searchOperation, expectedResultCode);
    return searchOperation;
  }
  private InternalSearchOperation searchDNWithBaseScope(String dn, Set<String> attributes) throws Exception
  {
    final InternalSearchOperation searchOp = connection.processSearch(
        dn,
        SearchScope.BASE_OBJECT,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0,     // Size limit
        0,     // Time limit
        false, // Types only
        "(objectclass=*)",
        attributes);
    waitForSearchOpResult(searchOp, ResultCode.SUCCESS);
    return searchOp;
  }
  /** Build a list of controls including the cookie provided. */
  private List<Control> createCookieControl(String cookie) throws DirectoryException
  {
    final MultiDomainServerState state = new MultiDomainServerState(cookie);
    final Control cookieControl = new ExternalChangelogRequestControl(true, state);
    return newList(cookieControl);
  }
  private static LDIFWriter getLDIFWriter() throws Exception
  {
    ByteArrayOutputStream stream = new ByteArrayOutputStream();
    LDIFExportConfig exportConfig = new LDIFExportConfig(stream);
    return new LDIFWriter(exportConfig);
  }
  private CSN[] generateCSNs(int numberOfCsns, int serverId)
  {
    long startTime = TimeThread.getTime();
    CSN[] csns = new CSN[numberOfCsns];
    for (int i = 0; i < numberOfCsns; i++)
    {
      // seqNum must be greater than 0, so start at 1
      csns[i] = new CSN(startTime + i, i + 1, serverId);
    }
    return csns;
  }
  private UpdateMsg generateDeleteMsg(String baseDn, CSN csn, String testName, int testIndex)
      throws Exception
  {
    String dn = "uid=" + testName + testIndex + "," + baseDn;
    return new DeleteMsg(DN.decode(dn), csn, testName + "uuid" + testIndex);
  }
  private UpdateMsg generateAddMsg(String baseDn, CSN csn, String user1entryUUID, String testName)
      throws Exception
  {
    String baseUUID = "22222222-2222-2222-2222-222222222222";
    String entryLdif = "dn: uid="+ testName + "2," + baseDn + "\n"
        + "objectClass: top\n" + "objectClass: domain\n"
        + "entryUUID: "+ user1entryUUID +"\n";
    Entry entry = TestCaseUtils.entryFromLdifString(entryLdif);
    return new AddMsg(
        csn,
        DN.decode("uid="+testName+"2," + baseDn),
        user1entryUUID,
        baseUUID,
        entry.getObjectClassAttribute(),
        entry.getAttributes(),
        Collections.<Attribute> emptyList());
  }
  private UpdateMsg generateModMsg(String baseDn, CSN csn, String testName) throws Exception
  {
    DN baseDN = DN.decode("uid=" + testName + "3," + baseDn);
    List<Modification> mods = createAttributeModif("description", "new value");
    return new ModifyMsg(csn, baseDN, mods, testName + "uuid3");
  }
  private List<Modification> createAttributeModif(String attributeName, String valueString)
  {
    Attribute attr = Attributes.create(attributeName, valueString);
    return newList(new Modification(ModificationType.REPLACE, attr));
  }
  private UpdateMsg generateModDNMsg(String baseDn, CSN csn, String testName) throws Exception
  {
    final DN newSuperior = ROOT_DN_OTEST2;
    ModifyDNOperation op = new ModifyDNOperationBasis(connection, 1, 1, null,
        DN.decode("uid=" + testName + "4," + baseDn), // entryDN
        RDN.decode("uid=" + testName + "new4"), // new rdn
        true,  // deleteoldrdn
        newSuperior);
    op.setAttachment(SYNCHROCONTEXT, new ModifyDnContext(csn, testName + "uuid4", "newparentId"));
    LocalBackendModifyDNOperation localOp = new LocalBackendModifyDNOperation(op);
    return new ModifyDNMsg(localOp);
  }
  //TODO : share this code with other classes ?
  private void waitForSearchOpResult(Operation operation, ResultCode expectedResult) throws Exception
  {
    int i = 0;
    while (operation.getResultCode() == ResultCode.UNDEFINED || operation.getResultCode() != expectedResult)
    {
      Thread.sleep(50);
      i++;
      if (i > 10)
      {
        assertEquals(operation.getResultCode(), expectedResult, operation.getErrorMessage().toString());
      }
    }
  }
  /** Verify that no entry contains the ChangeLogCookie control. */
  private void assertContainsNoControl(InternalSearchOperation searchOp)
  {
    for (SearchResultEntry entry : searchOp.getSearchEntries())
    {
      assertTrue(entry.getControls().isEmpty(), "result entry " + entry.toString() +
          " should contain no control(s)");
    }
  }
  /** Verify that all entries contains the ChangeLogCookie control with the correct cookie value. */
  private void assertResultsContainCookieControl(InternalSearchOperation searchOp, String[] cookies) throws Exception
  {
    for (SearchResultEntry entry : searchOp.getSearchEntries())
    {
      boolean cookieControlFound = false;
      for (Control control : entry.getControls())
      {
        if (control.getOID().equals(OID_ECL_COOKIE_EXCHANGE_CONTROL))
        {
          String cookieString =
              searchOp.getRequestControl(ExternalChangelogRequestControl.DECODER).getCookie().toString();
          assertThat(cookieString).isIn((Object[]) cookies);
          cookieControlFound = true;
        }
      }
      assertTrue(cookieControlFound, "result entry " + entry.toString() + " should contain the cookie control");
    }
  }
  /** Check the DEL entry has the right content. */
  private void assertDelEntry(SearchResultEntry entry, String uid, String entryUUID,
      long changeNumber, CSN csn, String cookie)
  {
    assertAttributeValue(entry, "changetype", "delete");
    assertAttributeValue(entry, "targetuniqueid", entryUUID);
    assertAttributeValue(entry, "targetentryuuid", entryUUID);
    assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
  }
  /** Check the ADD entry has the right content. */
  private void assertAddEntry(SearchResultEntry entry, String uid, String entryUUID,
      long changeNumber, CSN csn, String cookie)
  {
    assertAttributeValue(entry, "changetype", "add");
    assertEntryMatchesLDIF(entry, "changes",
        "objectClass: domain",
        "objectClass: top",
        "entryUUID: " + entryUUID);
    assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
  }
  private void assertModEntry(SearchResultEntry entry, String uid, String entryUUID,
      long changeNumber, CSN csn, String cookie)
  {
    assertAttributeValue(entry, "changetype", "modify");
    assertEntryMatchesLDIF(entry, "changes",
        "replace: description",
        "description: new value",
        "-");
    assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
  }
  private void assertModDNEntry(SearchResultEntry entry, String uid, String newUid,
      String entryUUID, long changeNumber, CSN csn, String cookie)
  {
    assertAttributeValue(entry, "changetype", "modrdn");
    assertAttributeValue(entry, "newrdn", "uid=" + newUid);
    assertAttributeValue(entry, "newsuperior", TEST_ROOT_DN_STRING2);
    assertAttributeValue(entry, "deleteoldrdn", "true");
    assertEntryCommonAttributes(entry, uid, entryUUID, changeNumber, csn, cookie);
  }
  private void assertEntryCommonAttributes(SearchResultEntry resultEntry,
      String uid, String entryUUID, long changeNumber, CSN csn, String cookie)
  {
    if (changeNumber == 0)
    {
      assertDNWithCSN(resultEntry, csn);
    }
    else
    {
      assertDNWithChangeNumber(resultEntry, changeNumber);
      assertAttributeValue(resultEntry, "changenumber", String.valueOf(changeNumber));
    }
    assertAttributeValue(resultEntry, "targetentryuuid", entryUUID);
    assertAttributeValue(resultEntry, "replicaidentifier", String.valueOf(SERVER_ID_1));
    assertAttributeValue(resultEntry, "replicationcsn", csn.toString());
    assertAttributeValue(resultEntry, "changelogcookie", cookie);
    // A null value can be provided for uid if it should not be checked
    if (uid != null)
    {
      final String targetDN = "uid=" + uid + "," + TEST_ROOT_DN_STRING;
      assertAttributeValue(resultEntry, "targetdn", targetDN);
    }
  }
  private void assertEntriesForEachOperationType(List<SearchResultEntry> entries, long firstChangeNumber,
      String testName, String entryUUID, CSN... csns) throws Exception
  {
    debugAndWriteEntries(getLDIFWriter(), entries, testName);
    assertThat(entries).hasSize(4);
    CSN csn = csns[0];
    assertDelEntry(entries.get(0), testName + "1", testName + "uuid1", firstChangeNumber, csn, "o=test:" + csn + ";");
    csn = csns[1];
    assertAddEntry(entries.get(1), testName + "2", entryUUID, firstChangeNumber+1, csn, "o=test:" + csn + ";");
    csn = csns[2];
    assertModEntry(entries.get(2), testName + "3", testName + "uuid3", firstChangeNumber+2, csn,
        "o=test:" + csn + ";");
    csn = csns[3];
    assertModDNEntry(entries.get(3), testName + "4", testName + "new4", testName + "uuid4", firstChangeNumber+3, csn,
        "o=test:" + csn + ";");
  }
  /**
   * Asserts the attribute value as LDIF to ignore lines ordering.
   */
  private static void assertEntryMatchesLDIF(Entry entry, String attrName, String... expectedLDIFLines)
  {
    final String actualVal = getAttributeValue(entry, attrName);
    final Set<Set<String>> actual = toLDIFEntries(actualVal.split("\n"));
    final Set<Set<String>> expected = toLDIFEntries(expectedLDIFLines);
    assertThat(actual)
        .as("In entry " + entry + " incorrect value for attr '" + attrName + "'")
        .isEqualTo(expected);
  }
  private static void assertAttributeValue(Entry entry, String attrName, String expectedValue)
  {
    assertFalse(expectedValue.contains("\n"),
        "You should use assertEntryMatchesLDIF() method for asserting on this value: \"" + expectedValue + "\"");
    final String actualValue = getAttributeValue(entry, attrName);
    assertThat(actualValue)
        .as("In entry " + entry + " incorrect value for attr '" + attrName + "'")
        .isEqualToIgnoringCase(expectedValue);
  }
  private void assertDNWithChangeNumber(SearchResultEntry resultEntry, long changeNumber)
  {
    String actualDN = resultEntry.getDN().toNormalizedString();
    String expectedDN = "changenumber=" + changeNumber + ",cn=changelog";
    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
  }
  private void assertDNWithCSN(SearchResultEntry resultEntry, CSN csn)
  {
    String actualDN = resultEntry.getDN().toNormalizedString();
    String expectedDN = "replicationcsn=" + csn + "," + TEST_ROOT_DN_STRING + ",cn=changelog";
    assertThat(actualDN).isEqualToIgnoringCase(expectedDN);
  }
  /**
   * Returns a data structure allowing to compare arbitrary LDIF lines. The
   * algorithm splits LDIF entries on lines containing only a dash ("-"). It
   * then returns LDIF entries and lines in an LDIF entry in ordering
   * insensitive data structures.
   * <p>
   * Note: a last line with only a dash ("-") is significant. i.e.:
   *
   * <pre>
   * <code>
   * boolean b = toLDIFEntries("-").equals(toLDIFEntries()));
   * System.out.println(b); // prints "false"
   * </code>
   * </pre>
   */
  private static Set<Set<String>> toLDIFEntries(String... ldifLines)
  {
    final Set<Set<String>> results = new HashSet<Set<String>>();
    Set<String> ldifEntryLines = new HashSet<String>();
    for (String ldifLine : ldifLines)
    {
      if (!"-".equals(ldifLine))
      {
        // same entry keep adding
        ldifEntryLines.add(ldifLine);
      }
      else
      {
        // this is a new entry
        results.add(ldifEntryLines);
        ldifEntryLines = new HashSet<String>();
      }
    }
    results.add(ldifEntryLines);
    return results;
  }
  // TODO : share this code with other classes
  private static String getAttributeValue(Entry entry, String attrName)
  {
    List<Attribute> attrs = entry.getAttribute(attrName.toLowerCase());
    if (attrs == null)
    {
      return null;
    }
    Attribute a = attrs.iterator().next();
    AttributeValue av = a.iterator().next();
    return av.toString();
  }
  private void debugAndWriteEntries(LDIFWriter ldifWriter,List<SearchResultEntry> entries, String tn) throws Exception
  {
    if (entries != null)
    {
      for (SearchResultEntry entry : entries)
      {
        // Can use entry.toSingleLineString()
        debugInfo(tn, " RESULT entry returned:" + entry.toLDIFString());
        if (ldifWriter != null)
        {
          ldifWriter.writeEntry(entry);
        }
      }
    }
  }
  /**
   * Utility - log debug message - highlight it is from the test and not
   * from the server code. Makes easier to observe the test steps.
   */
  private void debugInfo(String testName, String message)
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("** TEST " + testName + " ** " + message);
    }
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -41,6 +41,8 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Attribute;
@@ -174,7 +176,15 @@
    ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0,
            100, servers);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    final DN testBaseDN = this.baseDN;
    ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
    {
      @Override
      public boolean isECLEnabledDomain(DN baseDN)
      {
        return testBaseDN.equals(baseDN);
      }
    });
    Thread.sleep(1000);
    return replicationServer;