From 6fde75c6138d9f81009935d8d782209d8426e4de Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 29 Apr 2014 12:06:03 +0000
Subject: [PATCH] Added unit tests for encoding/decoding changelog state DB entries.

---
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java |  341 ++++++++++++++++++++++++++++++++++++--------------------
 1 files changed, 220 insertions(+), 121 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index fb11b44..ba63c39 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -28,7 +28,9 @@
 
 import java.io.File;
 import java.io.UnsupportedEncodingException;
-import java.util.List;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -87,58 +89,7 @@
 
     try
     {
-      EnvironmentConfig envConfig = new EnvironmentConfig();
-
-      /*
-       * Create the DB Environment that will be used for all the
-       * ReplicationServer activities related to the db
-       */
-      envConfig.setAllowCreate(true);
-      envConfig.setTransactional(true);
-      envConfig.setConfigParam(STATS_COLLECT, "false");
-      envConfig.setConfigParam(CLEANER_THREADS, "2");
-      envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
-      /*
-       * Tests have shown that since the parsing of the Replication log is
-       * always done sequentially, it is not necessary to use a large DB cache.
-       */
-      if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
-      {
-        /*
-         * If the JVM is reasonably large then we can safely default to bigger
-         * read buffers. This will result in more scalable checkpointer and
-         * cleaner performance.
-         */
-        envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
-        envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
-        envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
-
-        /*
-         * The cache size must be bigger in order to accommodate the larger
-         * buffers - see OPENDJ-943.
-         */
-        envConfig.setConfigParam(MAX_MEMORY, mb(16));
-      }
-      else
-      {
-        /*
-         * Use 5M so that the replication can be used with 64M total for the
-         * JVM.
-         */
-        envConfig.setConfigParam(MAX_MEMORY, mb(5));
-      }
-
-      // Since records are always added at the end of the Replication log and
-      // deleted at the beginning of the Replication log, this should never
-      // cause any deadlock.
-      envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
-      envConfig.setLockTimeout(0, TimeUnit.SECONDS);
-
-      // Since replication provides durability, we can reduce the DB durability
-      // level so that we are immune to application / JVM crashes.
-      envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
-
-      dbEnvironment = new Environment(new File(path), envConfig);
+      dbEnvironment = openJEEnvironment(path);
 
       /*
        * One database is created to store the update from each LDAP server in
@@ -153,6 +104,71 @@
     }
   }
 
+  /**
+   * Open a JE environment.
+   * <p>
+   * protected so it can be overridden by tests.
+   *
+   * @param path
+   *          the path to the JE environment in the filesystem
+   * @return the opened JE environment
+   */
+  protected Environment openJEEnvironment(String path)
+  {
+    final EnvironmentConfig envConfig = new EnvironmentConfig();
+
+    /*
+     * Create the DB Environment that will be used for all the
+     * ReplicationServer activities related to the db
+     */
+    envConfig.setAllowCreate(true);
+    envConfig.setTransactional(true);
+    envConfig.setConfigParam(STATS_COLLECT, "false");
+    envConfig.setConfigParam(CLEANER_THREADS, "2");
+    envConfig.setConfigParam(CHECKPOINTER_HIGH_PRIORITY, "true");
+    /*
+     * Tests have shown that since the parsing of the Replication log is
+     * always done sequentially, it is not necessary to use a large DB cache.
+     */
+    if (Runtime.getRuntime().maxMemory() > 256 * 1024 * 1024)
+    {
+      /*
+       * If the JVM is reasonably large then we can safely default to bigger
+       * read buffers. This will result in more scalable checkpointer and
+       * cleaner performance.
+       */
+      envConfig.setConfigParam(CLEANER_LOOK_AHEAD_CACHE_SIZE, mb(2));
+      envConfig.setConfigParam(LOG_ITERATOR_READ_SIZE, mb(2));
+      envConfig.setConfigParam(LOG_FAULT_READ_SIZE, kb(4));
+
+      /*
+       * The cache size must be bigger in order to accommodate the larger
+       * buffers - see OPENDJ-943.
+       */
+      envConfig.setConfigParam(MAX_MEMORY, mb(16));
+    }
+    else
+    {
+      /*
+       * Use 5M so that the replication can be used with 64M total for the
+       * JVM.
+       */
+      envConfig.setConfigParam(MAX_MEMORY, mb(5));
+    }
+
+    // Since records are always added at the end of the Replication log and
+    // deleted at the beginning of the Replication log, this should never
+    // cause any deadlock.
+    envConfig.setTxnTimeout(0, TimeUnit.SECONDS);
+    envConfig.setLockTimeout(0, TimeUnit.SECONDS);
+
+    // Since replication provides durability, we can reduce the DB durability
+    // level so that we are immune to application / JVM crashes.
+    envConfig.setDurability(Durability.COMMIT_WRITE_NO_SYNC);
+
+    return new Environment(new File(path), envConfig);
+  }
+
   private String kb(int sizeInKb)
   {
     return String.valueOf(sizeInKb * 1024);
@@ -163,8 +179,21 @@
     return String.valueOf(sizeInMb * 1024 * 1024);
   }
 
-  private Database openDatabase(String databaseName) throws ChangelogException,
-      RuntimeException
+  /**
+   * Open a JE database.
+   * <p>
+   * protected so it can be overridden by tests.
+   *
+   * @param databaseName
+   *          the databaseName to open
+   * @return the opened JE database
+   * @throws ChangelogException
+   *           if a problem happened opening the database
+   * @throws RuntimeException
+   *           if a problem happened with the JE database
+   */
+  protected Database openDatabase(String databaseName)
+      throws ChangelogException, RuntimeException
   {
     if (isShuttingDown.get())
     {
@@ -197,59 +226,90 @@
    */
   public ChangelogState readChangelogState() throws ChangelogException
   {
+    return decodeChangelogState(readWholeState());
+  }
+
+  /**
+   * Decode the whole changelog state DB.
+   *
+   * @param wholeState
+   *          the whole changelog state DB as a Map.
+   *          The Map is only used as a convenient collection of key => data objects 
+   * @return the decoded changelog state
+   * @throws ChangelogException
+   *           if a problem occurred while decoding
+   */
+  ChangelogState decodeChangelogState(Map<byte[], byte[]> wholeState)
+      throws ChangelogException
+  {
+    try
+    {
+      final ChangelogState result = new ChangelogState();
+      for (Entry<byte[], byte[]> entry : wholeState.entrySet())
+      {
+        final String stringKey = toString(entry.getKey());
+        final String stringData = toString(entry.getValue());
+
+        if (debugEnabled())
+        {
+          debug("read (key, value)=(" + stringKey + ", " + stringData + ")");
+        }
+
+        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
+        if (str[0].equals(GENERATION_ID_TAG))
+        {
+          final long generationId = toLong(str[1]);
+          final DN baseDN = DN.decode(str[2]);
+          if (debugEnabled())
+          {
+            debug("has read generationId: baseDN=" + baseDN + " generationId="
+                + generationId);
+          }
+          result.setDomainGenerationId(baseDN, generationId);
+        }
+        else
+        {
+          final int serverId = toInt(str[0]);
+          final DN baseDN = DN.decode(str[1]);
+          if (debugEnabled())
+          {
+            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
+          }
+          result.addServerIdToDomain(serverId, baseDN);
+        }
+      }
+      return result;
+    }
+    catch (DirectoryException e)
+    {
+      throw new ChangelogException(e.getMessageObject(), e);
+    }
+  }
+
+  private Map<byte[], byte[]> readWholeState() throws ChangelogException
+  {
     DatabaseEntry key = new DatabaseEntry();
     DatabaseEntry data = new DatabaseEntry();
     Cursor cursor = changelogStateDb.openCursor(null, null);
 
     try
     {
-      final ChangelogState result = new ChangelogState();
+      final Map<byte[], byte[]> results = new LinkedHashMap<byte[], byte[]>();
 
       OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
       while (status == OperationStatus.SUCCESS)
       {
-        final String stringData = toString(data.getData());
-
-        if (debugEnabled())
-          debug("read (" + GENERATION_ID_TAG + " generationId baseDN) OR "
-              + "(serverId baseDN): " + stringData);
-
-        final String[] str = stringData.split(FIELD_SEPARATOR, 3);
-        if (str[0].equals(GENERATION_ID_TAG))
-        {
-          long generationId = toLong(str[1]);
-          DN baseDN = DN.decode(str[2]);
-
-          if (debugEnabled())
-            debug("has read baseDN=" + baseDN + " generationId=" +generationId);
-
-          result.setDomainGenerationId(baseDN, generationId);
-        }
-        else
-        {
-          int serverId = toInt(str[0]);
-          DN baseDN = DN.decode(str[1]);
-
-          if (debugEnabled())
-            debug("has read: baseDN=" + baseDN + " serverId=" + serverId);
-
-          result.addServerIdToDomain(serverId, baseDN);
-        }
-
+        results.put(key.getData(), data.getData());
         status = cursor.getNext(key, data, LockMode.DEFAULT);
       }
 
-      return result;
+      return results;
     }
     catch (RuntimeException e)
     {
       final Message message = ERR_JEB_DATABASE_EXCEPTION.get(e.getMessage());
       throw new ChangelogException(message, e);
     }
-    catch (DirectoryException e)
-    {
-      throw new ChangelogException(e.getMessageObject(), e);
-    }
     finally
     {
       close(cursor);
@@ -261,7 +321,8 @@
     try
     {
       return Integer.parseInt(data);
-    } catch (NumberFormatException e)
+    }
+    catch (NumberFormatException e)
     {
       // should never happen
       // TODO: i18n
@@ -301,7 +362,14 @@
     }
   }
 
-  private byte[] toBytes(String s)
+  /**
+   * Converts the string to a UTF8-encoded byte array.
+   *
+   * @param s
+   *          the string to convert
+   * @return the byte array representation of the UTF8-encoded string
+   */
+  static byte[] toBytes(String s)
   {
     try
     {
@@ -315,8 +383,8 @@
   }
 
   /**
-   * Finds or creates the database used to store changes from the server with
-   * the given serverId and the given baseDN.
+   * Finds or creates the database used to store changes for a replica with the
+   * given baseDN and serverId.
    *
    * @param serverId
    *          The server id that identifies the server.
@@ -328,26 +396,27 @@
    * @throws ChangelogException
    *           in case of underlying Exception.
    */
-  public Database getOrAddDb(int serverId, DN baseDN, long generationId)
+  public Database getOrAddReplicationDB(int serverId, DN baseDN, long generationId)
       throws ChangelogException
   {
     if (debugEnabled())
+    {
       debug("ReplicationDbEnv.getOrAddDb(" + serverId + ", " + baseDN + ", "
           + generationId + ")");
+    }
     try
     {
       // JNR: redundant info is stored between the key and data down below.
       // It is probably ok since "changelogstate" DB does not receive a high
       // volume of inserts.
-      final String serverIdToBaseDn = buildServerIdKey(baseDN, serverId);
+      Entry<String, String> replicaEntry = toReplicaEntry(baseDN, serverId);
 
       // Opens the DB for the changes received from this server on this domain.
-      Database db = openDatabase(serverIdToBaseDn);
+      final Database replicaDB = openDatabase(replicaEntry.getKey());
 
-      putInChangelogStateDBIfNotExist(serverIdToBaseDn, serverIdToBaseDn);
-      putInChangelogStateDBIfNotExist(buildGenIdKey(baseDN),
-                                      buildGenIdData(baseDN, generationId));
-      return db;
+      putInChangelogStateDBIfNotExist(replicaEntry);
+      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
+      return replicaDB;
     }
     catch (RuntimeException e)
     {
@@ -355,36 +424,57 @@
     }
   }
 
-  private String buildGenIdKey(DN baseDN)
+  /**
+   * Return an entry to store in the changelog state database representing a
+   * replica in the topology.
+   *
+   * @param baseDN
+   *          the replica's baseDN
+   * @param serverId
+   *          the replica's serverId
+   * @return a database entry for the replica
+   */
+  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
   {
-    return GENERATION_ID_TAG + FIELD_SEPARATOR + baseDN.toNormalizedString();
+    final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
+    return new SimpleImmutableEntry<String, String>(key, key);
   }
 
-  private String buildServerIdKey(DN baseDN, int serverId)
+  /**
+   * Return an entry to store in the changelog state database representing the
+   * domain generation id.
+   *
+   * @param baseDN
+   *          the domain's baseDN
+   * @param generationId
+   *          the domain's generationId
+   * @return a database entry for the generationId
+   */
+  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
   {
-    return serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
+    final String normDn = baseDN.toNormalizedString();
+    final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
+    final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
+        + FIELD_SEPARATOR + normDn;
+    return new SimpleImmutableEntry<String, String>(key, data);
   }
 
-  private String buildGenIdData(DN baseDN, long generationId)
+  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
+      throws RuntimeException
   {
-    return GENERATION_ID_TAG + FIELD_SEPARATOR + generationId + FIELD_SEPARATOR
-        + baseDN.toNormalizedString();
-  }
-
-  private void putInChangelogStateDBIfNotExist(String keyString,
-      String dataString) throws RuntimeException
-  {
-    DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
+    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
     DatabaseEntry data = new DatabaseEntry();
     if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
     {
       Transaction txn = dbEnvironment.beginTransaction(null, null);
       try
       {
-        data.setData(toBytes(dataString));
+        data.setData(toBytes(entry.getValue()));
         if (debugEnabled())
-          debug("putting record in the changelogstate Db key=[" + keyString
-              + "] value=[" + dataString + "]");
+        {
+          debug("putting record in the changelogstate Db key=["
+              + entry.getKey() + "] value=[" + entry.getValue() + "]");
+        }
         changelogStateDb.put(txn, key, data);
         txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
       }
@@ -475,7 +565,8 @@
    */
   public void clearGenerationId(DN baseDN) throws ChangelogException
   {
-    deleteFromChangelogStateDB(buildGenIdKey(baseDN),
+    final int unusedGenId = 0;
+    deleteFromChangelogStateDB(toGenIdEntry(baseDN, unusedGenId),
         "clearGenerationId(baseDN=" + baseDN + ")");
   }
 
@@ -492,19 +583,21 @@
    */
   public void clearServerId(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(buildServerIdKey(baseDN, serverId),
+    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
         "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
   }
 
-  private void deleteFromChangelogStateDB(String keyString,
+  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
       String methodInvocation) throws ChangelogException
   {
     if (debugEnabled())
+    {
       debug(methodInvocation + " starting");
+    }
 
     try
     {
-      final DatabaseEntry key = new DatabaseEntry(toBytes(keyString));
+      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
       final DatabaseEntry data = new DatabaseEntry();
       if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
       {
@@ -514,7 +607,9 @@
           changelogStateDb.delete(txn, key);
           txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
           if (debugEnabled())
+          {
             debug(methodInvocation + " succeeded");
+          }
         }
         catch (RuntimeException dbe)
         {
@@ -526,8 +621,10 @@
       else
       {
         if (debugEnabled())
-          debug(methodInvocation + " failed: key=[ " + keyString
+        {
+          debug(methodInvocation + " failed: key=[ " + entry.getKey()
               + "] not found");
+        }
       }
     }
     catch (RuntimeException dbe)
@@ -570,7 +667,9 @@
         try
         {
           if (txn != null)
+          {
             txn.abort();
+          }
         }
         catch(Exception e)
         { /* do nothing */ }

--
Gitblit v1.10.0