From 137d1b4ba1992acdd880b61b1a03dc31f0cc1839 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 23 Apr 2014 14:19:01 +0000
Subject: [PATCH] OPENDJ-1448 Remove JReplicaDB flushing thread and msg queue
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 213 ++--------------------------------------------------
1 files changed, 10 insertions(+), 203 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index bbde1d1..19406bd 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,14 +28,10 @@
import java.util.ArrayList;
import java.util.Date;
-import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
@@ -52,8 +48,6 @@
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
-import static org.opends.server.util.StaticUtils.*;
/**
* This class is used for managing the replicationServer database for each
@@ -67,7 +61,7 @@
* <p>
* This class publish some monitoring information below cn=monitor.
*/
-public class JEReplicaDB implements Runnable
+public class JEReplicaDB
{
/**
@@ -89,29 +83,7 @@
}
- /**
- * The msgQueue holds all the updates not yet saved to stable storage.
- * <p>
- * This blocking queue is only used as a temporary placeholder so that the
- * write in the stable storage can be grouped for efficiency reason. Adding an
- * update synchronously add the update to this list. A dedicated thread
- * flushes this blocking queue.
- * <p>
- * Changes are not read back by replicationServer threads that are responsible
- * for pushing the changes to other replication server or to LDAP server
- */
- private final LinkedBlockingQueue<UpdateMsg> msgQueue =
- new LinkedBlockingQueue<UpdateMsg>();
-
- /**
- * Semaphore used to limit the number of bytes used in memory by the queue.
- * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
- * size of msgQueue becomes larger than the available permits and will resume
- * only when the number of available permits allow it.
- */
- private final Semaphore queueSizeBytes;
- private final int queueMaxBytes;
-
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
private ReplicationDB db;
/**
* Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -122,12 +94,6 @@
private int serverId;
private DN baseDN;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private DirectoryThread thread;
- /**
- * Used to prevent race conditions between threads calling {@link #flush()}.
- * This can happen with the thread flushing the queue, or else on shutdown.
- */
- private final Object flushLock = new Object();
private ReplicationServer replicationServer;
/**
@@ -147,15 +113,8 @@
this.replicationServer = replicationServer;
this.serverId = serverId;
this.baseDN = baseDN;
- queueMaxBytes = replicationServer.getQueueSize() * 200;
- queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
- thread = new DirectoryThread(this, "Replication server RS("
- + replicationServer.getServerId()
- + ") flusher thread for Replica DS(" + serverId
- + ") for domain \"" + baseDN + "\"");
- thread.start();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -174,34 +133,14 @@
*/
public void add(UpdateMsg updateMsg) throws ChangelogException
{
- if (thread.isShutdownInitiated())
+ if (shutdown.get())
{
throw new ChangelogException(
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
.toString(), String.valueOf(baseDN), String.valueOf(serverId)));
}
- final int msgSize = updateMsg.size();
- if (msgSize < queueMaxBytes)
- {
- try
- {
- queueSizeBytes.acquire(msgSize);
- }
- catch (InterruptedException e)
- {
- throw new ChangelogException(
- ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg
- .toString(), String.valueOf(baseDN), String.valueOf(serverId),
- stackTraceToSingleLineString(e)));
- }
- }
- else
- {
- // edge case with a very large message
- collectAllPermits();
- }
- msgQueue.add(updateMsg);
+ db.addEntry(updateMsg);
final CSNLimits limits = csnLimits;
final boolean updateNew = limits.newestCSN == null
@@ -215,22 +154,6 @@
}
}
- /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
- private void collectAllPermits()
- {
- int collectedPermits = queueSizeBytes.drainPermits();
- while (collectedPermits != queueMaxBytes)
- {
- // Do not use Thread.sleep() because:
- // 1) it is expected the permits will be released very soon
- // 2) we want to collect all the permits, so do not leave a chance to
- // other threads to steal them from us.
- // 3) we want to keep low latency
- Thread.yield();
- collectedPermits += queueSizeBytes.drainPermits();
- }
- }
-
/**
* Get the oldest CSN that has not been purged yet.
*
@@ -289,80 +212,10 @@
*/
public void shutdown()
{
- if (thread.isShutdownInitiated())
+ if (shutdown.compareAndSet(false, true))
{
- return;
- }
-
- thread.initiateShutdown();
-
- while (msgQueue.size() != 0)
- {
- try
- {
- flush();
- }
- catch (ChangelogException e)
- {
- // We are already shutting down
- logError(e.getMessageObject());
- }
- }
-
- db.shutdown();
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- }
-
- /**
- * Flushes the replicaDB queue from memory to stable storage.
- */
- @Override
- public void run()
- {
- thread.startWork();
-
- try
- {
- while (!thread.isShutdownInitiated())
- {
- try
- {
- flush();
- }
- catch (ChangelogException end)
- {
- stop(end);
- break;
- }
- }
-
- try
- {
- // call flush a last time before exiting to make sure that
- // no change was forgotten in the msgQueue
- flush();
- }
- catch (ChangelogException e)
- {
- stop(e);
- }
- }
- finally
- {
- thread.stopWork();
- }
- }
-
- private void stop(Exception e)
- {
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
-
- thread.initiateShutdown();
-
- if (replicationServer != null)
- {
- replicationServer.shutdown();
+ db.shutdown();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -401,7 +254,7 @@
{
for (int j = 0; j < 50; j++)
{
- if (thread.isShutdownInitiated())
+ if (shutdown.get())
{
return;
}
@@ -428,7 +281,7 @@
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- thread.initiateShutdown();
+ shutdown.set(true);
throw e;
}
finally
@@ -439,47 +292,6 @@
}
/**
- * Flush a number of updates from the memory list to the stable storage.
- * <p>
- * Flush is done by chunk sized to 500 messages, starting from the beginning
- * of the list.
- * <p>
- * @GuardedBy("flushLock")
- * @throws ChangelogException
- * If a database problem happened
- */
- private void flush() throws ChangelogException
- {
- try
- {
- synchronized (flushLock)
- {
- final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
- if (change == null)
- {
- // nothing to persist, check if shutdown was invoked
- return;
- }
-
- // Try to see if there are more changes and persist them all.
- final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
- changes.add(change);
- msgQueue.drainTo(changes);
-
- int totalSize = db.addEntries(changes);
- // do not release more than queue max size permits
- // (be careful of the edge case with the very large message)
- queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
- }
- }
- catch (InterruptedException e)
- {
- throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
- }
- }
-
- /**
* This internal class is used to implement the Monitoring capabilities of the
* ReplicaDB.
*/
@@ -501,9 +313,6 @@
{
create(attributes, "last-change", encode(limits.newestCSN));
}
- create(attributes, "queue-size", String.valueOf(msgQueue.size()));
- create(attributes, "queue-size-bytes",
- String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
return attributes;
}
@@ -552,8 +361,6 @@
*/
public void clear() throws ChangelogException
{
- collectAllPermits();
- msgQueue.clear(); // this call should not do anything at all
db.clear();
csnLimits = new CSNLimits(null, null);
}
--
Gitblit v1.10.0