mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.50.2014 08908175782573c536cb485092e473c7f1729281
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2011-2013 ForgeRock AS
 *      Portions copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.server.changelog.je;
@@ -30,6 +30,9 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -67,13 +70,33 @@
 */
public class JEReplicaDB implements Runnable
{
  /**
   * Class that allows atomically setting oldest and newest CSNs without
   * synchronization.
   *
   * @Immutable
   */
  private static final class CSNLimits
  {
    private final CSN oldestCSN;
    private final CSN newestCSN;
    public CSNLimits(CSN oldestCSN, CSN newestCSN)
    {
      this.oldestCSN = oldestCSN;
      this.newestCSN = newestCSN;
    }
  }
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This list is only used as a temporary placeholder so that the write in the
   * stable storage can be grouped for efficiency reason. Adding an update
   * synchronously add the update to this list. A dedicated thread loops on
   * flush() and trim().
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread loops
   * on {@link #flush()} and {@link #trim()}.
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
@@ -86,37 +109,35 @@
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedList<UpdateMsg> msgQueue =
    new LinkedList<UpdateMsg>();
  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
    new LinkedBlockingQueue<UpdateMsg>();
  /**
   * The High and low water mark for the max size of the msgQueue. The threads
   * calling add() method will be blocked if the size of msgQueue becomes larger
   * than the queueHimark and will resume only when the size of the msgQueue
   * goes below queueLowmark.
   * Semaphore used to limit the number of bytes used in memory by the queue.
   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
   * size of msgQueue becomes larger than the available permits and will resume
   * only when the number of available permits allow it.
   */
  private int queueMaxSize = 5000;
  private int queueLowmark = 1000;
  private int queueHimark = 4000;
  /**
   * The queue himark and lowmark in bytes, this is set to 100 times the himark
   * and lowmark in number of updates.
   */
  private int queueMaxBytes = 100 * queueMaxSize;
  private int queueLowmarkBytes = 100 * queueLowmark;
  private int queueHimarkBytes = 100 * queueHimark;
  /** The number of bytes currently in the queue. */
  private int queueByteSize = 0;
  private final Semaphore queueSizeBytes;
  private final int queueMaxBytes;
  private ReplicationDB db;
  private CSN oldestCSN;
  private CSN newestCSN;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
   *
   * @NonNull
   */
  private volatile CSNLimits csnLimits;
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #clear()}
   * {@link #flush()} or {@link #trim()}. This can happen with the thread
   * flushing the queue, on shutdown or on cursor opening, a thread calling
   * clear(), etc.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
@@ -146,16 +167,10 @@
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    final int queueSize = replicationServer.getQueueSize();
    queueMaxSize = queueSize;
    queueLowmark = queueSize / 5;
    queueHimark = queueSize * 4 / 5;
    queueMaxBytes = 200 * queueMaxSize;
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    oldestCSN = db.readOldestCSN();
    newestCSN = db.readNewestCSN();
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + serverId
@@ -167,61 +182,72 @@
  }
  /**
   * Add an update to the list of messages that must be saved to the db
   * managed by this db handler.
   * This method is blocking if the size of the list of message is larger
   * than its maximum.
   * Add an update to the list of messages that must be saved to the db managed
   * by this db handler. This method is blocking if the size of the list of
   * message is larger than its maximum.
   *
   * @param update The update that must be saved to the db managed by this db
   *               handler.
   * @param updateMsg
   *          The update message that must be saved to the db managed by this db
   *          handler.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void add(UpdateMsg update)
  public void add(UpdateMsg updateMsg) throws ChangelogException
  {
    synchronized (msgQueue)
    if (thread.isShutdownInitiated())
    {
      int size = msgQueue.size();
      if (size > queueHimark || queueByteSize > queueHimarkBytes)
      {
        msgQueue.notify();
      }
      throw new ChangelogException(
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
      while (size > queueMaxSize || queueByteSize > queueMaxBytes)
    final int msgSize = updateMsg.size();
    if (msgSize < queueMaxBytes)
    {
      try
      {
        try
        {
          msgQueue.wait(500);
        } catch (InterruptedException e)
        {
          // simply loop to try again.
        }
        size = msgQueue.size();
        queueSizeBytes.acquire(msgSize);
      }
      catch (InterruptedException e)
      {
        throw new ChangelogException(
            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
                .toString(), String.valueOf(baseDN), String.valueOf(serverId),
                stackTraceToSingleLineString(e)));
      }
    }
    else
    {
      // edge case with a very large message
      collectAllPermits();
    }
    msgQueue.add(updateMsg);
      queueByteSize += update.size();
      msgQueue.add(update);
      if (newestCSN == null || newestCSN.isOlderThan(update.getCSN()))
      {
        newestCSN = update.getCSN();
      }
      if (oldestCSN == null)
      {
        oldestCSN = update.getCSN();
      }
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
        || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
    if (updateOld || updateNew)
    {
      csnLimits = new CSNLimits(
          updateOld ? updateMsg.getCSN() : limits.oldestCSN,
          updateNew ? updateMsg.getCSN() : limits.newestCSN);
    }
  }
  /**
   * Get some changes out of the message queue of the LDAP server.
   * (from the beginning of the queue)
   * @param number the maximum number of messages to extract.
   * @return a List containing number changes extracted from the queue.
   */
  private List<UpdateMsg> getChanges(int number)
  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
  private void collectAllPermits()
  {
    synchronized (msgQueue)
    int collectedPermits = queueSizeBytes.drainPermits();
    while (collectedPermits != queueMaxBytes)
    {
      final int minAvailableNb = Math.min(number, msgQueue.size());
      return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb));
      // Do not use Thread.sleep() because:
      // 1) it is expected the permits will be released very soon
      // 2) we want to collect all the permits, so do not leave a chance to
      // other threads to steal them from us.
      // 3) we want to keep low latency
      Thread.yield();
      collectedPermits += queueSizeBytes.drainPermits();
    }
  }
@@ -232,7 +258,7 @@
   */
  public CSN getOldestCSN()
  {
    return oldestCSN;
    return csnLimits.oldestCSN;
  }
  /**
@@ -242,7 +268,7 @@
   */
  public CSN getNewestCSN()
  {
    return newestCSN;
    return csnLimits.newestCSN;
  }
  /**
@@ -252,9 +278,10 @@
   */
  public long getChangesCount()
  {
    if (newestCSN != null && oldestCSN != null)
    final CSNLimits limits = csnLimits;
    if (limits.newestCSN != null && limits.oldestCSN != null)
    {
      return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
      return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
    }
    return 0;
  }
@@ -276,36 +303,13 @@
  {
    if (startAfterCSN == null)
    {
      // flush any potential changes before opening the cursor
      flush();
    }
    return new JEReplicaDBCursor(db, startAfterCSN, this);
  }
  /**
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
   */
  private void clearQueue(int number)
  {
    synchronized (msgQueue)
    {
      int current = 0;
      while (current < number && !msgQueue.isEmpty())
      {
        UpdateMsg msg = msgQueue.remove(); // remove first
        queueByteSize -= msg.size();
        current++;
      }
      if (msgQueue.size() < queueLowmark
          && queueByteSize < queueLowmarkBytes)
      {
        msgQueue.notifyAll();
      }
    }
  }
  /**
   * Shutdown this ReplicaDB.
   */
  public void shutdown()
@@ -317,11 +321,6 @@
    thread.initiateShutdown();
    synchronized (msgQueue)
    {
      msgQueue.notifyAll();
    }
    while (msgQueue.size() != 0)
    {
      try
@@ -331,8 +330,7 @@
      catch (ChangelogException e)
      {
        // We are already shutting down
        logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
            .get(stackTraceToSingleLineString(e)));
        logError(e.getMessageObject());
      }
    }
@@ -358,27 +356,8 @@
        {
          flush();
          trim();
          synchronized (msgQueue)
          {
            if (msgQueue.size() < queueLowmark
                && queueByteSize < queueLowmarkBytes)
            {
              try
              {
                msgQueue.wait(1000);
              }
              catch (InterruptedException e)
              {
                // Do not reset the interrupt flag here,
                // because otherwise JE will barf next time flush() is called:
                // JE 5.0.97 refuses to persist changes to the DB when invoked
                // from a Thread with the interrupt flag set to true.
              }
            }
          }
        }
        catch (Exception end)
        catch (ChangelogException end)
        {
          stop(end);
          break;
@@ -400,11 +379,6 @@
    {
      thread.stopWork();
    }
    synchronized (this)
    {
      notifyAll();
    }
  }
  private void stop(Exception e)
@@ -453,13 +427,34 @@
      trimDate = lastBeforeTrimDate;
    }
    final int queueLowMarkBytes = queueMaxBytes / 5;
    for (int i = 0; i < 100; i++)
    {
      /*
       * Perform at least some trimming regardless of the flush backlog. Then
       * continue trim iterations while the flush backlog is low (below the
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
      {
        break;
      }
      synchronized (flushLock)
      {
        /*
         * the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         * the trim is done by group in order to save some CPU, IO bandwidth and
         * DB caches: start the transaction then do a bunch of remove then
         * commit.
         */
        /*
         * Matt wrote: The record removal is done as a DB transaction and the
         * deleted records are only "deleted" on commit. While the txn/cursor is
         * open the records to be deleted will, I think, be pinned in the DB
         * cache. In other words, the larger the transaction (the more records
         * deleted during a single batch) the more DB cache will be used to
         * process the transaction.
         */
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
@@ -477,13 +472,13 @@
              return;
            }
            if (!csn.equals(newestCSN) && csn.isOlderThan(trimDate))
            if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
            {
              cursor.delete();
            }
            else
            {
              oldestCSN = csn;
              csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
              return;
            }
          }
@@ -515,32 +510,33 @@
   */
  public void flush() throws ChangelogException
  {
    int size;
    int chunksize = Math.min(queueMaxSize, 500);
    do
    try
    {
      synchronized(flushLock)
      synchronized (flushLock)
      {
        // get N (or less) messages from the queue to save to the DB
        // (from the beginning of the queue)
        List<UpdateMsg> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
        if (changes == null || (size = changes.size()) == 0)
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, move on to the trim phase
          return;
        }
        // save the change to the stable storage.
        db.addEntries(changes);
        // Try to see if there are more changes and persist them all.
        changes.add(change);
        msgQueue.drainTo(changes);
        // remove the changes from the list of changes to be saved
        // (remove from the beginning of the queue)
        clearQueue(changes.size());
        int totalSize = db.addEntries(changes);
        // do not release more than queue max size permits
        // (be careful of the edge case with the very large message)
        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
      }
      // loop while there are more changes in the queue
    } while (size == chunksize);
    }
    catch (InterruptedException e)
    {
      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
          .get(stackTraceToSingleLineString(e)));
    }
  }
  /**
@@ -556,25 +552,28 @@
    public List<Attribute> getMonitorData()
    {
      List<Attribute> attributes = new ArrayList<Attribute>();
      attributes.add(Attributes.create("replicationServer-database",
          String.valueOf(serverId)));
      attributes.add(Attributes.create("domain-name",
          baseDN.toNormalizedString()));
      if (oldestCSN != null)
      create(attributes, "replicationServer-database", String.valueOf(serverId));
      create(attributes, "domain-name", baseDN.toNormalizedString());
      final CSNLimits limits = csnLimits;
      if (limits.oldestCSN != null)
      {
        attributes.add(Attributes.create("first-change", encode(oldestCSN)));
        create(attributes, "first-change", encode(limits.oldestCSN));
      }
      if (newestCSN != null)
      if (limits.newestCSN != null)
      {
        attributes.add(Attributes.create("last-change", encode(newestCSN)));
        create(attributes, "last-change", encode(limits.newestCSN));
      }
      attributes.add(
          Attributes.create("queue-size", String.valueOf(msgQueue.size())));
      attributes.add(
          Attributes.create("queue-size-bytes", String.valueOf(queueByteSize)));
      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
      create(attributes, "queue-size-bytes",
          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
      return attributes;
    }
    private void create(List<Attribute> attributes, String name, String value)
    {
      attributes.add(Attributes.create(name, value));
    }
    private String encode(CSN csn)
    {
      return csn + " " + new Date(csn.getTime());
@@ -609,8 +608,9 @@
  @Override
  public String toString()
  {
    final CSNLimits limits = csnLimits;
    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
        + oldestCSN + " " + newestCSN;
        + limits.oldestCSN + " " + limits.newestCSN;
  }
  /**
@@ -631,12 +631,10 @@
  {
    synchronized(flushLock)
    {
      collectAllPermits();
      msgQueue.clear();
      queueByteSize = 0;
      db.clear();
      oldestCSN = db.readOldestCSN();
      newestCSN = db.readNewestCSN();
      csnLimits = new CSNLimits(null, null);
    }
  }
@@ -655,7 +653,7 @@
   * For test purpose.
   * @return The memory queue size.
   */
  public int getQueueSize()
  int getQueueSize()
  {
    return this.msgQueue.size();
  }