mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
18.40.2008 66f48672bc7953e77364a9a7ae41f1e70d83534f
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -81,6 +81,22 @@
  //
  private final LinkedList<UpdateMessage> msgQueue =
    new LinkedList<UpdateMessage>();
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueHimark = 5000;
  int queueLowmark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueHimarkBytes = 100 * queueHimark;
  int queueLowmarkBytes = 100 * queueLowmark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
  private ReplicationDB db;
  private ChangeNumber firstChange = null;
  private ChangeNumber lastChange = null;
@@ -93,14 +109,6 @@
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The High and low water mark for the max size of the msgQueue.
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  MSG_QUEUE_HIMARK and will resume
  // only when the size of the msgQueue goes below MSG_QUEUE_LOWMARK.
  final static int MSG_QUEUE_HIMARK = 5000;
  final static int MSG_QUEUE_LOWMARK = 4000;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
@@ -120,16 +128,22 @@
   * @param replicationServer The ReplicationServer that creates this dbHandler.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @param queueSize The queueSize to use when creating the dbHandler.
   * @throws DatabaseException If a database problem happened
   */
  public DbHandler(short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv)
  public DbHandler(
      short id, DN baseDn, ReplicationServer replicationServer,
      ReplicationDbEnv dbenv, int queueSize)
         throws DatabaseException
  {
    this.replicationServer = replicationServer;
    this.serverId = id;
    serverId = id;
    this.baseDn = baseDn;
    this.trimage = replicationServer.getTrimage();
    trimage = replicationServer.getTrimage();
    queueHimark = queueSize;
    queueLowmark = queueSize * 4 / 5;
    queueHimarkBytes = 100 * queueHimark;
    queueLowmarkBytes = 100 * queueLowmark;
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
@@ -156,7 +170,7 @@
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      while (size > MSG_QUEUE_HIMARK)
      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
      {
        try
        {
@@ -168,6 +182,7 @@
        size = msgQueue.size();
      }
      queueByteSize += update.size();
      msgQueue.add(update);
      if (lastChange == null || lastChange.older(update.getChangeNumber()))
      {
@@ -308,10 +323,12 @@
      int current = 0;
      while ((current < number) && (!msgQueue.isEmpty()))
      {
        msgQueue.remove();
        UpdateMessage msg = msgQueue.remove();
        queueByteSize -= msg.size();
        current++;
      }
      if (msgQueue.size() < MSG_QUEUE_LOWMARK)
      if ((msgQueue.size() < queueLowmark) &&
          (queueByteSize < queueLowmarkBytes))
        msgQueue.notify();
    }
  }
@@ -475,13 +492,14 @@
  private void flush()
  {
    int size;
    int chunksize = (500 < queueHimark ? 500 : queueHimark);
    do
    {
      synchronized(flushLock)
      {
        // get N messages to save in the DB
        List<UpdateMessage> changes = getChanges(500);
        List<UpdateMessage> changes = getChanges(chunksize);
        // if no more changes to save exit immediately.
        if ((changes == null) || ((size = changes.size()) == 0))
@@ -493,7 +511,7 @@
        // remove the changes from the list of changes to be saved.
        clearQueue(changes.size());
      }
    } while (size >=500);
    } while (size >= chunksize);
  }
  /**
@@ -529,6 +547,10 @@
        attributes.add(new Attribute("last-change",
            lastChange.toString() + " " + lastTime.toString()));
      }
      attributes.add(
          new Attribute("queue-size", String.valueOf(msgQueue.size())));
      attributes.add(
          new Attribute("queue-size-bytes", String.valueOf(queueByteSize)));
      return attributes;
    }
@@ -604,6 +626,7 @@
    synchronized(flushLock)
    {
      msgQueue.clear();
      queueByteSize = 0;
    }
    db.clear();
    firstChange = db.readFirstChange();