mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
02.51.2014 819f74758a1c464bbf578e70ca8592cc8d101d75
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
   * <p>
   * This blocking queue is only used as a temporary placeholder so that the
   * write in the stable storage can be grouped for efficiency reason. Adding an
   * update synchronously add the update to this list. A dedicated thread loops
   * on {@link #flush()} and {@link #trim()}.
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
   * to the db.</dd>
   * <dt>trim()</dt>
   * <dd>deletes from the DB a number of changes that are older than a certain
   * date.</dd>
   * </dl>
   * update synchronously add the update to this list. A dedicated thread
   * flushes this blocking queue.
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private DirectoryThread thread;
  /**
   * Used to prevent race conditions between threads calling {@link #clear()}
   * {@link #flush()} or {@link #trim()}. This can happen with the thread
   * flushing the queue, on shutdown or on cursor opening, a thread calling
   * clear(), etc.
   * Used to prevent race conditions between threads calling {@link #flush()}.
   * This can happen with the thread flushing the queue, or else on shutdown.
   */
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  private long latestTrimDate = 0;
  /**
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   */
  private long trimAge;
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
@@ -166,15 +147,14 @@
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    trimAge = replicationServer.getTrimAge();
    queueMaxBytes = replicationServer.getQueueSize() * 200;
    queueSizeBytes = new Semaphore(queueMaxBytes);
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + serverId
        + ") for domain \"" + baseDN + "\"");
            + replicationServer.getServerId()
            + ") flusher thread for Replica DS(" + serverId
            + ") for domain \"" + baseDN + "\"");
    thread.start();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
  }
  /**
   * Run method for this class.
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   * Flushes the replicaDB queue from memory to stable storage.
   */
  @Override
  public void run()
@@ -350,7 +328,6 @@
        try
        {
          flush();
          trim();
        }
        catch (ChangelogException end)
        {
@@ -390,55 +367,26 @@
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
   * Synchronously purge changes older than purgeCSN from this replicaDB.
   *
   * @param purgeCSN
   *          The CSN up to which changes can be purged. No purging happens when
   *          it is null.
   * @throws ChangelogException
   *           In case of database problem.
   */
  public long getLatestTrimDate()
  void purgeUpTo(final CSN purgeCSN) throws ChangelogException
  {
    return latestTrimDate;
  }
  /**
   * Trim old changes from this replicationServer database.
   * @throws ChangelogException In case of database problem.
   */
  private void trim() throws ChangelogException
  {
    if (trimAge == 0)
    if (purgeCSN == null)
    {
      return;
    }
    latestTrimDate = TimeThread.getTime() - trimAge;
    CSN trimDate = new CSN(latestTrimDate, 0, 0);
    // Find the last CSN before the trimDate, in the Database.
    CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    for (int i = 0; i < 100; i++)
    {
      /*
       * Perform at least some trimming regardless of the flush backlog. Then
       * continue trim iterations while the flush backlog is low (below the
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && isQueueAboveLowMark())
      {
        break;
      }
      /*
       * the trim is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then
       * commit.
       * the purge is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then commit.
       */
      /*
       * Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
            return;
          }
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
          {
            cursor.delete();
          }
@@ -490,37 +438,31 @@
    }
  }
  private boolean isQueueAboveLowMark()
  {
    final int lowMarkBytes = queueMaxBytes / 5;
    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
    return bytesUsed > lowMarkBytes;
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
   * Flush is done by chunk sized to 500 messages, starting from the beginning
   * of the list.
   *
   * <p>
   * @GuardedBy("flushLock")
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void flush() throws ChangelogException
  private void flush() throws ChangelogException
  {
    try
    {
      synchronized (flushLock)
      {
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
        final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
        if (change == null)
        {
          // nothing to persist, move on to the trim phase
          // nothing to persist, check if shutdown was invoked
          return;
        }
        // Try to see if there are more changes and persist them all.
        final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
        changes.add(change);
        msgQueue.drainTo(changes);
@@ -604,15 +546,6 @@
  }
  /**
   * Set the Purge delay for this db Handler.
   * @param delay The purge delay in Milliseconds.
   */
  public void setPurgeDelay(long delay)
  {
    trimAge = delay;
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage).
   * @throws ChangelogException When an exception occurs while removing the
   * changes from the DB.
@@ -636,13 +569,15 @@
  }
  /**
   * Return the size of the msgQueue (the memory cache of the ReplicaDB).
   * Return the number of records of this replicaDB.
   * <p>
   * For test purpose.
   * @return The memory queue size.
   *
   * @return The number of records of this replicaDB.
   */
  int getQueueSize()
  long getNumberRecords()
  {
    return this.msgQueue.size();
    return db.getNumberRecords();
  }
  /**