mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
12.52.2013 c5ab3254dc665b820ca65ea919c5c2f5f5e0d110
CR-2563 JEChangelogDB removing a synchronized block in a heavily hit path

JEChangelogDB.java:
Used ConcurrentMap for domainToReplicaDBs.
Added shutdown field to better protect various code paths against shutdown.
In getOrCreateReplicaDB(), removed global synchronization block with fined grained synchronization code that is only used on JEReplicaDB creation.
Added getExistingOrNewDomainMap(), getExistingOrNewReplicaDB().
Extracted innerShutdownDomain() from shutdownDomain().
In shutdownDB(), ensured shutdown of all the replicaDBs.
In shutdownDB(), shutdownDomain() and getOrCreateReplicaDB() check whether the DB is shutting down.
In removeDomain() and innerShutdownDomain() ensured the domainMap is removed before shutting down the replicaDBs.

replication.properties:
Added an error message to be used when the DB is shutting down.
Extracted and separated method init() from JEReplicaDB ctor.

JEReplicaDB.java:
In ctor, renamed id parameter to serverId + removed queueSize parameter.

JEReplicaDBTest.java:
Consequence of the change to JEReplicaDB.
4 files modified
169 ■■■■ changed files
opends/src/messages/messages/replication.properties 2 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 148 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 15 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 4 ●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -527,3 +527,5 @@
NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
 for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
SEVERE_ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN_235=Could not \
 create replica database because the changelog database is shutting down
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -165,9 +167,21 @@
  /**
   * This map contains the List of updates received from each LDAP server.
   * <p>
   * When removing a domainMap, code:
   * <ol>
   * <li>first get the domainMap</li>
   * <li>synchronized on the domainMap</li>
   * <li>remove the domainMap</li>
   * <li>then check it's not null</li>
   * <li>then close all inside</li>
   * </ol>
   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
   * concurrent shutdown.
   */
  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
      new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
      domainToReplicaDBs =
          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
  private ReplicationDbEnv dbEnv;
  private final String dbDirectoryName;
  private final File dbDirectory;
@@ -185,6 +199,7 @@
  /** The local replication server. */
  private final ReplicationServer replicationServer;
  private AtomicBoolean shutdown = new AtomicBoolean();
  private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
      new DBCursor<UpdateMsg>()
@@ -305,27 +320,83 @@
  Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
      int serverId, ReplicationServer rs) throws ChangelogException
  {
    synchronized (domainToReplicaDBs)
    while (!shutdown.get())
    {
      Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
      if (domainMap == null)
      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
          getExistingOrNewDomainMap(baseDN);
      final Pair<JEReplicaDB, Boolean> result =
          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
      if (result != null)
      {
        domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
        domainToReplicaDBs.put(baseDN, domainMap);
        return result;
      }
    }
    throw new ChangelogException(
        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
      }
      JEReplicaDB replicaDB = domainMap.get(serverId);
      if (replicaDB == null)
  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
      DN baseDN)
      {
        replicaDB =
            new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
        domainMap.put(serverId, replicaDB);
        return Pair.of(replicaDB, true);
      }
      return Pair.of(replicaDB, false);
    }
    // happy path: the domainMap already exists
    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
        domainToReplicaDBs.get(baseDN);
    if (currentValue != null)
    {
      return currentValue;
  }
    // unlucky, the domainMap does not exist: take the hit and create the
    // newValue, even though the same could be done concurrently by another
    // thread
    final ConcurrentMap<Integer, JEReplicaDB> newValue =
        new ConcurrentHashMap<Integer, JEReplicaDB>();
    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
    if (previousValue != null)
    {
      // there was already a value associated to the key, let's use it
      return previousValue;
    }
    return newValue;
  }
  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
      DN baseDN, ReplicationServer rs) throws ChangelogException
  {
    // happy path: the JEReplicaDB already exists
    JEReplicaDB currentValue = domainMap.get(serverId);
    if (currentValue != null)
    {
      return Pair.of(currentValue, false);
    }
    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
    // on the domainMap to create a new ReplicaDB
    synchronized (domainMap)
    {
      // double-check
      currentValue = domainMap.get(serverId);
      if (currentValue != null)
      {
        return Pair.of(currentValue, false);
      }
      if (domainToReplicaDBs.get(baseDN) != domainMap)
      {
        // the domainMap could have been concurrently removed because
        // 1) a shutdown was initiated or 2) an initialize was called.
        // Return will allow the code to:
        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
        return null;
      }
      final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
      domainMap.put(serverId, newValue);
      return Pair.of(newValue, true);
    }
  }
  /** {@inheritDoc} */
  @Override
@@ -378,6 +449,11 @@
  @Override
  public void shutdownDB() throws ChangelogException
  {
    if (!this.shutdown.compareAndSet(false, true))
    { // shutdown has already been initiated
      return;
    }
    // Remember the first exception because :
    // - we want to try to remove everything we want to remove
    // - then throw the first encountered exception
@@ -392,6 +468,17 @@
      firstException = e;
    }
    for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
    {
      final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
      synchronized (domainMap)
      {
        it.remove();
        innerShutdownDomain(domainMap);
      }
    }
    if (dbEnv != null)
    {
      dbEnv.shutdown();
@@ -498,20 +585,31 @@
  @Override
  public void shutdownDomain(DN baseDN)
  {
    shutdownReplicaDBs(baseDN, getDomainMap(baseDN));
    if (this.shutdown.get())
    { // shutdown has already been initiated
      return;
  }
  private void shutdownReplicaDBs(DN baseDN,
      Map<Integer, JEReplicaDB> domainMap)
    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
  {
    synchronized (domainMap)
    {
        innerShutdownDomain(domainToReplicaDBs.remove(baseDN));
      }
    }
  }
  /**
   * This method assumes the domainMap is synchronized by calling code and that
   * the domainMap is not null.
   */
  private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap)
  {
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        replicaDB.shutdown();
      }
      domainToReplicaDBs.remove(baseDN);
    }
  }
  /** {@inheritDoc} */
@@ -548,9 +646,12 @@
    ChangelogException firstException = null;
    // 1- clear the replica DBs
    final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
    if (domainMap != null)
    {
    synchronized (domainMap)
    {
        domainMap = domainToReplicaDBs.remove(baseDN);
      for (JEReplicaDB replicaDB : domainMap.values())
      {
        try
@@ -561,8 +662,9 @@
        {
          firstException = e;
        }
          replicaDB.shutdown();
      }
      shutdownReplicaDBs(baseDN, domainMap);
      }
    }
    // 2- clear the ChangeNumber index DB
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -133,33 +133,34 @@
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
   * @param id Identifier of the DB.
   * @param serverId The serverId for which changes will be stored in the DB.
   * @param baseDN the baseDN for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this ReplicaDB.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @param queueSize The queueSize to use when creating the ReplicaDB.
   * @throws ChangelogException If a database problem happened
   */
  public JEReplicaDB(int id, DN baseDN, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, int queueSize) throws ChangelogException
  public JEReplicaDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      throws ChangelogException
  {
    this.replicationServer = replicationServer;
    serverId = id;
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    final int queueSize = replicationServer.getQueueSize();
    queueMaxSize = queueSize;
    queueLowmark = queueSize / 5;
    queueHimark = queueSize * 4 / 5;
    queueMaxBytes = 200 * queueMaxSize;
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    db = new ReplicationDB(id, baseDN, replicationServer, dbenv);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    oldestCSN = db.readOldestCSN();
    newestCSN = db.readNewestCSN();
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + id
        + ") changelog checkpointer for Replica DS(" + serverId
        + ") for domain \"" + baseDN + "\"");
    thread.start();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -408,7 +408,7 @@
      testRoot = createCleanDir();
      dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      replicaDB.setCounterRecordWindowSize(counterWindow);
      // Populate the db with 'max' msg
@@ -467,7 +467,7 @@
      debugInfo(tn, "SHUTDOWN replicaDB and recreate");
      replicaDB.shutdown();
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
      replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      replicaDB.setCounterRecordWindowSize(counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");