From 6b1e3bf06de1327d05b8cbefcd930e5974f556d3 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 04 Apr 2011 22:33:36 +0000
Subject: [PATCH] OpenDJ-107: Potential for leaking DB cursors in replication databases

---
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java |  422 +++++++++++++++++++++++++++-------------------------
 1 files changed, 216 insertions(+), 206 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 7330fac..b512b7e 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -30,6 +30,8 @@
 
 import static org.opends.server.loggers.ErrorLogger.logError;
 import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.util.StaticUtils.decodeUTF8;
+import static org.opends.server.util.StaticUtils.getBytes;
 import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
 
 import java.util.List;
@@ -37,8 +39,8 @@
 
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMsg;
+
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.zip.DataFormatException;
 
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.DatabaseEntry;
@@ -144,12 +146,12 @@
     // Initialize counter
     this.counterCurrValue = 1;
     cursor = db.openCursor(txn, null);
-    status = cursor.getLast(key, data, LockMode.DEFAULT);
-    while (status == OperationStatus.SUCCESS)
+    try
     {
-      try
+      status = cursor.getLast(key, data, LockMode.DEFAULT);
+      while (status == OperationStatus.SUCCESS)
       {
-        ChangeNumber cn =new ChangeNumber(new String(key.getData(), "UTF-8"));
+        ChangeNumber cn = new ChangeNumber(decodeUTF8(key.getData()));
         if (!ReplicationDB.isaCounter(cn))
         {
           status = cursor.getPrev(key, data, LockMode.DEFAULT);
@@ -158,38 +160,17 @@
         else
         {
           // counter record
-          counterCurrValue = decodeCounterValue(data.getData())+1;
+          counterCurrValue = decodeCounterValue(data.getData()) + 1;
           counterTsLimit = cn.getTime();
           break;
         }
       }
-      catch (UnsupportedEncodingException e)
-      {
-        MessageBuilder mb = new MessageBuilder();
-        mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
-        mb.append(stackTraceToSingleLineString(e));
-        logError(mb.toMessage());
-        replicationServer.shutdown();
-        if (txn != null)
-        {
-          try
-          {
-            txn.abort();
-          } catch (DatabaseException e1)
-          {
-            // can't do much more. The ReplicationServer is shuting down.
-          }
-        }
-        replicationServer.shutdown();
-      }
-      catch (DataFormatException e)
-      {
-        // Should never happen
-      }
+      counterCurrValue += distBackToCounterRecord;
     }
-    counterCurrValue += distBackToCounterRecord;
-    cursor.close();
-
+    finally
+    {
+      cursor.close();
+    }
   }
 
   /**
@@ -377,55 +358,38 @@
     String str = null;
     ChangeNumber cn = null;
 
+    dbCloseLock.readLock().lock();
     try
     {
-      dbCloseLock.readLock().lock();
+      DatabaseEntry key = new DatabaseEntry();
+      DatabaseEntry data = new DatabaseEntry();
+
       cursor = db.openCursor(null, null);
-    }
-    catch (DatabaseException e1)
-    {
-      dbCloseLock.readLock().unlock();
-      return null;
-    }
-    try
-    {
-      try
+
+      OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+
+      if (status != OperationStatus.SUCCESS)
       {
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
-        OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
+        /* database is empty */
+        return null;
+      }
+
+      str = decodeUTF8(key.getData());
+      cn = new ChangeNumber(str);
+      if (ReplicationDB.isaCounter(cn))
+      {
+        // First record is a counter record .. go next
+        status = cursor.getNext(key, data, LockMode.DEFAULT);
         if (status != OperationStatus.SUCCESS)
         {
-          /* database is empty */
+          // DB contains only a counter record
           return null;
         }
-        try
+        else
         {
-          str = new String(key.getData(), "UTF-8");
-          cn = new ChangeNumber(str);
-          if (ReplicationDB.isaCounter(cn))
-          {
-            // First record is a counter record .. go next
-            status = cursor.getNext(key, data, LockMode.DEFAULT);
-            if (status != OperationStatus.SUCCESS)
-            {
-              // DB contains only a counter record
-              return null;
-            }
-            else
-            {
-              cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
-            }
-          }
-        } catch (UnsupportedEncodingException e)
-        {
-          // never happens
+          cn = new ChangeNumber(decodeUTF8(key.getData()));
         }
       }
-      finally
-      {
-        closeLockedCursor(cursor);
-      }
     }
     catch (DatabaseException e)
     {
@@ -437,11 +401,18 @@
       replicationServer.shutdown();
       cn = null;
     }
+    finally
+    {
+      closeLockedCursor(cursor);
+    }
     return cn;
   }
 
+
+
   /**
    * Read the last Change from the database.
+   *
    * @return the last ChangeNumber.
    */
   public ChangeNumber readLastChange()
@@ -449,43 +420,36 @@
     Cursor cursor = null;
     ChangeNumber cn = null;
 
+    dbCloseLock.readLock().lock();
     try
     {
-      dbCloseLock.readLock().lock();
-      try
+      DatabaseEntry key = new DatabaseEntry();
+      DatabaseEntry data = new DatabaseEntry();
+
+      cursor = db.openCursor(null, null);
+
+      OperationStatus status = cursor.getLast(key, data,
+          LockMode.DEFAULT);
+
+      if (status != OperationStatus.SUCCESS)
       {
-        cursor = db.openCursor(null, null);
-        DatabaseEntry key = new DatabaseEntry();
-        DatabaseEntry data = new DatabaseEntry();
-        OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
-        if (status != OperationStatus.SUCCESS)
-        {
-          /* database is empty */
-          return null;
-        }
-        try
-        {
-          String str = new String(key.getData(), "UTF-8");
-          cn = new ChangeNumber(str);
-          if (ReplicationDB.isaCounter(cn))
-          {
-            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
-              OperationStatus.SUCCESS)
-            {
-              /* database only contain a counter record - don't know
-               * how much it can be possible but ... */
-              cn = null;
-            }
-          }
-        }
-        catch (UnsupportedEncodingException e)
-        {
-          // never happens
-        }
+        /* database is empty */
+        return null;
       }
-      finally
+
+      String str = decodeUTF8(key.getData());
+      cn = new ChangeNumber(str);
+      if (ReplicationDB.isaCounter(cn))
       {
-        closeLockedCursor(cursor);
+        if (cursor.getPrev(key, data,
+            LockMode.DEFAULT) != OperationStatus.SUCCESS)
+        {
+          /*
+           * database only contain a counter record - don't know how much it can
+           * be possible but ...
+           */
+          cn = null;
+        }
       }
     }
     catch (DatabaseException e)
@@ -497,6 +461,11 @@
       replicationServer.shutdown();
       cn = null;
     }
+    finally
+    {
+      closeLockedCursor(cursor);
+    }
+
     return cn;
   }
 
@@ -515,44 +484,56 @@
    */
   public class ReplServerDBCursor
   {
-    private Cursor cursor = null;
-
-    // The transaction that will protect the actions done with the cursor
+     // The transaction that will protect the actions done with the cursor
     // Will be let null for a read cursor
     // Will be set non null for a write cursor
-    private Transaction txn = null;
-    DatabaseEntry key = new DatabaseEntry();
-    DatabaseEntry data = new DatabaseEntry();
+    private final Transaction txn;
+    private final Cursor cursor;
+    private final DatabaseEntry key;
+    private final DatabaseEntry data;
+
+    private boolean isClosed = false;
 
     /**
      * Creates a ReplServerDBCursor that can be used for browsing a
      * replicationServer db.
      *
-     * @param startingChangeNumber The ChangeNumber from which the cursor must
-     *        start.
-     * @throws Exception When the startingChangeNumber does not exist.
+     * @param startingChangeNumber
+     *          The ChangeNumber from which the cursor must start.
+     * @throws Exception
+     *           When the startingChangeNumber does not exist.
      */
     private ReplServerDBCursor(ChangeNumber startingChangeNumber)
-            throws Exception
+        throws Exception
     {
+      if (startingChangeNumber != null)
+      {
+        key = new ReplicationKey(startingChangeNumber);
+      }
+      else
+      {
+        key = new DatabaseEntry();
+      }
+      data = new DatabaseEntry();
+
+      txn = null;
+
+      // Take the lock. From now on, whatever error that happen in the life
+      // of this cursor should end by unlocking that lock. We must also
+      // unlock it when throwing an exception.
+      dbCloseLock.readLock().lock();
+
+      Cursor localCursor = null;
       try
       {
-        // Take the lock. From now on, whatever error that happen in the life
-        // of this cursor should end by unlocking that lock. We must also
-        // unlock it when throwing an exception.
-        dbCloseLock.readLock().lock();
-
-        cursor = db.openCursor(txn, null);
+        localCursor = db.openCursor(txn, null);
         if (startingChangeNumber != null)
         {
-          key = new ReplicationKey(startingChangeNumber);
-          data = new DatabaseEntry();
-
-          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
+          if (localCursor.getSearchKey(key, data, LockMode.DEFAULT) !=
             OperationStatus.SUCCESS)
           {
             // We could not move the cursor to the expected startingChangeNumber
-            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+            if (localCursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
               OperationStatus.SUCCESS)
             {
               // We could not even move the cursor closed to it => failure
@@ -564,51 +545,75 @@
               // Let's create a cursor from that point.
               DatabaseEntry key = new DatabaseEntry();
               DatabaseEntry data = new DatabaseEntry();
-              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+              if (localCursor.getPrev(key, data, LockMode.DEFAULT) !=
                 OperationStatus.SUCCESS)
               {
-                closeLockedCursor(cursor);
-                dbCloseLock.readLock().lock();
-                cursor = db.openCursor(txn, null);
+                localCursor.close();
+                localCursor = db.openCursor(txn, null);
               }
             }
           }
         }
+        cursor = localCursor;
       }
       catch (Exception e)
       {
-       // Unlocking is required before throwing any exception
-        closeLockedCursor(cursor);
-        throw (e);
+        // Unlocking is required before throwing any exception
+        try
+        {
+          closeLockedCursor(localCursor);
+        }
+        catch (Exception ignore)
+        {
+          // Ignore.
+        }
+        throw e;
       }
     }
 
-    private ReplServerDBCursor() throws DatabaseException
+    private ReplServerDBCursor() throws Exception
     {
+      key = new DatabaseEntry();
+      data = new DatabaseEntry();
+
+      // We'll go on only if no close or no clear is running
+      dbCloseLock.readLock().lock();
+
+      Transaction localTxn = null;
+      Cursor localCursor = null;
       try
       {
-        // We'll go on only if no close or no clear is running
-        dbCloseLock.readLock().lock();
-
         // Create the transaction that will protect whatever done with this
         // write cursor.
-        txn = dbenv.beginTransaction();
+        localTxn = dbenv.beginTransaction();
+        localCursor = db.openCursor(localTxn, null);
 
-        cursor = db.openCursor(txn, null);
+        txn = localTxn;
+        cursor = localCursor;
       }
-      catch(DatabaseException e)
+      catch (Exception e)
       {
-        if (txn != null)
+        try
+        {
+          closeLockedCursor(localCursor);
+        }
+        catch (Exception ignore)
+        {
+          // Ignore.
+        }
+
+        if (localTxn != null)
         {
           try
           {
-            txn.abort();
+            localTxn.abort();
           }
-          catch (DatabaseException dbe)
-          {}
+          catch (DatabaseException ignore)
+          {
+            // Ignore.
+          }
         }
-        closeLockedCursor(cursor);
-        throw (e);
+        throw e;
       }
     }
 
@@ -617,10 +622,20 @@
      */
     public void close()
     {
+      synchronized (this)
+      {
+        if (isClosed)
+        {
+          return;
+        }
+        isClosed = true;
+      }
+
+      boolean closeHasFailed = false;
+
       try
       {
         closeLockedCursor(cursor);
-        cursor = null;
       }
       catch (DatabaseException e)
       {
@@ -628,22 +643,29 @@
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
-        replicationServer.shutdown();
+        closeHasFailed = true;
       }
+
       if (txn != null)
       {
         try
         {
           txn.commit();
-        } catch (DatabaseException e)
+        }
+        catch (DatabaseException e)
         {
           MessageBuilder mb = new MessageBuilder();
           mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
           mb.append(stackTraceToSingleLineString(e));
           logError(mb.toMessage());
-          replicationServer.shutdown();
+          closeHasFailed = true;
         }
       }
+
+      if (closeHasFailed)
+      {
+        replicationServer.shutdown();
+      }
     }
 
     /**
@@ -655,14 +677,22 @@
      */
     public void abort()
     {
-      if (cursor == null)
-        return;
+      synchronized (this)
+      {
+        if (isClosed)
+        {
+          return;
+        }
+        isClosed = true;
+      }
+
+      boolean closeHasFailed = false;
+
       try
       {
         closeLockedCursor(cursor);
-        cursor = null;
       }
-      catch (LockConflictException e1)
+      catch (LockConflictException e)
       {
         // The DB documentation states that a DeadlockException
         // on the close method of a cursor that is aborting should
@@ -674,22 +704,29 @@
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
-        replicationServer.shutdown();
+        closeHasFailed = true;
       }
+
       if (txn != null)
       {
         try
         {
           txn.abort();
-        } catch (DatabaseException e)
+        }
+        catch (DatabaseException e)
         {
           MessageBuilder mb = new MessageBuilder();
           mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
           mb.append(stackTraceToSingleLineString(e));
           logError(mb.toMessage());
-          replicationServer.shutdown();
+          closeHasFailed = true;
         }
       }
+
+      if (closeHasFailed)
+      {
+        replicationServer.shutdown();
+      }
     }
 
     /**
@@ -706,15 +743,8 @@
       {
         return null;
       }
-      try
-      {
-        String csnString = new String(key.getData(), "UTF-8");
-        return new ChangeNumber(csnString);
-      } catch (UnsupportedEncodingException e)
-      {
-        // can't happen
-        return null;
-      }
+      String csnString = decodeUTF8(key.getData());
+      return new ChangeNumber(csnString);
     }
 
     /**
@@ -738,26 +768,29 @@
         {
           return null;
         }
+
         try
         {
-          ChangeNumber cn=new ChangeNumber(new String(key.getData(), "UTF-8"));
-          if(ReplicationDB.isaCounter(cn))
+          ChangeNumber cn = new ChangeNumber(
+              decodeUTF8(key.getData()));
+          if (ReplicationDB.isaCounter(cn))
           {
             // counter record
             continue;
           }
-          currentChange = ReplicationData.generateChange(data.getData());
-        } catch (Exception e) {
+          currentChange = ReplicationData.generateChange(data
+              .getData());
+        }
+        catch (Exception e)
+        {
           /*
            * An error happening trying to convert the data from the
-           * replicationServer database to an Update Message.
-           * This can only happen if the database is corrupted.
-           * There is not much more that we can do at this point except trying
-           * to continue with the next record.
-           * In such case, it is therefore possible that we miss some changes.
-           * TODO. log an error message.
-           * TODO : REPAIR : Such problem should be handled by the
-           *        repair functionality.
+           * replicationServer database to an Update Message. This can only
+           * happen if the database is corrupted. There is not much more that we
+           * can do at this point except trying to continue with the next
+           * record. In such case, it is therefore possible that we miss some
+           * changes. TODO. log an error message. TODO : REPAIR : Such problem
+           * should be handled by the repair functionality.
            */
         }
       }
@@ -859,7 +892,7 @@
       while (status == OperationStatus.SUCCESS)
       {
         // test whether the record is a regular change or a counter
-        String csnString = new String(key.getData(), "UTF-8");
+        String csnString = decodeUTF8(key.getData());
         cn = new ChangeNumber(csnString);
         if (cn.getServerId() != 0)
         {
@@ -900,7 +933,7 @@
       status = cursor.getSearchKey(key, data, LockMode.DEFAULT);
       if (status == OperationStatus.SUCCESS)
       {
-        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+        cn = new ChangeNumber(decodeUTF8(key.getData()));
       }
       else
       {
@@ -915,7 +948,7 @@
       }
       while (status == OperationStatus.SUCCESS)
       {
-        cn = new ChangeNumber(new String(key.getData(), "UTF-8"));
+        cn = new ChangeNumber(decodeUTF8(key.getData()));
         if (!ReplicationDB.isaCounter(cn))
         {
           // regular change record
@@ -952,18 +985,6 @@
         }
       }
     }
-    catch (UnsupportedEncodingException e)
-    {
-      MessageBuilder mb = new MessageBuilder();
-      mb.append(ERR_CHANGELOG_UNSUPPORTED_UTF8_ENCODING.get());
-      mb.append(stackTraceToSingleLineString(e));
-      logError(mb.toMessage());
-      replicationServer.shutdown();
-    }
-    catch (DataFormatException e)
-    {
-      // Should never happen
-    }
     finally
     {
       if (cursor != null)
@@ -975,7 +996,7 @@
           txn.abort();
         } catch (DatabaseException e1)
         {
-          // can't do much more. The ReplicationServer is shuting down.
+          // can't do much more. The ReplicationServer is shutting down.
         }
       }
     }
@@ -996,33 +1017,22 @@
    * Decode the provided database entry as a the value of a counter.
    * @param entry The provided entry.
    * @return The counter value.
-   * @throws DataFormatException
    */
   private static int decodeCounterValue(byte[] entry)
-  throws DataFormatException
   {
-    try
-    {
-      String numAckStr = new String(entry, 0, entry.length, "UTF-8");
-      return Integer.parseInt(numAckStr);
-
-    } catch (UnsupportedEncodingException e)
-    {
-      throw new DataFormatException("UTF-8 is not supported by this jvm.");
-    }
+    String numAckStr = decodeUTF8(entry);
+    return Integer.parseInt(numAckStr);
   }
 
   /**
    * Encode the provided counter value in a database entry.
    * @param entry The provided entry.
-   * @return The databse entry with the counter value encoded inside..
-   * @throws UnsupportedEncodingException
+   * @return The database entry with the counter value encoded inside.
    */
   static private DatabaseEntry encodeCounterValue(int value)
-  throws UnsupportedEncodingException
   {
     DatabaseEntry entry = new DatabaseEntry();
-    entry.setData(String.valueOf(value).getBytes("UTF-8"));
+    entry.setData(getBytes(String.valueOf(value)));
     return entry;
   }
 

--
Gitblit v1.10.0