From 7e6c6657bced35f4a3aba723c2add20923450ad6 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Feb 2008 09:40:56 +0000
Subject: [PATCH] The Replication Server thread that is reading changes from the database to propagate them to the Directory Servers was sometimes leaving some Database cursors open.

---
 opends/src/server/org/opends/server/replication/server/ReplicationIterator.java     |   11 +
 opends/src/server/org/opends/server/replication/server/ServerHandler.java           |   21 +--
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java           |  158 ++++++++++++++++++++++++-------
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |    3 
 opends/src/server/org/opends/server/replication/server/DbHandler.java               |   86 +++++++++++-----
 5 files changed, 197 insertions(+), 82 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/DbHandler.java b/opends/src/server/org/opends/server/replication/server/DbHandler.java
index f3aabe9..f6c2c11 100644
--- a/opends/src/server/org/opends/server/replication/server/DbHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,6 +51,7 @@
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
 
 import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.DeadlockException;
 
 /**
  * This class is used for managing the replicationServer database for each
@@ -99,6 +100,9 @@
   final static int MSG_QUEUE_HIMARK = 5000;
   final static int MSG_QUEUE_LOWMARK = 4000;
 
+  // The maximum number of retries in case of DatabaseDeadlock Exception.
+  private static final int DEADLOCK_RETRIES = 10;
+
   /**
    *
    * The trim age in milliseconds. Changes record in the change DB that
@@ -285,7 +289,10 @@
       }
     }
 
-    return new ReplicationIterator(serverId, db, changeNumber);
+    ReplicationIterator it =
+      new ReplicationIterator(serverId, db, changeNumber);
+
+    return it;
   }
 
   /**
@@ -397,46 +404,67 @@
       return;
     int size = 0;
     boolean finished = false;
+    boolean done = false;
     ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
         (short) 0, (short)0);
 
-    /* the trim is done by group in order to save some CPU and IO bandwidth
-     * start the transaction then do a bunch of remove then commit
-     */
-    ReplServerDBCursor cursor;
+    // In case of deadlock detection by the Database, this thread can
+    // by aborted by a DeadlockException. This is a transient error and
+    // the transaction should be attempted again.
+    // We will try DEADLOCK_RETRIES times before failing.
+    int tries = 0;
+    while ((tries++ < DEADLOCK_RETRIES) && (!done))
+    {
+      /* the trim is done by group in order to save some CPU and IO bandwidth
+       * start the transaction then do a bunch of remove then commit
+       */
+      ReplServerDBCursor cursor;
+      cursor = db.openDeleteCursor();
 
-    cursor = db.openDeleteCursor();
-
-    try {
-      while ((size < 5000 ) &&  (!finished))
+      try
       {
-        ChangeNumber changeNumber = cursor.nextChangeNumber();
-        if (changeNumber != null)
+        while ((size < 5000 ) &&  (!finished))
         {
-          if ((!changeNumber.equals(lastChange))
-              && (changeNumber.older(trimDate)))
+          ChangeNumber changeNumber = cursor.nextChangeNumber();
+          if (changeNumber != null)
           {
-            size++;
-            cursor.delete();
+            if ((!changeNumber.equals(lastChange))
+                && (changeNumber.older(trimDate)))
+            {
+              size++;
+              cursor.delete();
+            }
+            else
+            {
+              firstChange = changeNumber;
+              finished = true;
+            }
           }
           else
-          {
-            firstChange = changeNumber;
             finished = true;
-          }
         }
-        else
-          finished = true;
+        cursor.close();
+        done = true;
       }
-
-      cursor.close();
-    } catch (DatabaseException e)
-    {
-      // mark shutdown for this db so that we don't try again to
-      // stop it from cursor.close() or methods called by cursor.close()
-      shutdown = true;
-      cursor.close();
-      throw (e);
+      catch (DeadlockException e)
+      {
+        cursor.abort();
+        if (tries == DEADLOCK_RETRIES)
+        {
+          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
+          // shutdown the ReplicationServer.
+          shutdown = true;
+          throw (e);
+        }
+      }
+      catch (DatabaseException e)
+      {
+        // mark shutdown for this db so that we don't try again to
+        // stop it from cursor.close() or methods called by cursor.close()
+        shutdown = true;
+        cursor.abort();
+        throw (e);
+      }
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index e003d16..63c5c4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
@@ -42,6 +42,7 @@
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Database;
+import com.sleepycat.je.DeadlockException;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.Transaction;
@@ -59,6 +60,9 @@
   private Short serverId;
   private DN baseDn;
 
+  // The maximum number of retries in case of DatabaseDeadlock Exception.
+  private static final int DEADLOCK_RETRIES = 10;
+
   /**
    * Creates a new database or open existing database that will be used
    * to store and retrieve changes from an LDAP server.
@@ -95,28 +99,49 @@
 
     try
     {
-      txn = dbenv.beginTransaction();
+      int tries = 0;
+      boolean done = false;
 
-      for (UpdateMessage change : changes)
+      // The database can return a Deadlock Exception if several threads are
+      // accessing the database at the same time. This Exception is a
+      // transient state, when it happens the transaction is aborted and
+      // the operation is attempted again up to DEADLOCK_RETRIES times.
+      while ((tries++ < DEADLOCK_RETRIES) && (!done))
       {
-        DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
-        DatabaseEntry data = new ReplicationData(change);
-
         try
         {
-          db.put(txn, key, data);
-        } catch (DatabaseException e)
+          txn = dbenv.beginTransaction();
+
+          for (UpdateMessage change : changes)
+          {
+            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
+            DatabaseEntry data = new ReplicationData(change);
+            db.put(txn, key, data);
+          }
+
+          txn.commitWriteNoSync();
+          txn = null;
+          done = true;
+        }
+        catch (DeadlockException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          replicationServer.shutdown();
+          txn.abort();
+          txn = null;
         }
       }
-
-      txn.commitWriteNoSync();
-      txn = null;
+      if (!done)
+      {
+        // Could not write to the DB after DEADLOCK_RETRIES tries.
+        // This ReplicationServer is not reliable and will be shutdown.
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+        logError(mb.toMessage());
+        if (txn != null)
+        {
+           txn.abort();
+        }
+        replicationServer.shutdown();
+      }
     }
     catch (DatabaseException e)
     {
@@ -332,31 +357,40 @@
     {
       cursor = db.openCursor(txn, null);
 
-      if (startingChangeNumber != null)
+      try
       {
-        key = new ReplicationKey(startingChangeNumber);
-        data = new DatabaseEntry();
-
-        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
-          OperationStatus.SUCCESS)
+        if (startingChangeNumber != null)
         {
-          if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+          key = new ReplicationKey(startingChangeNumber);
+          data = new DatabaseEntry();
+
+          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
             OperationStatus.SUCCESS)
           {
+            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+              OperationStatus.SUCCESS)
+            {
               throw new Exception("ChangeNumber not available");
-          }
-          else
-          {
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry data = new DatabaseEntry();
-            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
-             OperationStatus.SUCCESS)
-           {
-             cursor = db.openCursor(txn, null);
-           }
+            }
+            else
+            {
+              DatabaseEntry key = new DatabaseEntry();
+              DatabaseEntry data = new DatabaseEntry();
+              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+                OperationStatus.SUCCESS)
+              {
+                cursor.close();
+                cursor = db.openCursor(txn, null);
+              }
+            }
           }
         }
       }
+      catch (Exception e)
+      {
+        cursor.close();
+        throw (e);
+      }
     }
 
     private ReplServerDBCursor() throws DatabaseException
@@ -370,13 +404,15 @@
      */
     public void close()
     {
-      if (cursor == null)
-        return;
       try
       {
-        cursor.close();
-        cursor = null;
-      } catch (DatabaseException e)
+        if (cursor != null)
+        {
+          cursor.close();
+          cursor = null;
+        }
+      }
+      catch (DatabaseException e)
       {
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
@@ -401,6 +437,52 @@
     }
 
     /**
+     * Abort the Cursor after a Deadlock Exception.
+     * This method catch and ignore the DeadlockException because
+     * this must be done when aborting a cursor after a DeadlockException
+     * (per the Cursor documentation).
+     * This should not be used in any other case.
+     */
+    public void abort()
+    {
+      if (cursor == null)
+        return;
+      try
+      {
+        cursor.close();
+        cursor = null;
+      }
+      catch (DeadlockException e1)
+      {
+        // The DB documentation states that a DeadlockException
+        // on the close method of a cursor that is aborting should
+        // be ignored.
+      }
+      catch (DatabaseException e)
+      {
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+        mb.append(stackTraceToSingleLineString(e));
+        logError(mb.toMessage());
+        replicationServer.shutdown();
+      }
+      if (txn != null)
+      {
+        try
+        {
+          txn.abort();
+        } catch (DatabaseException e)
+        {
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+          mb.append(stackTraceToSingleLineString(e));
+          logError(mb.toMessage());
+          replicationServer.shutdown();
+        }
+      }
+    }
+
+    /**
      * Get the next ChangeNumber in the database from this Cursor.
      *
      * @return The next ChangeNumber in the database from this cursor.
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
index a65f61c..6c078d4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationIterator.java
@@ -22,16 +22,16 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 
-import com.sleepycat.je.DatabaseException;
-
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMessage;
 import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
 
+import com.sleepycat.je.DatabaseException;
+
 /**
  * This class allows to iterate through the changes received from a given
  * LDAP Server Identifier.
@@ -43,6 +43,9 @@
 
   /**
    * Creates a new ReplicationIterator.
+   * All created iterator must be released by the caller using the
+   * releaseCursor() method.
+   *
    * @param id the Identifier of the server on which the iterator applies.
    * @param db The db where the iterator must be created.
    * @param changeNumber The ChangeNumber after which the iterator must start.
@@ -56,7 +59,9 @@
   {
     cursor = db.openReadCursor(changeNumber);
     if (cursor == null)
+    {
       throw new Exception("no new change");
+    }
      if (this.next() == false)
      {
        cursor.close();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4d9cf4c..89726d4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -572,6 +572,9 @@
 
   /**
    * Creates and returns an iterator.
+   * When the iterator is not used anymore, the caller MUST call the
+   * ReplicationIterator.releaseCursor() method to free the ressources
+   * and locks used by the ReplicationIterator.
    *
    * @param serverId Identifier of the server for which the iterator is created.
    * @param changeNumber Starting point for the iterator.
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index d624741..3b85cb1 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -857,16 +857,6 @@
     if (olderUpdateCN == null)
       return null;
 
-    ReplicationIterator ri =
-      replicationServerDomain.getChangelogIterator(serverId, olderUpdateCN);
-    if (ri != null)
-    {
-      if (ri.next())
-      {
-        ChangeNumber firstMissingChange = ri.getChange().getChangeNumber();
-        return firstMissingChange.getTime();
-      }
-    }
     return olderUpdateCN.getTime();
   }
 
@@ -1081,9 +1071,16 @@
             ChangeNumber lastCsn = serverState.getMaxChangeNumber(serverId);
             ReplicationIterator iterator =
               replicationServerDomain.getChangelogIterator(serverId, lastCsn);
-            if ((iterator != null) && (iterator.getChange() != null))
+            if (iterator != null)
             {
-              iteratorSortedSet.add(iterator);
+              if (iterator.getChange() != null)
+              {
+                iteratorSortedSet.add(iterator);
+              }
+              else
+              {
+                iterator.releaseCursor();
+              }
             }
           }
           while (!iteratorSortedSet.isEmpty() && (lateQueue.size()<100))

--
Gitblit v1.10.0