From c5ab3254dc665b820ca65ea919c5c2f5f5e0d110 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 12 Nov 2013 13:52:24 +0000
Subject: [PATCH] CR-2563 JEChangelogDB removing a synchronized block in a heavily hit path

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java |  174 ++++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 138 insertions(+), 36 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index c4c614f..c3b2f8d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,8 @@
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
@@ -165,9 +167,21 @@
 
   /**
    * This map contains the List of updates received from each LDAP server.
+   * <p>
+   * When removing a domainMap, code:
+   * <ol>
+   * <li>first get the domainMap</li>
+   * <li>synchronized on the domainMap</li>
+   * <li>remove the domainMap</li>
+   * <li>then check it's not null</li>
+   * <li>then close all inside</li>
+   * </ol>
+   * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+   * concurrent shutdown.
    */
-  private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
-      new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
+  private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
+      domainToReplicaDBs =
+          new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
   private ReplicationDbEnv dbEnv;
   private final String dbDirectoryName;
   private final File dbDirectory;
@@ -185,6 +199,7 @@
 
   /** The local replication server. */
   private final ReplicationServer replicationServer;
+  private AtomicBoolean shutdown = new AtomicBoolean();
 
   private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
       new DBCursor<UpdateMsg>()
@@ -305,27 +320,83 @@
   Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
       int serverId, ReplicationServer rs) throws ChangelogException
   {
-    synchronized (domainToReplicaDBs)
+    while (!shutdown.get())
     {
-      Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
-      if (domainMap == null)
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap =
+          getExistingOrNewDomainMap(baseDN);
+      final Pair<JEReplicaDB, Boolean> result =
+          getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
+      if (result != null)
       {
-        domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
-        domainToReplicaDBs.put(baseDN, domainMap);
+        return result;
       }
-
-      JEReplicaDB replicaDB = domainMap.get(serverId);
-      if (replicaDB == null)
-      {
-        replicaDB =
-            new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
-        domainMap.put(serverId, replicaDB);
-        return Pair.of(replicaDB, true);
-      }
-      return Pair.of(replicaDB, false);
     }
+    throw new ChangelogException(
+        ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
   }
 
+  private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
+      DN baseDN)
+  {
+    // happy path: the domainMap already exists
+    final ConcurrentMap<Integer, JEReplicaDB> currentValue =
+        domainToReplicaDBs.get(baseDN);
+    if (currentValue != null)
+    {
+      return currentValue;
+    }
+
+    // unlucky, the domainMap does not exist: take the hit and create the
+    // newValue, even though the same could be done concurrently by another
+    // thread
+    final ConcurrentMap<Integer, JEReplicaDB> newValue =
+        new ConcurrentHashMap<Integer, JEReplicaDB>();
+    final ConcurrentMap<Integer, JEReplicaDB> previousValue =
+        domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+    if (previousValue != null)
+    {
+      // there was already a value associated to the key, let's use it
+      return previousValue;
+    }
+    return newValue;
+  }
+
+  private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
+      DN baseDN, ReplicationServer rs) throws ChangelogException
+  {
+    // happy path: the JEReplicaDB already exists
+    JEReplicaDB currentValue = domainMap.get(serverId);
+    if (currentValue != null)
+    {
+      return Pair.of(currentValue, false);
+    }
+
+    // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+    // on the domainMap to create a new ReplicaDB
+    synchronized (domainMap)
+    {
+      // double-check
+      currentValue = domainMap.get(serverId);
+      if (currentValue != null)
+      {
+        return Pair.of(currentValue, false);
+      }
+
+      if (domainToReplicaDBs.get(baseDN) != domainMap)
+      {
+        // the domainMap could have been concurrently removed because
+        // 1) a shutdown was initiated or 2) an initialize was called.
+        // Return will allow the code to:
+        // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+        return null;
+      }
+
+      final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
+      domainMap.put(serverId, newValue);
+      return Pair.of(newValue, true);
+    }
+  }
 
   /** {@inheritDoc} */
   @Override
@@ -378,6 +449,11 @@
   @Override
   public void shutdownDB() throws ChangelogException
   {
+    if (!this.shutdown.compareAndSet(false, true))
+    { // shutdown has already been initiated
+      return;
+    }
+
     // Remember the first exception because :
     // - we want to try to remove everything we want to remove
     // - then throw the first encountered exception
@@ -392,6 +468,17 @@
       firstException = e;
     }
 
+    for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
+        this.domainToReplicaDBs.values().iterator(); it.hasNext();)
+    {
+      final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
+      synchronized (domainMap)
+      {
+        it.remove();
+        innerShutdownDomain(domainMap);
+      }
+    }
+
     if (dbEnv != null)
     {
       dbEnv.shutdown();
@@ -498,19 +585,30 @@
   @Override
   public void shutdownDomain(DN baseDN)
   {
-    shutdownReplicaDBs(baseDN, getDomainMap(baseDN));
+    if (this.shutdown.get())
+    { // shutdown has already been initiated
+      return;
+    }
+
+    final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+    if (domainMap != null)
+    {
+      synchronized (domainMap)
+      {
+        innerShutdownDomain(domainToReplicaDBs.remove(baseDN));
+      }
+    }
   }
 
-  private void shutdownReplicaDBs(DN baseDN,
-      Map<Integer, JEReplicaDB> domainMap)
+  /**
+   * This method assumes the domainMap is synchronized by calling code and that
+   * the domainMap is not null.
+   */
+  private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap)
   {
-    synchronized (domainMap)
+    for (JEReplicaDB replicaDB : domainMap.values())
     {
-      for (JEReplicaDB replicaDB : domainMap.values())
-      {
-        replicaDB.shutdown();
-      }
-      domainToReplicaDBs.remove(baseDN);
+      replicaDB.shutdown();
     }
   }
 
@@ -548,21 +646,25 @@
     ChangelogException firstException = null;
 
     // 1- clear the replica DBs
-    final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
-    synchronized (domainMap)
+    Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+    if (domainMap != null)
     {
-      for (JEReplicaDB replicaDB : domainMap.values())
+      synchronized (domainMap)
       {
-        try
+        domainMap = domainToReplicaDBs.remove(baseDN);
+        for (JEReplicaDB replicaDB : domainMap.values())
         {
-          replicaDB.clear();
-        }
-        catch (ChangelogException e)
-        {
-          firstException = e;
+          try
+          {
+            replicaDB.clear();
+          }
+          catch (ChangelogException e)
+          {
+            firstException = e;
+          }
+          replicaDB.shutdown();
         }
       }
-      shutdownReplicaDBs(baseDN, domainMap);
     }
 
     // 2- clear the ChangeNumber index DB

--
Gitblit v1.10.0