mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.33.2014 08727ec559e7018316d3b25b04a828ee0a127b2a
OPENDJ-1262 NPE in ChangeNumberIndex during server startup

Code review: Matthew Swift

Caused by r10049.
Problem is down to the initialization sequence:
1. thread 1 - MultimasterReplication.initializeSynchronizationProvider()
1.1. it creates the ReplicationServerListener
1.1.1. the ReplicationServerListener in turn creates the ReplicationServer
1.1.1.1. the ReplicationServer in turn creates the ChangelogDB
1.1.1.1.1. the ChangelogDB in turn creates the ChangeNumberIndexer thread and STARTs it
1.1.1.1.1. the ChangelogDB starts the ChangeNumberIndexer thread
1.2. it proceeds with creating the LDAPReplicationDomain objects one by one
2. thread 2 - ChangeNumberIndexer.run()
2.1. it calls ChangeNumberIndexer.initialize()
2.1.1. ChangeNumberIndexer.initialize() calls MultimasterReplication.isECLEnabledDomain(baseDN)

Steps 1.2. and 2.1.1. are running concurrently.
If 2.1.1. is run before 1.2. is completed, In ChangeNumberIndexer.initialize():
1) MultimasterReplication.isECLEnabledDomain(baseDN) returns false, hence a cursor to the relevant replica DBs is not created
2) then the call to nextChangeForInsertDBCursor.getRecord() returns null, later throwing a NullPointerException because the ChangeNumberIndexer thread is in an illegal state: it was expecting to find an UpdateMsg with the correct CSN stamped on it.



MultimasterReplication.java:
Added State enum + state instance member to tell whether MultimasterReplication is ready for work.
Removed isRegistered instance member superseded by state instance member.
In isECLEnabledDomain(), completeSynchronizationProvider() and finalizeSynchronizationProvider(), deal with thread waits.

DomainFakeCfg.java:
Implemented toString().
2 files modified
85 ■■■■■ changed files
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java 59 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java 26 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/MultimasterReplication.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -30,6 +30,7 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.opends.messages.Message;
import org.opends.server.admin.server.ConfigurationAddListener;
@@ -67,7 +68,7 @@
                  ExportTaskListener
{
  private ReplicationServerListener replicationServerListener = null;
  private static Map<DN, LDAPReplicationDomain> domains =
  private static final Map<DN, LDAPReplicationDomain> domains =
    new ConcurrentHashMap<DN, LDAPReplicationDomain>(4) ;
  /**
@@ -80,7 +81,7 @@
  /**
   * The list of ReplayThread threads.
   */
  private static List<ReplayThread> replayThreads =
  private static final List<ReplayThread> replayThreads =
    new ArrayList<ReplayThread>();
  /**
@@ -88,7 +89,16 @@
   */
  private static int replayThreadNumber = 10;
  private boolean isRegistered = false;
  /**
   * enum that symbolizes the state of the multimaster replication.
   */
  private static enum State
  {
    STARTING, RUNNING, STOPPING
  }
  private static final AtomicReference<State> state =
      new AtomicReference<State>(State.STARTING);
  /**
   * The configurable connection/handshake timeout.
@@ -338,9 +348,12 @@
    try
    {
      LDAPReplicationDomain rd = createNewDomain(configuration);
      if (isRegistered)
      if (State.RUNNING.equals(state.get()))
      {
        rd.start();
        if (State.STOPPING.equals(state.get())) {
          rd.shutdown();
        }
      }
      return new ConfigChangeResult(ResultCode.SUCCESS, false);
    } catch (ConfigException e)
@@ -540,19 +553,16 @@
  @Override
  public void finalizeSynchronizationProvider()
  {
    isRegistered = false;
    setState(State.STOPPING);
    // shutdown all the domains
    for (LDAPReplicationDomain domain : domains.values())
    {
      domain.shutdown();
    }
    domains.clear();
    // Stop replay threads
    stopReplayThreads();
    // shutdown the ReplicationServer Service if necessary
    if (replicationServerListener != null)
      replicationServerListener.shutdown();
@@ -757,13 +767,20 @@
  @Override
  public void completeSynchronizationProvider()
  {
    isRegistered = true;
    // start all the domains
    for (LDAPReplicationDomain domain : domains.values())
    {
      domain.start();
    }
    setState(State.RUNNING);
  }
  private void setState(State newState)
  {
    state.set(newState);
    synchronized (state)
    {
      state.notifyAll();
    }
  }
  /**
@@ -806,6 +823,24 @@
   */
  public static boolean isECLEnabledDomain(DN baseDN)
  {
    if (State.STARTING.equals(state.get()))
    {
      synchronized (state)
      {
        while (State.STARTING.equals(state.get()))
        {
          try
          {
            state.wait();
          }
          catch (InterruptedException ignored)
          {
            // loop and check state again
          }
        }
      }
    }
    // if state is STOPPING, then we need to return from this method
    for (LDAPReplicationDomain domain : domains.values())
    {
      if (domain.isECLEnabled() && domain.getBaseDN().equals(baseDN))
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/plugin/DomainFakeCfg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2007-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.plugin;
@@ -389,14 +389,11 @@
   * @throws ConfigException
   *          If the add listener could not be registered.
   */
  public
  void addECLDomainAddListener(
  public void addECLDomainAddListener(
      ConfigurationAddListener<ExternalChangelogDomainCfg> listener)
  throws ConfigException
  {}
  /**
   * Deregisters an existing ECL Domain configuration add listener.
   *
@@ -404,12 +401,9 @@
   *          The ECL Domain configuration add listener.
   */
  public void removeECLDomainAddListener(
      ConfigurationAddListener<ExternalChangelogDomainCfg>
  listener)
      ConfigurationAddListener<ExternalChangelogDomainCfg> listener)
  {}
  /**
   * Registers to be notified the ECL Domain is deleted.
   *
@@ -418,22 +412,18 @@
   * @throws ConfigException
   *          If the delete listener could not be registered.
   */
  public void
  addECLDomainDeleteListener(
  public void addECLDomainDeleteListener(
      ConfigurationDeleteListener<ExternalChangelogDomainCfg> listener)
  throws ConfigException
  {}
  /**
   * Deregisters an existing ECL Domain configuration delete listener.
   *
   * @param listener
   *          The ECL Domain configuration delete listener.
   */
  public void
  removeECLDomainDeleteListener(
  public void removeECLDomainDeleteListener(
      ConfigurationDeleteListener<ExternalChangelogDomainCfg> listener)
  {}
@@ -457,4 +447,10 @@
    return 1440;
  }
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ", baseDN=" + baseDN + ", serverId="
        + serverId + ", replicationServers=" + replicationServers;
  }
}