mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
31.40.2014 90506e9b23401358f100dafdd684938d9b274393
OPENDJ-1177 Re-implement changelog purging logic

Code review: Matthew Swift

Code fix after r10601.
Several problems shown by continuous integration:
* Deadlocks,
* Long waits

JEReplicaDB.java:
In clear(), calling collectAllPermits() while holding the flushLock actually prevented flushing to happen because it also needed the flushLock. Calling collectAllPermits() does not require to hold the flushLock, and once this method returns the msgQueue should be empty anyway and all the changes should have been pushed to the DB (flush() first removes messages from msgQueue, then add to DB, then releases all permits). In effect, there is no need to synchronize on flushLock anymore.
In trim(), the check for queue being below low mark was wrong (comparing queue size with number of bytes): extracted isQueueAboveLowMark() and fixed its definition + removed synchronized (flushLock) because it is now deemed unnecessary.
In generateCursorFrom(), Removed unnecessary call to flush() because that method is also called from JEReplicaDBCursor ctor.
1 files modified
21 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 21 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -301,11 +301,6 @@
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
      throws ChangelogException
  {
    if (startAfterCSN == null)
    {
      // flush any potential changes before opening the cursor
      flush();
    }
    return new JEReplicaDBCursor(db, startAfterCSN, this);
  }
@@ -427,7 +422,6 @@
      trimDate = lastBeforeTrimDate;
    }
    final int queueLowMarkBytes = queueMaxBytes / 5;
    for (int i = 0; i < 100; i++)
    {
      /*
@@ -436,13 +430,11 @@
       * lowmark). Once the flush backlog increases, stop trimming and start
       * flushing more eagerly.
       */
      if (i > 20 && msgQueue.size() < queueLowMarkBytes)
      if (i > 20 && isQueueAboveLowMark())
      {
        break;
      }
      synchronized (flushLock)
      {
        /*
         * the trim is done by group in order to save some CPU, IO bandwidth and
         * DB caches: start the transaction then do a bunch of remove then
@@ -497,6 +489,12 @@
        }
      }
    }
  private boolean isQueueAboveLowMark()
  {
    final int lowMarkBytes = queueMaxBytes / 5;
    final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
    return bytesUsed > lowMarkBytes;
  }
  /**
@@ -621,14 +619,11 @@
   */
  public void clear() throws ChangelogException
  {
    synchronized(flushLock)
    {
      collectAllPermits();
      msgQueue.clear();
    msgQueue.clear(); // this call should not do anything at all
      db.clear();
      csnLimits = new CSNLimits(null, null);
    }
  }
  /**
   * Getter for the serverID of the server for which this database is managed.