From 45a8024fe68e7bc451a5a22afcaf31e7edb745a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 12 Aug 2013 15:22:03 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/DraftCNDB.java |  224 +++++++++++++++++++++++++++++--------------------------
 1 files changed, 117 insertions(+), 107 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
index 2d2e04c..5d7db0d 100644
--- a/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
+++ b/opends/src/server/org/opends/server/replication/server/DraftCNDB.java
@@ -27,6 +27,19 @@
  */
 package org.opends.server.replication.server;
 
+import java.io.Closeable;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.opends.messages.Message;
+import org.opends.messages.MessageBuilder;
+import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.DebugLogLevel;
+
+import com.sleepycat.je.*;
+
 import static com.sleepycat.je.LockMode.*;
 import static com.sleepycat.je.OperationStatus.*;
 
@@ -35,16 +48,6 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.util.StaticUtils.*;
 
-import java.io.Closeable;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.opends.messages.MessageBuilder;
-import org.opends.server.loggers.debug.DebugTracer;
-import org.opends.server.replication.common.ChangeNumber;
-import org.opends.server.types.DebugLogLevel;
-
-import com.sleepycat.je.*;
-
 /**
  * This class implements the interface between the underlying database
  * and the dbHandler class.
@@ -53,6 +56,8 @@
 public class DraftCNDB
 {
   private static final DebugTracer TRACER = getTracer();
+  private static final int DATABASE_EMPTY = 0;
+
   private Database db = null;
   private ReplicationDbEnv dbenv = null;
   private ReplicationServer replicationServer;
@@ -61,27 +66,23 @@
    * The lock used to provide exclusive access to the thread that close the db
    * (shutdown or clear).
    */
-  private ReentrantReadWriteLock dbCloseLock;
+  private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
 
   /**
    * Creates a new database or open existing database that will be used
    * to store and retrieve changes from an LDAP server.
    * @param replicationServer The ReplicationServer that needs to be shutdown.
    * @param dbenv The Db environment to use to create the db.
-   * @throws DatabaseException If a database problem happened.
+   * @throws ChangelogException If a database problem happened.
    */
-  public DraftCNDB(
-      ReplicationServer replicationServer,
-      ReplicationDbEnv dbenv)
-  throws DatabaseException
+  public DraftCNDB(ReplicationServer replicationServer, ReplicationDbEnv dbenv)
+      throws ChangelogException
   {
     this.dbenv = dbenv;
     this.replicationServer = replicationServer;
 
     // Get or create the associated ReplicationServerDomain and Db.
     db = dbenv.getOrCreateDraftCNDb();
-
-    dbCloseLock = new ReentrantReadWriteLock(true);
   }
 
   /**
@@ -101,8 +102,7 @@
     try
     {
       DatabaseEntry key = new ReplicationDraftCNKey(draftCN);
-      DatabaseEntry data = new DraftCNData(
-          value, domainBaseDN, changeNumber);
+      DatabaseEntry data = new DraftCNData(value, domainBaseDN, changeNumber);
 
       // Use a transaction so that we can override durability.
       Transaction txn = null;
@@ -121,24 +121,40 @@
       }
       finally
       {
-        if (txn != null)
-        {
-          // No effect if txn has committed.
-          try
-          {
-            txn.abort();
-          }
-          catch (Exception e)
-          {
-            // Ignored.
-          }
-        }
+        abort(txn);
         dbCloseLock.readLock().unlock();
       }
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
+    }
+    catch (ChangelogException e)
+    {
+      replicationServer.handleUnexpectedChangelogException(e);
+    }
+  }
+
+  /**
+   * Aborts the current transaction. It has no effect if the transaction has
+   * committed.
+   *
+   * @param txn
+   *          the transaction to abort
+   */
+  private static void abort(Transaction txn)
+  {
+    if (txn != null)
+    {
+      try
+      {
+        txn.abort();
+      }
+      catch (DatabaseException ignored)
+      {
+        // Ignore.
+        TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
+      }
     }
   }
 
@@ -147,18 +163,11 @@
    */
   public void shutdown()
   {
+    dbCloseLock.writeLock().lock();
     try
     {
-      dbCloseLock.writeLock().lock();
-      try
-      {
-        db.close();
-        db = null;
-      }
-      finally
-      {
-        dbCloseLock.writeLock().unlock();
-      }
+      db.close();
+      db = null;
     }
     catch (DatabaseException e)
     {
@@ -167,19 +176,21 @@
       mb.append(stackTraceToSingleLineString(e));
       logError(mb.toMessage());
     }
+    finally
+    {
+      dbCloseLock.writeLock().unlock();
+    }
   }
 
   /**
    * Create a cursor that can be used to search or iterate on this DB.
    *
    * @param draftCN The draftCN from which the cursor must start.
-   * @throws DatabaseException If a database error prevented the cursor
+   * @throws ChangelogException If a database error prevented the cursor
    *                           creation.
-   * @throws Exception if the ReplServerDBCursor creation failed.
    * @return The ReplServerDBCursor.
    */
-  public DraftCNDBCursor openReadCursor(int draftCN)
-  throws DatabaseException, Exception
+  public DraftCNDBCursor openReadCursor(int draftCN) throws ChangelogException
   {
     return new DraftCNDBCursor(draftCN);
   }
@@ -188,14 +199,11 @@
    * Create a cursor that can be used to delete some record from this
    * ReplicationServer database.
    *
-   * @throws DatabaseException If a database error prevented the cursor
+   * @throws ChangelogException If a database error prevented the cursor
    *                           creation.
-   * @throws Exception if the ReplServerDBCursor creation failed.
-   *
    * @return The ReplServerDBCursor.
    */
-  public DraftCNDBCursor openDeleteCursor()
-  throws DatabaseException, Exception
+  public DraftCNDBCursor openDeleteCursor() throws ChangelogException
   {
     return new DraftCNDBCursor();
   }
@@ -235,11 +243,10 @@
         DatabaseEntry entry = new DatabaseEntry();
         if (cursor.getFirst(key, entry, LockMode.DEFAULT) != SUCCESS)
         {
-          /* database is empty */
-          return 0;
+          return DATABASE_EMPTY;
         }
 
-        return new Integer(decodeUTF8(key.getData()));
+        return Integer.parseInt(decodeUTF8(key.getData()));
       }
       finally
       {
@@ -248,8 +255,7 @@
     }
     catch (DatabaseException e)
     {
-      /* database is faulty */
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
       return 0;
     }
   }
@@ -305,11 +311,10 @@
         DatabaseEntry entry = new DatabaseEntry();
         if (cursor.getLast(key, entry, LockMode.DEFAULT) != SUCCESS)
         {
-          /* database is empty */
-          return 0;
+          return DATABASE_EMPTY;
         }
 
-        return new Integer(decodeUTF8(key.getData()));
+        return Integer.parseInt(decodeUTF8(key.getData()));
       }
       finally
       {
@@ -318,11 +323,17 @@
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
       return 0;
     }
   }
 
+  private void handleUnexpectedDatabaseException(DatabaseException e)
+  {
+    ChangelogException ex = new ChangelogException(e);
+    replicationServer.handleUnexpectedChangelogException(ex);
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -357,10 +368,10 @@
      *
      * @param startingDraftCN
      *          the draftCN from which the cursor must start.
-     * @throws Exception
+     * @throws ChangelogException
      *           when the startingDraftCN does not exist.
      */
-    private DraftCNDBCursor(int startingDraftCN) throws Exception
+    private DraftCNDBCursor(int startingDraftCN) throws ChangelogException
     {
       this.key = new ReplicationDraftCNKey(startingDraftCN);
       this.entry = new DatabaseEntry();
@@ -391,8 +402,9 @@
             if (localCursor.getSearchKeyRange(key, entry, DEFAULT) != SUCCESS)
             {
               // We could not even move the cursor closed to it => failure
-              throw new Exception("ChangeLog Draft Change Number "
-                  + startingDraftCN + " is not available");
+              throw new ChangelogException(
+                  Message.raw("ChangeLog Draft Change Number " + startingDraftCN
+                      + " is not available"));
             }
 
             if (localCursor.getPrev(key, entry, LockMode.DEFAULT) != SUCCESS)
@@ -414,7 +426,13 @@
         this.txn = null;
         this.cursor = localCursor;
       }
-      catch (Exception e)
+      catch (DatabaseException e)
+      {
+        // Unlocking is required before throwing any exception
+        closeLockedCursor(localCursor);
+        throw new ChangelogException(e);
+      }
+      catch (ChangelogException e)
       {
         // Unlocking is required before throwing any exception
         closeLockedCursor(localCursor);
@@ -424,7 +442,7 @@
 
 
 
-    private DraftCNDBCursor() throws Exception
+    private DraftCNDBCursor() throws ChangelogException
     {
       Transaction localTxn = null;
       Cursor localCursor = null;
@@ -453,32 +471,20 @@
         this.txn = localTxn;
         this.cursor = localCursor;
       }
-      catch (Exception e)
+      catch (DatabaseException e)
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
 
-        try
-        {
-          closeLockedCursor(localCursor);
-        }
-        catch (DatabaseException ignored)
-        {
-          // Ignore.
-          TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
-        }
+        closeLockedCursor(localCursor);
+        DraftCNDB.abort(localTxn);
+        throw new ChangelogException(e);
+      }
+      catch (ChangelogException e)
+      {
+        TRACER.debugCaught(DebugLogLevel.ERROR, e);
 
-        if (localTxn != null)
-        {
-          try
-          {
-            localTxn.abort();
-          }
-          catch (DatabaseException ignored)
-          {
-            // Ignore.
-            TRACER.debugCaught(DebugLogLevel.ERROR, ignored);
-          }
-        }
+        closeLockedCursor(localCursor);
+        DraftCNDB.abort(localTxn);
         throw e;
       }
     }
@@ -508,7 +514,7 @@
         }
         catch (DatabaseException e)
         {
-          replicationServer.handleUnexpectedDatabaseException(e);
+          handleUnexpectedDatabaseException(e);
         }
       }
     }
@@ -541,7 +547,7 @@
         }
         catch (DatabaseException e)
         {
-          replicationServer.handleUnexpectedDatabaseException(e);
+          handleUnexpectedDatabaseException(e);
         }
       }
     }
@@ -593,7 +599,6 @@
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
       }
-
       return null;
     }
 
@@ -619,7 +624,6 @@
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
       }
-
       return -1;
     }
 
@@ -651,22 +655,22 @@
     /**
      * Go to the next record on the cursor.
      * @return the next record on this cursor.
-     * @throws DatabaseException a.
+     * @throws ChangelogException a.
      */
-    public boolean next() throws DatabaseException
+    public boolean next() throws ChangelogException
     {
       if (isClosed)
       {
         return false;
       }
 
-      OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
-      if (status != OperationStatus.SUCCESS)
-      {
-        seqnumData = null;
-        return false;
-      }
       try {
+        OperationStatus status = cursor.getNext(key, entry, LockMode.DEFAULT);
+        if (status != OperationStatus.SUCCESS)
+        {
+          seqnumData = null;
+          return false;
+        }
         seqnumData = new DraftCNData(entry.getData());
       }
       catch(Exception e)
@@ -679,16 +683,23 @@
     /**
      * Delete the record at the current cursor position.
      *
-     * @throws DatabaseException In case of database problem.
+     * @throws ChangelogException In case of database problem.
      */
-    public void delete() throws DatabaseException
+    public void delete() throws ChangelogException
     {
       if (isClosed)
       {
         throw new IllegalStateException("DraftCNDB already closed");
       }
 
-      cursor.delete();
+      try
+      {
+        cursor.delete();
+      }
+      catch (DatabaseException e)
+      {
+        throw new ChangelogException(e);
+      }
     }
 
     /**
@@ -710,10 +721,9 @@
   /**
    * Clears this change DB from the changes it contains.
    *
-   * @throws Exception Throws an exception it occurs.
-   * @throws DatabaseException Throws a DatabaseException when it occurs.
+   * @throws ChangelogException Throws a DatabaseException when it occurs.
    */
-  public void clear() throws Exception, DatabaseException
+  public void clear() throws ChangelogException
   {
     // The coming users will be blocked until the clear is done
     dbCloseLock.writeLock().lock();

--
Gitblit v1.10.0