mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
31.40.2014 90506e9b23401358f100dafdd684938d9b274393
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -301,11 +301,6 @@
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
      throws ChangelogException
  {
    if (startAfterCSN == null)
    {
      // flush any potential changes before opening the cursor
      flush();
    }
    return new JEReplicaDBCursor(db, startAfterCSN, this);
  }
@@ -427,7 +422,6 @@
      trimDate = lastBeforeTrimDate;
    }
    final int queueLowMarkBytes = queueMaxBytes / 5;
    for (int i = 0; i < 100; i++)
    {
      /*
@@ -436,69 +430,73 @@
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
      if (i > 20 && isQueueAboveLowMark())
      {
        break;
      }
      synchronized (flushLock)
      /*
       * the trim is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then
       * commit.
       */
      /*
       * Matt wrote: The record removal is done as a DB transaction and the
       * deleted records are only "deleted" on commit. While the txn/cursor is
       * open the records to be deleted will, I think, be pinned in the DB
       * cache. In other words, the larger the transaction (the more records
       * deleted during a single batch) the more DB cache will be used to
       * process the transaction.
       */
      final ReplServerDBCursor cursor = db.openDeleteCursor();
      try
      {
        /*
         * the trim is done by group in order to save some CPU, IO bandwidth and
         * DB caches: start the transaction then do a bunch of remove then
         * commit.
         */
        /*
         * Matt wrote: The record removal is done as a DB transaction and the
         * deleted records are only "deleted" on commit. While the txn/cursor is
         * open the records to be deleted will, I think, be pinned in the DB
         * cache. In other words, the larger the transaction (the more records
         * deleted during a single batch) the more DB cache will be used to
         * process the transaction.
         */
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
        for (int j = 0; j < 50; j++)
        {
          for (int j = 0; j < 50; j++)
          if (thread.isShutdownInitiated())
          {
            if (thread.isShutdownInitiated())
            {
              return;
            }
            return;
          }
            CSN csn = cursor.nextCSN();
            if (csn == null)
            {
              return;
            }
          CSN csn = cursor.nextCSN();
          if (csn == null)
          {
            return;
          }
            if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
            {
              cursor.delete();
            }
            else
            {
              csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
              return;
            }
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
          {
            cursor.delete();
          }
          else
          {
            csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
            return;
          }
        }
        catch (ChangelogException e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          cursor.abort();
          thread.initiateShutdown();
          throw e;
        }
        finally
        {
          cursor.close();
        }
      }
      catch (ChangelogException e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        thread.initiateShutdown();
        throw e;
      }
      finally
      {
        cursor.close();
      }
    }
  }
  private boolean isQueueAboveLowMark()
  {
    final int lowMarkBytes = queueMaxBytes / 5;
    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
    return bytesUsed > lowMarkBytes;
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
@@ -621,13 +619,10 @@
   */
  public void clear() throws ChangelogException
  {
    synchronized(flushLock)
    {
      collectAllPermits();
      msgQueue.clear();
      db.clear();
      csnLimits = new CSNLimits(null, null);
    }
    collectAllPermits();
    msgQueue.clear(); // this call should not do anything at all
    db.clear();
    csnLimits = new CSNLimits(null, null);
  }
  /**