mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.33.2011 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,7 +51,6 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
/**
 * This class is used for managing the replicationServer database for each
@@ -111,9 +110,6 @@
  private final Object flushLock = new Object();
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  private long latestTrimDate = 0;
  /**
@@ -122,7 +118,7 @@
   * are older than this age are removed.
   *
   */
  private long trimage;
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -143,7 +139,7 @@
    this.replicationServer = replicationServer;
    serverId = id;
    this.baseDn = baseDn;
    trimage = replicationServer.getTrimage();
    trimAge = replicationServer.getTrimAge();
    queueMaxSize = queueSize;
    queueLowmark = queueSize * 1 / 5;
    queueHimark = queueSize * 4 / 5;
@@ -291,43 +287,6 @@
  }
  /**
   * Return the number of changes between 2 provided change numbers.
   * @param from The lower (older) change number.
   * @param to   The upper (newer) change number.
   * @return The computed number of changes.
   */
  public int traverseAndCount(ChangeNumber from, ChangeNumber to)
  {
    int count = 0;
    flush();
    ReplServerDBCursor cursor = null;
    try
    {
      try
      {
        cursor = db.openReadCursor(from);
      }
      catch(Exception e)
      {
        return 0;
      }
      ChangeNumber curr = null;
      while ((curr = cursor.nextChangeNumber())!=null)
      {
        if (curr.newerOrEquals(to))
          break;
        count++;
      }
    }
    finally
    {
      if (cursor != null)
        cursor.abort();
    }
    return count;
  }
  /**
   * Removes the provided number of messages from the beginning of the msgQueue.
   *
   * @param number the number of changes to be removed.
@@ -456,80 +415,67 @@
   */
  private void trim() throws DatabaseException, Exception
  {
    if (trimage == 0)
    if (trimAge == 0)
    {
      return;
    int size = 0;
    boolean finished = false;
    boolean done = false;
    }
    latestTrimDate = TimeThread.getTime() - trimage;
    latestTrimDate = TimeThread.getTime() - trimAge;
    ChangeNumber trimDate = new ChangeNumber(latestTrimDate, 0, 0);
    // Find the last changeNumber before the trimDate, in the Database.
    ChangeNumber lastBeforeTrimDate = db.getPreviousChangeNumber(trimDate);
    ChangeNumber lastBeforeTrimDate = db
        .getPreviousChangeNumber(trimDate);
    if (lastBeforeTrimDate != null)
    {
      // If we found it, we want to stop trimming when reaching it.
      trimDate = lastBeforeTrimDate;
    }
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    for (int i = 0; i < 100; i++)
    {
      synchronized (flushLock)
      {
        /* the trim is done by group in order to save some CPU and IO bandwidth
        /*
         * the trim is done by group in order to save some CPU and IO bandwidth
         * start the transaction then do a bunch of remove then commit
         */
        ReplServerDBCursor cursor = db.openDeleteCursor();
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
        {
          while ((size < 5000 ) &&  (!finished))
          for (int j = 0; j < 50; j++)
          {
            ChangeNumber changeNumber = cursor.nextChangeNumber();
            if (changeNumber != null)
            if (changeNumber == null)
            {
              if ((!changeNumber.equals(lastChange))
                  && (changeNumber.older(trimDate)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstChange = changeNumber;
                finished = true;
              }
              cursor.close();
              done = true;
              return;
            }
            if ((!changeNumber.equals(lastChange))
                && (changeNumber.older(trimDate)))
            {
              cursor.delete();
            }
            else
              finished = true;
            {
              firstChange = changeNumber;
              cursor.close();
              done = true;
              return;
            }
          }
          cursor.close();
          done = true;
        }
        catch (LockConflictException e)
        {
          cursor.abort();
          if (tries == DEADLOCK_RETRIES)
          {
            // could not handle the Deadlock after DEADLOCK_RETRIES tries.
            // shutdown the ReplicationServer.
            shutdown = true;
            throw (e);
          }
        }
        catch (Exception e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          shutdown = true;
          cursor.abort();
          throw (e);
          shutdown = true;
          throw e;
        }
      }
    }
@@ -644,7 +590,7 @@
   */
  public void setPurgeDelay(long delay)
  {
    trimage = delay;
    trimAge = delay;
  }
  /**