From 9d5b1c7a628471604be4768f97fcdaf13cf0639f Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Oct 2013 13:45:10 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java |  150 +++++++++++++++++++++++++++++++++----------------
 1 files changed, 101 insertions(+), 49 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index 8db0837..e10248f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -29,7 +29,10 @@
 
 import java.io.File;
 import java.io.UnsupportedEncodingException;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
@@ -41,7 +44,9 @@
 import org.opends.server.types.DirectoryException;
 
 import com.sleepycat.je.*;
+import com.sleepycat.je.config.EnvironmentParams;
 
+import static com.sleepycat.je.EnvironmentConfig.*;
 import static com.sleepycat.je.LockMode.*;
 import static com.sleepycat.je.OperationStatus.*;
 
@@ -51,19 +56,19 @@
 import static org.opends.server.util.StaticUtils.*;
 
 /**
- * This class is used to represent a Db environment that can be used
- * to create ReplicationDB.
+ * This class represents a DB environment that acts as a factory for
+ * ReplicationDBs.
  */
 public class ReplicationDbEnv
 {
   private Environment dbEnvironment;
   private Database changelogStateDb;
+  private final List<Database> allDbs = new CopyOnWriteArrayList<Database>();
   private ReplicationServer replicationServer;
+  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
   private static final String GENERATION_ID_TAG = "GENID";
   private static final String FIELD_SEPARATOR = " ";
-  /**
-   * The tracer object for the debug logger.
-   */
+  /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
 
   /**
@@ -92,9 +97,9 @@
        */
       envConfig.setAllowCreate(true);
       envConfig.setTransactional(true);
-      envConfig.setConfigParam("je.cleaner.threads", "2");
-      envConfig.setConfigParam("je.checkpointer.highPriority", "true");
-
+      envConfig.setConfigParam(STATS_COLLECT, "false");
+      envConfig.setConfigParam(CLEANER_THREADS, "2");
+      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
       /*
        * Tests have shown that since the parsing of the Replication log is
        * always done sequentially, it is not necessary to use a large DB cache.
@@ -106,15 +111,15 @@
          * read buffers. This will result in more scalable checkpointer and
          * cleaner performance.
          */
-        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
-        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
-        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
+        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
+        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
+        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
 
         /*
          * The cache size must be bigger in order to accommodate the larger
          * buffers - see OPENDJ-943.
          */
-        envConfig.setConfigParam("je.maxMemory", mb(16));
+        envConfig.setConfigParam(MAX_MEMORY, mb(16));
       }
       else
       {
@@ -122,7 +127,7 @@
          * Use 5M so that the replication can be used with 64M total for the
          * JVM.
          */
-        envConfig.setConfigParam("je.maxMemory", mb(5));
+        envConfig.setConfigParam(MAX_MEMORY, mb(5));
       }
 
       // Since records are always added at the end of the Replication log and
@@ -160,12 +165,27 @@
     return String.valueOf(sizeInMb * 1024 * 1024);
   }
 
-  private Database openDatabase(String databaseName) throws RuntimeException
+  private Database openDatabase(String databaseName) throws ChangelogException,
+      RuntimeException
   {
+    if (isShuttingDown.get())
+    {
+      // TODO JNR i18n
+      throw new ChangelogException(Message.raw("DB is closing"));
+    }
     final DatabaseConfig dbConfig = new DatabaseConfig();
     dbConfig.setAllowCreate(true);
     dbConfig.setTransactional(true);
-    return dbEnvironment.openDatabase(null, databaseName, dbConfig);
+    final Database db =
+        dbEnvironment.openDatabase(null, databaseName, dbConfig);
+    if (isShuttingDown.get())
+    {
+      closeDB(db);
+      // TODO JNR i18n
+      throw new ChangelogException(Message.raw("DB is closing"));
+    }
+    allDbs.add(db);
+    return db;
   }
 
   /**
@@ -395,44 +415,69 @@
       }
     }
 
-    /**
-     * Shutdown the Db environment.
-     */
-    public void shutdown()
+  /**
+   * Shutdown the Db environment.
+   */
+  public void shutdown()
+  {
+    isShuttingDown.set(true);
+    // CopyOnWriteArrayList iterator never throw ConcurrentModificationException
+    // This code rely on openDatabase() to close databases opened concurrently
+    // with this code
+    final Database[] allDbsCopy = allDbs.toArray(new Database[0]);
+    allDbs.clear();
+    for (Database db : allDbsCopy)
     {
-      try
-      {
-        changelogStateDb.close();
-      } catch (DatabaseException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
-        mb.append(" ");
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-      }
-
-      try
-      {
-        dbEnvironment.close();
-      } catch (DatabaseException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
-        mb.append(" ");
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-      }
+      closeDB(db);
     }
 
+    try
+    {
+      dbEnvironment.close();
+    }
+    catch (DatabaseException e)
+    {
+      logError(newErrorMessage(e));
+    }
+  }
+
+  private void closeDB(Database db)
+  {
+    allDbs.remove(db);
+    try
+    {
+      db.close();
+    }
+    catch (DatabaseException e)
+    {
+      logError(newErrorMessage(e));
+    }
+  }
+
+  private Message newErrorMessage(DatabaseException e)
+  {
+    if (!isShuttingDown.get())
+    {
+      return NOTE_EXCEPTION_CLOSING_DATABASE
+          .get(stackTraceToSingleLineString(e));
+    }
+    MessageBuilder mb = new MessageBuilder();
+    mb.append(ERR_ERROR_CLOSING_CHANGELOG_ENV.get());
+    mb.append(" ");
+    mb.append(stackTraceToSingleLineString(e));
+    return mb.toMessage();
+  }
+
   /**
    * Clears the provided generationId associated to the provided baseDN from the
    * state Db.
    *
    * @param baseDN
    *          The baseDN for which the generationID must be cleared.
+   * @throws ChangelogException
+   *           If a database problem happened
    */
-  public void clearGenerationId(DN baseDN)
+  public void clearGenerationId(DN baseDN) throws ChangelogException
   {
     deleteFromChangelogStateDB(buildGenIdKey(baseDN),
         "clearGenerationId(baseDN=" + baseDN + ")");
@@ -446,15 +491,17 @@
    *          The baseDN for which the serverId must be cleared.
    * @param serverId
    *          The serverId to remove from the Db.
+   * @throws ChangelogException
+   *           If a database problem happened
    */
-  public void clearServerId(DN baseDN, int serverId)
+  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
   {
     deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
         "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
   }
 
   private void deleteFromChangelogStateDB(String keyString,
-      String methodInvocation)
+      String methodInvocation) throws ChangelogException
   {
     if (debugEnabled())
       debug(methodInvocation + " starting");
@@ -490,18 +537,23 @@
     }
     catch (RuntimeException dbe)
     {
-      // FIXME can actually happen (see catch above)
-      // what should we do about it?
+      throw new ChangelogException(dbe);
     }
   }
 
     /**
      * Clears the database.
      *
-     * @param databaseName The name of the database to clear.
+     * @param db
+     *          The database to clear.
      */
-    public final void clearDb(String databaseName)
+    public final void clearDb(Database db)
     {
+      String databaseName = db.getDatabaseName();
+
+      // Closing is requested by Berkeley JE before truncate
+      db.close();
+
       Transaction txn = null;
       try
       {

--
Gitblit v1.10.0