From c210356ee7f6b0037e2d672a4a11e83bbd160b0e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 08:59:16 +0000
Subject: [PATCH] MessageHandler.java: Extracted methods isMsgQueueAboveThreshold() and isMsgQueueBelowThreshold(). Extracted MsgQueue.consumeUpTo(UpdateMsg). Updated javadocs.
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java | 45 +++++++++++++---------
opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java | 24 +++++++++++-
2 files changed, 49 insertions(+), 20 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index af01fa2..ed243c4 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -49,6 +49,9 @@
/**
* This class implements a buffering/producer/consumer mechanism of
* replication changes (UpdateMsg) used inside the replication server.
+ * It logically represents another RS than the current one, and is used when the
+ * other RS is brought online and needs to catch up with the changes in the
+ * current RS.
*
* MessageHandlers are registered into Replication server domains.
* When an update message is received by a domain, the domain forwards
@@ -156,21 +159,31 @@
/* TODO : size should be configurable
* and larger than max-receive-queue-size
*/
- while ((msgQueue.count() > maxQueueSize) ||
- (msgQueue.bytesCount() > maxQueueBytesSize))
+ while (isMsgQueueAboveThreshold())
{
following = false;
msgQueue.removeFirst();
}
}
}
+
+ private boolean isMsgQueueAboveThreshold()
+ {
+ return msgQueue.count() > maxQueueSize
+ || msgQueue.bytesCount() > maxQueueBytesSize;
+ }
+
+ private boolean isMsgQueueBelowThreshold()
+ {
+ return !isMsgQueueAboveThreshold();
+ }
+
/**
* Set the shut down flag to true and returns the previous value of the flag.
* @return The previous value of the shut down flag
*/
public boolean engageShutdown()
{
- // Use thread safe boolean
return shuttingDown.getAndSet(true);
}
@@ -300,16 +313,16 @@
}
/*
- * If the late queue is empty then we could not find any
- * messages in the replication log so the remote serevr is not
- * late anymore.
+ * If the late queue is empty then we could not find any messages in
+ * the replication log so the remote server is not late anymore.
*/
if (lateQueue.isEmpty())
{
synchronized (msgQueue)
{
- if ((msgQueue.count() < maxQueueSize) &&
- (msgQueue.bytesCount() < maxQueueBytesSize))
+ // Ensure we are below threshold so this server will follow the
+ // msgQueue without fearing the msgQueue gets trimmed
+ if (isMsgQueueBelowThreshold())
{
following = true;
}
@@ -330,13 +343,9 @@
/* we finally catch up with the regular queue */
following = true;
lateQueue.clear();
- UpdateMsg msg1;
- do
- {
- msg1 = msgQueue.removeFirst();
- } while (!msg.getCSN().equals(msg1.getCSN()));
+ msgQueue.consumeUpTo(msg);
updateServerState(msg);
- return msg1;
+ return msg;
}
}
}
@@ -386,9 +395,9 @@
}
}
/*
- * Need to loop because following flag may have gone to false between
- * the first check at the beginning of this method
- * and the second check just above.
+ * Need to loop because following flag may have gone to false between the
+ * first check at the beginning of this method and the second check just
+ * above.
*/
}
return null;
@@ -497,7 +506,7 @@
new TreeSet<ReplicaDBCursor>();
for (int serverId : replicationServerDomain.getServerIds())
{
- // get the last already sent CN from that server to get a cursor
+ // get the last already sent CSN from that server to get a cursor
final CSN lastCsn = serverState.getCSN(serverId);
addCursorIfNotEmpty(results,
replicationServerDomain.getCursorFrom(serverId, lastCsn));
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
index 80b8255..97c391c 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/MsgQueue.java
@@ -27,7 +27,7 @@
*/
package org.opends.server.replication.server;
-import java.util.SortedMap;
+import java.util.NavigableMap;
import java.util.TreeMap;
import org.opends.messages.Message;
@@ -43,7 +43,7 @@
*/
public class MsgQueue
{
- private SortedMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
+ private NavigableMap<CSN, UpdateMsg> map = new TreeMap<CSN, UpdateMsg>();
private final Object lock = new Object();
/** The total number of bytes for all the message in the queue. */
@@ -192,4 +192,24 @@
bytesCount = 0;
}
}
+
+ /**
+ * Consumes all the messages in this queue up to and including the passed in
+ * message. If the passed in message is not contained in the current queue,
+ * then all messages will be removed from it.
+ *
+ * @param msg
+ * the final message to reach when consuming messages from this queue
+ */
+ public void consumeUpTo(UpdateMsg msg)
+ {
+ UpdateMsg msg1;
+ do
+ {
+ // FIXME this code could be more efficient if the msgQueue could call the
+ // following code (to be tested):
+ // map.headMap(msg.getCSN(), true).clear()
+ msg1 = removeFirst();
+ } while (!msg.getCSN().equals(msg1.getCSN()));
+ }
}
--
Gitblit v1.10.0