opendj3-server-dev/src/messages/messages/replication.properties
@@ -624,3 +624,9 @@ full, or corrupt and must be fixed before this replication server can be used. The underlying error was: %s INFO_CHANGELOG_LOG_FILE_RECOVERED_284=Log file '%s' was successfully \ recovered by removing a partially written record NOTE_SEARCH_CHANGELOG_INSUFFICIENT_PRIVILEGES_285=You do not have sufficient privileges to \ perform a search request on cn=changelog ERR_CHANGELOG_BACKEND_SEARCH_286 =An error occurred when \ searching base DN '%s' with filter '%s' in changelog backend : %s ERR_CHANGELOG_BACKEND_NUM_SUBORDINATES_287 =An error occurred when \ retrieving number of subordinates for entry DN '%s' in changelog backend : %s opendj3-server-dev/src/server/org/opends/server/backends/ChangelogBackend.java
New file @@ -0,0 +1,288 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2014 ForgeRock AS. */ package org.opends.server.backends; import java.util.Set; import org.forgerock.opendj.config.server.ConfigException; import org.forgerock.opendj.ldap.ConditionResult; import org.opends.server.admin.Configuration; import org.opends.server.api.Backend; import org.opends.server.core.AddOperation; import org.opends.server.core.DeleteOperation; import org.opends.server.core.ModifyDNOperation; import org.opends.server.core.ModifyOperation; import org.opends.server.core.SearchOperation; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.types.AttributeType; import org.opends.server.types.BackupConfig; import org.opends.server.types.BackupDirectory; import org.opends.server.types.CanceledOperationException; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.IndexType; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFExportConfig; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.types.RestoreConfig; /** * Changelog backend. */ public class ChangelogBackend extends Backend<Configuration> { /** Backend id. */ public static final String BACKEND_ID = "changelog"; /** * Creates. * * @param replicationServer * The replication server. * @param domainPredicate * The predicate. */ public ChangelogBackend(ReplicationServer replicationServer, ECLEnabledDomainPredicate domainPredicate) { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void configureBackend(Configuration cfg) throws ConfigException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void initializeBackend() throws ConfigException, InitializationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void finalizeBackend() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public DN[] getBaseDNs() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void preloadEntryCache() throws UnsupportedOperationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean isLocal() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean isIndexed(AttributeType attributeType, IndexType indexType) { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public Entry getEntry(DN entryDN) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public ConditionResult hasSubordinates(DN entryDN) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public long numSubordinates(DN entryDN, boolean subtree) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void addEntry(Entry entry, AddOperation addOperation) throws DirectoryException, CanceledOperationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void deleteEntry(DN entryDN, DeleteOperation deleteOperation) throws DirectoryException, CanceledOperationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void replaceEntry(Entry oldEntry, Entry newEntry, ModifyOperation modifyOperation) throws DirectoryException, CanceledOperationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void renameEntry(DN currentDN, Entry entry, ModifyDNOperation modifyDNOperation) throws DirectoryException, CanceledOperationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void search(SearchOperation searchOperation) throws DirectoryException, CanceledOperationException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public Set<String> getSupportedControls() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public Set<String> getSupportedFeatures() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean supportsLDIFExport() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void exportLDIF(LDIFExportConfig exportConfig) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean supportsLDIFImport() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean supportsBackup() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean supportsBackup(BackupConfig backupConfig, StringBuilder unsupportedReason) { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void createBackup(BackupConfig backupConfig) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public boolean supportsRestore() { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public long getEntryCount() { throw new RuntimeException("Not implemented"); } } opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -44,6 +44,7 @@ import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.admin.std.server.UserDefinedVirtualAttributeCfg; import org.opends.server.api.VirtualAttributeProvider; import org.opends.server.backends.ChangelogBackend; import org.opends.server.core.DirectoryServer; import org.opends.server.core.WorkflowImpl; import org.opends.server.core.networkgroups.NetworkGroup; @@ -55,12 +56,14 @@ import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.file.FileChangelogDB; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.replication.server.changelog.je.JEChangelogDB; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.*; import org.opends.server.util.ServerConstants; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import static org.opends.messages.ConfigMessages.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.util.StaticUtils.*; @@ -91,13 +94,21 @@ private final Map<DN, ReplicationServerDomain> baseDNs = new HashMap<DN, ReplicationServerDomain>(); private ChangelogDB changelogDB; /** The database storing the changes. */ private final ChangelogDB changelogDB; /** The backend that allow to search the changes (external changelog). */ private ChangelogBackend changelogBackend; private final AtomicBoolean shutdown = new AtomicBoolean(); private boolean stopListen = false; private final ReplSessionSecurity replSessionSecurity; private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); /** To know whether a domain is enabled for the external changelog. */ private final ECLEnabledDomainPredicate domainPredicate; private static final String eclWorkflowID = "External Changelog Workflow ID"; private ECLWorkflowElement eclwe; @@ -131,32 +142,45 @@ */ public ReplicationServer(ReplicationServerCfg cfg) throws ConfigException { this(cfg, new DSRSShutdownSync()); this(cfg, new DSRSShutdownSync(), new ECLEnabledDomainPredicate()); } /** * Creates a new Replication server using the provided configuration entry. * Creates a new Replication server using the provided configuration entry and shutdown * synchronization object. * * @param cfg The configuration of this replication server. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException public ReplicationServer(ReplicationServerCfg cfg, DSRSShutdownSync dsrsShutdownSync) throws ConfigException { this(cfg, dsrsShutdownSync, new ECLEnabledDomainPredicate()); } /** * Creates a new Replication server using the provided configuration entry, shutdown * synchronization object and domain predicate. * * @param cfg The configuration of this replication server. * @param dsrsShutdownSync Synchronization object for shutdown of combined DS/RS instances. * @param predicate Indicates whether a domain is enabled for the external changelog. * @throws ConfigException When Configuration is invalid. */ public ReplicationServer(final ReplicationServerCfg cfg, final DSRSShutdownSync dsrsShutdownSync, final ECLEnabledDomainPredicate predicate) throws ConfigException { this.config = cfg; this.changelogDB = new JEChangelogDB(this, cfg); this.dsrsShutdownSync = dsrsShutdownSync; this.config = cfg; this.domainPredicate = predicate; ReplicationDBImplementation dbImpl = cfg.getReplicationDBImplementation(); logger.trace("Using %s as DB implementation for changelog DB", dbImpl); if (dbImpl == ReplicationDBImplementation.JE) { logger.trace("Using JE as DB implementation for changelog DB"); this.changelogDB = new JEChangelogDB(this, cfg); } else { logger.trace("Using LOG FILE as DB implementation for changelog DB"); this.changelogDB = new FileChangelogDB(this, cfg); } @@ -164,6 +188,9 @@ initialize(); cfg.addChangeListener(this); // TODO : uncomment to branch changelog backend //enableExternalChangeLog(); localPorts.add(getReplicationPort()); // Keep track of this new instance @@ -493,6 +520,57 @@ registerVirtualAttributeRules(); } /** * Enable the external changelog if it is not already enabled. * <p> * The external changelog is provided by the changelog backend. * * @throws ConfigException * If an error occurs. */ private void enableExternalChangeLog() throws ConfigException { if (DirectoryServer.hasBackend(ChangelogBackend.BACKEND_ID)) { // Backend has already been created and initialized // This can occurs in tests return; } try { changelogBackend = new ChangelogBackend(this, domainPredicate); changelogBackend.initializeBackend(); try { DirectoryServer.registerBackend(changelogBackend); } catch (Exception e) { logger.error(WARN_CONFIG_BACKEND_CANNOT_REGISTER_BACKEND.get(changelogBackend.getBackendID(), getExceptionMessage(e))); } registerVirtualAttributeRules(); } catch (Exception e) { // TODO : I18N with correct message + what kind of exception should we really throw ? // (Directory/Initialization/Config Exception) throw new ConfigException(LocalizableMessage.raw("Error when enabling external changelog"), e); } } private void shutdownExternalChangelog() { if (changelogBackend != null) { DirectoryServer.deregisterBackend(changelogBackend); changelogBackend.finalizeBackend(); changelogBackend = null; } deregisterVirtualAttributeRules(); } private List<VirtualAttributeRule> getVirtualAttributesRules() throws DirectoryException { final List<VirtualAttributeRule> rules = new ArrayList<VirtualAttributeRule>(); @@ -601,6 +679,64 @@ return getReplicationServerDomain(baseDN, false); } /** Returns the replicated domain DNs minus the provided set of excluded DNs. */ private Set<DN> getDomainDNs(Set<DN> excludedBaseDNs) throws DirectoryException { Set<DN> domains = null; synchronized (baseDNs) { domains = new HashSet<DN>(baseDNs.keySet()); } domains.removeAll(excludedBaseDNs); return domains; } /** * Validate that provided state is coherent with this replication server, * when ignoring the provided set of DNs. * <p> * The state is coherent if and only if it exactly has the set of DNs corresponding to * the replication domains. * * @param state * The multi domain state (cookie) to validate. * @param ignoredBaseDNs * The set of DNs to ignore when validating * @throws DirectoryException * If the state is not valid */ public void validateServerState(MultiDomainServerState state, Set<DN> ignoredBaseDNs) throws DirectoryException { // TODO : should skip unused domains, where domain.getLatestServerState(); is empty final Set<DN> domains = getDomainDNs(ignoredBaseDNs); final Set<DN> stateDomains = state.getSnapshot().keySet(); final Set<DN> domainsCopy = new HashSet<DN>(domains); final Set<DN> stateDomainsCopy = new HashSet<DN>(stateDomains); domainsCopy.removeAll(stateDomains); if (!domainsCopy.isEmpty()) { final StringBuilder missingDomains = new StringBuilder(); for (DN dn : domainsCopy) { missingDomains.append(dn).append(":;"); } throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get( missingDomains, "<" + state.toString() + missingDomains + ">")); } stateDomainsCopy.removeAll(domains); if (!stateDomainsCopy.isEmpty()) { final StringBuilder startState = new StringBuilder(); for (DN dn : domains) { startState.append(dn).append(":").append(state.getServerState(dn).toString()).append(";"); } throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM, ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get( stateDomainsCopy.toString(), startState)); } } /** * Get the ReplicationServerDomain associated to the base DN given in * parameter. @@ -698,7 +834,9 @@ domain.shutdown(); } // TODO : switch to second method when changelog backend is branched shutdownECL(); //shutdownExternalChangelog(); try { opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -25,6 +25,8 @@ */ package org.opends.server.replication.server.changelog.api; import java.util.Set; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; @@ -115,6 +117,32 @@ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy) throws ChangelogException; /** * Generates a {@link DBCursor} across all the domains starting at or after * the provided {@link MultiDomainServerState} for each domain, excluding a * provided set of domain DNs. * <p> * When the cursor is not used anymore, client code MUST call the * {@link DBCursor#close()} method to free the resources and locks used by the * cursor. * * @param startState * Starting point for each domain cursor. If any {@link ServerState} * for a domain is null, then start from the oldest CSN for each * replicaDBs * @param positionStrategy * Cursor position strategy, which allow to indicates at which exact * position the cursor must start * @param excludedDomainDns * Every domain appearing in this set is excluded from the cursor * @return a non null {@link DBCursor} * @throws ChangelogException * If a database problem happened * @see #getCursorFrom(DN, ServerState, PositionStrategy) */ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException; // serverId methods /** opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -25,6 +25,8 @@ */ package org.opends.server.replication.server.changelog.file; import java.util.Set; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; @@ -98,6 +100,15 @@ /** {@inheritDoc} */ @Override public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException { throw new RuntimeException("Not implemented"); } /** {@inheritDoc} */ @Override public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy) throws ChangelogException { opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLEnabledDomainPredicate.java
@@ -32,7 +32,7 @@ * * @FunctionalInterface */ class ECLEnabledDomainPredicate public class ECLEnabledDomainPredicate { /** opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -33,7 +33,7 @@ * Multi domain DB cursor that only returns updates for the domains which have * been enabled for the external changelog. */ class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> public final class ECLMultiDomainDBCursor implements DBCursor<UpdateMsg> { private final ECLEnabledDomainPredicate predicate; opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -701,11 +701,22 @@ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final PositionStrategy positionStrategy) throws ChangelogException { final Set<DN> excludedDomainDns = Collections.emptySet(); return getCursorFrom(startState, positionStrategy, excludedDomainDns); } /** {@inheritDoc} */ @Override public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState, final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException { final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy); registeredMultiDomainCursors.add(cursor); for (DN baseDN : domainToReplicaDBs.keySet()) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); if (!excludedDomainDns.contains(baseDN)) { cursor.addDomain(baseDN, startState.getServerState(baseDN)); } } return cursor; } opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/MonitorTest.java
@@ -39,6 +39,8 @@ import org.opends.server.replication.plugin.LDAPReplicationDomain; import org.opends.server.replication.protocol.AddMsg; import org.opends.server.replication.protocol.ReplicationMsg; import org.opends.server.replication.server.changelog.je.ECLEnabledDomainPredicate; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.replication.service.ReplicationBroker; import org.opends.server.tools.LDAPSearch; import org.opends.server.types.Attribute; @@ -170,7 +172,15 @@ ReplServerFakeConfiguration conf = new ReplServerFakeConfiguration(chPort, chDir, replicationDbImplementation, 0, changelogId, 0, 100, servers); ReplicationServer replicationServer = new ReplicationServer(conf); final DN testBaseDN = this.baseDN; ReplicationServer replicationServer = new ReplicationServer(conf, new DSRSShutdownSync(), new ECLEnabledDomainPredicate() { @Override public boolean isECLEnabledDomain(DN baseDN) { return testBaseDN.equals(baseDN); } }); Thread.sleep(1000); return replicationServer;