| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Portions Copyright 2006-2007 Sun Microsystems, Inc. |
| | | * Portions Copyright 2006-2008 Sun Microsystems, Inc. |
| | | */ |
| | | package org.opends.server.replication.plugin; |
| | | |
| | | import java.util.ArrayList; |
| | | import static org.opends.server.replication.plugin. |
| | | ReplicationRepairRequestControl.*; |
| | | |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | |
| | | import org.opends.messages.Message; |
| | | import org.opends.server.admin.server.ConfigurationAddListener; |
| | | import org.opends.server.admin.server.ConfigurationChangeListener; |
| | | import org.opends.server.admin.server.ConfigurationDeleteListener; |
| | | import org.opends.server.admin.std.server.ReplicationDomainCfg; |
| | | import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg; |
| | |
| | | extends SynchronizationProvider<ReplicationSynchronizationProviderCfg> |
| | | implements ConfigurationAddListener<ReplicationDomainCfg>, |
| | | ConfigurationDeleteListener<ReplicationDomainCfg>, |
| | | ConfigurationChangeListener |
| | | <ReplicationSynchronizationProviderCfg>, |
| | | BackupTaskListener, RestoreTaskListener, ImportTaskListener, |
| | | ExportTaskListener |
| | | { |
| | |
| | | private static Map<DN, ReplicationDomain> domains = |
| | | new HashMap<DN, ReplicationDomain>() ; |
| | | |
| | | /** |
| | | * The queue of received update messages, to be treated by the ReplayThread |
| | | * threads. |
| | | */ |
| | | private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue = |
| | | new LinkedBlockingQueue<UpdateToReplay>(); |
| | | |
| | | /** |
| | | * The list of ReplayThread threads. |
| | | */ |
| | | private static List<ReplayThread> replayThreads = |
| | | new ArrayList<ReplayThread>(); |
| | | |
| | | /** |
| | | * The configurable number of replay threads. |
| | | */ |
| | | private static int replayThreadNumber = 10; |
| | | |
| | | /** |
| | | * Finds the domain for a given DN. |
| | |
| | | throws ConfigException |
| | | { |
| | | ReplicationDomain domain; |
| | | domain = new ReplicationDomain(configuration); |
| | | domain = new ReplicationDomain(configuration, updateToReplayQueue); |
| | | |
| | | if (domains.size() == 0) |
| | | { |
| | | /* |
| | | * Create the threads that will process incoming update messages |
| | | */ |
| | | createReplayThreads(); |
| | | } |
| | | |
| | | domains.put(domain.getBaseDN(), domain); |
| | | domain.start(); |
| | | return domain; |
| | |
| | | public static void deleteDomain(DN dn) |
| | | { |
| | | ReplicationDomain domain = domains.remove(dn); |
| | | |
| | | // No replay threads running if no replication need |
| | | if (domains.size() == 0) { |
| | | stopReplayThreads(); |
| | | } |
| | | |
| | | if (domain != null) |
| | | domain.shutdown(); |
| | | } |
| | |
| | | configuration.addReplicationDomainAddListener(this); |
| | | configuration.addReplicationDomainDeleteListener(this); |
| | | |
| | | // Register as a root configuration listener so that we can be notified if |
| | | // number of replay threads is changed and apply changes. |
| | | configuration.addReplicationChangeListener(this); |
| | | |
| | | replayThreadNumber = configuration.getNumUpdateReplayThreads(); |
| | | |
| | | // Create the list of domains that are already defined. |
| | | for (String name : configuration.listReplicationDomains()) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Create the threads that will wait for incoming update messages. |
| | | */ |
| | | private synchronized static void createReplayThreads() |
| | | { |
| | | replayThreads.clear(); |
| | | |
| | | for (int i = 0; i < replayThreadNumber; i++) |
| | | { |
| | | ReplayThread replayThread = new ReplayThread(updateToReplayQueue); |
| | | replayThread.start(); |
| | | replayThreads.add(replayThread); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Stope the threads that are waiting for incoming update messages. |
| | | */ |
| | | private synchronized static void stopReplayThreads() |
| | | { |
| | | // stop the replay threads |
| | | for (ReplayThread replayThread : replayThreads) |
| | | { |
| | | replayThread.shutdown(); |
| | | } |
| | | |
| | | for (ReplayThread replayThread : replayThreads) |
| | | { |
| | | replayThread.waitForShutdown(); |
| | | } |
| | | replayThreads.clear(); |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean isConfigurationAddAcceptable( |
| | |
| | | @Override |
| | | public void finalizeSynchronizationProvider() |
| | | { |
| | | // Stop replay threads |
| | | stopReplayThreads(); |
| | | |
| | | // shutdown all the domains |
| | | for (ReplicationDomain domain : domains.values()) |
| | | { |
| | |
| | | { |
| | | return replicationServerListener; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public boolean |
| | | isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg |
| | | configuration, |
| | | List<Message> unacceptableReasons) |
| | | { |
| | | return true; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public ConfigChangeResult |
| | | applyConfigurationChange |
| | | (ReplicationSynchronizationProviderCfg configuration) |
| | | { |
| | | int numUpdateRepayThread = configuration.getNumUpdateReplayThreads(); |
| | | |
| | | // Stop threads then restart new number of threads |
| | | stopReplayThreads(); |
| | | replayThreadNumber = numUpdateRepayThread; |
| | | if (domains.size() > 0) |
| | | { |
| | | createReplayThreads(); |
| | | } |
| | | |
| | | return new ConfigChangeResult(ResultCode.SUCCESS, false); |
| | | } |
| | | } |
| | | |
| | | |