mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
14.29.2009 bcf686add35bda4a6ac5c3d085abe151ea018e8e
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -22,7 +22,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 */
package org.opends.server.replication.server;
import org.opends.messages.MessageBuilder;
@@ -86,13 +86,15 @@
  // the threads calling add() method will be blocked if the size of
  // msgQueue becomes larger than the  queueHimark and will resume
  // only when the size of the msgQueue goes below queueLowmark.
  int queueHimark = 5000;
  int queueLowmark = 4000;
  int queueMaxSize = 5000;
  int queueLowmark = 1000;
  int queueHimark = 4000;
  // The queue himark and lowmark in bytes, this is set to 100 times the
  // himark and lowmark in number of updates.
  int queueHimarkBytes = 100 * queueHimark;
  int queueMaxBytes = 100 * queueMaxSize;
  int queueLowmarkBytes = 100 * queueLowmark;
  int queueHimarkBytes = 100 * queueHimark;
  // The number of bytes currently in the queue
  int queueByteSize = 0;
@@ -140,10 +142,12 @@
    serverId = id;
    this.baseDn = baseDn;
    trimage = replicationServer.getTrimage();
    queueHimark = queueSize;
    queueLowmark = queueSize * 4 / 5;
    queueHimarkBytes = 100 * queueHimark;
    queueLowmarkBytes = 100 * queueLowmark;
    queueMaxSize = queueSize;
    queueLowmark = queueSize * 1 / 5;
    queueHimark = queueSize * 4 / 5;
    queueMaxBytes = 200 * queueMaxSize;
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    db = new ReplicationDB(id, baseDn, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
@@ -171,11 +175,14 @@
    synchronized (msgQueue)
    {
      int size = msgQueue.size();
      while ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
      if ((size > queueHimark) || (queueByteSize > queueHimarkBytes))
        msgQueue.notify();
      while ((size > queueMaxSize) || (queueByteSize > queueMaxBytes))
      {
        try
        {
          msgQueue.wait(500);
          msgQueue.wait(5000);
        } catch (InterruptedException e)
        {
          // simply loop to try again.
@@ -379,17 +386,22 @@
  {
    while (shutdown == false)
    {
      try {
      try
      {
        flush();
        trim();
        synchronized (this)
        synchronized (msgQueue)
        {
          try
          if ((msgQueue.size() < queueLowmark) &&
              (queueByteSize < queueLowmarkBytes))
          {
            this.wait(1000);
          } catch (InterruptedException e)
          { }
            try
            {
              msgQueue.wait(10000);
            } catch (InterruptedException e)
            { }
          }
        }
      } catch (Exception end)
      {
@@ -434,56 +446,59 @@
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    {
      /* the trim is done by group in order to save some CPU and IO bandwidth
       * start the transaction then do a bunch of remove then commit
       */
      ReplServerDBCursor cursor;
      cursor = db.openDeleteCursor();
      try
      synchronized (flushLock)
      {
        while ((size < 5000 ) &&  (!finished))
        /* the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         */
        ReplServerDBCursor cursor;
        cursor = db.openDeleteCursor();
        try
        {
          ChangeNumber changeNumber = cursor.nextChangeNumber();
          if (changeNumber != null)
          while ((size < 5000 ) &&  (!finished))
          {
            if ((!changeNumber.equals(lastChange))
                && (changeNumber.older(trimDate)))
            ChangeNumber changeNumber = cursor.nextChangeNumber();
            if (changeNumber != null)
            {
              size++;
              cursor.delete();
              if ((!changeNumber.equals(lastChange))
                  && (changeNumber.older(trimDate)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstChange = changeNumber;
                finished = true;
              }
            }
            else
            {
              firstChange = changeNumber;
              finished = true;
            }
          }
          else
            finished = true;
          cursor.close();
          done = true;
        }
        cursor.close();
        done = true;
      }
      catch (DeadlockException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        catch (DeadlockException e)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          cursor.abort();
          if (tries == DEADLOCK_RETRIES)
          {
            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
            // shutdown the ReplicationServer.
            shutdown = true;
            throw (e);
          }
        }
        catch (DatabaseException e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          shutdown = true;
          cursor.abort();
          throw (e);
        }
      }
      catch (DatabaseException e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        throw (e);
      }
    }
  }
@@ -493,7 +508,7 @@
  private void flush()
  {
    int size;
    int chunksize = (500 < queueHimark ? 500 : queueHimark);
    int chunksize = (500 < queueMaxSize ? 500 : queueMaxSize);
    do
    {
@@ -630,9 +645,10 @@
    {
      msgQueue.clear();
      queueByteSize = 0;
      db.clear();
      firstChange = db.readFirstChange();
      lastChange = db.readLastChange();
    }
    db.clear();
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
  }
}