From 836356c02790df187234bc6f42ed4ddd2dfb2d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 29 Aug 2013 10:56:29 +0000
Subject: [PATCH] ReplicationDbEnv.java: Renamed: - field stateDb to changelogStateDb. - start() to initializeFromChangelogStateDB(). - putInStateDBIfNotExist() to putInChangelogStateDBIfNotExist() - deleteFromStateDB() to deleteFromChangelogStateDB() + changed the signature. In getOrAddDb(), reduced code clutter and removed useless comments. Extracted methods kb(), mb(), toBytes(), buildGenIdKey(), buildServerIdKey(), buildGenIdData() for better readability. Extracted method debug() to reduce code clutter. Used static imports.

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java |  246 +++++++++++++++++++++++++-----------------------
 1 files changed, 129 insertions(+), 117 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index 707667a..a47c099 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -39,6 +39,9 @@
 
 import com.sleepycat.je.*;
 
+import static com.sleepycat.je.LockMode.*;
+import static com.sleepycat.je.OperationStatus.*;
+
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -51,7 +54,7 @@
 public class ReplicationDbEnv
 {
   private Environment dbEnvironment;
-  private Database stateDb;
+  private Database changelogStateDb;
   private ReplicationServer replicationServer;
   private static final String GENERATION_ID_TAG = "GENID";
   private static final String FIELD_SEPARATOR = " ";
@@ -100,19 +103,15 @@
          * read buffers. This will result in more scalable checkpointer and
          * cleaner performance.
          */
-        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize",
-            String.valueOf(2 * 1024 * 1024));
-        envConfig.setConfigParam("je.log.iteratorReadSize",
-            String.valueOf(2 * 1024 * 1024));
-        envConfig.setConfigParam("je.log.faultReadSize",
-            String.valueOf(4 * 1024));
+        envConfig.setConfigParam("je.cleaner.lookAheadCacheSize", mb(2));
+        envConfig.setConfigParam("je.log.iteratorReadSize", mb(2));
+        envConfig.setConfigParam("je.log.faultReadSize", kb(4));
 
         /*
          * The cache size must be bigger in order to accommodate the larger
          * buffers - see OPENDJ-943.
          */
-        envConfig.setConfigParam("je.maxMemory",
-            String.valueOf(16 * 1024 * 1024));
+        envConfig.setConfigParam("je.maxMemory", mb(16));
       }
       else
       {
@@ -120,8 +119,7 @@
          * Use 5M so that the replication can be used with 64M total for the
          * JVM.
          */
-        envConfig.setConfigParam("je.maxMemory",
-            String.valueOf(5 * 1024 * 1024));
+        envConfig.setConfigParam("je.maxMemory", mb(5));
       }
 
       // Since records are always added at the end of the Replication log and
@@ -141,7 +139,7 @@
        * the topology. The database "changelogstate" is used to store the list
        * of all the servers that have been seen in the past.
        */
-      stateDb = openDatabase("changelogstate");
+      changelogStateDb = openDatabase("changelogstate");
     }
     catch (RuntimeException e)
     {
@@ -149,6 +147,16 @@
     }
   }
 
+  private String kb(int sizeInKb)
+  {
+    return String.valueOf(sizeInKb * 1024);
+  }
+
+  private String mb(int sizeInMb)
+  {
+    return String.valueOf(sizeInMb * 1024 * 1024);
+  }
+
   private Database openDatabase(String databaseName) throws RuntimeException
   {
     final DatabaseConfig dbConfig = new DatabaseConfig();
@@ -163,34 +171,31 @@
    *
    * @throws ChangelogException in case of underlying Exception
    */
-  public void start() throws ChangelogException
+  public void initializeFromChangelogStateDB() throws ChangelogException
   {
     DatabaseEntry key = new DatabaseEntry();
     DatabaseEntry data = new DatabaseEntry();
-    Cursor cursor = stateDb.openCursor(null, null);
+    Cursor cursor = changelogStateDb.openCursor(null, null);
 
     try
     {
       OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
       while (status == OperationStatus.SUCCESS)
       {
-        String stringData = toString(data.getData());
+        final String stringData = toString(data.getData());
 
         if (debugEnabled())
-          TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
-              + " Read (tag/baseDn/generationId) OR (serverId/baseDN): "
-              + stringData);
+          debug("read (" + GENERATION_ID_TAG + " generationId baseDn) OR "
+              + "(serverId baseDN): " + stringData);
 
-        String[] str = stringData.split(FIELD_SEPARATOR, 3);
+        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
         if (str[0].equals(GENERATION_ID_TAG))
         {
           long generationId = toLong(str[1]);
           String baseDn = str[2];
 
           if (debugEnabled())
-            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
-                + " Has read baseDn=" + baseDn + " generationId="
-                + generationId);
+            debug("has read baseDn=" + baseDn + " generationId=" +generationId);
 
           replicationServer.initDomainGenerationID(baseDn, generationId);
         }
@@ -200,8 +205,7 @@
           String baseDn = str[1];
 
           if (debugEnabled())
-            TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName()
-                + " Has read: baseDn=" + baseDn + " serverId=" + serverId);
+            debug("has read: baseDn=" + baseDn + " serverId=" + serverId);
 
           replicationServer.addServerIdToDomain(serverId, baseDn);
         }
@@ -264,73 +268,91 @@
     }
   }
 
-    /**
-     * Finds or creates the database used to store changes from the server
-     * with the given serverId and the given baseDn.
-     *
-     * @param serverId     The server id that identifies the server.
-     * @param baseDn       The baseDn that identifies the domain.
-     * @param generationId The generationId associated to this domain.
-     * @return the Database.
-     * @throws ChangelogException in case of underlying Exception.
-     */
-    public Database getOrAddDb(int serverId, String baseDn, long generationId)
-        throws ChangelogException
-    {
-      if (debugEnabled())
-        TRACER.debugInfo("ReplicationDbEnv.getOrAddDb() " +
-          serverId + " " + baseDn + " " + generationId);
-      try
-      {
-        final String serverIdKey = serverId + FIELD_SEPARATOR + baseDn;
-
-        // Opens the database for the changes received from this server
-        // on this domain. Create it if it does not already exist.
-        Database db = openDatabase(serverIdKey);
-
-        // Creates the record serverId/domain base Dn in the stateDb
-        // if it does not already exist.
-        putInStateDBIfNotExist(serverIdKey, serverIdKey);
-
-        // Creates the record domain base Dn/ generationId in the stateDb
-        // if it does not already exist.
-        final String genIdKey = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
-        final String genIdData = GENERATION_ID_TAG
-            + FIELD_SEPARATOR + generationId
-            + FIELD_SEPARATOR + baseDn;
-        putInStateDBIfNotExist(genIdKey, genIdData);
-        return db;
-      }
-      catch (RuntimeException e)
-      {
-        throw new ChangelogException(e);
-      }
-      catch (UnsupportedEncodingException e)
-      {
-        // can't happen
-        return null;
-      }
-    }
-
-  private void putInStateDBIfNotExist(String keyString, String dataString)
-      throws UnsupportedEncodingException, RuntimeException
+  private byte[] toBytes(String s)
   {
-    byte[] byteId = keyString.getBytes("UTF-8");
-    byte[] dataByteId = dataString.getBytes("UTF-8");
-    DatabaseEntry key = new DatabaseEntry();
-    key.setData(byteId);
+    try
+    {
+      return s.getBytes("UTF-8");
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      // can't happen
+      return null;
+    }
+  }
+
+  /**
+   * Finds or creates the database used to store changes from the server with
+   * the given serverId and the given baseDn.
+   *
+   * @param serverId
+   *          The server id that identifies the server.
+   * @param baseDn
+   *          The baseDn that identifies the domain.
+   * @param generationId
+   *          The generationId associated to this domain.
+   * @return the Database.
+   * @throws ChangelogException
+   *           in case of underlying Exception.
+   */
+  public Database getOrAddDb(int serverId, String baseDn, long generationId)
+      throws ChangelogException
+  {
+    if (debugEnabled())
+      debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDn + ", "
+          + generationId + ")");
+    try
+    {
+      // JNR: redundant info is stored between the key and data down below.
+      // It is probably ok since "changelogstate" DB does not receive a high
+      // volume of inserts.
+      final String serverIdToBaseDn = buildServerIdKey(baseDn, serverId);
+
+      // Opens the DB for the changes received from this server on this domain.
+      Database db = openDatabase(serverIdToBaseDn);
+
+      putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
+      putInChangelogStateDBIfNotExist(buildGenIdKey(baseDn),
+                                      buildGenIdData(baseDn, generationId));
+      return db;
+    }
+    catch (RuntimeException e)
+    {
+      throw new ChangelogException(e);
+    }
+  }
+
+  private String buildGenIdKey(String baseDn)
+  {
+    return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
+  }
+
+  private String buildServerIdKey(String baseDn, int serverId)
+  {
+    return serverId + FIELD_SEPARATOR + baseDn;
+  }
+
+  private String buildGenIdData(String baseDn, long generationId)
+  {
+    return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
+        + baseDn;
+  }
+
+  private void putInChangelogStateDBIfNotExist(String keyString,
+      String dataString) throws RuntimeException
+  {
+    DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
     DatabaseEntry data = new DatabaseEntry();
-    OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
-    if (status == OperationStatus.NOTFOUND)
+    if (changelogStateDb.get(null, key, data, DEFAULT) == NOTFOUND)
     {
       Transaction txn = dbEnvironment.beginTransaction(null, null);
       try
       {
-        data.setData(dataByteId);
+        data.setData(toBytes(dataString));
         if (debugEnabled())
-          TRACER.debugInfo("Created in the state Db record key=[" + keyString
+          debug("putting record in the changelogstate Db key=[" + keyString
               + "] value=[" + dataString + "]");
-        stateDb.put(txn, key, data);
+        changelogStateDb.put(txn, key, data);
         txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
       }
       catch (DatabaseException dbe)
@@ -367,7 +389,7 @@
     {
       try
       {
-        stateDb.close();
+        changelogStateDb.close();
       } catch (DatabaseException e)
       {
         MessageBuilder mb = new MessageBuilder();
@@ -397,18 +419,8 @@
    */
   public void clearGenerationId(String baseDn)
   {
-    String methodInvocation = "clearGenerationId(baseDN=" + baseDn + ")";
-
-    String key = GENERATION_ID_TAG + FIELD_SEPARATOR + baseDn;
-    OperationStatus status = deleteFromStateDB(key, methodInvocation);
-    if (status == OperationStatus.SUCCESS || status == OperationStatus.KEYEXIST)
-    {
-      // TODO : should have a better error logging
-      if (debugEnabled())
-        TRACER.debugInfo("In "
-            + this.replicationServer.getMonitorInstanceName()
-            + methodInvocation + " failed " + status);
-    }
+    deleteFromChangelogStateDB(buildGenIdKey(baseDn),
+        "clearGenerationId(baseDN=" + baseDn + ")");
   }
 
   /**
@@ -422,36 +434,30 @@
    */
   public void clearServerId(String baseDn, int serverId)
   {
-    String key = serverId + FIELD_SEPARATOR + baseDn;
-    deleteFromStateDB(key, "clearServerId(baseDN=" + baseDn + " , serverId="
-        + serverId + ")");
+    deleteFromChangelogStateDB(buildServerIdKey(baseDn, serverId),
+        "clearServerId(baseDN=" + baseDn + " , serverId=" + serverId + ")");
   }
 
-  private OperationStatus deleteFromStateDB(String keyString,
+  private void deleteFromChangelogStateDB(String keyString,
       String methodInvocation)
   {
     if (debugEnabled())
-      TRACER.debugInfo("In " + this.replicationServer.getMonitorInstanceName()
-          + " " + methodInvocation);
+      debug(methodInvocation + " starting");
 
     try
     {
-      final byte[] byteId = keyString.getBytes("UTF-8");
-      final DatabaseEntry key = new DatabaseEntry();
-      key.setData(byteId);
+      final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
       final DatabaseEntry data = new DatabaseEntry();
-      OperationStatus status = stateDb.get(null, key, data, LockMode.DEFAULT);
-      if (status != OperationStatus.NOTFOUND)
+      OperationStatus status = changelogStateDb.get(null, key, data, DEFAULT);
+      if (status == SUCCESS)
       {
         Transaction txn = dbEnvironment.beginTransaction(null, null);
         try
         {
-          stateDb.delete(txn, key);
+          changelogStateDb.delete(txn, key);
           txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
           if (debugEnabled())
-            TRACER.debugInfo(" In "
-                + this.replicationServer.getMonitorInstanceName() + " "
-                + methodInvocation + " succeeded " + status);
+            debug(methodInvocation + " succeeded");
         }
         catch (RuntimeException dbe)
         {
@@ -460,18 +466,18 @@
           throw dbe;
         }
       }
-      return status;
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      // can't happen
+      else
+      {
+        if (debugEnabled())
+          debug(methodInvocation + " failed: key=[ " + keyString
+              + "] not found");
+      }
     }
     catch (RuntimeException dbe)
     {
       // FIXME can actually happen (see catch above)
       // what should we do about it?
     }
-    return null;
   }
 
     /**
@@ -569,4 +575,10 @@
     replicationServer.shutdown();
   }
 
+  private void debug(String message)
+  {
+    TRACER.debugInfo("In " + replicationServer.getMonitorInstanceName() + ", "
+        + message);
+  }
+
 }

--
Gitblit v1.10.0