mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
27.43.2013 443a7bb2635912474bb086501b53553528e6e09f
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -26,29 +26,29 @@
 *      Portions copyright 2011-2013 ForgeRock AS
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.LinkedList;
import java.util.List;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.ChangeNumber;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.InitializationException;
import org.opends.server.util.TimeThread;
import com.sleepycat.je.DatabaseException;
@@ -58,45 +58,54 @@
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * This class is also able to generate a ReplicationIterator that can be
 * used to read all changes from a given ChangeNUmber.
 * used to read all changes from a given ChangeNumber.
 *
 * This class publish some monitoring information below cn=monitor.
 *
 */
public class DbHandler implements Runnable
{
  // The msgQueue holds all the updates not yet saved to stable storage.
  // This list is only used as a temporary placeholder so that the write
  // in the stable storage can be grouped for efficiency reason.
  // Adding an update synchronously add the update to this list.
  // A dedicated thread loops on flush() and trim().
  // flush() : get a number of changes from the in memory list by block
  //           and write them to the db.
  // trim()  : deletes from the DB a number of changes that are older than a
  //           certain date.
  //
  // Changes are not read back by replicationServer threads that are responsible
  // for pushing the changes to other replication server or to LDAP server
  //
  /**
   * The msgQueue holds all the updates not yet saved to stable storage.
   * <p>
   * This list is only used as a temporary placeholder so that the write in the
   * stable storage can be grouped for efficiency reason. Adding an update
   * synchronously add the update to this list. A dedicated thread loops on
   * flush() and trim().
   * <dl>
   * <dt>flush()</dt>
   * <dd>get a number of changes from the in memory list by block and write them
   * to the db.</dd>
   * <dt>trim()</dt>
   * <dd>deletes from the DB a number of changes that are older than a certain
   * date.</dd>
   * </dl>
   * <p>
   * Changes are not read back by replicationServer threads that are responsible
   * for pushing the changes to other replication server or to LDAP server
   */
  private final LinkedList<UpdateMsg> msgQueue =
    new LinkedList<UpdateMsg>();
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueMaxSize = 5000;
  int queueLowmark = 1000;
  int queueHimark = 4000;
  /**
   * The High and low water mark for the max size of the msgQueue. The threads
   * calling add() method will be blocked if the size of msgQueue becomes larger
   * than the queueHimark and will resume only when the size of the msgQueue
   * goes below queueLowmark.
   */
  private int queueMaxSize = 5000;
  private int queueLowmark = 1000;
  private int queueHimark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueMaxBytes = 100 * queueMaxSize;
  int queueLowmarkBytes = 100 * queueLowmark;
  int queueHimarkBytes = 100 * queueHimark;
  /**
   * The queue himark and lowmark in bytes, this is set to 100 times the himark
   * and lowmark in number of updates.
   */
  private int queueMaxBytes = 100 * queueMaxSize;
  private int queueLowmarkBytes = 100 * queueLowmark;
  private int queueHimarkBytes = 100 * queueHimark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
  /** The number of bytes currently in the queue */
  private int queueByteSize = 0;
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
@@ -113,10 +122,8 @@
  private long latestTrimDate = 0;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimAge;
@@ -195,7 +202,9 @@
        lastChange = update.getChangeNumber();
      }
      if (firstChange == null)
      {
        firstChange = update.getChangeNumber();
      }
    }
  }
@@ -299,9 +308,11 @@
        queueByteSize -= msg.size();
        current++;
      }
      if ((msgQueue.size() < queueLowmark) &&
          (queueByteSize < queueLowmarkBytes))
      if ((msgQueue.size() < queueLowmark)
          && (queueByteSize < queueLowmarkBytes))
      {
        msgQueue.notifyAll();
      }
    }
  }
@@ -334,7 +345,9 @@
    }
    while (msgQueue.size() != 0)
    {
      flush();
    }
    db.shutdown();
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -345,6 +358,7 @@
   * Periodically Flushes the ReplicationServerDomain cache from memory to the
   * stable storage and trims the old updates.
   */
  @Override
  public void run()
  {
    while (!shutdown)
@@ -650,16 +664,15 @@
   */
  public int getCount(ChangeNumber from, ChangeNumber to)
  {
    int c=0;
    // Now that we always keep the last ChangeNumber in the DB to avoid
    // expiring cookies to quickly, we need to check if the "to"
    // expiring cookies too quickly, we need to check if the "to"
    // is older than the trim date.
    if ((to == null) || !to.older(new ChangeNumber(latestTrimDate, 0, 0)))
    {
      flush();
      c = db.count(from, to);
      return db.count(from, to);
    }
    return c;
    return 0;
  }
}