From 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Wed, 08 Jun 2011 14:33:10 +0000
Subject: [PATCH] Fix OPENDJ-184: Transient errors when accessing cn=changelog DraftCN DB result in complete shutdown of the replication service

---
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java |  675 +++++++++++++++++++++++---------------------------------
 1 files changed, 276 insertions(+), 399 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 2a6c676..411c776 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -35,21 +35,13 @@
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.util.List;
-import java.io.UnsupportedEncodingException;
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
 
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
+import com.sleepycat.je.*;
 
 /**
  * This class implements the interface between the underlying database
@@ -64,9 +56,6 @@
   private int serverId;
   private String baseDn;
 
-  // The maximum number of retries in case of DatabaseDeadlock Exception.
-  private static final int DEADLOCK_RETRIES = 10;
-
   // The lock used to provide exclusive access to the thread that
   // close the db (shutdown or clear).
   private ReentrantReadWriteLock dbCloseLock;
@@ -180,98 +169,48 @@
    */
   public void addEntries(List<UpdateMsg> changes)
   {
-    Transaction txn = null;
-
     try
     {
-      int tries = 0;
-      boolean done = false;
-
-      // The database can return a Deadlock Exception if several threads are
-      // accessing the database at the same time. This Exception is a
-      // transient state, when it happens the transaction is aborted and
-      // the operation is attempted again up to DEADLOCK_RETRIES times.
-      while ((tries++ < DEADLOCK_RETRIES) && (!done))
+      dbCloseLock.readLock().lock();
+      try
       {
-        dbCloseLock.readLock().lock();
-        try
+        for (UpdateMsg change : changes)
         {
-          txn = dbenv.beginTransaction();
+          DatabaseEntry key = new ReplicationKey(
+              change.getChangeNumber());
+          DatabaseEntry data = new ReplicationData(change);
 
-          for (UpdateMsg change : changes)
+          if ((counterCurrValue != 0)
+              && (counterCurrValue % counterWindowSize == 0))
           {
-            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
-            DatabaseEntry data = new ReplicationData(change);
-
-            if ((counterCurrValue!=0) &&
-                (counterCurrValue%counterWindowSize == 0))
-            {
-              // enough changes to generate a counter record - wait for the next
-              // change fo time
-              counterTsLimit = change.getChangeNumber().getTime();
-            }
-            if ((counterTsLimit!=0)
-                && (change.getChangeNumber().getTime() != counterTsLimit))
-            {
-              // Write the counter record
-              DatabaseEntry counterKey = new ReplicationKey(
-                  new ChangeNumber(
-                  change.getChangeNumber().getTime(),
-                  0, 0));
-              DatabaseEntry counterValue =
-                encodeCounterValue(counterCurrValue-1);
-              db.put(txn, counterKey, counterValue);
-              counterTsLimit=0;
-            }
-            db.put(txn, key, data);
-            counterCurrValue++;
-
+            // enough changes to generate a counter record - wait for the next
+            // change of time
+            counterTsLimit = change.getChangeNumber().getTime();
           }
-          txn.commitWriteNoSync();
-          txn = null;
-          done = true;
-        }
-        catch (LockConflictException e)
-        {
-          // Try again.
-        }
-        finally
-        {
-          if (txn != null)
+          if ((counterTsLimit != 0)
+              && (change.getChangeNumber().getTime() != counterTsLimit))
           {
-            // No effect if txn has committed.
-            txn.abort();
-            txn = null;
+            // Write the counter record
+            DatabaseEntry counterKey = new ReplicationKey(
+                new ChangeNumber(change.getChangeNumber().getTime(),
+                    0, 0));
+            DatabaseEntry counterValue =
+              encodeCounterValue(counterCurrValue - 1);
+            db.put(null, counterKey, counterValue);
+            counterTsLimit = 0;
           }
-
-          dbCloseLock.readLock().unlock();
+          db.put(null, key, data);
+          counterCurrValue++;
         }
       }
-      if (!done)
+      finally
       {
-        // Could not write to the DB after DEADLOCK_RETRIES tries.
-        // This ReplicationServer is not reliable and will be shutdown.
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        logError(mb.toMessage());
-        replicationServer.shutdown();
+        dbCloseLock.readLock().unlock();
       }
     }
     catch (DatabaseException e)
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
-    }
-    catch (UnsupportedEncodingException e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
     }
   }
 
@@ -334,13 +273,24 @@
     return new ReplServerDBCursor();
   }
 
+
+
   private void closeLockedCursor(Cursor cursor)
-    throws DatabaseException
+      throws DatabaseException
   {
     try
     {
       if (cursor != null)
-        cursor.close();
+      {
+        try
+        {
+          cursor.close();
+        }
+        catch (DatabaseException e)
+        {
+          // Ignore.
+        }
+      }
     }
     finally
     {
@@ -358,53 +308,53 @@
     String str = null;
     ChangeNumber cn = null;
 
-    dbCloseLock.readLock().lock();
     try
     {
-      DatabaseEntry key = new DatabaseEntry();
-      DatabaseEntry data = new DatabaseEntry();
-
-      cursor = db.openCursor(null, null);
-
-      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
-
-      if (status != OperationStatus.SUCCESS)
+      dbCloseLock.readLock().lock();
+      try
       {
-        /* database is empty */
-        return null;
-      }
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
 
-      str = decodeUTF8(key.getData());
-      cn = new ChangeNumber(str);
-      if (ReplicationDB.isaCounter(cn))
-      {
-        // First record is a counter record .. go next
-        status = cursor.getNext(key, data, LockMode.DEFAULT);
+        cursor = db.openCursor(null, null);
+
+        OperationStatus status = cursor.getFirst(key, data,
+            LockMode.DEFAULT);
+
         if (status != OperationStatus.SUCCESS)
         {
-          // DB contains only a counter record
+          /* database is empty */
           return null;
         }
-        else
+
+        str = decodeUTF8(key.getData());
+        cn = new ChangeNumber(str);
+        if (ReplicationDB.isaCounter(cn))
         {
-          cn = new ChangeNumber(decodeUTF8(key.getData()));
+          // First record is a counter record .. go next
+          status = cursor.getNext(key, data, LockMode.DEFAULT);
+          if (status != OperationStatus.SUCCESS)
+          {
+            // DB contains only a counter record
+            return null;
+          }
+          else
+          {
+            cn = new ChangeNumber(decodeUTF8(key.getData()));
+          }
         }
       }
+      finally
+      {
+        closeLockedCursor(cursor);
+      }
     }
     catch (DatabaseException e)
     {
       /* database is faulty */
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       cn = null;
     }
-    finally
-    {
-      closeLockedCursor(cursor);
-    }
     return cn;
   }
 
@@ -420,57 +370,56 @@
     Cursor cursor = null;
     ChangeNumber cn = null;
 
-    dbCloseLock.readLock().lock();
     try
     {
-      DatabaseEntry key = new DatabaseEntry();
-      DatabaseEntry data = new DatabaseEntry();
-
-      cursor = db.openCursor(null, null);
-
-      OperationStatus status = cursor.getLast(key, data,
-          LockMode.DEFAULT);
-
-      if (status != OperationStatus.SUCCESS)
+      dbCloseLock.readLock().lock();
+      try
       {
-        /* database is empty */
-        return null;
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+
+        cursor = db.openCursor(null, null);
+
+        OperationStatus status = cursor.getLast(key, data,
+            LockMode.DEFAULT);
+
+        if (status != OperationStatus.SUCCESS)
+        {
+          /* database is empty */
+          return null;
+        }
+
+        String str = decodeUTF8(key.getData());
+        cn = new ChangeNumber(str);
+        if (ReplicationDB.isaCounter(cn))
+        {
+          if (cursor.getPrev(key, data, LockMode.DEFAULT)
+              != OperationStatus.SUCCESS)
+          {
+            /*
+             * database only contain a counter record - don't know how much it
+             * can be possible but ...
+             */
+            cn = null;
+          }
+          else
+          {
+            str = decodeUTF8(key.getData());
+            cn = new ChangeNumber(str);
+            // There can't be 2 counter record next to each other
+          }
+        }
       }
-
-      String str = decodeUTF8(key.getData());
-      cn = new ChangeNumber(str);
-      if (ReplicationDB.isaCounter(cn))
+      finally
       {
-        if (cursor.getPrev(key, data,
-            LockMode.DEFAULT) != OperationStatus.SUCCESS)
-        {
-          /*
-           * database only contain a counter record - don't know how much it can
-           * be possible but ...
-           */
-          cn = null;
-        }
-        else
-        {
-          str = decodeUTF8(key.getData());
-          cn= new ChangeNumber(str);
-          // There can't be 2 counter record next to each other
-        }
+        closeLockedCursor(cursor);
       }
     }
     catch (DatabaseException e)
     {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       cn = null;
     }
-    finally
-    {
-      closeLockedCursor(cursor);
-    }
 
     return cn;
   }
@@ -496,85 +445,80 @@
     DatabaseEntry key = new ReplicationKey(changeNumber);
     DatabaseEntry data = new DatabaseEntry();
 
-    Transaction txn = null;
-
-    dbCloseLock.readLock().lock();
     try
     {
-      cursor = db.openCursor(txn, null);
-      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) ==
-              OperationStatus.SUCCESS)
+      dbCloseLock.readLock().lock();
+      try
       {
-        // We can move close to the changeNumber.
-        // Let's move to the previous change.
-        if (cursor.getPrev(key, data, LockMode.DEFAULT) ==
-                OperationStatus.SUCCESS)
+        cursor = db.openCursor(null, null);
+        if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT)
+            == OperationStatus.SUCCESS)
         {
-          String str = decodeUTF8(key.getData());
-          cn = new ChangeNumber(str);
-          if (ReplicationDB.isaCounter(cn))
+          // We can move close to the changeNumber.
+          // Let's move to the previous change.
+          if (cursor.getPrev(key, data, LockMode.DEFAULT)
+              == OperationStatus.SUCCESS)
           {
-            if (cursor.getPrev(key, data,
-                 LockMode.DEFAULT) != OperationStatus.SUCCESS)
+            String str = decodeUTF8(key.getData());
+            cn = new ChangeNumber(str);
+            if (ReplicationDB.isaCounter(cn))
             {
-              // database starts with a counter record.
-              cn = null;
+              if (cursor.getPrev(key, data, LockMode.DEFAULT)
+                  != OperationStatus.SUCCESS)
+              {
+                // database starts with a counter record.
+                cn = null;
+              }
+              else
+              {
+                str = decodeUTF8(key.getData());
+                cn = new ChangeNumber(str);
+                // There can't be 2 counter record next to each other
+              }
             }
-            else
+          }
+          // else, there was no change previous to our changeNumber.
+        }
+        else
+        {
+          // We could not move the cursor past to the changeNumber
+          // Check if the last change is older than changeNumber
+          if (cursor.getLast(key, data, LockMode.DEFAULT)
+              == OperationStatus.SUCCESS)
+          {
+            String str = decodeUTF8(key.getData());
+            cn = new ChangeNumber(str);
+            if (ReplicationDB.isaCounter(cn))
             {
-              str = decodeUTF8(key.getData());
-              cn= new ChangeNumber(str);
-              // There can't be 2 counter record next to each other
+              if (cursor.getPrev(key, data, LockMode.DEFAULT)
+                  != OperationStatus.SUCCESS)
+              {
+                /*
+                 * database only contain a counter record, should not be
+                 * possible, but Ok, let's just say no change Number
+                 */
+                cn = null;
+              }
+              else
+              {
+                str = decodeUTF8(key.getData());
+                cn = new ChangeNumber(str);
+                // There can't be 2 counter record next to each other
+              }
             }
           }
         }
-        // else, there was no change previous to our changeNumber.
       }
-      else
+      finally
       {
-        // We could not move the cursor past to the changeNumber
-        // Check if the last change is older than changeNumber
-        if (cursor.getLast(key, data, LockMode.DEFAULT) ==
-                OperationStatus.SUCCESS)
-        {
-          String str = decodeUTF8(key.getData());
-          cn = new ChangeNumber(str);
-          if (ReplicationDB.isaCounter(cn))
-          {
-            if (cursor.getPrev(key, data,
-              LockMode.DEFAULT) != OperationStatus.SUCCESS)
-            {
-              /*
-               * database only contain a counter record, should not be
-               * possible, but Ok, let's just say no change Number
-               */
-              cn = null;
-            }
-            else
-            {
-              str = decodeUTF8(key.getData());
-              cn= new ChangeNumber(str);
-              // There can't be 2 counter record next to each other
-            }
-          }
-        }
+        closeLockedCursor(cursor);
       }
     }
     catch (DatabaseException e)
     {
-      /* database is faulty */
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      // TODO: Verify if shutting down replication is the right thing to do
-      replicationServer.shutdown();
+      replicationServer.handleUnexpectedDatabaseException(e);
       cn = null;
     }
-    finally
-    {
-      closeLockedCursor(cursor);
-    }
     return cn;
   }
 
@@ -669,14 +613,7 @@
       catch (Exception e)
       {
         // Unlocking is required before throwing any exception
-        try
-        {
-          closeLockedCursor(localCursor);
-        }
-        catch (Exception ignore)
-        {
-          // Ignore.
-        }
+        closeLockedCursor(localCursor);
         throw e;
       }
     }
@@ -703,14 +640,7 @@
       }
       catch (Exception e)
       {
-        try
-        {
-          closeLockedCursor(localCursor);
-        }
-        catch (Exception ignore)
-        {
-          // Ignore.
-        }
+        closeLockedCursor(localCursor);
 
         if (localTxn != null)
         {
@@ -741,41 +671,19 @@
         isClosed = true;
       }
 
-      boolean closeHasFailed = false;
-
-      try
-      {
-        closeLockedCursor(cursor);
-      }
-      catch (DatabaseException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        closeHasFailed = true;
-      }
-
+      closeLockedCursor(cursor);
       if (txn != null)
       {
         try
         {
-          txn.commit();
+          // No need for durability when purging.
+          txn.commit(Durability.COMMIT_NO_SYNC);
         }
         catch (DatabaseException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          closeHasFailed = true;
+          replicationServer.handleUnexpectedDatabaseException(e);
         }
       }
-
-      if (closeHasFailed)
-      {
-        replicationServer.shutdown();
-      }
     }
 
     /**
@@ -796,26 +704,7 @@
         isClosed = true;
       }
 
-      boolean closeHasFailed = false;
-
-      try
-      {
-        closeLockedCursor(cursor);
-      }
-      catch (LockConflictException e)
-      {
-        // The DB documentation states that a DeadlockException
-        // on the close method of a cursor that is aborting should
-        // be ignored.
-      }
-      catch (DatabaseException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        closeHasFailed = true;
-      }
+      closeLockedCursor(cursor);
 
       if (txn != null)
       {
@@ -825,18 +714,9 @@
         }
         catch (DatabaseException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          closeHasFailed = true;
+          replicationServer.handleUnexpectedDatabaseException(e);
         }
       }
-
-      if (closeHasFailed)
-      {
-        replicationServer.shutdown();
-      }
     }
 
     /**
@@ -972,143 +852,140 @@
     int distToCounterRecord1 = 0;
     int distBackToCounterRecord2 = 0;
     int count=0;
-    Cursor cursor = null;
-    Transaction txn = null;
     OperationStatus status;
     try
     {
-      ChangeNumber cn ;
-
-      if ((start==null)&&(stop==null))
-        return (int)db.count();
-
-      // Step 1 : from the start point, traverse db to the next counter record
-      // or to the stop point.
-      DatabaseEntry key = new DatabaseEntry();
-      DatabaseEntry data = new DatabaseEntry();
-      cursor = db.openCursor(txn, null);
-      if (start != null)
+      Cursor cursor = null;
+      try
       {
-        key = new ReplicationKey(start);
-        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
-        if (status == OperationStatus.NOTFOUND)
-          status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
-      }
-      else
-      {
-        status = cursor.getNext(key, data, LockMode.DEFAULT);
-      }
+        ChangeNumber cn ;
 
-      while (status == OperationStatus.SUCCESS)
-      {
-        // test whether the record is a regular change or a counter
-        String csnString = decodeUTF8(key.getData());
-        cn = new ChangeNumber(csnString);
-        if (cn.getServerId() != 0)
+        if ((start==null)&&(stop==null))
+          return (int)db.count();
+
+        // Step 1 : from the start point, traverse db to the next counter record
+        // or to the stop point.
+        DatabaseEntry key = new DatabaseEntry();
+        DatabaseEntry data = new DatabaseEntry();
+        cursor = db.openCursor(null, null);
+        if (start != null)
         {
-          // reached a regular change record
-          // test whether we reached the 'stop' target
-          if (!cn.newer(stop))
-          {
-            // let's loop
-            distToCounterRecord1++;
-            status = cursor.getNext(key, data, LockMode.DEFAULT);
-          }
-          else
-          {
-            // reached the end
-            break;
-          }
+          key = new ReplicationKey(start);
+          status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+          if (status == OperationStatus.NOTFOUND)
+            status = cursor.getSearchKeyRange(key, data, LockMode.DEFAULT);
         }
         else
         {
-          // counter record
-          counterRecord1 = decodeCounterValue(data.getData());
-          break;
+          status = cursor.getNext(key, data, LockMode.DEFAULT);
         }
-      }
-      cursor.close();
 
-      // cases
-      //
-      if (counterRecord1==0)
-        return distToCounterRecord1;
+        while (status == OperationStatus.SUCCESS)
+        {
+          // test whether the record is a regular change or a counter
+          String csnString = decodeUTF8(key.getData());
+          cn = new ChangeNumber(csnString);
+          if (cn.getServerId() != 0)
+          {
+            // reached a regular change record
+            // test whether we reached the 'stop' target
+            if (!cn.newer(stop))
+            {
+              // let's loop
+              distToCounterRecord1++;
+              status = cursor.getNext(key, data, LockMode.DEFAULT);
+            }
+            else
+            {
+              // reached the end
+              break;
+            }
+          }
+          else
+          {
+            // counter record
+            counterRecord1 = decodeCounterValue(data.getData());
+            break;
+          }
+        }
+        cursor.close();
 
-      // Step 2 : from the stop point, traverse db to the next counter record
-      // or to the start point.
-      txn = null;
-      data = new DatabaseEntry();
-      key = new ReplicationKey(stop);
-      cursor = db.openCursor(txn, null);
-      status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
-      if (status == OperationStatus.SUCCESS)
-      {
-        cn = new ChangeNumber(decodeUTF8(key.getData()));
-      }
-      else
-      {
-        key = new DatabaseEntry();
+        // cases
+        //
+        if (counterRecord1==0)
+          return distToCounterRecord1;
+
+        // Step 2 : from the stop point, traverse db to the next counter record
+        // or to the start point.
         data = new DatabaseEntry();
-        status = cursor.getLast(key, data, LockMode.DEFAULT);
-        if (status != OperationStatus.SUCCESS)
+        key = new ReplicationKey(stop);
+        cursor = db.openCursor(null, null);
+        status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
+        if (status == OperationStatus.SUCCESS)
         {
-          /* database is empty */
-          return 0;
+          cn = new ChangeNumber(decodeUTF8(key.getData()));
         }
-      }
-      while (status == OperationStatus.SUCCESS)
-      {
-        cn = new ChangeNumber(decodeUTF8(key.getData()));
-        if (!ReplicationDB.isaCounter(cn))
+        else
         {
-          // regular change record
-          if (!cn.older(start))
+          key = new DatabaseEntry();
+          data = new DatabaseEntry();
+          status = cursor.getLast(key, data, LockMode.DEFAULT);
+          if (status != OperationStatus.SUCCESS)
           {
-            distBackToCounterRecord2++;
-            status = cursor.getPrev(key, data, LockMode.DEFAULT);
+            /* database is empty */
+            return 0;
+          }
+        }
+        while (status == OperationStatus.SUCCESS)
+        {
+          cn = new ChangeNumber(decodeUTF8(key.getData()));
+          if (!ReplicationDB.isaCounter(cn))
+          {
+            // regular change record
+            if (!cn.older(start))
+            {
+              distBackToCounterRecord2++;
+              status = cursor.getPrev(key, data, LockMode.DEFAULT);
+            }
+            else
+              break;
           }
           else
+          {
+            // counter record
+            counterRecord2 = decodeCounterValue(data.getData());
             break;
+          }
         }
-        else
+        cursor.close();
+
+        // Step 3 : Now consolidates the result
+        if (counterRecord1!=0)
         {
-          // counter record
-          counterRecord2 = decodeCounterValue(data.getData());
-          break;
+          if (counterRecord1 == counterRecord2)
+          {
+            // only one cp between from and to - no need to use it
+            count = distToCounterRecord1 + distBackToCounterRecord2;
+          }
+          else
+          {
+            // 2 cp between from and to
+            count = distToCounterRecord1 + (counterRecord2-counterRecord1)
+            + distBackToCounterRecord2;
+          }
         }
       }
-      cursor.close();
-
-      // Step 3 : Now consolidates the result
-      if (counterRecord1!=0)
+      finally
       {
-        if (counterRecord1 == counterRecord2)
+        if (cursor != null)
         {
-          // only one cp between from and to - no need to use it
-          count = distToCounterRecord1 + distBackToCounterRecord2;
-        }
-        else
-        {
-          // 2 cp between from and to
-          count = distToCounterRecord1 + (counterRecord2-counterRecord1)
-            + distBackToCounterRecord2;
+          cursor.close();
         }
       }
     }
-    finally
+    catch (DatabaseException e)
     {
-      if (cursor != null)
-        cursor.close();
-      if (txn != null)
-      {
-        try
-        {
-          txn.abort();
-        } catch (DatabaseException e1)
-        {
-          // can't do much more. The ReplicationServer is shutting down.
-        }
-      }
+      replicationServer.handleUnexpectedDatabaseException(e);
     }
     return count;
   }

--
Gitblit v1.10.0