mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.00.2014 5106a9c251d6bb6896af88a1b156ee6114df49f7
OPENDJ-1262 NPE in ChangeNumberIndex during server startup

Code review: Matthew Swift

Caused by r10049.
Problem is down to the initialization sequence:
1. thread 1 - MultimasterReplication.initializeSynchronizationProvider()
1.1. it creates the ReplicationServerListener
1.1.1. the ReplicationServerListener in turn creates the ReplicationServer
1.1.1.1. the ReplicationServer in turn creates the ChangelogDB
1.1.1.1.1. the ChangelogDB in turn creates the ChangeNumberIndexer thread and STARTs it
1.1.1.1.1. the ChangelogDB starts the ChangeNumberIndexer thread
1.2. it proceeds with creating the LDAPReplicationDomain objects one by one
2. thread 2 - ChangeNumberIndexer.run()
2.1. it calls ChangeNumberIndexer.initialize()
2.1.1. ChangeNumberIndexer.initialize() calls MultimasterReplication.isECLEnabledDomain(baseDN)

Steps 1.2. and 2.1.1. are running concurrently.
If 2.1.1. is run before 1.2. is completed, In ChangeNumberIndexer.initialize():
1) MultimasterReplication.isECLEnabledDomain(baseDN) returns false, hence a cursor to the relevant replica DBs is not created
2) then the call to nextChangeForInsertDBCursor.getRecord() returns null, later throwing a NullPointerException because the ChangeNumberIndexer thread is in an illegal state: it was expecting to find an UpdateMsg with the correct CSN stamped on it.



MultimasterReplication.java:
Added State enum + state instance member to tell whether MultimasterReplication is ready for work.
Removed isRegistered instance member superseded by state instance member.
In isECLEnabledDomain(), completeSynchronizationProvider() and finalizeSynchronizationProvider(), deal with thread waits.

DomainFakeCfg.java:
Implemented toString().
2 files modified
85 ■■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 57 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java 28 ●●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -30,6 +30,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.forgerock.i18n.LocalizableMessage;
import org.opends.server.admin.server.ConfigurationAddListener;
@@ -67,7 +68,7 @@
                  ExportTaskListener
{
  private ReplicationServerListener replicationServerListener = null;
  private static Map<DN, LDAPReplicationDomain> domains =
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
  /**
@@ -80,7 +81,7 @@
  /**
   * The list of ReplayThread threads.
   */
  private static List<ReplayThread> replayThreads =
  private static final List<ReplayThread> replayThreads =
    new ArrayList<ReplayThread>();
  /**
@@ -88,7 +89,16 @@
   */
  private static int replayThreadNumber = 10;
  private boolean isRegistered = false;
  /**
   * enum that symbolizes the state of the multimaster replication.
   */
  private static enum State
  {
    STARTING, RUNNING, STOPPING
  }
  private static final AtomicReference<State> state =
      new AtomicReference<State>(State.STARTING);
  /**
   * The configurable connection/handshake timeout.
@@ -338,9 +348,12 @@
    try
    {
      LDAPReplicationDomain rd = createNewDomain(configuration);
      if (isRegistered)
      if (State.RUNNING.equals(state.get()))
      {
        rd.start();
        if (State.STOPPING.equals(state.get())) {
          rd.shutdown();
        }
      }
      return new ConfigChangeResult(ResultCode.SUCCESS, false);
    } catch (ConfigException e)
@@ -540,19 +553,16 @@
  @Override
  public void finalizeSynchronizationProvider()
  {
    isRegistered = false;
    setState(State.STOPPING);
    // shutdown all the domains
    for (LDAPReplicationDomain domain : domains.values())
    {
      domain.shutdown();
    }
    domains.clear();
    // Stop replay threads
    stopReplayThreads();
    // shutdown the ReplicationServer Service if necessary
    if (replicationServerListener != null)
      replicationServerListener.shutdown();
@@ -757,13 +767,20 @@
  @Override
  public void completeSynchronizationProvider()
  {
    isRegistered = true;
    // start all the domains
    for (LDAPReplicationDomain domain : domains.values())
    {
      domain.start();
    }
    setState(State.RUNNING);
  }
  private void setState(State newState)
  {
    state.set(newState);
    synchronized (state)
    {
      state.notifyAll();
    }
  }
  /**
@@ -806,6 +823,24 @@
   */
  public static boolean isECLEnabledDomain(DN baseDN)
  {
    if (State.STARTING.equals(state.get()))
    {
      synchronized (state)
      {
        while (State.STARTING.equals(state.get()))
        {
          try
          {
            state.wait();
          }
          catch (InterruptedException ignored)
          {
            // loop and check state again
          }
        }
      }
    }
    // if state is STOPPING, then we need to return from this method
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -389,14 +389,11 @@
   * @throws ConfigException
   *          If the add listener could not be registered.
   */
  public
  void addECLDomainAddListener(
  public void addECLDomainAddListener(
      ConfigurationAddListener<ExternalChangelogDomainCfg> listener)
  throws ConfigException
      throws ConfigException
  {}
  /**
   * Deregisters an existing ECL Domain configuration add listener.
   *
@@ -404,12 +401,9 @@
   *          The ECL Domain configuration add listener.
   */
  public void removeECLDomainAddListener(
      ConfigurationAddListener<ExternalChangelogDomainCfg>
  listener)
      ConfigurationAddListener<ExternalChangelogDomainCfg> listener)
  {}
  /**
   * Registers to be notified the ECL Domain is deleted.
   *
@@ -418,22 +412,18 @@
   * @throws ConfigException
   *          If the delete listener could not be registered.
   */
  public void
  addECLDomainDeleteListener(
  public void addECLDomainDeleteListener(
      ConfigurationDeleteListener<ExternalChangelogDomainCfg> listener)
  throws ConfigException
      throws ConfigException
  {}
  /**
   * Deregisters an existing ECL Domain configuration delete listener.
   *
   * @param listener
   *          The ECL Domain configuration delete listener.
   */
  public void
  removeECLDomainDeleteListener(
  public void removeECLDomainDeleteListener(
      ConfigurationDeleteListener<ExternalChangelogDomainCfg> listener)
  {}
@@ -457,4 +447,10 @@
    return 1440;
  }
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ", baseDN=" + baseDN + ", serverId="
        + serverId + ", replicationServers=" + replicationServers;
  }
}