From 08908175782573c536cb485092e473c7f1729281 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 28 Mar 2014 11:50:04 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3278) Re-implement changelog purging logic
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 354 +++++++++++++++++++++++++++++-----------------------------
1 files changed, 176 insertions(+), 178 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index b85d132..4426b0f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions copyright 2011-2013 ForgeRock AS
+ * Portions copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -30,6 +30,9 @@
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import org.opends.server.admin.std.server.MonitorProviderCfg;
import org.opends.server.api.DirectoryThread;
@@ -67,13 +70,33 @@
*/
public class JEReplicaDB implements Runnable
{
+
+ /**
+ * Class that allows atomically setting oldest and newest CSNs without
+ * synchronization.
+ *
+ * @Immutable
+ */
+ private static final class CSNLimits
+ {
+ private final CSN oldestCSN;
+ private final CSN newestCSN;
+
+ public CSNLimits(CSN oldestCSN, CSN newestCSN)
+ {
+ this.oldestCSN = oldestCSN;
+ this.newestCSN = newestCSN;
+ }
+
+ }
+
/**
* The msgQueue holds all the updates not yet saved to stable storage.
* <p>
- * This list is only used as a temporary placeholder so that the write in the
- * stable storage can be grouped for efficiency reason. Adding an update
- * synchronously add the update to this list. A dedicated thread loops on
- * flush() and trim().
+ * This blocking queue is only used as a temporary placeholder so that the
+ * write in the stable storage can be grouped for efficiency reason. Adding an
+ * update synchronously add the update to this list. A dedicated thread loops
+ * on {@link #flush()} and {@link #trim()}.
* <dl>
* <dt>flush()</dt>
* <dd>get a number of changes from the in memory list by block and write them
@@ -86,37 +109,35 @@
* Changes are not read back by replicationServer threads that are responsible
* for pushing the changes to other replication server or to LDAP server
*/
- private final LinkedList<UpdateMsg> msgQueue =
- new LinkedList<UpdateMsg>();
+ private final LinkedBlockingQueue<UpdateMsg> msgQueue =
+ new LinkedBlockingQueue<UpdateMsg>();
/**
- * The High and low water mark for the max size of the msgQueue. The threads
- * calling add() method will be blocked if the size of msgQueue becomes larger
- * than the queueHimark and will resume only when the size of the msgQueue
- * goes below queueLowmark.
+ * Semaphore used to limit the number of bytes used in memory by the queue.
+ * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
+ * size of msgQueue becomes larger than the available permits and will resume
+ * only when the number of available permits allow it.
*/
- private int queueMaxSize = 5000;
- private int queueLowmark = 1000;
- private int queueHimark = 4000;
-
- /**
- * The queue himark and lowmark in bytes, this is set to 100 times the himark
- * and lowmark in number of updates.
- */
- private int queueMaxBytes = 100 * queueMaxSize;
- private int queueLowmarkBytes = 100 * queueLowmark;
- private int queueHimarkBytes = 100 * queueHimark;
-
- /** The number of bytes currently in the queue. */
- private int queueByteSize = 0;
+ private final Semaphore queueSizeBytes;
+ private final int queueMaxBytes;
private ReplicationDB db;
- private CSN oldestCSN;
- private CSN newestCSN;
+ /**
+ * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
+ *
+ * @NonNull
+ */
+ private volatile CSNLimits csnLimits;
private int serverId;
private DN baseDN;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private DirectoryThread thread;
+ /**
+ * Used to prevent race conditions between threads calling {@link #clear()}
+ * {@link #flush()} or {@link #trim()}. This can happen with the thread
+ * flushing the queue, on shutdown or on cursor opening, a thread calling
+ * clear(), etc.
+ */
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
@@ -146,16 +167,10 @@
this.serverId = serverId;
this.baseDN = baseDN;
trimAge = replicationServer.getTrimAge();
- final int queueSize = replicationServer.getQueueSize();
- queueMaxSize = queueSize;
- queueLowmark = queueSize / 5;
- queueHimark = queueSize * 4 / 5;
- queueMaxBytes = 200 * queueMaxSize;
- queueLowmarkBytes = 200 * queueLowmark;
- queueHimarkBytes = 200 * queueLowmark;
+ queueMaxBytes = replicationServer.getQueueSize() * 200;
+ queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
- oldestCSN = db.readOldestCSN();
- newestCSN = db.readNewestCSN();
+ csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
thread = new DirectoryThread(this, "Replication server RS("
+ replicationServer.getServerId()
+ ") changelog checkpointer for Replica DS(" + serverId
@@ -167,61 +182,72 @@
}
/**
- * Add an update to the list of messages that must be saved to the db
- * managed by this db handler.
- * This method is blocking if the size of the list of message is larger
- * than its maximum.
+ * Add an update to the list of messages that must be saved to the db managed
+ * by this db handler. This method is blocking if the size of the list of
+ * message is larger than its maximum.
*
- * @param update The update that must be saved to the db managed by this db
- * handler.
+ * @param updateMsg
+ * The update message that must be saved to the db managed by this db
+ * handler.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public void add(UpdateMsg update)
+ public void add(UpdateMsg updateMsg) throws ChangelogException
{
- synchronized (msgQueue)
+ if (thread.isShutdownInitiated())
{
- int size = msgQueue.size();
- if (size > queueHimark || queueByteSize > queueHimarkBytes)
- {
- msgQueue.notify();
- }
+ throw new ChangelogException(
+ ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
+ .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
+ }
- while (size > queueMaxSize || queueByteSize > queueMaxBytes)
+ final int msgSize = updateMsg.size();
+ if (msgSize < queueMaxBytes)
+ {
+ try
{
- try
- {
- msgQueue.wait(500);
- } catch (InterruptedException e)
- {
- // simply loop to try again.
- }
- size = msgQueue.size();
+ queueSizeBytes.acquire(msgSize);
}
+ catch (InterruptedException e)
+ {
+ throw new ChangelogException(
+ ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
+ .toString(), String.valueOf(baseDN), String.valueOf(serverId),
+ stackTraceToSingleLineString(e)));
+ }
+ }
+ else
+ {
+ // edge case with a very large message
+ collectAllPermits();
+ }
+ msgQueue.add(updateMsg);
- queueByteSize += update.size();
- msgQueue.add(update);
- if (newestCSN == null || newestCSN.isOlderThan(update.getCSN()))
- {
- newestCSN = update.getCSN();
- }
- if (oldestCSN == null)
- {
- oldestCSN = update.getCSN();
- }
+ final CSNLimits limits = csnLimits;
+ final boolean updateNew = limits.newestCSN == null
+ || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+ final boolean updateOld = limits.oldestCSN == null;
+ if (updateOld || updateNew)
+ {
+ csnLimits = new CSNLimits(
+ updateOld ? updateMsg.getCSN() : limits.oldestCSN,
+ updateNew ? updateMsg.getCSN() : limits.newestCSN);
}
}
- /**
- * Get some changes out of the message queue of the LDAP server.
- * (from the beginning of the queue)
- * @param number the maximum number of messages to extract.
- * @return a List containing number changes extracted from the queue.
- */
- private List<UpdateMsg> getChanges(int number)
+ /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
+ private void collectAllPermits()
{
- synchronized (msgQueue)
+ int collectedPermits = queueSizeBytes.drainPermits();
+ while (collectedPermits != queueMaxBytes)
{
- final int minAvailableNb = Math.min(number, msgQueue.size());
- return new LinkedList<UpdateMsg>(msgQueue.subList(0, minAvailableNb));
+ // Do not use Thread.sleep() because:
+ // 1) it is expected the permits will be released very soon
+ // 2) we want to collect all the permits, so do not leave a chance to
+ // other threads to steal them from us.
+ // 3) we want to keep low latency
+ Thread.yield();
+ collectedPermits += queueSizeBytes.drainPermits();
}
}
@@ -232,7 +258,7 @@
*/
public CSN getOldestCSN()
{
- return oldestCSN;
+ return csnLimits.oldestCSN;
}
/**
@@ -242,7 +268,7 @@
*/
public CSN getNewestCSN()
{
- return newestCSN;
+ return csnLimits.newestCSN;
}
/**
@@ -252,9 +278,10 @@
*/
public long getChangesCount()
{
- if (newestCSN != null && oldestCSN != null)
+ final CSNLimits limits = csnLimits;
+ if (limits.newestCSN != null && limits.oldestCSN != null)
{
- return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
+ return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1;
}
return 0;
}
@@ -276,36 +303,13 @@
{
if (startAfterCSN == null)
{
+ // flush any potential changes before opening the cursor
flush();
}
return new JEReplicaDBCursor(db, startAfterCSN, this);
}
/**
- * Removes the provided number of messages from the beginning of the msgQueue.
- *
- * @param number the number of changes to be removed.
- */
- private void clearQueue(int number)
- {
- synchronized (msgQueue)
- {
- int current = 0;
- while (current < number && !msgQueue.isEmpty())
- {
- UpdateMsg msg = msgQueue.remove(); // remove first
- queueByteSize -= msg.size();
- current++;
- }
- if (msgQueue.size() < queueLowmark
- && queueByteSize < queueLowmarkBytes)
- {
- msgQueue.notifyAll();
- }
- }
- }
-
- /**
* Shutdown this ReplicaDB.
*/
public void shutdown()
@@ -317,11 +321,6 @@
thread.initiateShutdown();
- synchronized (msgQueue)
- {
- msgQueue.notifyAll();
- }
-
while (msgQueue.size() != 0)
{
try
@@ -331,8 +330,7 @@
catch (ChangelogException e)
{
// We are already shutting down
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
+ logError(e.getMessageObject());
}
}
@@ -358,27 +356,8 @@
{
flush();
trim();
-
- synchronized (msgQueue)
- {
- if (msgQueue.size() < queueLowmark
- && queueByteSize < queueLowmarkBytes)
- {
- try
- {
- msgQueue.wait(1000);
- }
- catch (InterruptedException e)
- {
- // Do not reset the interrupt flag here,
- // because otherwise JE will barf next time flush() is called:
- // JE 5.0.97 refuses to persist changes to the DB when invoked
- // from a Thread with the interrupt flag set to true.
- }
- }
- }
}
- catch (Exception end)
+ catch (ChangelogException end)
{
stop(end);
break;
@@ -400,11 +379,6 @@
{
thread.stopWork();
}
-
- synchronized (this)
- {
- notifyAll();
- }
}
private void stop(Exception e)
@@ -453,13 +427,34 @@
trimDate = lastBeforeTrimDate;
}
+ final int queueLowMarkBytes = queueMaxBytes / 5;
for (int i = 0; i < 100; i++)
{
+ /*
+ * Perform at least some trimming regardless of the flush backlog. Then
+ * continue trim iterations while the flush backlog is low (below the
+ * lowmark). Once the flush backlog increases, stop trimming and start
+ * flushing more eagerly.
+ */
+ if (i > 20 && msgQueue.size() < queueLowMarkBytes)
+ {
+ break;
+ }
+
synchronized (flushLock)
{
/*
- * the trim is done by group in order to save some CPU and IO bandwidth
- * start the transaction then do a bunch of remove then commit
+ * the trim is done by group in order to save some CPU, IO bandwidth and
+ * DB caches: start the transaction then do a bunch of remove then
+ * commit.
+ */
+ /*
+ * Matt wrote: The record removal is done as a DB transaction and the
+ * deleted records are only "deleted" on commit. While the txn/cursor is
+ * open the records to be deleted will, I think, be pinned in the DB
+ * cache. In other words, the larger the transaction (the more records
+ * deleted during a single batch) the more DB cache will be used to
+ * process the transaction.
*/
final ReplServerDBCursor cursor = db.openDeleteCursor();
try
@@ -477,13 +472,13 @@
return;
}
- if (!csn.equals(newestCSN) && csn.isOlderThan(trimDate))
+ if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
{
cursor.delete();
}
else
{
- oldestCSN = csn;
+ csnLimits = new CSNLimits(csn, csnLimits.newestCSN);
return;
}
}
@@ -515,32 +510,33 @@
*/
public void flush() throws ChangelogException
{
- int size;
- int chunksize = Math.min(queueMaxSize, 500);
-
- do
+ try
{
- synchronized(flushLock)
+ synchronized (flushLock)
{
- // get N (or less) messages from the queue to save to the DB
- // (from the beginning of the queue)
- List<UpdateMsg> changes = getChanges(chunksize);
-
- // if no more changes to save exit immediately.
- if (changes == null || (size = changes.size()) == 0)
+ final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
+ final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+ if (change == null)
{
+ // nothing to persist, move on to the trim phase
return;
}
- // save the change to the stable storage.
- db.addEntries(changes);
+ // Try to see if there are more changes and persist them all.
+ changes.add(change);
+ msgQueue.drainTo(changes);
- // remove the changes from the list of changes to be saved
- // (remove from the beginning of the queue)
- clearQueue(changes.size());
+ int totalSize = db.addEntries(changes);
+ // do not release more than queue max size permits
+ // (be careful of the edge case with the very large message)
+ queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
}
- // loop while there are more changes in the queue
- } while (size == chunksize);
+ }
+ catch (InterruptedException e)
+ {
+ throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(e)));
+ }
}
/**
@@ -556,25 +552,28 @@
public List<Attribute> getMonitorData()
{
List<Attribute> attributes = new ArrayList<Attribute>();
- attributes.add(Attributes.create("replicationServer-database",
- String.valueOf(serverId)));
- attributes.add(Attributes.create("domain-name",
- baseDN.toNormalizedString()));
- if (oldestCSN != null)
+ create(attributes, "replicationServer-database", String.valueOf(serverId));
+ create(attributes, "domain-name", baseDN.toNormalizedString());
+ final CSNLimits limits = csnLimits;
+ if (limits.oldestCSN != null)
{
- attributes.add(Attributes.create("first-change", encode(oldestCSN)));
+ create(attributes, "first-change", encode(limits.oldestCSN));
}
- if (newestCSN != null)
+ if (limits.newestCSN != null)
{
- attributes.add(Attributes.create("last-change", encode(newestCSN)));
+ create(attributes, "last-change", encode(limits.newestCSN));
}
- attributes.add(
- Attributes.create("queue-size", String.valueOf(msgQueue.size())));
- attributes.add(
- Attributes.create("queue-size-bytes", String.valueOf(queueByteSize)));
+ create(attributes, "queue-size", String.valueOf(msgQueue.size()));
+ create(attributes, "queue-size-bytes",
+ String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
return attributes;
}
+ private void create(List<Attribute> attributes, String name, String value)
+ {
+ attributes.add(Attributes.create(name, value));
+ }
+
private String encode(CSN csn)
{
return csn + " " + new Date(csn.getTime());
@@ -609,8 +608,9 @@
@Override
public String toString()
{
+ final CSNLimits limits = csnLimits;
return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
- + oldestCSN + " " + newestCSN;
+ + limits.oldestCSN + " " + limits.newestCSN;
}
/**
@@ -631,12 +631,10 @@
{
synchronized(flushLock)
{
+ collectAllPermits();
msgQueue.clear();
- queueByteSize = 0;
-
db.clear();
- oldestCSN = db.readOldestCSN();
- newestCSN = db.readNewestCSN();
+ csnLimits = new CSNLimits(null, null);
}
}
@@ -655,7 +653,7 @@
* For test purpose.
* @return The memory queue size.
*/
- public int getQueueSize()
+ int getQueueSize()
{
return this.msgQueue.size();
}
--
Gitblit v1.10.0