mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.19.2014 137d1b4ba1992acdd880b61b1a03dc31f0cc1839
OPENDJ-1448 Remove JReplicaDB flushing thread and msg queue 

It seems unnecessary to have a msgQueue on top of JE since JE's already has a builtin cache that handles the same responsibility.
This improvement removes the JEReplicaDB.msgQueue and the associated flushing thread to save on memory and resources.

Code cleanup in JEChangeNumberIndexDB after CR-3388.


JEReplicaDB.java:
Does not implement Runnable anymore.
Removed fields msgQueue, queueSizeBytes, queueMaxBytes, thread, flushLock.
Added and used shutdown field to compensate for removing the thread field.
Removed methods collectAllPermits(), flush(), run() and stop().
In shutdown(), used AtomicBoolean.compareAndSet().

ReplicationDB.java:
Renamed addEntries(List<UpdateMsg>) to addEntry(UpdateMsg).

JEReplicaDBTest.java:
Removed now unnecessary waitChangesArePersisted().


JEChangelogDB.java:
In shutdown(), enforced threads joining + called Thread.interrupt() to ensure shutdown. This prevents message about unclosed cursors in integrated unit tests.

ReplicationServer.java:
Removed getQueueSize().


JEChangeNumberIndexDB.java:
Removed unused oldestChangeNumber.
In shutdown(), used AtomicBoolean.compareAndSet() + removed useless call to notify().

JEChangeNumberIndexDBTest.java:
Fixed javadocs.


ChangeTimeHeartbeatMsg.java:
Implemented toString().
8 files modified
363 ■■■■ changed files
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java 16 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServer.java 11 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 42 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 19 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 209 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 24 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 13 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 29 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -66,8 +66,6 @@
    return csn;
  }
  /**
   * Creates a message from a provided byte array.
   *
@@ -113,11 +111,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -141,4 +135,10 @@
    }
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -947,17 +947,6 @@
  }
  /**
   * Get the queueSize for this replication server.
   *
   * @return The maximum size of the queues for this Replication Server
   *
   */
  public int getQueueSize()
  {
    return this.config.getQueueSize();
  }
  /**
   * Creates the backend associated to this replication server.
   */
  private void createBackend() throws ConfigException
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -62,8 +62,6 @@
  private static int NO_KEY = 0;
  private DraftCNDB db;
  /** FIXME What is this field used for? */
  private volatile long oldestChangeNumber = NO_KEY;
  /**
   * The newest changenumber stored in the DB. It is used to avoid purging the
   * record with the newest changenumber. The newest record in the changenumber
@@ -96,14 +94,11 @@
  public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
  {
    db = new DraftCNDB(dbEnv);
    final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
    final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
    oldestChangeNumber = getChangeNumber(oldestRecord);
    final long newestCN = getChangeNumber(newestRecord);
    newestChangeNumber = newestCN;
    newestChangeNumber = getChangeNumber(newestRecord);
    // initialization of the lastGeneratedChangeNumber from the DB content
    // if DB is empty => last record does not exist => default to 0
    lastGeneratedChangeNumber = new AtomicLong(newestCN);
    lastGeneratedChangeNumber = new AtomicLong(newestChangeNumber);
    // Monitoring registration
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -117,7 +112,7 @@
    {
      return record.getChangeNumber();
    }
    return 0;
    return NO_KEY;
  }
  /** {@inheritDoc} */
@@ -198,20 +193,12 @@
   */
  public void shutdown()
  {
    if (shutdown.get())
    if (shutdown.compareAndSet(false, true))
    {
      return;
    }
    shutdown.set(true);
    synchronized (this)
    {
      notifyAll();
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  }
  /**
   * Synchronously purges the change number index DB up to and excluding the
@@ -236,11 +223,6 @@
      while (!mustShutdown(shutdown) && cursor.next())
      {
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (record.getChangeNumber() != oldestChangeNumber)
        {
          oldestChangeNumber = record.getChangeNumber();
        }
        if (record.getChangeNumber() != newestChangeNumber
            && record.getCSN().isOlderThan(purgeCSN))
        {
@@ -293,14 +275,9 @@
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
      boolean isOldestRecord = true;
      while (!mustShutdown(shutdown) && cursor.next())
      {
        final ChangeNumberIndexRecord record = cursor.currentRecord();
        if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
        {
          oldestChangeNumber = record.getChangeNumber();
        }
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
@@ -312,10 +289,6 @@
        {
          cursor.delete();
        }
        else
        {
          isOldestRecord = false;
        }
      }
    }
    catch (ChangelogException e)
@@ -398,7 +371,7 @@
  @Override
  public String toString()
  {
    return getClass().getSimpleName() + ": " + oldestChangeNumber + " "
    return getClass().getSimpleName() + ", newestChangeNumber="
        + newestChangeNumber;
  }
@@ -411,8 +384,7 @@
  public void clear() throws ChangelogException
  {
    db.clear();
    oldestChangeNumber = getChangeNumber(db.readFirstRecord());
    newestChangeNumber = getChangeNumber(db.readLastRecord());
    newestChangeNumber = NO_KEY;
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -385,11 +385,13 @@
    if (indexer != null)
    {
      indexer.initiateShutdown();
      indexer.interrupt();
    }
    final ChangelogDBPurger purger = cnPurger.getAndSet(null);
    if (purger != null)
    {
      purger.initiateShutdown();
      purger.interrupt();
    }
    try
@@ -417,6 +419,23 @@
    if (dbEnv != null)
    {
      // wait for shutdown of the threads holding cursors
      try
      {
        if (indexer != null)
        {
          indexer.join();
        }
        if (purger != null)
        {
          purger.join();
        }
      }
      catch (InterruptedException e)
      {
        // do nothing: we are already shutting down
      }
      dbEnv.shutdown();
    }
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
@@ -52,8 +48,6 @@
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used for managing the replicationServer database for each
@@ -67,7 +61,7 @@
 * <p>
 * This class publish some monitoring information below cn=monitor.
 */
public class JEReplicaDB implements Runnable
public class JEReplicaDB
{
  /**
@@ -89,29 +83,7 @@
  }
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread
   * flushes this blocking queue.
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
    new LinkedBlockingQueue<UpdateMsg>();
  /**
   * Semaphore used to limit the number of bytes used in memory by the queue.
   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
   * size of msgQueue becomes larger than the available permits and will resume
   * only when the number of available permits allow it.
   */
  private final Semaphore queueSizeBytes;
  private final int queueMaxBytes;
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private ReplicationDB db;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -122,12 +94,6 @@
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #flush()}.
   * This can happen with the thread flushing the queue, or else on shutdown.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  /**
@@ -147,15 +113,8 @@
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
            + replicationServer.getServerId()
            + ") flusher thread for Replica DS(" + serverId
            + ") for domain \"" + baseDN + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -174,34 +133,14 @@
   */
  public void add(UpdateMsg updateMsg) throws ChangelogException
  {
    if (thread.isShutdownInitiated())
    if (shutdown.get())
    {
      throw new ChangelogException(
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
    final int msgSize = updateMsg.size();
    if (msgSize < queueMaxBytes)
    {
      try
      {
        queueSizeBytes.acquire(msgSize);
      }
      catch (InterruptedException e)
      {
        throw new ChangelogException(
            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
                stackTraceToSingleLineString(e)));
      }
    }
    else
    {
      // edge case with a very large message
      collectAllPermits();
    }
    msgQueue.add(updateMsg);
    db.addEntry(updateMsg);
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
@@ -215,22 +154,6 @@
    }
  }
  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
  private void collectAllPermits()
  {
    int collectedPermits = queueSizeBytes.drainPermits();
    while (collectedPermits != queueMaxBytes)
    {
      // Do not use Thread.sleep() because:
      // 1) it is expected the permits will be released very soon
      // 2) we want to collect all the permits, so do not leave a chance to
      // other threads to steal them from us.
      // 3) we want to keep low latency
      Thread.yield();
      collectedPermits += queueSizeBytes.drainPermits();
    }
  }
  /**
   * Get the oldest CSN that has not been purged yet.
   *
@@ -289,81 +212,11 @@
   */
  public void shutdown()
  {
    if (thread.isShutdownInitiated())
    if (shutdown.compareAndSet(false, true))
    {
      return;
    }
    thread.initiateShutdown();
    while (msgQueue.size() != 0)
    {
      try
      {
        flush();
      }
      catch (ChangelogException e)
      {
        // We are already shutting down
        logError(e.getMessageObject());
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
   * Flushes the replicaDB queue from memory to stable storage.
   */
  @Override
  public void run()
  {
    thread.startWork();
    try
    {
      while (!thread.isShutdownInitiated())
      {
        try
        {
          flush();
        }
        catch (ChangelogException end)
        {
          stop(end);
          break;
        }
      }
      try
      {
        // call flush a last time before exiting to make sure that
        // no change was forgotten in the msgQueue
        flush();
      }
      catch (ChangelogException e)
      {
        stop(e);
      }
    }
    finally
    {
      thread.stopWork();
    }
  }
  private void stop(Exception e)
  {
    logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
        .get(stackTraceToSingleLineString(e)));
    thread.initiateShutdown();
    if (replicationServer != null)
    {
      replicationServer.shutdown();
    }
  }
  /**
@@ -401,7 +254,7 @@
      {
        for (int j = 0; j < 50; j++)
        {
          if (thread.isShutdownInitiated())
          if (shutdown.get())
          {
            return;
          }
@@ -428,7 +281,7 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        thread.initiateShutdown();
        shutdown.set(true);
        throw e;
      }
      finally
@@ -439,47 +292,6 @@
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   * <p>
   * @GuardedBy("flushLock")
   * @throws ChangelogException
   *           If a database problem happened
   */
  private void flush() throws ChangelogException
  {
    try
    {
      synchronized (flushLock)
      {
        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, check if shutdown was invoked
          return;
        }
        // Try to see if there are more changes and persist them all.
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        changes.add(change);
        msgQueue.drainTo(changes);
        int totalSize = db.addEntries(changes);
        // do not release more than queue max size permits
        // (be careful of the edge case with the very large message)
        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
      }
    }
    catch (InterruptedException e)
    {
      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
          .get(stackTraceToSingleLineString(e)));
    }
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * ReplicaDB.
   */
@@ -501,9 +313,6 @@
      {
        create(attributes, "last-change", encode(limits.newestCSN));
      }
      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
      create(attributes, "queue-size-bytes",
          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
      return attributes;
    }
@@ -552,8 +361,6 @@
   */
  public void clear() throws ChangelogException
  {
    collectAllPermits();
    msgQueue.clear(); // this call should not do anything at all
    db.clear();
    csnLimits = new CSNLimits(null, null);
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -28,7 +28,6 @@
import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -182,15 +181,14 @@
  }
  /**
   * add a list of changes to the underlying db.
   * add one change to the underlying db.
   *
   * @param changes
   *          The list of changes to add to the underlying db.
   * @return the total size of all the changes
   * @param change
   *          The change to add to the underlying db.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public int addEntries(List<UpdateMsg> changes) throws ChangelogException
  public void addEntry(UpdateMsg change) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -198,26 +196,22 @@
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return 0;
        return;
      }
      int totalSize = 0;
      for (UpdateMsg change : changes)
      {
        final DatabaseEntry key = createReplicationKey(change.getCSN());
        final DatabaseEntry data = new ReplicationData(change);
        insertCounterRecordIfNeeded(change.getCSN());
        db.put(null, key, data);
        counterCurrValue++;
        totalSize += change.size();
      }
      return totalSize;
    }
    catch (DatabaseException e)
    {
      throw new ChangelogException(e);
      throw new ChangelogException(
          ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(
              change.toString(), String.valueOf(baseDN),
              String.valueOf(serverId), stackTraceToSingleLineString(e)));
    }
    finally
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -48,8 +48,7 @@
import static org.testng.Assert.*;
/**
 * Test the JEChangeNumberIndexDB class with 2 kinds of cleaning of the db : -
 * periodic trim - call to clear method()
 * Test the JEChangeNumberIndexDB class.
 */
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
@@ -71,13 +70,13 @@
   * <li>create the db</li>
   * <li>add records</li>
   * <li>read them with a cursor</li>
   * <li>set a very short trim period</li>
   * <li>wait for the db to be trimmed / here since the changes are not stored
   * <li>set a very short purge period</li>
   * <li>wait for the db to be purged / here since the changes are not stored
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   */
  @Test
  void testPurge() throws Exception
  public void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
@@ -172,8 +171,8 @@
   * <li>clear the db</li>
   * </ol>
   */
  @Test()
  void testClear() throws Exception
  @Test
  public void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,8 +97,6 @@
      //--
      // Iterator tests with changes persisted
      waitChangesArePersisted(replicaDB, 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
@@ -108,7 +106,6 @@
      //--
      // Cursor tests with changes persisted
      replicaDB.add(update4);
      waitChangesArePersisted(replicaDB, 4);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      // Test cursor from existing CSN
@@ -141,29 +138,6 @@
    }
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted) throws Exception
  {
    waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
  }
  private void waitChangesArePersisted(JEReplicaDB replicaDB,
      int nbRecordsInserted, int counterWindow) throws Exception
  {
    // one counter record is inserted every time "counterWindow"
    // records have been inserted
    int expectedNbRecords =
        nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
    int count = 0;
    while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
    {
      Thread.sleep(10);
      count++;
    }
    assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
@@ -301,7 +275,6 @@
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
      }
      waitChangesArePersisted(replicaDB, 4);
      cursor = replicaDB.generateCursorFrom(csns[0]);
      assertTrue(cursor.next());
@@ -378,7 +351,6 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      waitChangesArePersisted(replicaDB, max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -402,7 +374,6 @@
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        mySeqnum+=2;
      }
      waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");