From 9d5b1c7a628471604be4768f97fcdaf13cf0639f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Oct 2013 13:45:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB
---
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java | 150 +++++++++++++++++++++++++++++++++----------------
1 files changed, 101 insertions(+), 49 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index 8db0837..e10248f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@
import java.io.File;
import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
@@ -41,7 +44,9 @@
import org.opends.server.types.DirectoryException;
import com.sleepycat.je.*;
+import com.sleepycat.je.config.EnvironmentParams;
+import static com.sleepycat.je.EnvironmentConfig.*;
import static com.sleepycat.je.LockMode.*;
import static com.sleepycat.je.OperationStatus.*;
@@ -51,19 +56,19 @@
import static org.opends.server.util.StaticUtils.*;
/**
- * This class is used to represent a Db environment that can be used
- * to create ReplicationDB.
+ * This class represents a DB environment that acts as a factory for
+ * ReplicationDBs.
*/
public class ReplicationDbEnv
{
private Environment dbEnvironment;
private Database changelogStateDb;
+ private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
private ReplicationServer replicationServer;
+ private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
private static final String GENERATION_ID_TAG = "GENID";
private static final String FIELD_SEPARATOR = " ";
- /**
- * The tracer object for the debug logger.
- */
+ /** The tracer object for the debug logger. */
private static final DebugTracer TRACER = getTracer();
/**
@@ -92,9 +97,9 @@
*/
envConfig.setAllowCreate(true);
envConfig.setTransactional(true);
- envConfig.setConfigParam("je.cleaner.threads", "2");
- envConfig.setConfigParam("je.checkpointer.highPriority", "true");
-
+ envConfig.setConfigParam(STATS_COLLECT, "false");
+ envConfig.setConfigParam(CLEANER_THREADS, "2");
+ envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
/*
* Tests have shown that since the parsing of the Replication log is
* always done sequentially, it is not necessary to use a large DB cache.
@@ -106,15 +111,15 @@
* read buffers. This will result in more scalable checkpointer and
* cleaner performance.
*/
- envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
- envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
- envConfig.setConfigParam("je.log.faultReadSize", kb(4));
+ envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
+ envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
+ envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
/*
* The cache size must be bigger in order to accommodate the larger
* buffers - see OPENDJ-943.
*/
- envConfig.setConfigParam("je.maxMemory", mb(16));
+ envConfig.setConfigParam(MAX_MEMORY, mb(16));
}
else
{
@@ -122,7 +127,7 @@
* Use 5M so that the replication can be used with 64M total for the
* JVM.
*/
- envConfig.setConfigParam("je.maxMemory", mb(5));
+ envConfig.setConfigParam(MAX_MEMORY, mb(5));
}
// Since records are always added at the end of the Replication log and
@@ -160,12 +165,27 @@
return String.valueOf(sizeInMb * 1024 * 1024);
}
- private Database openDatabase(String databaseName) throws RuntimeException
+ private Database openDatabase(String databaseName) throws ChangelogException,
+ RuntimeException
{
+ if (isShuttingDown.get())
+ {
+ // TODO JNR i18n
+ throw new ChangelogException(Message.raw("DB is closing"));
+ }
final DatabaseConfig dbConfig = new DatabaseConfig();
dbConfig.setAllowCreate(true);
dbConfig.setTransactional(true);
- return dbEnvironment.openDatabase(null, databaseName, dbConfig);
+ final Database db =
+ dbEnvironment.openDatabase(null, databaseName, dbConfig);
+ if (isShuttingDown.get())
+ {
+ closeDB(db);
+ // TODO JNR i18n
+ throw new ChangelogException(Message.raw("DB is closing"));
+ }
+ allDbs.add(db);
+ return db;
}
/**
@@ -395,44 +415,69 @@
}
}
- /**
- * Shutdown the Db environment.
- */
- public void shutdown()
+ /**
+ * Shutdown the Db environment.
+ */
+ public void shutdown()
+ {
+ isShuttingDown.set(true);
+ // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
+ // This code rely on openDatabase() to close databases opened concurrently
+ // with this code
+ final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
+ allDbs.clear();
+ for (Database db : allDbsCopy)
{
- try
- {
- changelogStateDb.close();
- } catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- }
-
- try
- {
- dbEnvironment.close();
- } catch (DatabaseException e)
- {
- MessageBuilder mb = new MessageBuilder();
- mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
- mb.append(" ");
- mb.append(stackTraceToSingleLineString(e));
- logError(mb.toMessage());
- }
+ closeDB(db);
}
+ try
+ {
+ dbEnvironment.close();
+ }
+ catch (DatabaseException e)
+ {
+ logError(newErrorMessage(e));
+ }
+ }
+
+ private void closeDB(Database db)
+ {
+ allDbs.remove(db);
+ try
+ {
+ db.close();
+ }
+ catch (DatabaseException e)
+ {
+ logError(newErrorMessage(e));
+ }
+ }
+
+ private Message newErrorMessage(DatabaseException e)
+ {
+ if (!isShuttingDown.get())
+ {
+ return NOTE_EXCEPTION_CLOSING_DATABASE
+ .get(stackTraceToSingleLineString(e));
+ }
+ MessageBuilder mb = new MessageBuilder();
+ mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
+ mb.append(" ");
+ mb.append(stackTraceToSingleLineString(e));
+ return mb.toMessage();
+ }
+
/**
* Clears the provided generationId associated to the provided baseDN from the
* state Db.
*
* @param baseDN
* The baseDN for which the generationID must be cleared.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void clearGenerationId(DN baseDN)
+ public void clearGenerationId(DN baseDN) throws ChangelogException
{
deleteFromChangelogStateDB(buildGenIdKey(baseDN),
"clearGenerationId(baseDN=" + baseDN + ")");
@@ -446,15 +491,17 @@
* The baseDN for which the serverId must be cleared.
* @param serverId
* The serverId to remove from the Db.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void clearServerId(DN baseDN, int serverId)
+ public void clearServerId(DN baseDN, int serverId) throws ChangelogException
{
deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
"clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
}
private void deleteFromChangelogStateDB(String keyString,
- String methodInvocation)
+ String methodInvocation) throws ChangelogException
{
if (debugEnabled())
debug(methodInvocation + " starting");
@@ -490,18 +537,23 @@
}
catch (RuntimeException dbe)
{
- // FIXME can actually happen (see catch above)
- // what should we do about it?
+ throw new ChangelogException(dbe);
}
}
/**
* Clears the database.
*
- * @param databaseName The name of the database to clear.
+ * @param db
+ * The database to clear.
*/
- public final void clearDb(String databaseName)
+ public final void clearDb(Database db)
{
+ String databaseName = db.getDatabaseName();
+
+ // Closing is requested by Berkeley JE before truncate
+ db.close();
+
Transaction txn = null;
try
{
--
Gitblit v1.10.0