From 45a8024fe68e7bc451a5a22afcaf31e7edb745a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 12 Aug 2013 15:22:03 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java |  159 +++++++++++++++++++++++++++++++++-------------------
 1 files changed, 101 insertions(+), 58 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 867d23d..b3d89ba 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -27,26 +27,28 @@
  */
 package org.opends.server.replication.server;
 
-import static com.sleepycat.je.LockMode.*;
-import static com.sleepycat.je.OperationStatus.*;
-
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.util.StaticUtils.*;
-
 import java.io.Closeable;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
+import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.util.StaticUtils;
 
 import com.sleepycat.je.*;
 
+import static com.sleepycat.je.LockMode.*;
+import static com.sleepycat.je.OperationStatus.*;
+
+import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.loggers.ErrorLogger.*;
+import static org.opends.server.util.StaticUtils.*;
+
 /**
  * This class implements the interface between the underlying database
  * and the dbHandler class.
@@ -67,8 +69,7 @@
    * The lock used to provide exclusive access to the thread that close the db
    * (shutdown or clear).
    */
-  private final ReentrantReadWriteLock dbCloseLock =
-      new ReentrantReadWriteLock(true);
+  private final ReadWriteLock dbCloseLock = new ReentrantReadWriteLock(true);
 
   // Change counter management
   // The Db itself does not allow to count records between a start and an end
@@ -117,12 +118,12 @@
    * @param baseDn The baseDn of the replication domain.
    * @param replicationServer The ReplicationServer that needs to be shutdown.
    * @param dbenv The Db environment to use to create the db.
-   * @throws DatabaseException If a database problem happened.
+   * @throws ChangelogException If a database problem happened.
    */
   public ReplicationDB(int serverId, String baseDn,
                      ReplicationServer replicationServer,
                      ReplicationDbEnv dbenv)
-                     throws DatabaseException
+                     throws ChangelogException
   {
     this.serverId = serverId;
     this.baseDn = baseDn;
@@ -138,13 +139,15 @@
     intializeCounters();
   }
 
-  private void intializeCounters()
+  private void intializeCounters() throws ChangelogException
   {
     this.counterCurrValue = 1;
 
-    Cursor cursor = db.openCursor(null, null);
+    Cursor cursor = null;
     try
     {
+      cursor = db.openCursor(null, null);
+
       int distBackToCounterRecord = 0;
       DatabaseEntry key = new DatabaseEntry();
       DatabaseEntry data = new DatabaseEntry();
@@ -164,9 +167,13 @@
       }
       counterCurrValue += distBackToCounterRecord;
     }
+    catch (DatabaseException e)
+    {
+      throw new ChangelogException(e);
+    }
     finally
     {
-      cursor.close();
+      close(cursor);
     }
   }
 
@@ -205,7 +212,7 @@
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
     }
     finally
     {
@@ -213,7 +220,14 @@
     }
   }
 
+  private void handleUnexpectedDatabaseException(DatabaseException e)
+  {
+    ChangelogException ex = new ChangelogException(e);
+    replicationServer.handleUnexpectedChangelogException(ex);
+  }
+
   private void insertCounterRecordIfNeeded(ChangeNumber changeNumber)
+      throws DatabaseException
   {
     if (counterCurrValue != 0 && (counterCurrValue % counterWindowSize == 0))
     {
@@ -276,13 +290,12 @@
    * ReplicationServer DB.
    *
    * @param changeNumber The ChangeNumber from which the cursor must start.
-   * @throws DatabaseException If a database error prevented the cursor
-   *                           creation.
-   * @throws Exception if the ReplServerDBCursor creation failed.
+   * @throws ChangelogException
+   *           When a problem occurs or the startingChangeNumber does not exist.
    * @return The ReplServerDBCursor.
    */
   public ReplServerDBCursor openReadCursor(ChangeNumber changeNumber)
-                throws DatabaseException, Exception
+      throws ChangelogException
   {
     return new ReplServerDBCursor(changeNumber);
   }
@@ -291,21 +304,19 @@
    * Create a cursor that can be used to delete some record from this
    * ReplicationServer database.
    *
-   * @throws DatabaseException If a database error prevented the cursor
+   * @throws ChangelogException If a database error prevented the cursor
    *                           creation.
-   * @throws Exception if the ReplServerDBCursor creation failed.
    *
    * @return The ReplServerDBCursor.
    */
-  public ReplServerDBCursor openDeleteCursor()
-                throws DatabaseException, Exception
+  public ReplServerDBCursor openDeleteCursor() throws ChangelogException
   {
     return new ReplServerDBCursor();
   }
 
 
 
-  private void closeAndReleaseReadLock(Cursor cursor) throws DatabaseException
+  private void closeAndReleaseReadLock(Cursor cursor)
   {
     try
     {
@@ -362,7 +373,7 @@
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
       return null;
     }
     finally
@@ -421,7 +432,7 @@
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
       return null;
     }
     finally
@@ -482,7 +493,7 @@
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
     }
     finally
     {
@@ -492,7 +503,7 @@
   }
 
   private ChangeNumber getRegularRecord(Cursor cursor, DatabaseEntry key,
-      DatabaseEntry data)
+      DatabaseEntry data) throws DatabaseException
   {
     final ChangeNumber cn = toChangeNumber(key.getData());
     if (!isACounterRecord(cn))
@@ -548,11 +559,11 @@
      *
      * @param startingChangeNumber
      *          The ChangeNumber from which the cursor must start.
-     * @throws Exception
+     * @throws ChangelogException
      *           When the startingChangeNumber does not exist.
      */
     private ReplServerDBCursor(ChangeNumber startingChangeNumber)
-        throws Exception
+        throws ChangelogException
     {
       if (startingChangeNumber != null)
       {
@@ -591,7 +602,8 @@
             if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
             {
               // We could not even move the cursor closed to it => failure
-              throw new Exception("ChangeNumber not available");
+              throw new ChangelogException(
+                  Message.raw("ChangeNumber not available"));
             }
 
             // We can move close to the startingChangeNumber.
@@ -607,15 +619,21 @@
         }
         cursor = localCursor;
       }
-      catch (Exception e)
+      catch (ChangelogException e)
       {
         // Unlocking is required before throwing any exception
         closeAndReleaseReadLock(localCursor);
         throw e;
       }
+      catch (DatabaseException e)
+      {
+        // Unlocking is required before throwing any exception
+        closeAndReleaseReadLock(localCursor);
+        throw new ChangelogException(e);
+      }
     }
 
-    private ReplServerDBCursor() throws Exception
+    private ReplServerDBCursor() throws ChangelogException
     {
       key = new DatabaseEntry();
       data = new DatabaseEntry();
@@ -644,22 +662,32 @@
         txn = localTxn;
         cursor = localCursor;
       }
+      catch (ChangelogException e)
+      {
+        closeAndReleaseReadLock(localCursor);
+        abort(localTxn);
+        throw e;
+      }
       catch (Exception e)
       {
         closeAndReleaseReadLock(localCursor);
+        abort(localTxn);
+        throw new ChangelogException(e);
+      }
+    }
 
-        if (localTxn != null)
+    private void abort(Transaction localTxn)
+    {
+      if (localTxn != null)
+      {
+        try
         {
-          try
-          {
-            localTxn.abort();
-          }
-          catch (DatabaseException ignore)
-          {
-            // Ignore.
-          }
+          localTxn.abort();
         }
-        throw e;
+        catch (DatabaseException ignore)
+        {
+          // Ignore.
+        }
       }
     }
 
@@ -689,7 +717,7 @@
         }
         catch (DatabaseException e)
         {
-          replicationServer.handleUnexpectedDatabaseException(e);
+          handleUnexpectedDatabaseException(e);
         }
       }
     }
@@ -722,7 +750,7 @@
         }
         catch (DatabaseException e)
         {
-          replicationServer.handleUnexpectedDatabaseException(e);
+          handleUnexpectedDatabaseException(e);
         }
       }
     }
@@ -731,20 +759,27 @@
      * Get the next ChangeNumber in the database from this Cursor.
      *
      * @return The next ChangeNumber in the database from this cursor.
-     * @throws DatabaseException In case of underlying database problem.
+     * @throws ChangelogException In case of underlying database problem.
      */
-    public ChangeNumber nextChangeNumber() throws DatabaseException
+    public ChangeNumber nextChangeNumber() throws ChangelogException
     {
       if (isClosed)
       {
         return null;
       }
 
-      if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
+      try
       {
-        return null;
+        if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
+        {
+          return null;
+        }
+        return toChangeNumber(key.getData());
       }
-      return toChangeNumber(key.getData());
+      catch (DatabaseException e)
+      {
+        throw new ChangelogException(e);
+      }
     }
 
     /**
@@ -807,26 +842,32 @@
     /**
      * Delete the record at the current cursor position.
      *
-     * @throws DatabaseException In case of database problem.
+     * @throws ChangelogException In case of database problem.
      */
-    public void delete() throws DatabaseException
+    public void delete() throws ChangelogException
     {
       if (isClosed)
       {
         throw new IllegalStateException("ReplServerDBCursor already closed");
       }
 
-      cursor.delete();
+      try
+      {
+        cursor.delete();
+      }
+      catch (DatabaseException e)
+      {
+        throw new ChangelogException(e);
+      }
     }
-  } // ReplServerDBCursor
+  }
 
   /**
    * Clears this change DB from the changes it contains.
    *
-   * @throws Exception Throws an exception it occurs.
-   * @throws DatabaseException Throws a DatabaseException when it occurs.
+   * @throws ChangelogException In case of database problem.
    */
-  public void clear() throws Exception, DatabaseException
+  public void clear() throws ChangelogException
   {
     // The coming users will be blocked until the clear is done
     dbCloseLock.writeLock().lock();
@@ -915,7 +956,7 @@
     }
     catch (DatabaseException e)
     {
-      replicationServer.handleUnexpectedDatabaseException(e);
+      handleUnexpectedDatabaseException(e);
     }
     finally
     {
@@ -927,6 +968,7 @@
 
   private void findFirstCounterRecordAfterStartPoint(ChangeNumber start,
       ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
+      throws DatabaseException
   {
     Cursor cursor = db.openCursor(null, null);
     try
@@ -981,6 +1023,7 @@
 
   private boolean findFirstCounterRecordBeforeStopPoint(ChangeNumber start,
       ChangeNumber stop, int[] counterValues, int[] distanceToCounterRecords)
+      throws DatabaseException
   {
     Cursor cursor = db.openCursor(null, null);
     try

--
Gitblit v1.10.0