From 52df1e0c040cb7f4af2f849e617ce94b61a22fa8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 10 Oct 2013 09:50:19 +0000
Subject: [PATCH] Stabilizing JEChangeNumberIndexDBTest.testClear().

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                     |    1 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java                             |   55 +++++++++++++++++++--------
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java |   23 +++++------
 3 files changed, 49 insertions(+), 30 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index f246160..25de88f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -30,6 +30,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -86,15 +87,15 @@
   /** The last generated value for the change number. */
   private final AtomicLong lastGeneratedChangeNumber;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
-  private boolean shutdown = false;
-  private boolean trimDone = false;
+  private final AtomicBoolean shutdown = new AtomicBoolean(false);
+  private volatile boolean trimDone = false;
   /**
    * A dedicated thread loops trim().
    * <p>
    * trim() : deletes from the DB a number of changes that are older than a
    * certain date.
    */
-  private DirectoryThread thread;
+  private DirectoryThread trimmingThread;
   /**
    * The trim age in milliseconds. Changes record in the change DB that are
    * older than this age are removed.
@@ -132,16 +133,21 @@
     long newestCN = (newestRecord != null) ? newestRecord.getChangeNumber() : 0;
     lastGeneratedChangeNumber = new AtomicLong(newestCN);
 
-    // Trimming thread
-    thread =
-        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
-    thread.start();
-
     // Monitoring registration
     DirectoryServer.deregisterMonitorProvider(dbMonitor);
     DirectoryServer.registerMonitorProvider(dbMonitor);
   }
 
+  /**
+   * Creates and starts the thread trimming the CNIndexDB.
+   */
+  public void startTrimmingThread()
+  {
+    trimmingThread =
+        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
+    trimmingThread.start();
+  }
+
   private long getChangeNumber(CNIndexRecord record) throws ChangelogException
   {
     if (record != null)
@@ -245,12 +251,12 @@
    */
   public void shutdown()
   {
-    if (shutdown)
+    if (shutdown.get())
     {
       return;
     }
 
-    shutdown = true;
+    shutdown.set(true);
     synchronized (this)
     {
       notifyAll();
@@ -280,10 +286,10 @@
   @Override
   public void run()
   {
-    while (!shutdown)
+    while (!shutdown.get())
     {
       try {
-        trim();
+        trim(shutdown);
 
         synchronized (this)
         {
@@ -319,12 +325,12 @@
    * Trim old changes from this database.
    * @throws ChangelogException In case of database problem.
    */
-  public void trim() throws ChangelogException
+  private void trim(AtomicBoolean shutdown) throws ChangelogException
   {
     if (trimAge == 0)
       return;
 
-    clear(null);
+    clear(null, shutdown);
   }
 
   /**
@@ -339,6 +345,12 @@
    */
   public void clear(DN baseDNToClear) throws ChangelogException
   {
+    clear(baseDNToClear, null);
+  }
+
+  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
+      throws ChangelogException
+  {
     if (isEmpty())
     {
       return;
@@ -346,13 +358,17 @@
 
     for (int i = 0; i < 100; i++)
     {
+      if (mustShutdown(shutdown))
+      {
+        return;
+      }
       final DraftCNDBCursor cursor = db.openDeleteCursor();
       try
       {
         for (int j = 0; j < 50; j++)
         {
           // let's traverse the CNIndexDB
-          if (!cursor.next())
+          if (mustShutdown(shutdown) || !cursor.next())
           {
             cursor.close();
             return;
@@ -431,7 +447,7 @@
         // mark shutdown for this db so that we don't try again to
         // stop it from cursor.close() or methods called by cursor.close()
         cursor.abort();
-        shutdown = true;
+        shutdown.set(true);
         throw e;
       }
       catch (Exception e)
@@ -439,12 +455,17 @@
         // mark shutdown for this db so that we don't try again to
         // stop it from cursor.close() or methods called by cursor.close()
         cursor.abort();
-        shutdown = true;
+        shutdown.set(true);
         throw new ChangelogException(e);
       }
     }
   }
 
+  private boolean mustShutdown(AtomicBoolean shutdown)
+  {
+    return shutdown != null && shutdown.get();
+  }
+
   /**
    * This internal class is used to implement the Monitoring capabilities of the
    * JEChangeNumberIndexDB.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 5ecd3c1..ac4aea8 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -650,6 +650,7 @@
         try
         {
           cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
+          cnIndexDB.startTrimmingThread();
         }
         catch (Exception e)
         {
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 4b92f77..0e78d2f 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -53,6 +53,10 @@
 @SuppressWarnings("javadoc")
 public class JEChangeNumberIndexDBTest extends ReplicationTestCase
 {
+  private static final String value1 = "value1";
+  private static final String value2 = "value2";
+  private static final String value3 = "value3";
+
   /**
    * This test makes basic operations of a JEChangeNumberIndexDB:
    * <ol>
@@ -76,10 +80,6 @@
       cnIndexDB.setPurgeDelay(0);
 
       // Prepare data to be stored in the db
-      String value1 = "value1";
-      String value2 = "value2";
-      String value3 = "value3";
-
       DN baseDN1 = DN.decode("o=baseDN1");
       DN baseDN2 = DN.decode("o=baseDN2");
       DN baseDN3 = DN.decode("o=baseDN3");
@@ -115,7 +115,9 @@
         StaticUtils.close(dbc);
       }
 
+      // Now test that the trimming thread does its job => start it
       cnIndexDB.setPurgeDelay(100);
+      cnIndexDB.startTrimmingThread();
 
       // Check the db is cleared.
       while (!cnIndexDB.isEmpty())
@@ -145,7 +147,9 @@
   {
     File testRoot = createCleanDir();
     ReplicationDbEnv dbEnv = new ReplicationDbEnv(testRoot.getPath(), rs);
-    return new JEChangeNumberIndexDB(rs, dbEnv);
+    JEChangeNumberIndexDB result = new JEChangeNumberIndexDB(rs, dbEnv);
+    assertTrue(result.isEmpty());
+    return result;
   }
 
   private File createCleanDir() throws IOException
@@ -182,12 +186,7 @@
       cnIndexDB = newCNIndexDB(replicationServer);
       cnIndexDB.setPurgeDelay(0);
 
-      assertTrue(cnIndexDB.isEmpty());
-
       // Prepare data to be stored in the db
-      String value1 = "value1";
-      String value2 = "value2";
-      String value3 = "value3";
 
       DN baseDN1 = DN.decode("o=baseDN1");
       DN baseDN2 = DN.decode("o=baseDN2");
@@ -230,8 +229,6 @@
     }
     finally
     {
-      if (cnIndexDB != null)
-        cnIndexDB.shutdown();
       remove(replicationServer);
     }
   }
@@ -241,7 +238,7 @@
     TestCaseUtils.startServer();
     final int port = TestCaseUtils.findFreePort();
     return new ReplicationServer(
-        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null)) ;
+        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null));
   }
 
   private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,

--
Gitblit v1.10.0