mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

gbellato
01.40.2008 7e6c6657bced35f4a3aba723c2add20923450ad6
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -51,6 +51,7 @@
import org.opends.server.replication.server.ReplicationDB.ReplServerDBCursor;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.DeadlockException;
/**
 * This class is used for managing the replicationServer database for each
@@ -99,6 +100,9 @@
  final static int MSG_QUEUE_HIMARK = 5000;
  final static int MSG_QUEUE_LOWMARK = 4000;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
@@ -285,7 +289,10 @@
      }
    }
    return new ReplicationIterator(serverId, db, changeNumber);
    ReplicationIterator it =
      new ReplicationIterator(serverId, db, changeNumber);
    return it;
  }
  /**
@@ -397,46 +404,67 @@
      return;
    int size = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
        (short) 0, (short)0);
    /* the trim is done by group in order to save some CPU and IO bandwidth
     * start the transaction then do a bunch of remove then commit
     */
    ReplServerDBCursor cursor;
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    int tries = 0;
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    {
      /* the trim is done by group in order to save some CPU and IO bandwidth
       * start the transaction then do a bunch of remove then commit
       */
      ReplServerDBCursor cursor;
      cursor = db.openDeleteCursor();
    cursor = db.openDeleteCursor();
    try {
      while ((size < 5000 ) &&  (!finished))
      try
      {
        ChangeNumber changeNumber = cursor.nextChangeNumber();
        if (changeNumber != null)
        while ((size < 5000 ) &&  (!finished))
        {
          if ((!changeNumber.equals(lastChange))
              && (changeNumber.older(trimDate)))
          ChangeNumber changeNumber = cursor.nextChangeNumber();
          if (changeNumber != null)
          {
            size++;
            cursor.delete();
            if ((!changeNumber.equals(lastChange))
                && (changeNumber.older(trimDate)))
            {
              size++;
              cursor.delete();
            }
            else
            {
              firstChange = changeNumber;
              finished = true;
            }
          }
          else
          {
            firstChange = changeNumber;
            finished = true;
          }
        }
        else
          finished = true;
        cursor.close();
        done = true;
      }
      cursor.close();
    } catch (DatabaseException e)
    {
      // mark shutdown for this db so that we don't try again to
      // stop it from cursor.close() or methods called by cursor.close()
      shutdown = true;
      cursor.close();
      throw (e);
      catch (DeadlockException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          shutdown = true;
          throw (e);
        }
      }
      catch (DatabaseException e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        throw (e);
      }
    }
  }