From c5ab3254dc665b820ca65ea919c5c2f5f5e0d110 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 12 Nov 2013 13:52:24 +0000
Subject: [PATCH] CR-2563 JEChangelogDB removing a synchronized block in a heavily hit path
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 174 ++++++++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 138 insertions(+), 36 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index c4c614f..c3b2f8d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -165,9 +167,21 @@
/**
* This map contains the List of updates received from each LDAP server.
+ * <p>
+ * When removing a domainMap, code:
+ * <ol>
+ * <li>first get the domainMap</li>
+ * <li>synchronized on the domainMap</li>
+ * <li>remove the domainMap</li>
+ * <li>then check it's not null</li>
+ * <li>then close all inside</li>
+ * </ol>
+ * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+ * concurrent shutdown.
*/
- private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
- new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
+ domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
private ReplicationDbEnv dbEnv;
private final String dbDirectoryName;
private final File dbDirectory;
@@ -185,6 +199,7 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
+ private AtomicBoolean shutdown = new AtomicBoolean();
private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
new DBCursor<UpdateMsg>()
@@ -305,27 +320,83 @@
Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
int serverId, ReplicationServer rs) throws ChangelogException
{
- synchronized (domainToReplicaDBs)
+ while (!shutdown.get())
{
- Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
- if (domainMap == null)
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap =
+ getExistingOrNewDomainMap(baseDN);
+ final Pair<JEReplicaDB, Boolean> result =
+ getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
+ if (result != null)
{
- domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
- domainToReplicaDBs.put(baseDN, domainMap);
+ return result;
}
-
- JEReplicaDB replicaDB = domainMap.get(serverId);
- if (replicaDB == null)
- {
- replicaDB =
- new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
- domainMap.put(serverId, replicaDB);
- return Pair.of(replicaDB, true);
- }
- return Pair.of(replicaDB, false);
}
+ throw new ChangelogException(
+ ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
+ private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
+ DN baseDN)
+ {
+ // happy path: the domainMap already exists
+ final ConcurrentMap<Integer, JEReplicaDB> currentValue =
+ domainToReplicaDBs.get(baseDN);
+ if (currentValue != null)
+ {
+ return currentValue;
+ }
+
+ // unlucky, the domainMap does not exist: take the hit and create the
+ // newValue, even though the same could be done concurrently by another
+ // thread
+ final ConcurrentMap<Integer, JEReplicaDB> newValue =
+ new ConcurrentHashMap<Integer, JEReplicaDB>();
+ final ConcurrentMap<Integer, JEReplicaDB> previousValue =
+ domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ if (previousValue != null)
+ {
+ // there was already a value associated to the key, let's use it
+ return previousValue;
+ }
+ return newValue;
+ }
+
+ private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
+ DN baseDN, ReplicationServer rs) throws ChangelogException
+ {
+ // happy path: the JEReplicaDB already exists
+ JEReplicaDB currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+ // on the domainMap to create a new ReplicaDB
+ synchronized (domainMap)
+ {
+ // double-check
+ currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ if (domainToReplicaDBs.get(baseDN) != domainMap)
+ {
+ // the domainMap could have been concurrently removed because
+ // 1) a shutdown was initiated or 2) an initialize was called.
+ // Return will allow the code to:
+ // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+ return null;
+ }
+
+ final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
+ domainMap.put(serverId, newValue);
+ return Pair.of(newValue, true);
+ }
+ }
/** {@inheritDoc} */
@Override
@@ -378,6 +449,11 @@
@Override
public void shutdownDB() throws ChangelogException
{
+ if (!this.shutdown.compareAndSet(false, true))
+ { // shutdown has already been initiated
+ return;
+ }
+
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
@@ -392,6 +468,17 @@
firstException = e;
}
+ for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
+ this.domainToReplicaDBs.values().iterator(); it.hasNext();)
+ {
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
+ synchronized (domainMap)
+ {
+ it.remove();
+ innerShutdownDomain(domainMap);
+ }
+ }
+
if (dbEnv != null)
{
dbEnv.shutdown();
@@ -498,19 +585,30 @@
@Override
public void shutdownDomain(DN baseDN)
{
- shutdownReplicaDBs(baseDN, getDomainMap(baseDN));
+ if (this.shutdown.get())
+ { // shutdown has already been initiated
+ return;
+ }
+
+ final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
+ {
+ synchronized (domainMap)
+ {
+ innerShutdownDomain(domainToReplicaDBs.remove(baseDN));
+ }
+ }
}
- private void shutdownReplicaDBs(DN baseDN,
- Map<Integer, JEReplicaDB> domainMap)
+ /**
+ * This method assumes the domainMap is synchronized by calling code and that
+ * the domainMap is not null.
+ */
+ private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap)
{
- synchronized (domainMap)
+ for (JEReplicaDB replicaDB : domainMap.values())
{
- for (JEReplicaDB replicaDB : domainMap.values())
- {
- replicaDB.shutdown();
- }
- domainToReplicaDBs.remove(baseDN);
+ replicaDB.shutdown();
}
}
@@ -548,21 +646,25 @@
ChangelogException firstException = null;
// 1- clear the replica DBs
- final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
- synchronized (domainMap)
+ Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
{
- for (JEReplicaDB replicaDB : domainMap.values())
+ synchronized (domainMap)
{
- try
+ domainMap = domainToReplicaDBs.remove(baseDN);
+ for (JEReplicaDB replicaDB : domainMap.values())
{
- replicaDB.clear();
- }
- catch (ChangelogException e)
- {
- firstException = e;
+ try
+ {
+ replicaDB.clear();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+ replicaDB.shutdown();
}
}
- shutdownReplicaDBs(baseDN, domainMap);
}
// 2- clear the ChangeNumber index DB
--
Gitblit v1.10.0