From 7e6c6657bced35f4a3aba723c2add20923450ad6 Mon Sep 17 00:00:00 2001
From: gbellato <gbellato@localhost>
Date: Fri, 01 Feb 2008 09:40:56 +0000
Subject: [PATCH] The Replication Server thread that is reading changes from the database to propagate them to the Directory Servers was sometimes leaving some Database cursors open.

---
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java |  158 ++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 120 insertions(+), 38 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index e003d16..63c5c4d 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -22,7 +22,7 @@
  * CDDL HEADER END
  *
  *
- *      Portions Copyright 2006-2007 Sun Microsystems, Inc.
+ *      Portions Copyright 2006-2008 Sun Microsystems, Inc.
  */
 package org.opends.server.replication.server;
 import org.opends.messages.MessageBuilder;
@@ -42,6 +42,7 @@
 import com.sleepycat.je.DatabaseEntry;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Database;
+import com.sleepycat.je.DeadlockException;
 import com.sleepycat.je.LockMode;
 import com.sleepycat.je.OperationStatus;
 import com.sleepycat.je.Transaction;
@@ -59,6 +60,9 @@
   private Short serverId;
   private DN baseDn;
 
+  // The maximum number of retries in case of DatabaseDeadlock Exception.
+  private static final int DEADLOCK_RETRIES = 10;
+
   /**
    * Creates a new database or open existing database that will be used
    * to store and retrieve changes from an LDAP server.
@@ -95,28 +99,49 @@
 
     try
     {
-      txn = dbenv.beginTransaction();
+      int tries = 0;
+      boolean done = false;
 
-      for (UpdateMessage change : changes)
+      // The database can return a Deadlock Exception if several threads are
+      // accessing the database at the same time. This Exception is a
+      // transient state, when it happens the transaction is aborted and
+      // the operation is attempted again up to DEADLOCK_RETRIES times.
+      while ((tries++ < DEADLOCK_RETRIES) && (!done))
       {
-        DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
-        DatabaseEntry data = new ReplicationData(change);
-
         try
         {
-          db.put(txn, key, data);
-        } catch (DatabaseException e)
+          txn = dbenv.beginTransaction();
+
+          for (UpdateMessage change : changes)
+          {
+            DatabaseEntry key = new ReplicationKey(change.getChangeNumber());
+            DatabaseEntry data = new ReplicationData(change);
+            db.put(txn, key, data);
+          }
+
+          txn.commitWriteNoSync();
+          txn = null;
+          done = true;
+        }
+        catch (DeadlockException e)
         {
-          MessageBuilder mb = new MessageBuilder();
-          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
-          mb.append(stackTraceToSingleLineString(e));
-          logError(mb.toMessage());
-          replicationServer.shutdown();
+          txn.abort();
+          txn = null;
         }
       }
-
-      txn.commitWriteNoSync();
-      txn = null;
+      if (!done)
+      {
+        // Could not write to the DB after DEADLOCK_RETRIES tries.
+        // This ReplicationServer is not reliable and will be shutdown.
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+        logError(mb.toMessage());
+        if (txn != null)
+        {
+           txn.abort();
+        }
+        replicationServer.shutdown();
+      }
     }
     catch (DatabaseException e)
     {
@@ -332,31 +357,40 @@
     {
       cursor = db.openCursor(txn, null);
 
-      if (startingChangeNumber != null)
+      try
       {
-        key = new ReplicationKey(startingChangeNumber);
-        data = new DatabaseEntry();
-
-        if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
-          OperationStatus.SUCCESS)
+        if (startingChangeNumber != null)
         {
-          if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+          key = new ReplicationKey(startingChangeNumber);
+          data = new DatabaseEntry();
+
+          if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
             OperationStatus.SUCCESS)
           {
+            if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
+              OperationStatus.SUCCESS)
+            {
               throw new Exception("ChangeNumber not available");
-          }
-          else
-          {
-            DatabaseEntry key = new DatabaseEntry();
-            DatabaseEntry data = new DatabaseEntry();
-            if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
-             OperationStatus.SUCCESS)
-           {
-             cursor = db.openCursor(txn, null);
-           }
+            }
+            else
+            {
+              DatabaseEntry key = new DatabaseEntry();
+              DatabaseEntry data = new DatabaseEntry();
+              if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
+                OperationStatus.SUCCESS)
+              {
+                cursor.close();
+                cursor = db.openCursor(txn, null);
+              }
+            }
           }
         }
       }
+      catch (Exception e)
+      {
+        cursor.close();
+        throw (e);
+      }
     }
 
     private ReplServerDBCursor() throws DatabaseException
@@ -370,13 +404,15 @@
      */
     public void close()
     {
-      if (cursor == null)
-        return;
       try
       {
-        cursor.close();
-        cursor = null;
-      } catch (DatabaseException e)
+        if (cursor != null)
+        {
+          cursor.close();
+          cursor = null;
+        }
+      }
+      catch (DatabaseException e)
       {
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
@@ -401,6 +437,52 @@
     }
 
     /**
+     * Abort the Cursor after a Deadlock Exception.
+     * This method catch and ignore the DeadlockException because
+     * this must be done when aborting a cursor after a DeadlockException
+     * (per the Cursor documentation).
+     * This should not be used in any other case.
+     */
+    public void abort()
+    {
+      if (cursor == null)
+        return;
+      try
+      {
+        cursor.close();
+        cursor = null;
+      }
+      catch (DeadlockException e1)
+      {
+        // The DB documentation states that a DeadlockException
+        // on the close method of a cursor that is aborting should
+        // be ignored.
+      }
+      catch (DatabaseException e)
+      {
+        MessageBuilder mb = new MessageBuilder();
+        mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+        mb.append(stackTraceToSingleLineString(e));
+        logError(mb.toMessage());
+        replicationServer.shutdown();
+      }
+      if (txn != null)
+      {
+        try
+        {
+          txn.abort();
+        } catch (DatabaseException e)
+        {
+          MessageBuilder mb = new MessageBuilder();
+          mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
+          mb.append(stackTraceToSingleLineString(e));
+          logError(mb.toMessage());
+          replicationServer.shutdown();
+        }
+      }
+    }
+
+    /**
      * Get the next ChangeNumber in the database from this Cursor.
      *
      * @return The next ChangeNumber in the database from this cursor.

--
Gitblit v1.10.0