From e53ece2d372bc18e22340da7152291f93b45cb8c Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 22 Sep 2014 14:19:46 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend   to support cn=changelog CR-4083

---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java                                 |  288 ++++++++++++++++++++++++++++++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java             |   13 +
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java    |   12 +
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java                      |  156 ++++++++++++++++-
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java      |   28 +++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java    |    2 
 opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties                                                |    6 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java         |   11 +
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java |    2 
 9 files changed, 505 insertions(+), 13 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
index 23c1a2e..5a7de06 100644
--- a/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
+++ b/opendj-sdk/opendj3-server-dev/src/messages/messages/replication.properties
@@ -624,3 +624,9 @@
  full, or corrupt and must be fixed before this replication server can be used. The underlying error was: %s
 INFO_CHANGELOG_LOG_FILE_RECOVERED_284=Log file '%s' was successfully \
  recovered by removing a partially written record
+NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \
+ perform a search request on cn=changelog
+ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \
+ searching base DN '%s' with filter '%s' in changelog backend : %s
+ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES_287 =An error occurred when \
+ retrieving number of subordinates for entry DN '%s' in changelog backend : %s
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
new file mode 100644
index 0000000..12d6b8f
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
@@ -0,0 +1,288 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *
+ *      Copyright 2014 ForgeRock AS.
+ */
+package org.opends.server.backends;
+
+import java.util.Set;
+
+import org.forgerock.opendj.config.server.ConfigException;
+import org.forgerock.opendj.ldap.ConditionResult;
+import org.opends.server.admin.Configuration;
+import org.opends.server.api.Backend;
+import org.opends.server.core.AddOperation;
+import org.opends.server.core.DeleteOperation;
+import org.opends.server.core.ModifyDNOperation;
+import org.opends.server.core.ModifyOperation;
+import org.opends.server.core.SearchOperation;
+import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
+import org.opends.server.types.AttributeType;
+import org.opends.server.types.BackupConfig;
+import org.opends.server.types.BackupDirectory;
+import org.opends.server.types.CanceledOperationException;
+import org.opends.server.types.DN;
+import org.opends.server.types.DirectoryException;
+import org.opends.server.types.Entry;
+import org.opends.server.types.IndexType;
+import org.opends.server.types.InitializationException;
+import org.opends.server.types.LDIFExportConfig;
+import org.opends.server.types.LDIFImportConfig;
+import org.opends.server.types.LDIFImportResult;
+import org.opends.server.types.RestoreConfig;
+
+/**
+ * Changelog backend.
+ */
+public class ChangelogBackend extends Backend<Configuration>
+{
+
+  /** Backend id. */
+  public static final String BACKEND_ID = "changelog";
+
+  /**
+   * Creates.
+   *
+   * @param replicationServer
+   *            The replication server.
+   * @param domainPredicate
+   *            The predicate.
+   */
+  public ChangelogBackend(ReplicationServer replicationServer,
+      ECLEnabledDomainPredicate domainPredicate)
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void configureBackend(Configuration cfg) throws ConfigException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void initializeBackend() throws ConfigException,
+      InitializationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void finalizeBackend()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public DN[] getBaseDNs()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void preloadEntryCache() throws UnsupportedOperationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean isLocal()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean isIndexed(AttributeType attributeType, IndexType indexType)
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Entry getEntry(DN entryDN) throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long numSubordinates(DN entryDN, boolean subtree)
+      throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void addEntry(Entry entry, AddOperation addOperation)
+      throws DirectoryException, CanceledOperationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void deleteEntry(DN entryDN, DeleteOperation deleteOperation)
+      throws DirectoryException, CanceledOperationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void replaceEntry(Entry oldEntry, Entry newEntry,
+      ModifyOperation modifyOperation) throws DirectoryException,
+      CanceledOperationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void renameEntry(DN currentDN, Entry entry,
+      ModifyDNOperation modifyDNOperation) throws DirectoryException,
+      CanceledOperationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void search(SearchOperation searchOperation)
+      throws DirectoryException, CanceledOperationException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Set<String> getSupportedControls()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public Set<String> getSupportedFeatures()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean supportsLDIFExport()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void exportLDIF(LDIFExportConfig exportConfig)
+      throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean supportsLDIFImport()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public LDIFImportResult importLDIF(LDIFImportConfig importConfig)
+      throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean supportsBackup()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean supportsBackup(BackupConfig backupConfig,
+      StringBuilder unsupportedReason)
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void createBackup(BackupConfig backupConfig) throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void removeBackup(BackupDirectory backupDirectory, String backupID)
+      throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean supportsRestore()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void restoreBackup(RestoreConfig restoreConfig)
+      throws DirectoryException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public long getEntryCount()
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
index c1f52c6..0998c10 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -44,6 +44,7 @@
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
 import org.opends.server.api.VirtualAttributeProvider;
+import org.opends.server.backends.ChangelogBackend;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.WorkflowImpl;
 import org.opends.server.core.networkgroups.NetworkGroup;
@@ -55,12 +56,14 @@
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.file.FileChangelogDB;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
 import org.opends.server.replication.server.changelog.je.JEChangelogDB;
 import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.*;
 import org.opends.server.util.ServerConstants;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
+import static org.opends.messages.ConfigMessages.*;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -91,13 +94,21 @@
   private final Map<DN, ReplicationServerDomain> baseDNs =
       new HashMap<DN, ReplicationServerDomain>();
 
-  private ChangelogDB changelogDB;
+  /** The database storing the changes. */
+  private final ChangelogDB changelogDB;
+
+  /** The backend that allow to search the changes (external changelog). */
+  private ChangelogBackend changelogBackend;
+
   private final AtomicBoolean shutdown = new AtomicBoolean();
   private boolean stopListen = false;
   private final ReplSessionSecurity replSessionSecurity;
 
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
+  /** To know whether a domain is enabled for the external changelog. */
+  private final ECLEnabledDomainPredicate domainPredicate;
+
   private static final String eclWorkflowID =
     "External Changelog Workflow ID";
   private ECLWorkflowElement eclwe;
@@ -131,32 +142,45 @@
    */
   public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
   {
-    this(cfg, new DSRSShutdownSync());
+    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
   }
 
   /**
-   * Creates a new Replication server using the provided configuration entry.
+   * Creates a new Replication server using the provided configuration entry and shutdown
+   * synchronization object.
    *
    * @param cfg The configuration of this replication server.
    * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
    * @throws ConfigException When Configuration is invalid.
    */
-  public ReplicationServer(ReplicationServerCfg cfg,
-      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+  {
+    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
+  }
+
+  /**
+   * Creates a new Replication server using the provided configuration entry, shutdown
+   * synchronization object and domain predicate.
+   *
+   * @param cfg The configuration of this replication server.
+   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+   * @param predicate Indicates whether a domain is enabled for the external changelog.
+   * @throws ConfigException When Configuration is invalid.
+   */
+  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
+      final ECLEnabledDomainPredicate predicate) throws ConfigException
   {
     this.config = cfg;
-    this.changelogDB = new JEChangelogDB(this, cfg);
     this.dsrsShutdownSync = dsrsShutdownSync;
-    this.config = cfg;
+    this.domainPredicate = predicate;
     ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
+    logger.trace("Using %s as DB implementation for changelog DB", dbImpl);
     if (dbImpl == ReplicationDBImplementation.JE)
     {
-      logger.trace("Using JE as DB implementation for changelog DB");
       this.changelogDB = new JEChangelogDB(this, cfg);
     }
     else
     {
-      logger.trace("Using LOG FILE as DB implementation for changelog DB");
       this.changelogDB = new FileChangelogDB(this, cfg);
     }
 
@@ -164,6 +188,9 @@
     initialize();
     cfg.addChangeListener(this);
 
+    // TODO : uncomment to branch changelog backend
+    //enableExternalChangeLog();
+
     localPorts.add(getReplicationPort());
 
     // Keep track of this new instance
@@ -493,6 +520,57 @@
     registerVirtualAttributeRules();
   }
 
+  /**
+   * Enable the external changelog if it is not already enabled.
+   * <p>
+   * The external changelog is provided by the changelog backend.
+   *
+   * @throws ConfigException
+   *            If an error occurs.
+   */
+  private void enableExternalChangeLog() throws ConfigException
+  {
+    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
+    {
+      // Backend has already been created and initialized
+      // This can occurs in tests
+      return;
+    }
+    try
+    {
+      changelogBackend = new ChangelogBackend(this, domainPredicate);
+      changelogBackend.initializeBackend();
+      try
+      {
+        DirectoryServer.registerBackend(changelogBackend);
+      }
+      catch (Exception e)
+      {
+        logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
+            getExceptionMessage(e)));
+      }
+
+      registerVirtualAttributeRules();
+    }
+    catch (Exception e)
+    {
+      // TODO : I18N with correct message + what kind of exception should we really throw ?
+      // (Directory/Initialization/Config Exception)
+      throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e);
+    }
+  }
+
+  private void shutdownExternalChangelog()
+  {
+    if (changelogBackend != null)
+    {
+      DirectoryServer.deregisterBackend(changelogBackend);
+      changelogBackend.finalizeBackend();
+      changelogBackend = null;
+    }
+    deregisterVirtualAttributeRules();
+  }
+
   private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
   {
     final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -601,6 +679,64 @@
     return getReplicationServerDomain(baseDN, false);
   }
 
+  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
+  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
+  {
+    Set<DN> domains = null;
+    synchronized (baseDNs)
+    {
+      domains = new HashSet<DN>(baseDNs.keySet());
+    }
+    domains.removeAll(excludedBaseDNs);
+    return domains;
+  }
+
+  /**
+   * Validate that provided state is coherent with this replication server,
+   * when ignoring the provided set of DNs.
+   * <p>
+   * The state is coherent if and only if it exactly has the set of DNs corresponding to
+   * the replication domains.
+   *
+   * @param state
+   *            The multi domain state (cookie) to validate.
+   * @param ignoredBaseDNs
+   *            The set of DNs to ignore when validating
+   * @throws DirectoryException
+   *            If the state is not valid
+   */
+  public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
+  {
+    // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
+    final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
+    final Set<DN> stateDomains = state.getSnapshot().keySet();
+    final Set<DN> domainsCopy = new HashSet<DN>(domains);
+    final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
+    domainsCopy.removeAll(stateDomains);
+    if (!domainsCopy.isEmpty())
+    {
+      final StringBuilder missingDomains = new StringBuilder();
+      for (DN dn : domainsCopy)
+      {
+        missingDomains.append(dn).append(":;");
+      }
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+              missingDomains, "<" + state.toString() + missingDomains + ">"));
+    }
+    stateDomainsCopy.removeAll(domains);
+    if (!stateDomainsCopy.isEmpty())
+    {
+      final StringBuilder startState = new StringBuilder();
+      for (DN dn : domains) {
+        startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
+      }
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+              stateDomainsCopy.toString(), startState));
+    }
+  }
+
   /**
    * Get the ReplicationServerDomain associated to the base DN given in
    * parameter.
@@ -698,7 +834,9 @@
       domain.shutdown();
     }
 
+    // TODO : switch to second method when changelog backend is branched
     shutdownECL();
+    //shutdownExternalChangelog();
 
     try
     {
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index e275020..8970417 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -25,6 +25,8 @@
  */
 package org.opends.server.replication.server.changelog.api;
 
+import java.util.Set;
+
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
 import org.opends.server.replication.common.ServerState;
@@ -115,6 +117,32 @@
   public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
       throws ChangelogException;
 
+  /**
+   * Generates a {@link DBCursor} across all the domains starting at or after
+   * the provided {@link MultiDomainServerState} for each domain, excluding a
+   * provided set of domain DNs.
+   * <p>
+   * When the cursor is not used anymore, client code MUST call the
+   * {@link DBCursor#close()} method to free the resources and locks used by the
+   * cursor.
+   *
+   * @param startState
+   *          Starting point for each domain cursor. If any {@link ServerState}
+   *          for a domain is null, then start from the oldest CSN for each
+   *          replicaDBs
+   * @param positionStrategy
+   *          Cursor position strategy, which allow to indicates at which exact
+   *          position the cursor must start
+   * @param excludedDomainDns
+   *          Every domain appearing in this set is excluded from the cursor
+   * @return a non null {@link DBCursor}
+   * @throws ChangelogException
+   *           If a database problem happened
+   * @see #getCursorFrom(DN, ServerState, PositionStrategy)
+   */
+  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy,
+      Set<DN> excludedDomainDns) throws ChangelogException;
+
   // serverId methods
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index ad0f2fe..26a0bfe 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -25,6 +25,8 @@
  */
 package org.opends.server.replication.server.changelog.file;
 
+import java.util.Set;
+
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.MultiDomainServerState;
@@ -98,6 +100,15 @@
 
   /** {@inheritDoc} */
   @Override
+  public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
+      PositionStrategy positionStrategy, Set<DN> excludedDomainDns)
+      throws ChangelogException
+  {
+    throw new RuntimeException("Not implemented");
+  }
+
+  /** {@inheritDoc} */
+  @Override
   public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
       PositionStrategy positionStrategy) throws ChangelogException
   {
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
index d1a4721..9a9972d 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
@@ -32,7 +32,7 @@
  *
  * @FunctionalInterface
  */
-class ECLEnabledDomainPredicate
+public class ECLEnabledDomainPredicate
 {
 
   /**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
index a035657..97a1806 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -33,7 +33,7 @@
  * Multi domain DB cursor that only returns updates for the domains which have
  * been enabled for the external changelog.
  */
-class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
+public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg>
 {
 
   private final ECLEnabledDomainPredicate predicate;
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index b39b6d5..2504200 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -701,11 +701,22 @@
   public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
       final PositionStrategy positionStrategy) throws ChangelogException
   {
+    final Set<DN> excludedDomainDns = Collections.emptySet();
+    return getCursorFrom(startState, positionStrategy, excludedDomainDns);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+      final PositionStrategy positionStrategy, final  Set<DN> excludedDomainDns) throws ChangelogException
+  {
     final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
     registeredMultiDomainCursors.add(cursor);
     for (DN baseDN : domainToReplicaDBs.keySet())
     {
-      cursor.addDomain(baseDN, startState.getServerState(baseDN));
+      if (!excludedDomainDns.contains(baseDN)) {
+        cursor.addDomain(baseDN, startState.getServerState(baseDN));
+      }
     }
     return cursor;
   }
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
index c133267..1404da9 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -39,6 +39,8 @@
 import org.opends.server.replication.plugin.LDAPReplicationDomain;
 import org.opends.server.replication.protocol.AddMsg;
 import org.opends.server.replication.protocol.ReplicationMsg;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
+import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.replication.service.ReplicationBroker;
 import org.opends.server.tools.LDAPSearch;
 import org.opends.server.types.Attribute;
@@ -170,7 +172,15 @@
     ReplServerFakeConfiguration conf =
         new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0,
             100, servers);
-    ReplicationServer replicationServer = new ReplicationServer(conf);
+    final DN testBaseDN = this.baseDN;
+    ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()
+    {
+      @Override
+      public boolean isECLEnabledDomain(DN baseDN)
+      {
+        return testBaseDN.equals(baseDN);
+      }
+    });
     Thread.sleep(1000);
 
     return replicationServer;

--
Gitblit v1.10.0