From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 129 ++++++++++--------------------------------
1 files changed, 32 insertions(+), 97 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index e7309b0..bbde1d1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
-import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
* <p>
* This blocking queue is only used as a temporary placeholder so that the
* write in the stable storage can be grouped for efficiency reason. Adding an
- * update synchronously add the update to this list. A dedicated thread loops
- * on {@link #flush()} and {@link #trim()}.
- * <dl>
- * <dt>flush()</dt>
- * <dd>get a number of changes from the in memory list by block and write them
- * to the db.</dd>
- * <dt>trim()</dt>
- * <dd>deletes from the DB a number of changes that are older than a certain
- * date.</dd>
- * </dl>
+ * update synchronously add the update to this list. A dedicated thread
+ * flushes this blocking queue.
* <p>
* Changes are not read back by replicationServer threads that are responsible
* for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private DirectoryThread thread;
/**
- * Used to prevent race conditions between threads calling {@link #clear()}
- * {@link #flush()} or {@link #trim()}. This can happen with the thread
- * flushing the queue, on shutdown or on cursor opening, a thread calling
- * clear(), etc.
+ * Used to prevent race conditions between threads calling {@link #flush()}.
+ * This can happen with the thread flushing the queue, or else on shutdown.
*/
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
- private long latestTrimDate = 0;
-
- /**
- * The trim age in milliseconds. Changes record in the change DB that
- * are older than this age are removed.
- */
- private long trimAge;
-
/**
* Creates a new ReplicaDB associated to a given LDAP server.
*
@@ -166,15 +147,14 @@
this.replicationServer = replicationServer;
this.serverId = serverId;
this.baseDN = baseDN;
- trimAge = replicationServer.getTrimAge();
queueMaxBytes = replicationServer.getQueueSize() * 200;
queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
thread = new DirectoryThread(this, "Replication server RS("
- + replicationServer.getServerId()
- + ") changelog checkpointer for Replica DS(" + serverId
- + ") for domain \"" + baseDN + "\"");
+ + replicationServer.getServerId()
+ + ") flusher thread for Replica DS(" + serverId
+ + ") for domain \"" + baseDN + "\"");
thread.start();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
}
/**
- * Run method for this class.
- * Periodically Flushes the ReplicationServerDomain cache from memory to the
- * stable storage and trims the old updates.
+ * Flushes the replicaDB queue from memory to stable storage.
*/
@Override
public void run()
@@ -350,7 +328,6 @@
try
{
flush();
- trim();
}
catch (ChangelogException end)
{
@@ -390,55 +367,26 @@
}
/**
- * Retrieves the latest trim date.
- * @return the latest trim date.
+ * Synchronously purge changes older than purgeCSN from this replicaDB.
+ *
+ * @param purgeCSN
+ * The CSN up to which changes can be purged. No purging happens when
+ * it is null.
+ * @throws ChangelogException
+ * In case of database problem.
*/
- public long getLatestTrimDate()
+ void purgeUpTo(final CSN purgeCSN) throws ChangelogException
{
- return latestTrimDate;
- }
-
-
- /**
- * Trim old changes from this replicationServer database.
- * @throws ChangelogException In case of database problem.
- */
- private void trim() throws ChangelogException
- {
- if (trimAge == 0)
+ if (purgeCSN == null)
{
return;
}
- latestTrimDate = TimeThread.getTime() - trimAge;
-
- CSN trimDate = new CSN(latestTrimDate, 0, 0);
-
- // Find the last CSN before the trimDate, in the Database.
- CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
- if (lastBeforeTrimDate != null)
- {
- // If we found it, we want to stop trimming when reaching it.
- trimDate = lastBeforeTrimDate;
- }
-
for (int i = 0; i < 100; i++)
{
/*
- * Perform at least some trimming regardless of the flush backlog. Then
- * continue trim iterations while the flush backlog is low (below the
- * lowmark). Once the flush backlog increases, stop trimming and start
- * flushing more eagerly.
- */
- if (i > 20 && isQueueAboveLowMark())
- {
- break;
- }
-
- /*
- * the trim is done by group in order to save some CPU, IO bandwidth and
- * DB caches: start the transaction then do a bunch of remove then
- * commit.
+ * the purge is done by group in order to save some CPU, IO bandwidth and
+ * DB caches: start the transaction then do a bunch of remove then commit.
*/
/*
* Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
return;
}
- if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
+ if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
{
cursor.delete();
}
@@ -490,37 +438,31 @@
}
}
- private boolean isQueueAboveLowMark()
- {
- final int lowMarkBytes = queueMaxBytes / 5;
- final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
- return bytesUsed > lowMarkBytes;
- }
-
/**
* Flush a number of updates from the memory list to the stable storage.
* <p>
* Flush is done by chunk sized to 500 messages, starting from the beginning
* of the list.
- *
+ * <p>
+ * @GuardedBy("flushLock")
* @throws ChangelogException
* If a database problem happened
*/
- public void flush() throws ChangelogException
+ private void flush() throws ChangelogException
{
try
{
synchronized (flushLock)
{
- final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
- final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+ final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
if (change == null)
{
- // nothing to persist, move on to the trim phase
+ // nothing to persist, check if shutdown was invoked
return;
}
// Try to see if there are more changes and persist them all.
+ final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
changes.add(change);
msgQueue.drainTo(changes);
@@ -604,15 +546,6 @@
}
/**
- * Set the Purge delay for this db Handler.
- * @param delay The purge delay in Milliseconds.
- */
- public void setPurgeDelay(long delay)
- {
- trimAge = delay;
- }
-
- /**
* Clear the changes from this DB (from both memory cache and DB storage).
* @throws ChangelogException When an exception occurs while removing the
* changes from the DB.
@@ -636,13 +569,15 @@
}
/**
- * Return the size of the msgQueue (the memory cache of the ReplicaDB).
+ * Return the number of records of this replicaDB.
+ * <p>
* For test purpose.
- * @return The memory queue size.
+ *
+ * @return The number of records of this replicaDB.
*/
- int getQueueSize()
+ long getNumberRecords()
{
- return this.msgQueue.size();
+ return db.getNumberRecords();
}
/**
--
Gitblit v1.10.0