From 111c22a17e3ab4bdca9052891a810f5ab7cea6b6 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Wed, 19 Mar 2008 10:33:45 +0000
Subject: [PATCH] The following changes fix a bug in the clearing of the replication server db that make the clearing sometimes fail silently . Particularly Berkeley DB requires to close the db and any reference to the  db handle released before to truncate the db. That requires to lock the db when it is closed/cleared with a limited impact on the performances in the other cases. A RW lock is added on the db : every thread using  the db takes/releases the READ lock before /after usage. That still allow these threads to run concurrently and prevent a big impact on performances.  Every thread closing the db  (shutdown or clear) takes/releases the WRITE lock before/after the closure.

---
 opends/src/server/org/opends/server/replication/server/ReplicationDB.java |  135 ++++++++++++++++++++++++++++++++++++++------
 1 files changed, 116 insertions(+), 19 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
index 501d009..dba83f0 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationDB.java
@@ -37,6 +37,7 @@
 import org.opends.server.types.DN;
 import org.opends.server.replication.common.ChangeNumber;
 import org.opends.server.replication.protocol.UpdateMessage;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.sleepycat.je.Cursor;
 import com.sleepycat.je.DatabaseEntry;
@@ -63,7 +64,11 @@
   // The maximum number of retries in case of DatabaseDeadlock Exception.
   private static final int DEADLOCK_RETRIES = 10;
 
-  /**
+  // The lock used to provide exclusive access to the thread that
+  // close the db (shutdown or clear).
+  private ReentrantReadWriteLock dbCloseLock;
+
+ /**
    * Creates a new database or open existing database that will be used
    * to store and retrieve changes from an LDAP server.
    * @param serverId The identifier of the LDAP server.
@@ -86,6 +91,8 @@
     db = dbenv.getOrAddDb(serverId, baseDn,
         replicationServer.getReplicationServerDomain(baseDn,
         true).getGenerationId());
+
+    dbCloseLock = new ReentrantReadWriteLock(true);
   }
 
   /**
@@ -108,6 +115,7 @@
       // the operation is attempted again up to DEADLOCK_RETRIES times.
       while ((tries++ < DEADLOCK_RETRIES) && (!done))
       {
+        dbCloseLock.readLock().lock();
         try
         {
           txn = dbenv.beginTransaction();
@@ -128,6 +136,10 @@
           txn.abort();
           txn = null;
         }
+        finally
+        {
+          dbCloseLock.readLock().unlock();
+        }
       }
       if (!done)
       {
@@ -190,8 +202,17 @@
   {
     try
     {
-      db.close();
-    } catch (DatabaseException e)
+      dbCloseLock.writeLock().lock();
+      try
+      {
+        db.close();
+      }
+      finally
+      {
+        dbCloseLock.writeLock().unlock();
+      }
+    }
+    catch (DatabaseException e)
     {
       MessageBuilder mb = new MessageBuilder();
       mb.append(NOTE_EXCEPTION_CLOSING_DATABASE.get(this.toString()));
@@ -223,6 +244,7 @@
    * @throws DatabaseException If a database error prevented the cursor
    *                           creation.
    * @throws Exception if the ReplServerDBCursor creation failed.
+   *
    * @return The ReplServerDBCursor.
    */
   public ReplServerDBCursor openDeleteCursor()
@@ -242,10 +264,13 @@
 
     try
     {
-        cursor = db.openCursor(null, null);
-    } catch (DatabaseException e1)
+      dbCloseLock.readLock().lock();
+      cursor = db.openCursor(null, null);
+    }
+    catch (DatabaseException e1)
     {
-        return null;
+      dbCloseLock.readLock().unlock();
+      return null;
     }
     try
     {
@@ -253,6 +278,7 @@
       DatabaseEntry data = new DatabaseEntry();
       OperationStatus status = cursor.getFirst(key, data, LockMode.DEFAULT);
       cursor.close();
+      dbCloseLock.readLock().unlock();
       if (status != OperationStatus.SUCCESS)
       {
         /* database is empty */
@@ -268,11 +294,14 @@
       return new ChangeNumber(str);
     } catch (DatabaseException e)
     {
-      try {
-      cursor.close();
+      try
+      {
+        cursor.close();
+        dbCloseLock.readLock().unlock();
       }
       catch (DatabaseException dbe)
       {
+        // The db is dead - let's only log.
       }
       /* database is faulty */
       MessageBuilder mb = new MessageBuilder();
@@ -295,11 +324,13 @@
 
     try
     {
+      dbCloseLock.readLock().lock();
       cursor = db.openCursor(null, null);
       DatabaseEntry key = new DatabaseEntry();
       DatabaseEntry data = new DatabaseEntry();
       OperationStatus status = cursor.getLast(key, data, LockMode.DEFAULT);
       cursor.close();
+      dbCloseLock.readLock().unlock();
       if (status != OperationStatus.SUCCESS)
       {
         /* database is empty */
@@ -355,10 +386,14 @@
     private ReplServerDBCursor(ChangeNumber startingChangeNumber)
             throws Exception
     {
-      cursor = db.openCursor(txn, null);
-
       try
       {
+        // Take the lock. From now on, whatever error that happen in the life
+        // of this cursor should end by unlocking that lock. We must also
+        // unlock it when throwing an exception.
+        dbCloseLock.readLock().lock();
+
+        cursor = db.openCursor(txn, null);
         if (startingChangeNumber != null)
         {
           key = new ReplicationKey(startingChangeNumber);
@@ -367,20 +402,35 @@
           if (cursor.getSearchKey(key, data, LockMode.DEFAULT) !=
             OperationStatus.SUCCESS)
           {
+            // We could not move the cursor to the expected startingChangeNumber
             if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) !=
               OperationStatus.SUCCESS)
             {
+              // We could not even move the cursor closed to it => failure
+              // Unlocking is required before throwing any exception
+              dbCloseLock.readLock().unlock();
               throw new Exception("ChangeNumber not available");
             }
             else
             {
+              // We can move close to the startingChangeNumber.
+              // Let's create a cursor from that point.
               DatabaseEntry key = new DatabaseEntry();
               DatabaseEntry data = new DatabaseEntry();
               if (cursor.getPrev(key, data, LockMode.DEFAULT) !=
                 OperationStatus.SUCCESS)
               {
-                cursor.close();
-                cursor = db.openCursor(txn, null);
+                try
+                {
+                  cursor.close();
+                  cursor = db.openCursor(txn, null);
+                }
+                catch(Exception e)
+                {
+                  // Unlocking is required before throwing any exception
+                  dbCloseLock.readLock().unlock();
+                  throw(e);
+                }
               }
             }
           }
@@ -388,6 +438,8 @@
       }
       catch (Exception e)
       {
+        // Unlocking is required before throwing any exception
+        dbCloseLock.readLock().unlock();
         cursor.close();
         throw (e);
       }
@@ -395,8 +447,18 @@
 
     private ReplServerDBCursor() throws DatabaseException
     {
-      txn = dbenv.beginTransaction();
-      cursor = db.openCursor(txn, null);
+      try
+      {
+        // We'll go on only if no close or no clear is running
+        dbCloseLock.readLock().lock();
+        txn = dbenv.beginTransaction();
+        cursor = db.openCursor(txn, null);
+      }
+      catch(DatabaseException e)
+      {
+        dbCloseLock.readLock().unlock();
+        throw (e);
+      }
     }
 
     /**
@@ -414,12 +476,15 @@
       }
       catch (DatabaseException e)
       {
+        dbCloseLock.readLock().unlock();
+
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
         mb.append(stackTraceToSingleLineString(e));
         logError(mb.toMessage());
         replicationServer.shutdown();
       }
+
       if (txn != null)
       {
         try
@@ -434,6 +499,7 @@
           replicationServer.shutdown();
         }
       }
+      dbCloseLock.readLock().unlock();
     }
 
     /**
@@ -460,6 +526,8 @@
       }
       catch (DatabaseException e)
       {
+        dbCloseLock.readLock().unlock();
+
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_CHANGELOG_SHUTDOWN_DATABASE_ERROR.get());
         mb.append(stackTraceToSingleLineString(e));
@@ -480,6 +548,7 @@
           replicationServer.shutdown();
         }
       }
+      dbCloseLock.readLock().unlock();
     }
 
     /**
@@ -556,7 +625,7 @@
     {
       cursor.delete();
     }
-  }
+  } // ReplServerDBCursor
 
   /**
    * Clears this change DB from the changes it contains.
@@ -566,10 +635,38 @@
    */
   public void clear() throws Exception, DatabaseException
   {
-    // Clears the changes
-    dbenv.clearDb(this.toString());
+    // The coming users will be blocked until the clear is done
+    dbCloseLock.writeLock().lock();
+    try
+    {
+      String dbName = db.getDatabaseName();
 
-    // Clears the reference to this serverID
-    dbenv.clearServerId(baseDn, serverId);
+      // Clears the reference to this serverID
+      dbenv.clearServerId(baseDn, serverId);
+
+      // Closing is requested by the Berkeley DB before truncate
+      db.close();
+
+      // Clears the changes
+      dbenv.clearDb(dbName);
+
+      db = null;
+
+      // RE-create the db
+      db = dbenv.getOrAddDb(serverId, baseDn, (long)-1);
+    }
+    catch(Exception e)
+    {
+      MessageBuilder mb = new MessageBuilder();
+      mb.append(ERR_ERROR_CLEARING_DB.get(this.toString(),
+          e.getMessage() + " " +
+          stackTraceToSingleLineString(e)));
+      logError(mb.toMessage());
+    }
+    finally
+    {
+      // Relax the waiting users
+      dbCloseLock.writeLock().unlock();
+    }
   }
 }

--
Gitblit v1.10.0