From 13f31d030c3b205931b63c29b0d6bc1d4eefd163 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 21 Aug 2014 13:07:07 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend   to support cn=changelog CR-4083

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  152 ++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 146 insertions(+), 6 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 652ce4f..93a9f57 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@
 import org.opends.server.admin.std.server.ReplicationServerCfg;
 import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
 import org.opends.server.api.VirtualAttributeProvider;
+import org.opends.server.backends.ChangelogBackend;
 import org.opends.server.config.ConfigException;
 import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.WorkflowImpl;
@@ -56,6 +57,7 @@
 import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.file.FileChangelogDB;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
 import org.opends.server.replication.server.changelog.je.JEChangelogDB;
 import org.opends.server.replication.service.DSRSShutdownSync;
 import org.opends.server.types.*;
@@ -63,6 +65,7 @@
 import org.opends.server.util.StaticUtils;
 import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
 
+import static org.opends.messages.ConfigMessages.*;
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -95,11 +98,19 @@
   private final Map<DN, ReplicationServerDomain> baseDNs =
       new HashMap<DN, ReplicationServerDomain>();
 
+  /** The database storing the changes. */
   private final ChangelogDB changelogDB;
+
+  /** The backend that allow to search the changes (external changelog). */
+  private ChangelogBackend changelogBackend;
+
   private final AtomicBoolean shutdown = new AtomicBoolean();
   private boolean stopListen = false;
   private final ReplSessionSecurity replSessionSecurity;
 
+  /** To know whether a domain is enabled for the external changelog. */
+  private final ECLEnabledDomainPredicate domainPredicate;
+
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
@@ -136,26 +147,41 @@
    */
   public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
   {
-    this(cfg, new DSRSShutdownSync());
+    this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
   }
 
   /**
-   * Creates a new Replication server using the provided configuration entry.
+   * Creates a new Replication server using the provided configuration entry and shutdown
+   * synchronization object.
    *
    * @param cfg The configuration of this replication server.
    * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
    * @throws ConfigException When Configuration is invalid.
    */
-  public ReplicationServer(ReplicationServerCfg cfg,
-      DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+  public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+  {
+    this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
+  }
+
+  /**
+   * Creates a new Replication server using the provided configuration entry, shutdown
+   * synchronization object and domain predicate.
+   *
+   * @param cfg The configuration of this replication server.
+   * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+   * @param predicate Indicates whether a domain is enabled for the external changelog.
+   * @throws ConfigException When Configuration is invalid.
+   */
+  public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
+      final ECLEnabledDomainPredicate predicate) throws ConfigException
   {
     this.config = cfg;
     this.dsrsShutdownSync = dsrsShutdownSync;
+    this.domainPredicate = predicate;
     ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
     if (DebugLogger.debugEnabled())
     {
-      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
-          + " as DB implementation for changelog DB");
+      TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB");
     }
     this.changelogDB = dbImpl == ReplicationDBImplementation.JE
         ? new JEChangelogDB(this, cfg)
@@ -165,6 +191,9 @@
     initialize();
     cfg.addChangeListener(this);
 
+    // TODO : uncomment to branch changelog backend
+    //enableExternalChangeLog();
+
     localPorts.add(getReplicationPort());
 
     // Keep track of this new instance
@@ -501,6 +530,57 @@
     registerVirtualAttributeRules();
   }
 
+  /**
+   * Enable the external changelog if it is not already enabled.
+   * <p>
+   * The external changelog is provided by the changelog backend.
+   *
+   * @throws ConfigException
+   *            If an error occurs.
+   */
+  private void enableExternalChangeLog() throws ConfigException
+  {
+    if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
+    {
+      // Backend has already been created and initialized
+      // This can occurs in tests
+      return;
+    }
+    try
+    {
+      changelogBackend = new ChangelogBackend(this, domainPredicate);
+      changelogBackend.initializeBackend();
+      try
+      {
+        DirectoryServer.registerBackend(changelogBackend);
+      }
+      catch (Exception e)
+      {
+        logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
+            getExceptionMessage(e)));
+      }
+
+      registerVirtualAttributeRules();
+    }
+    catch (Exception e)
+    {
+      // TODO : I18N with correct message + what kind of exception should we really throw ?
+      // (Directory/Initialization/Config Exception)
+      throw new ConfigException(Message.raw("Error when enabling external changelog"), e);
+    }
+  }
+
+  private void shutdownExternalChangelog()
+  {
+    if (changelogBackend != null)
+    {
+      DirectoryServer.deregisterBackend(changelogBackend);
+      changelogBackend.finalizeBackend();
+      changelogBackend = null;
+    }
+    deregisterVirtualAttributeRules();
+  }
+
   private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
   {
     final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -609,6 +689,64 @@
     return getReplicationServerDomain(baseDN, false);
   }
 
+  /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
+  private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
+  {
+    Set<DN> domains = null;
+    synchronized (baseDNs)
+    {
+      domains = new HashSet<DN>(baseDNs.keySet());
+    }
+    domains.removeAll(excludedBaseDNs);
+    return domains;
+  }
+
+  /**
+   * Validate that provided state is coherent with this replication server,
+   * when ignoring the provided set of DNs.
+   * <p>
+   * The state is coherent if and only if it exactly has the set of DNs corresponding to
+   * the replication domains.
+   *
+   * @param state
+   *            The multi domain state (cookie) to validate.
+   * @param ignoredBaseDNs
+   *            The set of DNs to ignore when validating
+   * @throws DirectoryException
+   *            If the state is not valid
+   */
+  public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
+  {
+    // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
+    final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
+    final Set<DN> stateDomains = state.getSnapshot().keySet();
+    final Set<DN> domainsCopy = new HashSet<DN>(domains);
+    final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
+    domainsCopy.removeAll(stateDomains);
+    if (!domainsCopy.isEmpty())
+    {
+      final StringBuilder missingDomains = new StringBuilder();
+      for (DN dn : domainsCopy)
+      {
+        missingDomains.append(dn).append(":;");
+      }
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+              missingDomains, "<" + state.toString() + missingDomains + ">"));
+    }
+    stateDomainsCopy.removeAll(domains);
+    if (!stateDomainsCopy.isEmpty())
+    {
+      final StringBuilder startState = new StringBuilder();
+      for (DN dn : domains) {
+        startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
+      }
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+              stateDomainsCopy.toString(), startState));
+    }
+  }
+
   /**
    * Get the ReplicationServerDomain associated to the base DN given in
    * parameter.
@@ -706,7 +844,9 @@
       domain.shutdown();
     }
 
+    // TODO : switch to second method when changelog backend is branched
     shutdownECL();
+    //shutdownExternalChangelog();
 
     try
     {

--
Gitblit v1.10.0