mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
10.50.2013 52df1e0c040cb7f4af2f849e617ce94b61a22fa8
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
@@ -86,15 +87,15 @@
  /** The last generated value for the change number. */
  private final AtomicLong lastGeneratedChangeNumber;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private boolean shutdown = false;
  private boolean trimDone = false;
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private volatile boolean trimDone = false;
  /**
   * A dedicated thread loops trim().
   * <p>
   * trim() : deletes from the DB a number of changes that are older than a
   * certain date.
   */
  private DirectoryThread thread;
  private DirectoryThread trimmingThread;
  /**
   * The trim age in milliseconds. Changes record in the change DB that are
   * older than this age are removed.
@@ -132,16 +133,21 @@
    long newestCN = (newestRecord != null) ? newestRecord.getChangeNumber() : 0;
    lastGeneratedChangeNumber = new AtomicLong(newestCN);
    // Trimming thread
    thread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    thread.start();
    // Monitoring registration
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
  }
  /**
   * Creates and starts the thread trimming the CNIndexDB.
   */
  public void startTrimmingThread()
  {
    trimmingThread =
        new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
    trimmingThread.start();
  }
  private long getChangeNumber(CNIndexRecord record) throws ChangelogException
  {
    if (record != null)
@@ -245,12 +251,12 @@
   */
  public void shutdown()
  {
    if (shutdown)
    if (shutdown.get())
    {
      return;
    }
    shutdown = true;
    shutdown.set(true);
    synchronized (this)
    {
      notifyAll();
@@ -280,10 +286,10 @@
  @Override
  public void run()
  {
    while (!shutdown)
    while (!shutdown.get())
    {
      try {
        trim();
        trim(shutdown);
        synchronized (this)
        {
@@ -319,12 +325,12 @@
   * Trim old changes from this database.
   * @throws ChangelogException In case of database problem.
   */
  public void trim() throws ChangelogException
  private void trim(AtomicBoolean shutdown) throws ChangelogException
  {
    if (trimAge == 0)
      return;
    clear(null);
    clear(null, shutdown);
  }
  /**
@@ -339,6 +345,12 @@
   */
  public void clear(DN baseDNToClear) throws ChangelogException
  {
    clear(baseDNToClear, null);
  }
  private void clear(DN baseDNToClear, AtomicBoolean shutdown)
      throws ChangelogException
  {
    if (isEmpty())
    {
      return;
@@ -346,13 +358,17 @@
    for (int i = 0; i < 100; i++)
    {
      if (mustShutdown(shutdown))
      {
        return;
      }
      final DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        for (int j = 0; j < 50; j++)
        {
          // let's traverse the CNIndexDB
          if (!cursor.next())
          if (mustShutdown(shutdown) || !cursor.next())
          {
            cursor.close();
            return;
@@ -431,7 +447,7 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        shutdown = true;
        shutdown.set(true);
        throw e;
      }
      catch (Exception e)
@@ -439,12 +455,17 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        shutdown = true;
        shutdown.set(true);
        throw new ChangelogException(e);
      }
    }
  }
  private boolean mustShutdown(AtomicBoolean shutdown)
  {
    return shutdown != null && shutdown.get();
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * JEChangeNumberIndexDB.
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -650,6 +650,7 @@
        try
        {
          cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
          cnIndexDB.startTrimmingThread();
        }
        catch (Exception e)
        {
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -53,6 +53,10 @@
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
{
  private static final String value1 = "value1";
  private static final String value2 = "value2";
  private static final String value3 = "value3";
  /**
   * This test makes basic operations of a JEChangeNumberIndexDB:
   * <ol>
@@ -76,10 +80,6 @@
      cnIndexDB.setPurgeDelay(0);
      // Prepare data to be stored in the db
      String value1 = "value1";
      String value2 = "value2";
      String value3 = "value3";
      DN baseDN1 = DN.decode("o=baseDN1");
      DN baseDN2 = DN.decode("o=baseDN2");
      DN baseDN3 = DN.decode("o=baseDN3");
@@ -115,7 +115,9 @@
        StaticUtils.close(dbc);
      }
      // Now test that the trimming thread does its job => start it
      cnIndexDB.setPurgeDelay(100);
      cnIndexDB.startTrimmingThread();
      // Check the db is cleared.
      while (!cnIndexDB.isEmpty())
@@ -145,7 +147,9 @@
  {
    File testRoot = createCleanDir();
    ReplicationDbEnv dbEnv = new ReplicationDbEnv(testRoot.getPath(), rs);
    return new JEChangeNumberIndexDB(rs, dbEnv);
    JEChangeNumberIndexDB result = new JEChangeNumberIndexDB(rs, dbEnv);
    assertTrue(result.isEmpty());
    return result;
  }
  private File createCleanDir() throws IOException
@@ -182,12 +186,7 @@
      cnIndexDB = newCNIndexDB(replicationServer);
      cnIndexDB.setPurgeDelay(0);
      assertTrue(cnIndexDB.isEmpty());
      // Prepare data to be stored in the db
      String value1 = "value1";
      String value2 = "value2";
      String value3 = "value3";
      DN baseDN1 = DN.decode("o=baseDN1");
      DN baseDN2 = DN.decode("o=baseDN2");
@@ -230,8 +229,6 @@
    }
    finally
    {
      if (cnIndexDB != null)
        cnIndexDB.shutdown();
      remove(replicationServer);
    }
  }
@@ -241,7 +238,7 @@
    TestCaseUtils.startServer();
    final int port = TestCaseUtils.findFreePort();
    return new ReplicationServer(
        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null)) ;
        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null));
  }
  private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,