From c5ab3254dc665b820ca65ea919c5c2f5f5e0d110 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 12 Nov 2013 13:52:24 +0000
Subject: [PATCH] CR-2563 JEChangelogDB removing a synchronized block in a heavily hit path
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 174 ++++++++++++++++++++++++++++++++++---------
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 15 ++-
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 4
opends/src/messages/messages/replication.properties | 2
4 files changed, 150 insertions(+), 45 deletions(-)
diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 7d8a3c1..f010c55 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -527,3 +527,5 @@
NOTICE_BEST_RS_233=RS(%d) has been evaluated to be the best replication server \
for DS(%d) to connect to because it was the only one standing after all tests
NOTICE_UNKNOWN_RS_234=RS(%d) could not be contacted by DS(%d)
+SEVERE_ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN_235=Could not \
+ create replica database because the changelog database is shutting down
\ No newline at end of file
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index c4c614f..c3b2f8d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -165,9 +167,21 @@
/**
* This map contains the List of updates received from each LDAP server.
+ * <p>
+ * When removing a domainMap, code:
+ * <ol>
+ * <li>first get the domainMap</li>
+ * <li>synchronized on the domainMap</li>
+ * <li>remove the domainMap</li>
+ * <li>then check it's not null</li>
+ * <li>then close all inside</li>
+ * </ol>
+ * When creating a JEReplicaDB, synchronize on the domainMap to avoid
+ * concurrent shutdown.
*/
- private final Map<DN, Map<Integer, JEReplicaDB>> domainToReplicaDBs =
- new ConcurrentHashMap<DN, Map<Integer, JEReplicaDB>>();
+ private final ConcurrentMap<DN, ConcurrentMap<Integer, JEReplicaDB>>
+ domainToReplicaDBs =
+ new ConcurrentHashMap<DN, ConcurrentMap<Integer, JEReplicaDB>>();
private ReplicationDbEnv dbEnv;
private final String dbDirectoryName;
private final File dbDirectory;
@@ -185,6 +199,7 @@
/** The local replication server. */
private final ReplicationServer replicationServer;
+ private AtomicBoolean shutdown = new AtomicBoolean();
private static final DBCursor<UpdateMsg> EMPTY_CURSOR =
new DBCursor<UpdateMsg>()
@@ -305,27 +320,83 @@
Pair<JEReplicaDB, Boolean> getOrCreateReplicaDB(DN baseDN,
int serverId, ReplicationServer rs) throws ChangelogException
{
- synchronized (domainToReplicaDBs)
+ while (!shutdown.get())
{
- Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
- if (domainMap == null)
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap =
+ getExistingOrNewDomainMap(baseDN);
+ final Pair<JEReplicaDB, Boolean> result =
+ getExistingOrNewReplicaDB(domainMap, serverId, baseDN, rs);
+ if (result != null)
{
- domainMap = new ConcurrentHashMap<Integer, JEReplicaDB>();
- domainToReplicaDBs.put(baseDN, domainMap);
+ return result;
}
-
- JEReplicaDB replicaDB = domainMap.get(serverId);
- if (replicaDB == null)
- {
- replicaDB =
- new JEReplicaDB(serverId, baseDN, rs, dbEnv, rs.getQueueSize());
- domainMap.put(serverId, replicaDB);
- return Pair.of(replicaDB, true);
- }
- return Pair.of(replicaDB, false);
}
+ throw new ChangelogException(
+ ERR_CANNOT_CREATE_REPLICA_DB_BECAUSE_CHANGELOG_DB_SHUTDOWN.get());
}
+ private ConcurrentMap<Integer, JEReplicaDB> getExistingOrNewDomainMap(
+ DN baseDN)
+ {
+ // happy path: the domainMap already exists
+ final ConcurrentMap<Integer, JEReplicaDB> currentValue =
+ domainToReplicaDBs.get(baseDN);
+ if (currentValue != null)
+ {
+ return currentValue;
+ }
+
+ // unlucky, the domainMap does not exist: take the hit and create the
+ // newValue, even though the same could be done concurrently by another
+ // thread
+ final ConcurrentMap<Integer, JEReplicaDB> newValue =
+ new ConcurrentHashMap<Integer, JEReplicaDB>();
+ final ConcurrentMap<Integer, JEReplicaDB> previousValue =
+ domainToReplicaDBs.putIfAbsent(baseDN, newValue);
+ if (previousValue != null)
+ {
+ // there was already a value associated to the key, let's use it
+ return previousValue;
+ }
+ return newValue;
+ }
+
+ private Pair<JEReplicaDB, Boolean> getExistingOrNewReplicaDB(
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap, int serverId,
+ DN baseDN, ReplicationServer rs) throws ChangelogException
+ {
+ // happy path: the JEReplicaDB already exists
+ JEReplicaDB currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ // unlucky, the JEReplicaDB does not exist: take the hit and synchronize
+ // on the domainMap to create a new ReplicaDB
+ synchronized (domainMap)
+ {
+ // double-check
+ currentValue = domainMap.get(serverId);
+ if (currentValue != null)
+ {
+ return Pair.of(currentValue, false);
+ }
+
+ if (domainToReplicaDBs.get(baseDN) != domainMap)
+ {
+ // the domainMap could have been concurrently removed because
+ // 1) a shutdown was initiated or 2) an initialize was called.
+ // Return will allow the code to:
+ // 1) shutdown properly or 2) lazily recreate the JEReplicaDB
+ return null;
+ }
+
+ final JEReplicaDB newValue = new JEReplicaDB(serverId, baseDN, rs, dbEnv);
+ domainMap.put(serverId, newValue);
+ return Pair.of(newValue, true);
+ }
+ }
/** {@inheritDoc} */
@Override
@@ -378,6 +449,11 @@
@Override
public void shutdownDB() throws ChangelogException
{
+ if (!this.shutdown.compareAndSet(false, true))
+ { // shutdown has already been initiated
+ return;
+ }
+
// Remember the first exception because :
// - we want to try to remove everything we want to remove
// - then throw the first encountered exception
@@ -392,6 +468,17 @@
firstException = e;
}
+ for (Iterator<ConcurrentMap<Integer, JEReplicaDB>> it =
+ this.domainToReplicaDBs.values().iterator(); it.hasNext();)
+ {
+ final ConcurrentMap<Integer, JEReplicaDB> domainMap = it.next();
+ synchronized (domainMap)
+ {
+ it.remove();
+ innerShutdownDomain(domainMap);
+ }
+ }
+
if (dbEnv != null)
{
dbEnv.shutdown();
@@ -498,19 +585,30 @@
@Override
public void shutdownDomain(DN baseDN)
{
- shutdownReplicaDBs(baseDN, getDomainMap(baseDN));
+ if (this.shutdown.get())
+ { // shutdown has already been initiated
+ return;
+ }
+
+ final Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
+ {
+ synchronized (domainMap)
+ {
+ innerShutdownDomain(domainToReplicaDBs.remove(baseDN));
+ }
+ }
}
- private void shutdownReplicaDBs(DN baseDN,
- Map<Integer, JEReplicaDB> domainMap)
+ /**
+ * This method assumes the domainMap is synchronized by calling code and that
+ * the domainMap is not null.
+ */
+ private void innerShutdownDomain(final Map<Integer, JEReplicaDB> domainMap)
{
- synchronized (domainMap)
+ for (JEReplicaDB replicaDB : domainMap.values())
{
- for (JEReplicaDB replicaDB : domainMap.values())
- {
- replicaDB.shutdown();
- }
- domainToReplicaDBs.remove(baseDN);
+ replicaDB.shutdown();
}
}
@@ -548,21 +646,25 @@
ChangelogException firstException = null;
// 1- clear the replica DBs
- final Map<Integer, JEReplicaDB> domainMap = getDomainMap(baseDN);
- synchronized (domainMap)
+ Map<Integer, JEReplicaDB> domainMap = domainToReplicaDBs.get(baseDN);
+ if (domainMap != null)
{
- for (JEReplicaDB replicaDB : domainMap.values())
+ synchronized (domainMap)
{
- try
+ domainMap = domainToReplicaDBs.remove(baseDN);
+ for (JEReplicaDB replicaDB : domainMap.values())
{
- replicaDB.clear();
- }
- catch (ChangelogException e)
- {
- firstException = e;
+ try
+ {
+ replicaDB.clear();
+ }
+ catch (ChangelogException e)
+ {
+ firstException = e;
+ }
+ replicaDB.shutdown();
}
}
- shutdownReplicaDBs(baseDN, domainMap);
}
// 2- clear the ChangeNumber index DB
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 032dcad..67d2404 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -133,33 +133,34 @@
/**
* Creates a new ReplicaDB associated to a given LDAP server.
*
- * @param id Identifier of the DB.
+ * @param serverId The serverId for which changes will be stored in the DB.
* @param baseDN the baseDN for which this DB was created.
* @param replicationServer The ReplicationServer that creates this ReplicaDB.
* @param dbenv the Database Env to use to create the ReplicationServer DB.
* server for this domain.
- * @param queueSize The queueSize to use when creating the ReplicaDB.
* @throws ChangelogException If a database problem happened
*/
- public JEReplicaDB(int id, DN baseDN, ReplicationServer replicationServer,
- ReplicationDbEnv dbenv, int queueSize) throws ChangelogException
+ public JEReplicaDB(int serverId, DN baseDN,
+ ReplicationServer replicationServer, ReplicationDbEnv dbenv)
+ throws ChangelogException
{
this.replicationServer = replicationServer;
- serverId = id;
+ this.serverId = serverId;
this.baseDN = baseDN;
trimAge = replicationServer.getTrimAge();
+ final int queueSize = replicationServer.getQueueSize();
queueMaxSize = queueSize;
queueLowmark = queueSize / 5;
queueHimark = queueSize * 4 / 5;
queueMaxBytes = 200 * queueMaxSize;
queueLowmarkBytes = 200 * queueLowmark;
queueHimarkBytes = 200 * queueLowmark;
- db = new ReplicationDB(id, baseDN, replicationServer, dbenv);
+ db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
oldestCSN = db.readOldestCSN();
newestCSN = db.readNewestCSN();
thread = new DirectoryThread(this, "Replication server RS("
+ replicationServer.getServerId()
- + ") changelog checkpointer for Replica DS(" + id
+ + ") changelog checkpointer for Replica DS(" + serverId
+ ") for domain \"" + baseDN + "\"");
thread.start();
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index ac9d8c7..2d62b93 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -408,7 +408,7 @@
testRoot = createCleanDir();
dbEnv = new ReplicationDbEnv(testRoot.getPath(), replicationServer);
- replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
+ replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
replicaDB.setCounterRecordWindowSize(counterWindow);
// Populate the db with 'max' msg
@@ -467,7 +467,7 @@
debugInfo(tn, "SHUTDOWN replicaDB and recreate");
replicaDB.shutdown();
- replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv, 10);
+ replicaDB = new JEReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
replicaDB.setCounterRecordWindowSize(counterWindow);
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
--
Gitblit v1.10.0