mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Matthew Swift
08.33.2011 7a34cefa2a5bbdf339f1a50b856e3d7441006b8d
opends/src/server/org/opends/server/replication/server/DraftCNDbHandler.java
@@ -50,7 +50,6 @@
import org.opends.server.types.InitializationException;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LockConflictException;
/**
 * This class is used for managing the replicationServer database for each
@@ -84,16 +83,13 @@
  private DirectoryThread thread = null;
  private ReplicationServer replicationServer;
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
   * are older than this age are removed.
   *
   */
  private long trimage;
  private long trimAge;
  /**
   * Creates a new dbHandler associated to a given LDAP server.
@@ -108,7 +104,7 @@
         throws DatabaseException
  {
    this.replicationServer = replicationServer;
    this.trimage = replicationServer.getTrimage();
    this.trimAge = replicationServer.getTrimAge();
    // DB initialization
    db = new DraftCNDB(replicationServer, dbenv);
@@ -312,7 +308,7 @@
   */
  public void trim() throws DatabaseException, Exception
  {
    if (trimage == 0)
    if (trimAge == 0)
      return;
    clear(null);
@@ -330,106 +326,86 @@
   *
   */
  public void clear(String serviceIDToClear)
  throws DatabaseException, Exception
      throws DatabaseException, Exception
  {
    if (this.count()==0)
    if (this.count() == 0)
    {
      return;
    }
    int size = 0;
    int tries = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber crossDomainEligibleCN = replicationServer
        .getEligibleCN();
    ChangeNumber crossDomainEligibleCN = replicationServer.getEligibleCN();
    // In case of deadlock detection by the Database, this thread can
    // by aborted by a DeadlockException. This is a transient error and
    // the transaction should be attempted again.
    // We will try DEADLOCK_RETRIES times before failing.
    while ((tries++ < DEADLOCK_RETRIES) && (!done))
    for (int i = 0; i < 100; i++)
    {
      DraftCNDBCursor cursor = db.openDeleteCursor();
      try
      {
        while ((size < 5000 ) &&  (!finished))
        for (int j = 0; j < 50; j++)
        {
          // let's traverse the DraftCNDb
          if (!cursor.next())
          {
            finished=true;
            cursor.close();
            return;
          }
          else
          ChangeNumber cn = cursor.currentChangeNumber();
          // From the draftCNDb change record, get the domain and changeNumber
          String serviceID = cursor.currentServiceID();
          if ((serviceIDToClear != null)
              && (serviceIDToClear.equalsIgnoreCase(serviceID)))
          {
            ChangeNumber cn = cursor.currentChangeNumber();
            // From the draftCNDb change record, get the domain and changeNumber
            String serviceID = cursor.currentServiceID();
            if ((serviceIDToClear!=null) &&
                (serviceIDToClear.equalsIgnoreCase(serviceID)))
            {
              size++;
              cursor.delete();
              continue;
            }
            ReplicationServerDomain domain =
              replicationServer.getReplicationServerDomain(serviceID, false);
            if (domain==null)
            {
              // the domain has been removed since the record was written in the
              // draftCNDb, thus it makes no sense to keep the record in the
              // draftCNDb.
              size++;
              cursor.delete();
            }
            else
            {
              ServerState startState = domain.getStartState();
              // We don't use the returned endState but it's updating CN as
              // reading
              domain.getEligibleState(crossDomainEligibleCN);
              ChangeNumber fcn = startState.getMaxChangeNumber(
                  cn.getServerId());
              int currentKey = cursor.currentKey();
              // Do not delete the lastKey. This should allow us to
              // preserve last change number over time.
              if ((currentKey != lastkey) && (cn.older(fcn)))
              {
                size++;
                cursor.delete();
              }
              else
              {
                firstkey = currentKey;
                finished = true;
              }
            }
            cursor.delete();
            continue;
          }
          ReplicationServerDomain domain = replicationServer
              .getReplicationServerDomain(serviceID, false);
          if (domain == null)
          {
            // the domain has been removed since the record was written in the
            // draftCNDb, thus it makes no sense to keep the record in the
            // draftCNDb.
            cursor.delete();
            continue;
          }
          ServerState startState = domain.getStartState();
          // We don't use the returned endState but it's updating CN as
          // reading
          domain.getEligibleState(crossDomainEligibleCN);
          ChangeNumber fcn = startState.getMaxChangeNumber(cn
              .getServerId());
          int currentKey = cursor.currentKey();
          // Do not delete the lastKey. This should allow us to
          // preserve last change number over time.
          if ((currentKey != lastkey) && (cn.older(fcn)))
          {
            cursor.delete();
            continue;
          }
          firstkey = currentKey;
          cursor.close();
          return;
        }
        cursor.close();
        done = true;
      }
      catch (LockConflictException e)
      {
        cursor.abort();
        if (tries == DEADLOCK_RETRIES)
        {
          // could not handle the Deadlock after DEADLOCK_RETRIES tries.
          // shutdown the ReplicationServer.
          shutdown = true;
          throw e;
        }
      }
      catch (Exception e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        shutdown = true;
        cursor.abort();
        shutdown = true;
        throw e;
      }
    }
@@ -492,7 +468,7 @@
   */
  public void setPurgeDelay(long delay)
  {
    trimage = delay;
    trimAge = delay;
  }
  /**