From 13f31d030c3b205931b63c29b0d6bc1d4eefd163 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Thu, 21 Aug 2014 13:07:07 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend to support cn=changelog CR-4083
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 152 ++++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 146 insertions(+), 6 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 652ce4f..93a9f57 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -42,6 +42,7 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg;
import org.opends.server.api.VirtualAttributeProvider;
+import org.opends.server.backends.ChangelogBackend;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.WorkflowImpl;
@@ -56,6 +57,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.file.FileChangelogDB;
+import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate;
import org.opends.server.replication.server.changelog.je.JEChangelogDB;
import org.opends.server.replication.service.DSRSShutdownSync;
import org.opends.server.types.*;
@@ -63,6 +65,7 @@
import org.opends.server.util.StaticUtils;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
+import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -95,11 +98,19 @@
private final Map<DN, ReplicationServerDomain> baseDNs =
new HashMap<DN, ReplicationServerDomain>();
+ /** The database storing the changes. */
private final ChangelogDB changelogDB;
+
+ /** The backend that allow to search the changes (external changelog). */
+ private ChangelogBackend changelogBackend;
+
private final AtomicBoolean shutdown = new AtomicBoolean();
private boolean stopListen = false;
private final ReplSessionSecurity replSessionSecurity;
+ /** To know whether a domain is enabled for the external changelog. */
+ private final ECLEnabledDomainPredicate domainPredicate;
+
/** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
@@ -136,26 +147,41 @@
*/
public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException
{
- this(cfg, new DSRSShutdownSync());
+ this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate());
}
/**
- * Creates a new Replication server using the provided configuration entry.
+ * Creates a new Replication server using the provided configuration entry and shutdown
+ * synchronization object.
*
* @param cfg The configuration of this replication server.
* @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
* @throws ConfigException When Configuration is invalid.
*/
- public ReplicationServer(ReplicationServerCfg cfg,
- DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+ public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException
+ {
+ this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate());
+ }
+
+ /**
+ * Creates a new Replication server using the provided configuration entry, shutdown
+ * synchronization object and domain predicate.
+ *
+ * @param cfg The configuration of this replication server.
+ * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances.
+ * @param predicate Indicates whether a domain is enabled for the external changelog.
+ * @throws ConfigException When Configuration is invalid.
+ */
+ public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync,
+ final ECLEnabledDomainPredicate predicate) throws ConfigException
{
this.config = cfg;
this.dsrsShutdownSync = dsrsShutdownSync;
+ this.domainPredicate = predicate;
ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation();
if (DebugLogger.debugEnabled())
{
- TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl
- + " as DB implementation for changelog DB");
+ TRACER.debugMessage(DebugLogLevel.INFO, "Using " + dbImpl + " as DB implementation for changelog DB");
}
this.changelogDB = dbImpl == ReplicationDBImplementation.JE
? new JEChangelogDB(this, cfg)
@@ -165,6 +191,9 @@
initialize();
cfg.addChangeListener(this);
+ // TODO : uncomment to branch changelog backend
+ //enableExternalChangeLog();
+
localPorts.add(getReplicationPort());
// Keep track of this new instance
@@ -501,6 +530,57 @@
registerVirtualAttributeRules();
}
+ /**
+ * Enable the external changelog if it is not already enabled.
+ * <p>
+ * The external changelog is provided by the changelog backend.
+ *
+ * @throws ConfigException
+ * If an error occurs.
+ */
+ private void enableExternalChangeLog() throws ConfigException
+ {
+ if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID))
+ {
+ // Backend has already been created and initialized
+ // This can occurs in tests
+ return;
+ }
+ try
+ {
+ changelogBackend = new ChangelogBackend(this, domainPredicate);
+ changelogBackend.initializeBackend();
+ try
+ {
+ DirectoryServer.registerBackend(changelogBackend);
+ }
+ catch (Exception e)
+ {
+ logError(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(),
+ getExceptionMessage(e)));
+ }
+
+ registerVirtualAttributeRules();
+ }
+ catch (Exception e)
+ {
+ // TODO : I18N with correct message + what kind of exception should we really throw ?
+ // (Directory/Initialization/Config Exception)
+ throw new ConfigException(Message.raw("Error when enabling external changelog"), e);
+ }
+ }
+
+ private void shutdownExternalChangelog()
+ {
+ if (changelogBackend != null)
+ {
+ DirectoryServer.deregisterBackend(changelogBackend);
+ changelogBackend.finalizeBackend();
+ changelogBackend = null;
+ }
+ deregisterVirtualAttributeRules();
+ }
+
private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException
{
final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>();
@@ -609,6 +689,64 @@
return getReplicationServerDomain(baseDN, false);
}
+ /** Returns the replicated domain DNs minus the provided set of excluded DNs. */
+ private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException
+ {
+ Set<DN> domains = null;
+ synchronized (baseDNs)
+ {
+ domains = new HashSet<DN>(baseDNs.keySet());
+ }
+ domains.removeAll(excludedBaseDNs);
+ return domains;
+ }
+
+ /**
+ * Validate that provided state is coherent with this replication server,
+ * when ignoring the provided set of DNs.
+ * <p>
+ * The state is coherent if and only if it exactly has the set of DNs corresponding to
+ * the replication domains.
+ *
+ * @param state
+ * The multi domain state (cookie) to validate.
+ * @param ignoredBaseDNs
+ * The set of DNs to ignore when validating
+ * @throws DirectoryException
+ * If the state is not valid
+ */
+ public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException
+ {
+ // TODO : should skip unused domains, where domain.getLatestServerState(); is empty
+ final Set<DN> domains = getDomainDNs(ignoredBaseDNs);
+ final Set<DN> stateDomains = state.getSnapshot().keySet();
+ final Set<DN> domainsCopy = new HashSet<DN>(domains);
+ final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains);
+ domainsCopy.removeAll(stateDomains);
+ if (!domainsCopy.isEmpty())
+ {
+ final StringBuilder missingDomains = new StringBuilder();
+ for (DN dn : domainsCopy)
+ {
+ missingDomains.append(dn).append(":;");
+ }
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+ missingDomains, "<" + state.toString() + missingDomains + ">"));
+ }
+ stateDomainsCopy.removeAll(domains);
+ if (!stateDomainsCopy.isEmpty())
+ {
+ final StringBuilder startState = new StringBuilder();
+ for (DN dn : domains) {
+ startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";");
+ }
+ throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+ ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+ stateDomainsCopy.toString(), startState));
+ }
+ }
+
/**
* Get the ReplicationServerDomain associated to the base DN given in
* parameter.
@@ -706,7 +844,9 @@
domain.shutdown();
}
+ // TODO : switch to second method when changelog backend is branched
shutdownECL();
+ //shutdownExternalChangelog();
try
{
--
Gitblit v1.10.0