mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

mrossign
18.18.2008 4c5133331fbc83cb3fd98f0c100104b7f9bf0359
opendj-sdk/opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,19 +22,22 @@
 * CDDL HEADER END
 *
 *
 *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
 *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
 */
package org.opends.server.replication.plugin;
import java.util.ArrayList;
import static org.opends.server.replication.plugin.
ReplicationRepairRequestControl.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.server.ConfigurationDeleteListener;
import org.opends.server.admin.std.server.ReplicationDomainCfg;
import org.opends.server.admin.std.server.ReplicationSynchronizationProviderCfg;
@@ -82,6 +85,8 @@
       extends SynchronizationProvider<ReplicationSynchronizationProviderCfg>
       implements ConfigurationAddListener<ReplicationDomainCfg>,
                  ConfigurationDeleteListener<ReplicationDomainCfg>,
                  ConfigurationChangeListener
                  <ReplicationSynchronizationProviderCfg>,
                  BackupTaskListener, RestoreTaskListener, ImportTaskListener,
                  ExportTaskListener
{
@@ -89,6 +94,23 @@
  private static Map<DN, ReplicationDomain> domains =
    new HashMap<DN, ReplicationDomain>() ;
  /**
   * The queue of received update messages, to be treated by the ReplayThread
   * threads.
   */
  private static LinkedBlockingQueue<UpdateToReplay> updateToReplayQueue =
    new LinkedBlockingQueue<UpdateToReplay>();
  /**
   * The list of ReplayThread threads.
   */
  private static List<ReplayThread> replayThreads =
    new ArrayList<ReplayThread>();
  /**
   * The configurable number of replay threads.
   */
  private static int replayThreadNumber = 10;
  /**
   * Finds the domain for a given DN.
@@ -167,7 +189,16 @@
      throws ConfigException
  {
    ReplicationDomain domain;
    domain = new ReplicationDomain(configuration);
    domain = new ReplicationDomain(configuration, updateToReplayQueue);
    if (domains.size() == 0)
    {
      /*
       * Create the threads that will process incoming update messages
       */
      createReplayThreads();
    }
    domains.put(domain.getBaseDN(), domain);
    domain.start();
    return domain;
@@ -180,6 +211,12 @@
  public static void deleteDomain(DN dn)
  {
    ReplicationDomain domain = domains.remove(dn);
    // No replay threads running if no replication need
    if (domains.size() == 0) {
      stopReplayThreads();
    }
    if (domain != null)
      domain.shutdown();
  }
@@ -200,6 +237,12 @@
    configuration.addReplicationDomainAddListener(this);
    configuration.addReplicationDomainDeleteListener(this);
    // Register as a root configuration listener so that we can be notified if
    // number of replay threads is changed and apply changes.
    configuration.addReplicationChangeListener(this);
    replayThreadNumber = configuration.getNumUpdateReplayThreads();
    //  Create the list of domains that are already defined.
    for (String name : configuration.listReplicationDomains())
    {
@@ -228,6 +271,39 @@
  }
  /**
   * Create the threads that will wait for incoming update messages.
   */
  private synchronized static void createReplayThreads()
  {
    replayThreads.clear();
    for (int i = 0; i < replayThreadNumber; i++)
    {
      ReplayThread replayThread = new ReplayThread(updateToReplayQueue);
      replayThread.start();
      replayThreads.add(replayThread);
    }
  }
  /**
   * Stope the threads that are waiting for incoming update messages.
   */
  private synchronized static void stopReplayThreads()
  {
    //  stop the replay threads
    for (ReplayThread replayThread : replayThreads)
    {
      replayThread.shutdown();
    }
    for (ReplayThread replayThread : replayThreads)
    {
      replayThread.waitForShutdown();
    }
    replayThreads.clear();
  }
  /**
   * {@inheritDoc}
   */
  public boolean isConfigurationAddAcceptable(
@@ -431,6 +507,9 @@
  @Override
  public void finalizeSynchronizationProvider()
  {
    // Stop replay threads
    stopReplayThreads();
    // shutdown all the domains
    for (ReplicationDomain domain : domains.values())
    {
@@ -621,6 +700,37 @@
  {
    return replicationServerListener;
  }
  /**
   * {@inheritDoc}
   */
  public boolean
    isConfigurationChangeAcceptable(ReplicationSynchronizationProviderCfg
    configuration,
    List<Message> unacceptableReasons)
  {
    return true;
  }
  /**
   * {@inheritDoc}
   */
  public ConfigChangeResult
    applyConfigurationChange
    (ReplicationSynchronizationProviderCfg configuration)
  {
    int numUpdateRepayThread = configuration.getNumUpdateReplayThreads();
    // Stop threads then restart new number of threads
    stopReplayThreads();
    replayThreadNumber = numUpdateRepayThread;
    if (domains.size() > 0)
    {
      createReplayThreads();
    }
    return new ConfigChangeResult(ResultCode.SUCCESS, false);
  }
}