mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.19.2014 7645fcf6334c7c78655a12a08b4a8f3351be1ba4
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,16 +28,11 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.CSN;
@@ -53,7 +48,6 @@
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This class is used for managing the replicationServer database for each
@@ -67,9 +61,8 @@
 * <p>
 * This class publish some monitoring information below cn=monitor.
 */
public class JEReplicaDB implements Runnable
public class JEReplicaDB
{
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /**
   * Class that allows atomically setting oldest and newest CSNs without
@@ -90,29 +83,7 @@
  }
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread
   * flushes this blocking queue.
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedBlockingQueue<UpdateMsg> msgQueue =
    new LinkedBlockingQueue<UpdateMsg>();
  /**
   * Semaphore used to limit the number of bytes used in memory by the queue.
   * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
   * size of msgQueue becomes larger than the available permits and will resume
   * only when the number of available permits allow it.
   */
  private final Semaphore queueSizeBytes;
  private final int queueMaxBytes;
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private ReplicationDB db;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -123,12 +94,6 @@
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #flush()}.
   * This can happen with the thread flushing the queue, or else on shutdown.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  /**
@@ -148,15 +113,8 @@
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
            + replicationServer.getServerId()
            + ") flusher thread for Replica DS(" + serverId
            + ") for domain \"" + baseDN + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -175,32 +133,13 @@
   */
  public void add(UpdateMsg updateMsg) throws ChangelogException
  {
    if (thread.isShutdownInitiated())
    if (shutdown.get())
    {
      throw new ChangelogException(
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg, baseDN, serverId));
    }
    final int msgSize = updateMsg.size();
    if (msgSize < queueMaxBytes)
    {
      try
      {
        queueSizeBytes.acquire(msgSize);
      }
      catch (InterruptedException e)
      {
        throw new ChangelogException(
            ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg, baseDN, serverId,
                stackTraceToSingleLineString(e)));
      }
    }
    else
    {
      // edge case with a very large message
      collectAllPermits();
    }
    msgQueue.add(updateMsg);
    db.addEntry(updateMsg);
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
@@ -214,22 +153,6 @@
    }
  }
  /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
  private void collectAllPermits()
  {
    int collectedPermits = queueSizeBytes.drainPermits();
    while (collectedPermits != queueMaxBytes)
    {
      // Do not use Thread.sleep() because:
      // 1) it is expected the permits will be released very soon
      // 2) we want to collect all the permits, so do not leave a chance to
      // other threads to steal them from us.
      // 3) we want to keep low latency
      Thread.yield();
      collectedPermits += queueSizeBytes.drainPermits();
    }
  }
  /**
   * Get the oldest CSN that has not been purged yet.
   *
@@ -288,79 +211,10 @@
   */
  public void shutdown()
  {
    if (thread.isShutdownInitiated())
    if (shutdown.compareAndSet(false, true))
    {
      return;
    }
    thread.initiateShutdown();
    while (msgQueue.size() != 0)
    {
      try
      {
        flush();
      }
      catch (ChangelogException e)
      {
        // We are already shutting down
        logger.error(e.getMessageObject());
      }
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
  }
  /**
   * Flushes the replicaDB queue from memory to stable storage.
   */
  @Override
  public void run()
  {
    thread.startWork();
    try
    {
      while (!thread.isShutdownInitiated())
      {
        try
        {
          flush();
        }
        catch (ChangelogException end)
        {
          stop(end);
          break;
        }
      }
      try
      {
        // call flush a last time before exiting to make sure that
        // no change was forgotten in the msgQueue
        flush();
      }
      catch (ChangelogException e)
      {
        stop(e);
      }
    }
    finally
    {
      thread.stopWork();
    }
  }
  private void stop(Exception e)
  {
    logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
    thread.initiateShutdown();
    if (replicationServer != null)
    {
      replicationServer.shutdown();
      db.shutdown();
      DirectoryServer.deregisterMonitorProvider(dbMonitor);
    }
  }
@@ -399,7 +253,7 @@
      {
        for (int j = 0; j < 50; j++)
        {
          if (thread.isShutdownInitiated())
          if (shutdown.get())
          {
            return;
          }
@@ -426,7 +280,7 @@
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        thread.initiateShutdown();
        shutdown.set(true);
        throw e;
      }
      finally
@@ -437,47 +291,6 @@
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   * <p>
   * @GuardedBy("flushLock")
   * @throws ChangelogException
   *           If a database problem happened
   */
  private void flush() throws ChangelogException
  {
    try
    {
      synchronized (flushLock)
      {
        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, check if shutdown was invoked
          return;
        }
        // Try to see if there are more changes and persist them all.
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        changes.add(change);
        msgQueue.drainTo(changes);
        int totalSize = db.addEntries(changes);
        // do not release more than queue max size permits
        // (be careful of the edge case with the very large message)
        queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
      }
    }
    catch (InterruptedException e)
    {
      throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
          .get(stackTraceToSingleLineString(e)));
    }
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * ReplicaDB.
   */
@@ -499,9 +312,6 @@
      {
        create(attributes, "last-change", encode(limits.newestCSN));
      }
      create(attributes, "queue-size", String.valueOf(msgQueue.size()));
      create(attributes, "queue-size-bytes",
          String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
      return attributes;
    }
@@ -550,8 +360,6 @@
   */
  public void clear() throws ChangelogException
  {
    collectAllPermits();
    msgQueue.clear(); // this call should not do anything at all
    db.clear();
    csnLimits = new CSNLimits(null, null);
  }