From 14e5a18a4ba3146470fd4d82152b6f061b442dce Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 31 Mar 2014 09:40:11 +0000
Subject: [PATCH] OPENDJ-1177 Re-implement changelog purging logic
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 117 ++++++++++++++++++++++++++++------------------------------
1 files changed, 56 insertions(+), 61 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 4af7853..e7309b0 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -301,11 +301,6 @@
public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
throws ChangelogException
{
- if (startAfterCSN == null)
- {
- // flush any potential changes before opening the cursor
- flush();
- }
return new JEReplicaDBCursor(db, startAfterCSN, this);
}
@@ -427,7 +422,6 @@
trimDate = lastBeforeTrimDate;
}
- final int queueLowMarkBytes = queueMaxBytes / 5;
for (int i = 0; i < 100; i++)
{
/*
@@ -436,69 +430,73 @@
* lowmark). Once the flush backlog increases, stop trimming and start
* flushing more eagerly.
*/
- if (i > 20 && msgQueue.size() < queueLowMarkBytes)
+ if (i > 20 && isQueueAboveLowMark())
{
break;
}
- synchronized (flushLock)
+ /*
+ * the trim is done by group in order to save some CPU, IO bandwidth and
+ * DB caches: start the transaction then do a bunch of remove then
+ * commit.
+ */
+ /*
+ * Matt wrote: The record removal is done as a DB transaction and the
+ * deleted records are only "deleted" on commit. While the txn/cursor is
+ * open the records to be deleted will, I think, be pinned in the DB
+ * cache. In other words, the larger the transaction (the more records
+ * deleted during a single batch) the more DB cache will be used to
+ * process the transaction.
+ */
+ final ReplServerDBCursor cursor = db.openDeleteCursor();
+ try
{
- /*
- * the trim is done by group in order to save some CPU, IO bandwidth and
- * DB caches: start the transaction then do a bunch of remove then
- * commit.
- */
- /*
- * Matt wrote: The record removal is done as a DB transaction and the
- * deleted records are only "deleted" on commit. While the txn/cursor is
- * open the records to be deleted will, I think, be pinned in the DB
- * cache. In other words, the larger the transaction (the more records
- * deleted during a single batch) the more DB cache will be used to
- * process the transaction.
- */
- final ReplServerDBCursor cursor = db.openDeleteCursor();
- try
+ for (int j = 0; j < 50; j++)
{
- for (int j = 0; j < 50; j++)
+ if (thread.isShutdownInitiated())
{
- if (thread.isShutdownInitiated())
- {
- return;
- }
+ return;
+ }
- CSN csn = cursor.nextCSN();
- if (csn == null)
- {
- return;
- }
+ CSN csn = cursor.nextCSN();
+ if (csn == null)
+ {
+ return;
+ }
- if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
- {
- cursor.delete();
- }
- else
- {
- csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
- return;
- }
+ if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
+ {
+ cursor.delete();
+ }
+ else
+ {
+ csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
+ return;
}
}
- catch (ChangelogException e)
- {
- // mark shutdown for this db so that we don't try again to
- // stop it from cursor.close() or methods called by cursor.close()
- cursor.abort();
- thread.initiateShutdown();
- throw e;
- }
- finally
- {
- cursor.close();
- }
+ }
+ catch (ChangelogException e)
+ {
+ // mark shutdown for this db so that we don't try again to
+ // stop it from cursor.close() or methods called by cursor.close()
+ cursor.abort();
+ thread.initiateShutdown();
+ throw e;
+ }
+ finally
+ {
+ cursor.close();
}
}
}
+ private boolean isQueueAboveLowMark()
+ {
+ final int lowMarkBytes = queueMaxBytes / 5;
+ final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
+ return bytesUsed > lowMarkBytes;
+ }
+
/**
* Flush a number of updates from the memory list to the stable storage.
* <p>
@@ -621,13 +619,10 @@
*/
public void clear() throws ChangelogException
{
- synchronized(flushLock)
- {
- collectAllPermits();
- msgQueue.clear();
- db.clear();
- csnLimits = new CSNLimits(null, null);
- }
+ collectAllPermits();
+ msgQueue.clear(); // this call should not do anything at all
+ db.clear();
+ csnLimits = new CSNLimits(null, null);
}
/**
--
Gitblit v1.10.0