mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.00.2014 5106a9c251d6bb6896af88a1b156ee6114df49f7
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -30,6 +30,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.opends.server.admin.server.ConfigurationAddListener;
@@ -67,7 +68,7 @@
                  ExportTaskListener
{
  private ReplicationServerListener replicationServerListener = null;
  private static Map<DN, LDAPReplicationDomain> domains =
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
  /**
@@ -80,7 +81,7 @@
  /**
   * The list of ReplayThread threads.
   */
  private static List<ReplayThread> replayThreads =
  private static final List<ReplayThread> replayThreads =
    new ArrayList<ReplayThread>();
  /**
@@ -88,7 +89,16 @@
   */
  private static int replayThreadNumber = 10;
  private boolean isRegistered = false;
  /**
   * enum that symbolizes the state of the multimaster replication.
   */
  private static enum State
  {
    STARTING, RUNNING, STOPPING
  }
  private static final AtomicReference<State> state =
      new AtomicReference<State>(State.STARTING);
  /**
   * The configurable connection/handshake timeout.
@@ -338,9 +348,12 @@
    try
    {
      LDAPReplicationDomain rd = createNewDomain(configuration);
      if (isRegistered)
      if (State.RUNNING.equals(state.get()))
      {
        rd.start();
        if (State.STOPPING.equals(state.get())) {
          rd.shutdown();
        }
      }
      return new ConfigChangeResult(ResultCode.SUCCESS, false);
    } catch (ConfigException e)
@@ -540,19 +553,16 @@
  @Override
  public void finalizeSynchronizationProvider()
  {
    isRegistered = false;
    setState(State.STOPPING);
    // shutdown all the domains
    for (LDAPReplicationDomain domain : domains.values())
    {
      domain.shutdown();
    }
    domains.clear();
    // Stop replay threads
    stopReplayThreads();
    // shutdown the ReplicationServer Service if necessary
    if (replicationServerListener != null)
      replicationServerListener.shutdown();
@@ -757,13 +767,20 @@
  @Override
  public void completeSynchronizationProvider()
  {
    isRegistered = true;
    // start all the domains
    for (LDAPReplicationDomain domain : domains.values())
    {
      domain.start();
    }
    setState(State.RUNNING);
  }
  private void setState(State newState)
  {
    state.set(newState);
    synchronized (state)
    {
      state.notifyAll();
    }
  }
  /**
@@ -806,6 +823,24 @@
   */
  public static boolean isECLEnabledDomain(DN baseDN)
  {
    if (State.STARTING.equals(state.get()))
    {
      synchronized (state)
      {
        while (State.STARTING.equals(state.get()))
        {
          try
          {
            state.wait();
          }
          catch (InterruptedException ignored)
          {
            // loop and check state again
          }
        }
      }
    }
    // if state is STOPPING, then we need to return from this method
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))