mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
10.50.2013 1c3a3e354aeb853680cd722767e62a51ef85a2a6
Stabilizing JEChangeNumberIndexDBTest.testClear().



Here is the failure scenario for this test (with one CPU):

t1 (test thread) t2 (CNIndexDB purge thread)
---------------- ---------------------------
| |
| v
| JEChangeNumberIndexDB
| run()
| trim()
| clear(null) <- entered, but not completed
v
ChangeNumberIndexDB
setPurgeDelay(0);
addRecord(1)
addRecord(2)
addRecord(3)
| |
| +--> executes the clear()
v
getFirstRecord() <- BOOM NullPointerException!



Ensured the CNIndexDB is not trimmed until we explicitly test it:
to do this, ensured the thread is only started when needed by the test.
Also took the occasion to ensure a faster shutdown for the CNIndexDB trimming thread.



JEChangeNumberIndexDB.java:
Ensured we use volatile and AtomicBoolean.
Renamed thread field to trimmingThread.
Extracted method startTrimmingThread().
In trim(), added an AtomicBoolean parameter.
Added methods clear(DN, AtomicBoolean shutdown) and mustShutdown() to ensure faster shutdown.



JEChangelogDB.java:
Called JEChangeNumberIndexDB.startTrimmingThread() after creating the JEChangeNumberIndexDB instance.

JEChangeNumberIndexDBTest.java:
Extracted constants.
In testTrim(), called JEChangeNumberIndexDB.startTrimmingThread().
In testClear(), removed
3 files modified
79 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 55 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 1 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 23 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -86,15 +87,15 @@
  /** The last generated value for the change number. */
  private final AtomicLong lastGeneratedChangeNumber;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private boolean shutdown = false;
  private boolean trimDone = false;
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private volatile boolean trimDone = false;
  /**
   * A dedicated thread loops trim().
   * <p>
   * trim() : deletes from the DB a number of changes that are older than a
   * certain date.
   */
  private DirectoryThread thread;
  private DirectoryThread trimmingThread;
  /**
   * The trim age in milliseconds. Changes record in the change DB that are
   * older than this age are removed.
@@ -132,16 +133,21 @@
    long newestCN = (newestRecord != null) ? newestRecord.getChangeNumber() : 0;
    lastGeneratedChangeNumber = new AtomicLong(newestCN);
    // Trimming thread
    thread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    thread.start();
    // Monitoring registration
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  /**
   * Creates and starts the thread trimming the CNIndexDB.
   */
  public void startTrimmingThread()
  {
    trimmingThread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    trimmingThread.start();
  }
  private long getChangeNumber(CNIndexRecord record) throws ChangelogException
  {
    if (record != null)
@@ -245,12 +251,12 @@
   */
  public void shutdown()
  {
    if (shutdown)
    if (shutdown.get())
    {
      return;
    }
    shutdown = true;
    shutdown.set(true);
    synchronized (this)
    {
      notifyAll();
@@ -280,10 +286,10 @@
  @Override
  public void run()
  {
    while (!shutdown)
    while (!shutdown.get())
    {
      try {
        trim();
        trim(shutdown);
        synchronized (this)
        {
@@ -319,12 +325,12 @@
   * Trim old changes from this database.
   * @throws ChangelogException In case of database problem.
   */
  public void trim() throws ChangelogException
  private void trim(AtomicBoolean shutdown) throws ChangelogException
  {
    if (trimAge == 0)
      return;
    clear(null);
    clear(null, shutdown);
  }
  /**
@@ -339,6 +345,12 @@
   */
  public void clear(DN baseDNToClear) throws ChangelogException
  {
    clear(baseDNToClear, null);
  }
  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
      throws ChangelogException
  {
    if (isEmpty())
    {
      return;
@@ -346,13 +358,17 @@
    for (int i = 0; i < 100; i++)
    {
      if (mustShutdown(shutdown))
      {
        return;
      }
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
        {
          // let's traverse the CNIndexDB
          if (!cursor.next())
          if (mustShutdown(shutdown) || !cursor.next())
          {
            cursor.close();
            return;
@@ -431,7 +447,7 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        shutdown = true;
        shutdown.set(true);
        throw e;
      }
      catch (Exception e)
@@ -439,12 +455,17 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        shutdown = true;
        shutdown.set(true);
        throw new ChangelogException(e);
      }
    }
  }
  private boolean mustShutdown(AtomicBoolean shutdown)
  {
    return shutdown != null && shutdown.get();
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * JEChangeNumberIndexDB.
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -650,6 +650,7 @@
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
          cnIndexDB.startTrimmingThread();
        }
        catch (Exception e)
        {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -53,6 +53,10 @@
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
{
  private static final String value1 = "value1";
  private static final String value2 = "value2";
  private static final String value3 = "value3";
  /**
   * This test makes basic operations of a JEChangeNumberIndexDB:
   * <ol>
@@ -76,10 +80,6 @@
      cnIndexDB.setPurgeDelay(0);
      // Prepare data to be stored in the db
      String value1 = "value1";
      String value2 = "value2";
      String value3 = "value3";
      DN baseDN1 = DN.decode("o=baseDN1");
      DN baseDN2 = DN.decode("o=baseDN2");
      DN baseDN3 = DN.decode("o=baseDN3");
@@ -115,7 +115,9 @@
        StaticUtils.close(dbc);
      }
      // Now test that the trimming thread does its job => start it
      cnIndexDB.setPurgeDelay(100);
      cnIndexDB.startTrimmingThread();
      // Check the db is cleared.
      while (!cnIndexDB.isEmpty())
@@ -145,7 +147,9 @@
  {
    File testRoot = createCleanDir();
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(testRoot.getPath(), rs);
    return new JEChangeNumberIndexDB(rs, dbEnv);
    JEChangeNumberIndexDB result = new JEChangeNumberIndexDB(rs, dbEnv);
    assertTrue(result.isEmpty());
    return result;
  }
  private File createCleanDir() throws IOException
@@ -182,12 +186,7 @@
      cnIndexDB = newCNIndexDB(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      assertTrue(cnIndexDB.isEmpty());
      // Prepare data to be stored in the db
      String value1 = "value1";
      String value2 = "value2";
      String value3 = "value3";
      DN baseDN1 = DN.decode("o=baseDN1");
      DN baseDN2 = DN.decode("o=baseDN2");
@@ -230,8 +229,6 @@
    }
    finally
    {
      if (cnIndexDB != null)
        cnIndexDB.shutdown();
      remove(replicationServer);
    }
  }
@@ -241,7 +238,7 @@
    TestCaseUtils.startServer();
    final int port = TestCaseUtils.findFreePort();
    return new ReplicationServer(
        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null)) ;
        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null));
  }
  private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,