From 7645fcf6334c7c78655a12a08b4a8f3351be1ba4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 13:19:59 +0000
Subject: [PATCH] OPENDJ-1448 Remove JReplicaDB flushing thread and msg queue
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 212 ++--------------------------------------------------
1 files changed, 10 insertions(+), 202 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index c6b17e7..13ddac4 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -28,16 +28,11 @@
import java.util.ArrayList;
import java.util.Date;
-import java.util.LinkedList;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.core.DirectoryServer;
import org.opends.server.replication.common.CSN;
@@ -53,7 +48,6 @@
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.util.StaticUtils.*;
/**
* This class is used for managing the replicationServer database for each
@@ -67,9 +61,8 @@
* <p>
* This class publish some monitoring information below cn=monitor.
*/
-public class JEReplicaDB implements Runnable
+public class JEReplicaDB
{
- private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/**
* Class that allows atomically setting oldest and newest CSNs without
@@ -90,29 +83,7 @@
}
- /**
- * The msgQueue holds all the updates not yet saved to stable storage.
- * <p>
- * This blocking queue is only used as a temporary placeholder so that the
- * write in the stable storage can be grouped for efficiency reason. Adding an
- * update synchronously add the update to this list. A dedicated thread
- * flushes this blocking queue.
- * <p>
- * Changes are not read back by replicationServer threads that are responsible
- * for pushing the changes to other replication server or to LDAP server
- */
- private final LinkedBlockingQueue<UpdateMsg> msgQueue =
- new LinkedBlockingQueue<UpdateMsg>();
-
- /**
- * Semaphore used to limit the number of bytes used in memory by the queue.
- * The threads calling {@link #add(UpdateMsg)} method will be blocked if the
- * size of msgQueue becomes larger than the available permits and will resume
- * only when the number of available permits allow it.
- */
- private final Semaphore queueSizeBytes;
- private final int queueMaxBytes;
-
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
private ReplicationDB db;
/**
* Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
@@ -123,12 +94,6 @@
private int serverId;
private DN baseDN;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private DirectoryThread thread;
- /**
- * Used to prevent race conditions between threads calling {@link #flush()}.
- * This can happen with the thread flushing the queue, or else on shutdown.
- */
- private final Object flushLock = new Object();
private ReplicationServer replicationServer;
/**
@@ -148,15 +113,8 @@
this.replicationServer = replicationServer;
this.serverId = serverId;
this.baseDN = baseDN;
- queueMaxBytes = replicationServer.getQueueSize() * 200;
- queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
- thread = new DirectoryThread(this, "Replication server RS("
- + replicationServer.getServerId()
- + ") flusher thread for Replica DS(" + serverId
- + ") for domain \"" + baseDN + "\"");
- thread.start();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -175,32 +133,13 @@
*/
public void add(UpdateMsg updateMsg) throws ChangelogException
{
- if (thread.isShutdownInitiated())
+ if (shutdown.get())
{
throw new ChangelogException(
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg, baseDN, serverId));
}
- final int msgSize = updateMsg.size();
- if (msgSize < queueMaxBytes)
- {
- try
- {
- queueSizeBytes.acquire(msgSize);
- }
- catch (InterruptedException e)
- {
- throw new ChangelogException(
- ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg, baseDN, serverId,
- stackTraceToSingleLineString(e)));
- }
- }
- else
- {
- // edge case with a very large message
- collectAllPermits();
- }
- msgQueue.add(updateMsg);
+ db.addEntry(updateMsg);
final CSNLimits limits = csnLimits;
final boolean updateNew = limits.newestCSN == null
@@ -214,22 +153,6 @@
}
}
- /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */
- private void collectAllPermits()
- {
- int collectedPermits = queueSizeBytes.drainPermits();
- while (collectedPermits != queueMaxBytes)
- {
- // Do not use Thread.sleep() because:
- // 1) it is expected the permits will be released very soon
- // 2) we want to collect all the permits, so do not leave a chance to
- // other threads to steal them from us.
- // 3) we want to keep low latency
- Thread.yield();
- collectedPermits += queueSizeBytes.drainPermits();
- }
- }
-
/**
* Get the oldest CSN that has not been purged yet.
*
@@ -288,79 +211,10 @@
*/
public void shutdown()
{
- if (thread.isShutdownInitiated())
+ if (shutdown.compareAndSet(false, true))
{
- return;
- }
-
- thread.initiateShutdown();
-
- while (msgQueue.size() != 0)
- {
- try
- {
- flush();
- }
- catch (ChangelogException e)
- {
- // We are already shutting down
- logger.error(e.getMessageObject());
- }
- }
-
- db.shutdown();
- DirectoryServer.deregisterMonitorProvider(dbMonitor);
- }
-
- /**
- * Flushes the replicaDB queue from memory to stable storage.
- */
- @Override
- public void run()
- {
- thread.startWork();
-
- try
- {
- while (!thread.isShutdownInitiated())
- {
- try
- {
- flush();
- }
- catch (ChangelogException end)
- {
- stop(end);
- break;
- }
- }
-
- try
- {
- // call flush a last time before exiting to make sure that
- // no change was forgotten in the msgQueue
- flush();
- }
- catch (ChangelogException e)
- {
- stop(e);
- }
- }
- finally
- {
- thread.stopWork();
- }
- }
-
- private void stop(Exception e)
- {
- logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
-
- thread.initiateShutdown();
-
- if (replicationServer != null)
- {
- replicationServer.shutdown();
+ db.shutdown();
+ DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
}
@@ -399,7 +253,7 @@
{
for (int j = 0; j < 50; j++)
{
- if (thread.isShutdownInitiated())
+ if (shutdown.get())
{
return;
}
@@ -426,7 +280,7 @@
// mark shutdown for this db so that we don't try again to
// stop it from cursor.close() or methods called by cursor.close()
cursor.abort();
- thread.initiateShutdown();
+ shutdown.set(true);
throw e;
}
finally
@@ -437,47 +291,6 @@
}
/**
- * Flush a number of updates from the memory list to the stable storage.
- * <p>
- * Flush is done by chunk sized to 500 messages, starting from the beginning
- * of the list.
- * <p>
- * @GuardedBy("flushLock")
- * @throws ChangelogException
- * If a database problem happened
- */
- private void flush() throws ChangelogException
- {
- try
- {
- synchronized (flushLock)
- {
- final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
- if (change == null)
- {
- // nothing to persist, check if shutdown was invoked
- return;
- }
-
- // Try to see if there are more changes and persist them all.
- final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
- changes.add(change);
- msgQueue.drainTo(changes);
-
- int totalSize = db.addEntries(changes);
- // do not release more than queue max size permits
- // (be careful of the edge case with the very large message)
- queueSizeBytes.release(Math.min(totalSize, queueMaxBytes));
- }
- }
- catch (InterruptedException e)
- {
- throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(e)));
- }
- }
-
- /**
* This internal class is used to implement the Monitoring capabilities of the
* ReplicaDB.
*/
@@ -499,9 +312,6 @@
{
create(attributes, "last-change", encode(limits.newestCSN));
}
- create(attributes, "queue-size", String.valueOf(msgQueue.size()));
- create(attributes, "queue-size-bytes",
- String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits()));
return attributes;
}
@@ -550,8 +360,6 @@
*/
public void clear() throws ChangelogException
{
- collectAllPermits();
- msgQueue.clear(); // this call should not do anything at all
db.clear();
csnLimits = new CSNLimits(null, null);
}
--
Gitblit v1.10.0