/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.plugin; import java.util.*; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicReference; import org.opends.messages.Message; import org.opends.server.admin.server.ConfigurationAddListener; import org.opends.server.admin.server.ConfigurationChangeListener; import org.opends.server.admin.server.ConfigurationDeleteListener; import org.opends.server.admin.std.server.ReplicationDomainCfg; import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; import org.opends.server.api.*; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.service.DSRSShutdownSync; import org.opends.server.types.*; import org.opends.server.types.operation.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.replication.plugin.ReplicationRepairRequestControl.*; import static org.opends.server.util.StaticUtils.*; /** * This class is used to load the Replication code inside the JVM * and to trigger initialization of the replication. * * It also extends the SynchronizationProvider class in order to have some * replication code running during the operation process * as pre-op, conflictResolution, and post-op. */ public class MultimasterReplication extends SynchronizationProvider implements ConfigurationAddListener, ConfigurationDeleteListener, ConfigurationChangeListener , BackupTaskListener, RestoreTaskListener, ImportTaskListener, ExportTaskListener { private ReplicationServerListener replicationServerListener; private static final Map domains = new ConcurrentHashMap(4); private static final DSRSShutdownSync dsrsShutdownSync = new DSRSShutdownSync(); /** * The queue of received update messages, to be treated by the ReplayThread * threads. */ private static final BlockingQueue updateToReplayQueue = new LinkedBlockingQueue(10000); /** * The list of ReplayThread threads. */ private static final List replayThreads = new ArrayList(); /** * The configurable number of replay threads. */ private static int replayThreadNumber = 10; /** * enum that symbolizes the state of the multimaster replication. */ private static enum State { STARTING, RUNNING, STOPPING } private static final AtomicReference state = new AtomicReference(State.STARTING); /** * The configurable connection/handshake timeout. */ private static volatile int connectionTimeoutMS = 5000; /** * Finds the domain for a given DN. * * @param dn The DN for which the domain must be returned. * @param pluginOp An optional operation for which the check is done. * Can be null is the request has no associated operation. * @return The domain for this DN. */ public static LDAPReplicationDomain findDomain(DN dn, PluginOperation pluginOp) { /* * Don't run the special replication code on Operation that are * specifically marked as don't synchronize. */ if (pluginOp instanceof Operation) { final Operation op = (Operation) pluginOp; if (op.dontSynchronize()) { return null; } /* * Check if the provided operation is a repair operation and set the * synchronization flags if necessary. * The repair operations are tagged as synchronization operations so * that the core server let the operation modify the entryuuid and * ds-sync-hist attributes. * They are also tagged as dontSynchronize so that the replication code * running later do not generate CSN, solve conflicts and forward the * operation to the replication server. */ final List controls = op.getRequestControls(); for (Iterator iter = controls.iterator(); iter.hasNext();) { Control c = iter.next(); if (OID_REPLICATION_REPAIR_CONTROL.equals(c.getOID())) { op.setSynchronizationOperation(true); op.setDontSynchronize(true); /* remove this control from the list of controls since it has now been processed and the local backend will fail if it finds a control that it does not know about and that is marked as critical. */ iter.remove(); return null; } } } LDAPReplicationDomain domain = null; DN temp = dn; while (domain == null && temp != null) { domain = domains.get(temp); temp = temp.getParentDNInSuffix(); } return domain; } /** * Creates a new domain from its configEntry, do the * necessary initialization and starts it so that it is * fully operational when this method returns. * @param configuration The entry with the configuration of this domain. * @return The domain created. * @throws ConfigException When the configuration is not valid. */ public static LDAPReplicationDomain createNewDomain( ReplicationDomainCfg configuration) throws ConfigException { try { final LDAPReplicationDomain domain = new LDAPReplicationDomain( configuration, updateToReplayQueue, dsrsShutdownSync); if (domains.size() == 0) { // Create the threads that will process incoming update messages createReplayThreads(); } domains.put(domain.getBaseDN(), domain); return domain; } catch (ConfigException e) { logError(ERR_COULD_NOT_START_REPLICATION.get( configuration.dn().toString(), e.getLocalizedMessage() + " " + stackTraceToSingleLineString(e))); } return null; } /** * Creates a new domain from its configEntry, do the necessary initialization * and starts it so that it is fully operational when this method returns. It * is only used for tests so far. * * @param configuration The entry with the configuration of this domain. * @param queue The BlockingQueue that this domain will use. * * @return The domain created. * * @throws ConfigException When the configuration is not valid. */ static LDAPReplicationDomain createNewDomain( ReplicationDomainCfg configuration, BlockingQueue queue) throws ConfigException { final LDAPReplicationDomain domain = new LDAPReplicationDomain(configuration, queue, dsrsShutdownSync); domains.put(domain.getBaseDN(), domain); return domain; } /** * Deletes a domain. * @param dn : the base DN of the domain to delete. */ public static void deleteDomain(DN dn) { LDAPReplicationDomain domain = domains.remove(dn); if (domain != null) { domain.delete(); } // No replay threads running if no replication need if (domains.size() == 0) { stopReplayThreads(); } } /** {@inheritDoc} */ @Override public void initializeSynchronizationProvider( ReplicationSynchronizationProviderCfg cfg) throws ConfigException { domains.clear(); replicationServerListener = new ReplicationServerListener(cfg, dsrsShutdownSync); // Register as an add and delete listener with the root configuration so we // can be notified if Multimaster domain entries are added or removed. cfg.addReplicationDomainAddListener(this); cfg.addReplicationDomainDeleteListener(this); // Register as a root configuration listener so that we can be notified if // number of replay threads is changed and apply changes. cfg.addReplicationChangeListener(this); replayThreadNumber = cfg.getNumUpdateReplayThreads(); connectionTimeoutMS = (int) Math.min(cfg.getConnectionTimeout(), Integer.MAX_VALUE); // Create the list of domains that are already defined. for (String name : cfg.listReplicationDomains()) { createNewDomain(cfg.getReplicationDomain(name)); } // If any schema changes were made with the server offline, then handle them now. List offlineSchemaChanges = DirectoryServer.getOfflineSchemaChanges(); if (offlineSchemaChanges != null && !offlineSchemaChanges.isEmpty()) { processSchemaChange(offlineSchemaChanges); } DirectoryServer.registerBackupTaskListener(this); DirectoryServer.registerRestoreTaskListener(this); DirectoryServer.registerExportTaskListener(this); DirectoryServer.registerImportTaskListener(this); DirectoryServer.registerSupportedControl( ReplicationRepairRequestControl.OID_REPLICATION_REPAIR_CONTROL); } /** * Create the threads that will wait for incoming update messages. */ private static synchronized void createReplayThreads() { replayThreads.clear(); for (int i = 0; i < replayThreadNumber; i++) { ReplayThread replayThread = new ReplayThread(updateToReplayQueue); replayThread.start(); replayThreads.add(replayThread); } } /** * Stop the threads that are waiting for incoming update messages. */ private static synchronized void stopReplayThreads() { // stop the replay threads for (ReplayThread replayThread : replayThreads) { replayThread.shutdown(); } for (ReplayThread replayThread : replayThreads) { try { replayThread.join(); } catch(InterruptedException e) { Thread.currentThread().interrupt(); } } replayThreads.clear(); } /** {@inheritDoc} */ @Override public boolean isConfigurationAddAcceptable( ReplicationDomainCfg configuration, List unacceptableReasons) { return LDAPReplicationDomain.isConfigurationAcceptable( configuration, unacceptableReasons); } /** {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationAdd( ReplicationDomainCfg configuration) { try { LDAPReplicationDomain rd = createNewDomain(configuration); if (State.RUNNING.equals(state.get())) { rd.start(); if (State.STOPPING.equals(state.get())) { rd.shutdown(); } } return new ConfigChangeResult(ResultCode.SUCCESS, false); } catch (ConfigException e) { // we should never get to this point because the configEntry has // already been validated in isConfigurationAddAcceptable() return new ConfigChangeResult(ResultCode.CONSTRAINT_VIOLATION, false); } } /** {@inheritDoc} */ @Override public void doPostOperation(PostOperationAddOperation addOperation) { DN dn = addOperation.getEntryDN(); genericPostOperation(addOperation, dn); } /** {@inheritDoc} */ @Override public void doPostOperation(PostOperationDeleteOperation deleteOperation) { DN dn = deleteOperation.getEntryDN(); genericPostOperation(deleteOperation, dn); } /** {@inheritDoc} */ @Override public void doPostOperation(PostOperationModifyDNOperation modifyDNOperation) { DN dn = modifyDNOperation.getEntryDN(); genericPostOperation(modifyDNOperation, dn); } /** {@inheritDoc} */ @Override public void doPostOperation(PostOperationModifyOperation modifyOperation) { DN dn = modifyOperation.getEntryDN(); genericPostOperation(modifyOperation, dn); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( PreOperationModifyOperation modifyOperation) { LDAPReplicationDomain domain = findDomain(modifyOperation.getEntryDN(), modifyOperation); if (domain != null) { return domain.handleConflictResolution(modifyOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( PreOperationAddOperation addOperation) throws DirectoryException { LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); if (domain != null) { return domain.handleConflictResolution(addOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( PreOperationDeleteOperation deleteOperation) throws DirectoryException { LDAPReplicationDomain domain = findDomain(deleteOperation.getEntryDN(), deleteOperation); if (domain != null) { return domain.handleConflictResolution(deleteOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult handleConflictResolution( PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException { LDAPReplicationDomain domain = findDomain(modifyDNOperation.getEntryDN(), modifyDNOperation); if (domain != null) { return domain.handleConflictResolution(modifyDNOperation); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation(PreOperationModifyOperation modifyOperation) { DN operationDN = modifyOperation.getEntryDN(); LDAPReplicationDomain domain = findDomain(operationDN, modifyOperation); if (domain == null || !domain.solveConflict()) { return new SynchronizationProviderResult.ContinueProcessing(); } EntryHistorical historicalInformation = (EntryHistorical) modifyOperation.getAttachment(EntryHistorical.HISTORICAL); if (historicalInformation == null) { Entry entry = modifyOperation.getModifiedEntry(); historicalInformation = EntryHistorical.newInstanceFromEntry(entry); modifyOperation.setAttachment(EntryHistorical.HISTORICAL, historicalInformation); } historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); historicalInformation.setHistoricalAttrToOperation(modifyOperation); if (modifyOperation.getModifications().isEmpty()) { /* * This operation becomes a no-op due to conflict resolution * stop the processing and send an OK result */ return new SynchronizationProviderResult.StopProcessing( ResultCode.SUCCESS, null); } return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation( PreOperationDeleteOperation deleteOperation) throws DirectoryException { return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation( PreOperationModifyDNOperation modifyDNOperation) throws DirectoryException { DN operationDN = modifyDNOperation.getEntryDN(); LDAPReplicationDomain domain = findDomain(operationDN, modifyDNOperation); if (domain == null || !domain.solveConflict()) { return new SynchronizationProviderResult.ContinueProcessing(); } // The historical object is retrieved from the attachment created // in the HandleConflictResolution phase. EntryHistorical historicalInformation = (EntryHistorical) modifyDNOperation.getAttachment(EntryHistorical.HISTORICAL); if (historicalInformation == null) { // When no Historical attached, create once by loading from the entry // and attach it to the operation Entry entry = modifyDNOperation.getUpdatedEntry(); historicalInformation = EntryHistorical.newInstanceFromEntry(entry); modifyDNOperation.setAttachment(EntryHistorical.HISTORICAL, historicalInformation); } historicalInformation.setPurgeDelay(domain.getHistoricalPurgeDelay()); // Add to the operation the historical attribute : "dn:changeNumber:moddn" historicalInformation.setHistoricalAttrToOperation(modifyDNOperation); return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public SynchronizationProviderResult doPreOperation( PreOperationAddOperation addOperation) { // Check replication domain LDAPReplicationDomain domain = findDomain(addOperation.getEntryDN(), addOperation); if (domain == null) { return new SynchronizationProviderResult.ContinueProcessing(); } // For LOCAL op only, generate CSN and attach Context if (!addOperation.isSynchronizationOperation()) { domain.doPreOperation(addOperation); } // Add to the operation the historical attribute : "dn:changeNumber:add" EntryHistorical.setHistoricalAttrToOperation(addOperation); return new SynchronizationProviderResult.ContinueProcessing(); } /** {@inheritDoc} */ @Override public void finalizeSynchronizationProvider() { setState(State.STOPPING); for (LDAPReplicationDomain domain : domains.values()) { domain.shutdown(); } domains.clear(); stopReplayThreads(); if (replicationServerListener != null) { replicationServerListener.shutdown(); } DirectoryServer.deregisterBackupTaskListener(this); DirectoryServer.deregisterRestoreTaskListener(this); DirectoryServer.deregisterExportTaskListener(this); DirectoryServer.deregisterImportTaskListener(this); } /** * This method is called whenever the server detects a modification * of the schema done by directly modifying the backing files * of the schema backend. * Call the schema Domain if it exists. * * @param modifications The list of modifications that was * applied to the schema. * */ @Override public void processSchemaChange(List modifications) { LDAPReplicationDomain domain = findDomain(DirectoryServer.getSchemaDN(), null); if (domain != null) { domain.synchronizeModifications(modifications); } } /** {@inheritDoc} */ @Override public void processBackupBegin(Backend backend, BackupConfig config) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupStart(); } } } /** {@inheritDoc} */ @Override public void processBackupEnd(Backend backend, BackupConfig config, boolean successful) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupEnd(); } } } /** {@inheritDoc} */ @Override public void processRestoreBegin(Backend backend, RestoreConfig config) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.disable(); } } } /** {@inheritDoc} */ @Override public void processRestoreEnd(Backend backend, RestoreConfig config, boolean successful) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.enable(); } } } /** {@inheritDoc} */ @Override public void processImportBegin(Backend backend, LDIFImportConfig config) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.disable(); } } } /** {@inheritDoc} */ @Override public void processImportEnd(Backend backend, LDIFImportConfig config, boolean successful) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.enable(); } } } /** {@inheritDoc} */ @Override public void processExportBegin(Backend backend, LDIFExportConfig config) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupStart(); } } } /** {@inheritDoc} */ @Override public void processExportEnd(Backend backend, LDIFExportConfig config, boolean successful) { for (DN dn : backend.getBaseDNs()) { LDAPReplicationDomain domain = findDomain(dn, null); if (domain != null) { domain.backupEnd(); } } } /** {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationDelete( ReplicationDomainCfg configuration) { deleteDomain(configuration.getBaseDN()); return new ConfigChangeResult(ResultCode.SUCCESS, false); } /** {@inheritDoc} */ @Override public boolean isConfigurationDeleteAcceptable( ReplicationDomainCfg configuration, List unacceptableReasons) { return true; } /** * Generic code for all the postOperation entry point. * * @param operation The Operation for which the post-operation is called. * @param dn The Dn for which the post-operation is called. */ private void genericPostOperation(PostOperationOperation operation, DN dn) { LDAPReplicationDomain domain = findDomain(dn, operation); if (domain != null) { domain.synchronize(operation); } } /** * Returns the replication server listener associated to that Multimaster * Replication. * @return the listener. */ public ReplicationServerListener getReplicationServerListener() { return replicationServerListener; } /** {@inheritDoc} */ @Override public boolean isConfigurationChangeAcceptable( ReplicationSynchronizationProviderCfg configuration, List unacceptableReasons) { return true; } /** {@inheritDoc} */ @Override public ConfigChangeResult applyConfigurationChange( ReplicationSynchronizationProviderCfg configuration) { int numUpdateRepayThread = configuration.getNumUpdateReplayThreads(); // Stop threads then restart new number of threads stopReplayThreads(); replayThreadNumber = numUpdateRepayThread; if (domains.size() > 0) { createReplayThreads(); } connectionTimeoutMS = (int) Math.min(configuration.getConnectionTimeout(), Integer.MAX_VALUE); return new ConfigChangeResult(ResultCode.SUCCESS, false); } /** {@inheritDoc} */ @Override public void completeSynchronizationProvider() { for (LDAPReplicationDomain domain : domains.values()) { domain.start(); } setState(State.RUNNING); } private void setState(State newState) { state.set(newState); synchronized (state) { state.notifyAll(); } } /** * Gets the number of handled domain objects. * @return The number of handled domain objects */ public static int getNumberOfDomains() { return domains.size(); } /** * Gets the Set of baseDN of the domains which are disabled for the external * changelog. * * @return The Set of baseDNs which are disabled for the external changelog. */ public static Set getECLDisabledDomains() { final Set disabledBaseDNs = new HashSet(domains.size()); for (LDAPReplicationDomain domain : domains.values()) { if (!domain.isECLEnabled()) { disabledBaseDNs.add(domain.getBaseDN().toNormalizedString()); } } return disabledBaseDNs; } /** * Returns whether the provided baseDN represents a replication domain enabled * for the external changelog. * * @param baseDN * the replication domain to check * @return true if the provided baseDN is enabled for the external changelog, * false if the provided baseDN is disabled for the external changelog * or unknown to multimaster replication. */ public static boolean isECLEnabledDomain(DN baseDN) { if (State.STARTING.equals(state.get())) { synchronized (state) { while (State.STARTING.equals(state.get())) { try { state.wait(); } catch (InterruptedException ignored) { // loop and check state again } } } } // if state is STOPPING, then we need to return from this method for (LDAPReplicationDomain domain : domains.values()) { if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN)) { return true; } } return false; } /** * Returns the connection timeout in milli-seconds. * * @return The connection timeout in milli-seconds. */ public static int getConnectionTimeoutMS() { return connectionTimeoutMS; } }