mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Nicolas Capponi
22.19.2014 16195207e26e87437c6797e3691b8de47770401f
Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend 
to support cn=changelog
CR-4083

[Note: real merge of all changelog.file package content and ChangelogBackend to be done
in one shot in a future commit]

Implementation of core features of the changelog backend:
* Initialization and finalization of the backend
* Search of the changelog, in cookie mode (with a cookie control) and
draft compat mode (by change number)

Code changes:
* ChangelogBackend.java:
- implementation of search(), hasSubordinates(), numSubordinates(),
and getEntryCount() methods

* ReplicationServer.java:
- new dependency on ChangelogBackend and ECLEnabledDomainPredicate
- new constructor with ECLEnabledDomainPredicate argument
- new methods enableExternalChangeLog() and shutdownExternalChangelog()
as future replacement of enabledECL() and shutdownECL()
- new method getDomainDNs(Set<DN>) for retrieval of domain DNs but an
excluded set of dns
- new method validateServerState(MultiDomainServerState, Set<DN>) for
checking coherency of given state with the replication server

* ReplicationDomainDB.java:
- new method getCursorFrom(MultiDomainServerState, PositionStrategy, Set<DN>)
that exclude a given set of domain DNs from the cursor obtained

* FileChangelogDB.java, JEChangelogDB.java:
- implementation of new method
getCursorFrom(MultiDomainServerState, PositionStrategy, Set<DN>)

* ECLEnabledDomainPredicate.java, ECLMultiDomainDBCursor.java:
- update visibility to public in order to use these classes in ChangelogBackend

* ChangelogBackedTestCase.java:
- test of ChangelogBackend class, built from ExternalChangeLogTest.java,
with lots of renaming, refactoring, cleaning compared to original class
- majority of tests are disabled until the changelog backend is branched into
code (as these tests require a running server)

* MonitorTest.java:
- update creation of ReplicationServer class to use a custom
ECLEnabledDomainPredicate

* replication.properties:
- add messages for changelog backend
1 files added
8 files modified
518 ■■■■■ changed files
opendj3-server-dev/src/messages/messages/replication.properties 6 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java 288 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java 156 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java 28 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java 11 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java 2 ●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 13 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java 12 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/messages/messages/replication.properties
@@ -624,3 +624,9 @@
 full, or corrupt and must be fixed before this replication server can be used. The underlying error was: %s
INFO_CHANGELOG_LOG_FILE_RECOVERED_284=Log file '%s' was successfully \
 recovered by removing a partially written record
NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \
 perform a search request on cn=changelog
ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \
 searching base DN '%s' with filter '%s' in changelog backend : %s
ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES_287 =An error occurred when \
 retrieving number of subordinates for entry DN '%s' in changelog backend : %s
opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
New file
@@ -0,0 +1,288 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *
 *      Copyright 2014 ForgeRock AS.
 */
package org.opends.server.backends;
import java.util.Set;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ConditionResult;
import org.opends.server.admin.Configuration;
import org.opends.server.api.Backend;
import org.opends.server.core.AddOperation;
import org.opends.server.core.DeleteOperation;
import org.opends.server.core.ModifyDNOperation;
import org.opends.server.core.ModifyOperation;
import org.opends.server.core.SearchOperation;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.types.AttributeType;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.CanceledOperationException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.IndexType;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFExportConfig;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
import org.opends.server.types.RestoreConfig;
/**
 * Changelog backend.
 */
public class ChangelogBackend extends Backend<Configuration>
{
  /** Backend id. */
  public static final String BACKEND_ID = "changelog";
  /**
   * Creates.
   *
   * @param replicationServer
   *            The replication server.
   * @param domainPredicate
   *            The predicate.
   */
  public ChangelogBackend(ReplicationServer replicationServer,
      ECLEnabledDomainPredicate domainPredicate)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void configureBackend(Configuration cfg) throws ConfigException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void initializeBackend() throws ConfigException,
      InitializationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void finalizeBackend()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public DN[] getBaseDNs()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void preloadEntryCache() throws UnsupportedOperationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean isLocal()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean isIndexed(AttributeType attributeType, IndexType indexType)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public Entry getEntry(DN entryDN) throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public long numSubordinates(DN entryDN, boolean subtree)
      throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void addEntry(Entry entry, AddOperation addOperation)
      throws DirectoryException, CanceledOperationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
      throws DirectoryException, CanceledOperationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void replaceEntry(Entry oldEntry, Entry newEntry,
      ModifyOperation modifyOperation) throws DirectoryException,
      CanceledOperationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void renameEntry(DN currentDN, Entry entry,
      ModifyDNOperation modifyDNOperation) throws DirectoryException,
      CanceledOperationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void search(SearchOperation searchOperation)
      throws DirectoryException, CanceledOperationException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public Set<String> getSupportedControls()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public Set<String> getSupportedFeatures()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean supportsLDIFExport()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void exportLDIF(LDIFExportConfig exportConfig)
      throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean supportsLDIFImport()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
      throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean supportsBackup()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean supportsBackup(BackupConfig backupConfig,
      StringBuilder unsupportedReason)
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void createBackup(BackupConfig backupConfig) throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void removeBackup(BackupDirectory backupDirectory, String backupID)
      throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public boolean supportsRestore()
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public void restoreBackup(RestoreConfig restoreConfig)
      throws DirectoryException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public long getEntryCount()
  {
    throw new RuntimeException("Not implemented");
  }
}
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -44,6 +44,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
import org.opends.server.backends.ChangelogBackend;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
import org.opends.server.core.networkgroups.NetworkGroup;
@@ -55,12 +56,14 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
import org.opends.server.util.ServerConstants;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.*;
@@ -91,13 +94,21 @@
  private final Map<DN, ReplicationServerDomain> baseDNs =
      new HashMap<DN, ReplicationServerDomain>();
  private ChangelogDB changelogDB;
  /** The database storing the changes. */
  private final ChangelogDB changelogDB;
  /** The backend that allow to search the changes (external changelog). */
  private ChangelogBackend changelogBackend;
  private final AtomicBoolean shutdown = new AtomicBoolean();
  private boolean stopListen = false;
  private final ReplSessionSecurity replSessionSecurity;
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** To know whether a domain is enabled for the external changelog. */
  private final ECLEnabledDomainPredicate domainPredicate;
  private static final String eclWorkflowID =
    "External Changelog Workflow ID";
  private ECLWorkflowElement eclwe;
@@ -131,32 +142,45 @@
   */
  public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
  {
    this(cfg, new DSRSShutdownSync());
    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
  }
  /**
   * Creates a new Replication server using the provided configuration entry.
   * Creates a new Replication server using the provided configuration entry and shutdown
   * synchronization object.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(ReplicationServerCfg cfg,
      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
  {
    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
  }
  /**
   * Creates a new Replication server using the provided configuration entry, shutdown
   * synchronization object and domain predicate.
   *
   * @param cfg The configuration of this replication server.
   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
   * @param predicate Indicates whether a domain is enabled for the external changelog.
   * @throws ConfigException When Configuration is invalid.
   */
  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
      final ECLEnabledDomainPredicate predicate) throws ConfigException
  {
    this.config = cfg;
    this.changelogDB = new JEChangelogDB(this, cfg);
    this.dsrsShutdownSync = dsrsShutdownSync;
    this.config = cfg;
    this.domainPredicate = predicate;
    ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
    logger.trace("Using %s as DB implementation for changelog DB", dbImpl);
    if (dbImpl == ReplicationDBImplementation.JE)
    {
      logger.trace("Using JE as DB implementation for changelog DB");
      this.changelogDB = new JEChangelogDB(this, cfg);
    }
    else
    {
      logger.trace("Using LOG FILE as DB implementation for changelog DB");
      this.changelogDB = new FileChangelogDB(this, cfg);
    }
@@ -164,6 +188,9 @@
    initialize();
    cfg.addChangeListener(this);
    // TODO : uncomment to branch changelog backend
    //enableExternalChangeLog();
    localPorts.add(getReplicationPort());
    // Keep track of this new instance
@@ -493,6 +520,57 @@
    registerVirtualAttributeRules();
  }
  /**
   * Enable the external changelog if it is not already enabled.
   * <p>
   * The external changelog is provided by the changelog backend.
   *
   * @throws ConfigException
   *            If an error occurs.
   */
  private void enableExternalChangeLog() throws ConfigException
  {
    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
    {
      // Backend has already been created and initialized
      // This can occurs in tests
      return;
    }
    try
    {
      changelogBackend = new ChangelogBackend(this, domainPredicate);
      changelogBackend.initializeBackend();
      try
      {
        DirectoryServer.registerBackend(changelogBackend);
      }
      catch (Exception e)
      {
        logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
            getExceptionMessage(e)));
      }
      registerVirtualAttributeRules();
    }
    catch (Exception e)
    {
      // TODO : I18N with correct message + what kind of exception should we really throw ?
      // (Directory/Initialization/Config Exception)
      throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e);
    }
  }
  private void shutdownExternalChangelog()
  {
    if (changelogBackend != null)
    {
      DirectoryServer.deregisterBackend(changelogBackend);
      changelogBackend.finalizeBackend();
      changelogBackend = null;
    }
    deregisterVirtualAttributeRules();
  }
  private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
  {
    final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -601,6 +679,64 @@
    return getReplicationServerDomain(baseDN, false);
  }
  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
  {
    Set<DN> domains = null;
    synchronized (baseDNs)
    {
      domains = new HashSet<DN>(baseDNs.keySet());
    }
    domains.removeAll(excludedBaseDNs);
    return domains;
  }
  /**
   * Validate that provided state is coherent with this replication server,
   * when ignoring the provided set of DNs.
   * <p>
   * The state is coherent if and only if it exactly has the set of DNs corresponding to
   * the replication domains.
   *
   * @param state
   *            The multi domain state (cookie) to validate.
   * @param ignoredBaseDNs
   *            The set of DNs to ignore when validating
   * @throws DirectoryException
   *            If the state is not valid
   */
  public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
  {
    // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
    final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
    final Set<DN> stateDomains = state.getSnapshot().keySet();
    final Set<DN> domainsCopy = new HashSet<DN>(domains);
    final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
    domainsCopy.removeAll(stateDomains);
    if (!domainsCopy.isEmpty())
    {
      final StringBuilder missingDomains = new StringBuilder();
      for (DN dn : domainsCopy)
      {
        missingDomains.append(dn).append(":;");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
              missingDomains, "<" + state.toString() + missingDomains + ">"));
    }
    stateDomainsCopy.removeAll(domains);
    if (!stateDomainsCopy.isEmpty())
    {
      final StringBuilder startState = new StringBuilder();
      for (DN dn : domains) {
        startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
      }
      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
              stateDomainsCopy.toString(), startState));
    }
  }
  /**
   * Get the ReplicationServerDomain associated to the base DN given in
   * parameter.
@@ -698,7 +834,9 @@
      domain.shutdown();
    }
    // TODO : switch to second method when changelog backend is branched
    shutdownECL();
    //shutdownExternalChangelog();
    try
    {
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -25,6 +25,8 @@
 */
package org.opends.server.replication.server.changelog.api;
import java.util.Set;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
@@ -115,6 +117,32 @@
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
      throws ChangelogException;
  /**
   * Generates a {@link DBCursor} across all the domains starting at or after
   * the provided {@link MultiDomainServerState} for each domain, excluding a
   * provided set of domain DNs.
   * <p>
   * When the cursor is not used anymore, client code MUST call the
   * {@link DBCursor#close()} method to free the resources and locks used by the
   * cursor.
   *
   * @param startState
   *          Starting point for each domain cursor. If any {@link ServerState}
   *          for a domain is null, then start from the oldest CSN for each
   *          replicaDBs
   * @param positionStrategy
   *          Cursor position strategy, which allow to indicates at which exact
   *          position the cursor must start
   * @param excludedDomainDns
   *          Every domain appearing in this set is excluded from the cursor
   * @return a non null {@link DBCursor}
   * @throws ChangelogException
   *           If a database problem happened
   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
   */
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy,
      Set<DN> excludedDomainDns) throws ChangelogException;
  // serverId methods
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -25,6 +25,8 @@
 */
package org.opends.server.replication.server.changelog.file;
import java.util.Set;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -98,6 +100,15 @@
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
      PositionStrategy positionStrategy, Set<DN> excludedDomainDns)
      throws ChangelogException
  {
    throw new RuntimeException("Not implemented");
  }
  /** {@inheritDoc} */
  @Override
  public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
      PositionStrategy positionStrategy) throws ChangelogException
  {
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
@@ -32,7 +32,7 @@
 *
 * @FunctionalInterface
 */
class ECLEnabledDomainPredicate
public class ECLEnabledDomainPredicate
{
  /**
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -33,7 +33,7 @@
 * Multi domain DB cursor that only returns updates for the domains which have
 * been enabled for the external changelog.
 */
class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
{
  private final ECLEnabledDomainPredicate predicate;
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -701,11 +701,22 @@
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    final Set<DN> excludedDomainDns = Collections.emptySet();
    return getCursorFrom(startState, positionStrategy, excludedDomainDns);
  }
  /** {@inheritDoc} */
  @Override
  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
      final PositionStrategy positionStrategy, final  Set<DN> excludedDomainDns) throws ChangelogException
  {
    final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
    registeredMultiDomainCursors.add(cursor);
    for (DN baseDN : domainToReplicaDBs.keySet())
    {
      cursor.addDomain(baseDN, startState.getServerState(baseDN));
      if (!excludedDomainDns.contains(baseDN)) {
        cursor.addDomain(baseDN, startState.getServerState(baseDN));
      }
    }
    return cursor;
  }
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -39,6 +39,8 @@
import org.opends.server.replication.plugin.LDAPReplicationDomain;
import org.opends.server.replication.protocol.AddMsg;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.tools.LDAPSearch;
import org.opends.server.types.Attribute;
@@ -170,7 +172,15 @@
    ReplServerFakeConfiguration conf =
        new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0,
            100, servers);
    ReplicationServer replicationServer = new ReplicationServer(conf);
    final DN testBaseDN = this.baseDN;
    ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
    {
      @Override
      public boolean isECLEnabledDomain(DN baseDN)
      {
        return testBaseDN.equals(baseDN);
      }
    });
    Thread.sleep(1000);
    return replicationServer;