mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
31.40.2014 90506e9b23401358f100dafdd684938d9b274393
OPENDJ-1177 Re-implement changelog purging logic

Code review: Matthew Swift

Code fix after r10601.
Several problems shown by continuous integration:
* Deadlocks,
* Long waits

JEReplicaDB.java:
In clear(), calling collectAllPermits() while holding the flushLock actually prevented flushing to happen because it also needed the flushLock. Calling collectAllPermits() does not require to hold the flushLock, and once this method returns the msgQueue should be empty anyway and all the changes should have been pushed to the DB (flush() first removes messages from msgQueue, then add to DB, then releases all permits). In effect, there is no need to synchronize on flushLock anymore.
In trim(), the check for queue being below low mark was wrong (comparing queue size with number of bytes): extracted isQueueAboveLowMark() and fixed its definition + removed synchronized (flushLock) because it is now deemed unnecessary.
In generateCursorFrom(), Removed unnecessary call to flush() because that method is also called from JEReplicaDBCursor ctor.
1 files modified
117 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 117 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -301,11 +301,6 @@
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
      throws ChangelogException
  {
    if (startAfterCSN == null)
    {
      // flush any potential changes before opening the cursor
      flush();
    }
    return new JEReplicaDBCursor(db, startAfterCSN, this);
  }
@@ -427,7 +422,6 @@
      trimDate = lastBeforeTrimDate;
    }
    final int queueLowMarkBytes = queueMaxBytes / 5;
    for (int i = 0; i < 100; i++)
    {
      /*
@@ -436,69 +430,73 @@
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
      if (i > 20 && isQueueAboveLowMark())
      {
        break;
      }
      synchronized (flushLock)
      /*
       * the trim is done by group in order to save some CPU, IO bandwidth and
       * DB caches: start the transaction then do a bunch of remove then
       * commit.
       */
      /*
       * Matt wrote: The record removal is done as a DB transaction and the
       * deleted records are only "deleted" on commit. While the txn/cursor is
       * open the records to be deleted will, I think, be pinned in the DB
       * cache. In other words, the larger the transaction (the more records
       * deleted during a single batch) the more DB cache will be used to
       * process the transaction.
       */
      final ReplServerDBCursor cursor = db.openDeleteCursor();
      try
      {
        /*
         * the trim is done by group in order to save some CPU, IO bandwidth and
         * DB caches: start the transaction then do a bunch of remove then
         * commit.
         */
        /*
         * Matt wrote: The record removal is done as a DB transaction and the
         * deleted records are only "deleted" on commit. While the txn/cursor is
         * open the records to be deleted will, I think, be pinned in the DB
         * cache. In other words, the larger the transaction (the more records
         * deleted during a single batch) the more DB cache will be used to
         * process the transaction.
         */
        final ReplServerDBCursor cursor = db.openDeleteCursor();
        try
        for (int j = 0; j < 50; j++)
        {
          for (int j = 0; j < 50; j++)
          if (thread.isShutdownInitiated())
          {
            if (thread.isShutdownInitiated())
            {
              return;
            }
            return;
          }
            CSN csn = cursor.nextCSN();
            if (csn == null)
            {
              return;
            }
          CSN csn = cursor.nextCSN();
          if (csn == null)
          {
            return;
          }
            if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
            {
              cursor.delete();
            }
            else
            {
              csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
              return;
            }
          if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
          {
            cursor.delete();
          }
          else
          {
            csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
            return;
          }
        }
        catch (ChangelogException e)
        {
          // mark shutdown for this db so that we don't try again to
          // stop it from cursor.close() or methods called by cursor.close()
          cursor.abort();
          thread.initiateShutdown();
          throw e;
        }
        finally
        {
          cursor.close();
        }
      }
      catch (ChangelogException e)
      {
        // mark shutdown for this db so that we don't try again to
        // stop it from cursor.close() or methods called by cursor.close()
        cursor.abort();
        thread.initiateShutdown();
        throw e;
      }
      finally
      {
        cursor.close();
      }
    }
  }
  private boolean isQueueAboveLowMark()
  {
    final int lowMarkBytes = queueMaxBytes / 5;
    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
    return bytesUsed > lowMarkBytes;
  }
  /**
   * Flush a number of updates from the memory list to the stable storage.
   * <p>
@@ -621,13 +619,10 @@
   */
  public void clear() throws ChangelogException
  {
    synchronized(flushLock)
    {
      collectAllPermits();
      msgQueue.clear();
      db.clear();
      csnLimits = new CSNLimits(null, null);
    }
    collectAllPermits();
    msgQueue.clear(); // this call should not do anything at all
    db.clear();
    csnLimits = new CSNLimits(null, null);
  }
  /**