From 19298042f99ef0a659f9271d022f58884a18704b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 08:59:16 +0000
Subject: [PATCH] MessageHandler.java: Extracted methods isMsgQueueAboveThreshold() and isMsgQueueBelowThreshold(). Extracted MsgQueue.consumeUpTo(UpdateMsg). Updated javadocs.
---
opends/src/server/org/opends/server/replication/server/MessageHandler.java | 45 +++++++++++++++++++++++++++------------------
1 files changed, 27 insertions(+), 18 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index af01fa2..ed243c4 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -49,6 +49,9 @@
/**
* This class implements a buffering/producer/consumer mechanism of
* replication changes (UpdateMsg) used inside the replication server.
+ * It logically represents another RS than the current one, and is used when the
+ * other RS is brought online and needs to catch up with the changes in the
+ * current RS.
*
* MessageHandlers are registered into Replication server domains.
* When an update message is received by a domain, the domain forwards
@@ -156,21 +159,31 @@
/* TODO : size should be configurable
* and larger than max-receive-queue-size
*/
- while ((msgQueue.count() > maxQueueSize) ||
- (msgQueue.bytesCount() > maxQueueBytesSize))
+ while (isMsgQueueAboveThreshold())
{
following = false;
msgQueue.removeFirst();
}
}
}
+
+ private boolean isMsgQueueAboveThreshold()
+ {
+ return msgQueue.count() > maxQueueSize
+ || msgQueue.bytesCount() > maxQueueBytesSize;
+ }
+
+ private boolean isMsgQueueBelowThreshold()
+ {
+ return !isMsgQueueAboveThreshold();
+ }
+
/**
* Set the shut down flag to true and returns the previous value of the flag.
* @return The previous value of the shut down flag
*/
public boolean engageShutdown()
{
- // Use thread safe boolean
return shuttingDown.getAndSet(true);
}
@@ -300,16 +313,16 @@
}
/*
- * If the late queue is empty then we could not find any
- * messages in the replication log so the remote serevr is not
- * late anymore.
+ * If the late queue is empty then we could not find any messages in
+ * the replication log so the remote server is not late anymore.
*/
if (lateQueue.isEmpty())
{
synchronized (msgQueue)
{
- if ((msgQueue.count() < maxQueueSize) &&
- (msgQueue.bytesCount() < maxQueueBytesSize))
+ // Ensure we are below threshold so this server will follow the
+ // msgQueue without fearing the msgQueue gets trimmed
+ if (isMsgQueueBelowThreshold())
{
following = true;
}
@@ -330,13 +343,9 @@
/* we finally catch up with the regular queue */
following = true;
lateQueue.clear();
- UpdateMsg msg1;
- do
- {
- msg1 = msgQueue.removeFirst();
- } while (!msg.getCSN().equals(msg1.getCSN()));
+ msgQueue.consumeUpTo(msg);
updateServerState(msg);
- return msg1;
+ return msg;
}
}
}
@@ -386,9 +395,9 @@
}
}
/*
- * Need to loop because following flag may have gone to false between
- * the first check at the beginning of this method
- * and the second check just above.
+ * Need to loop because following flag may have gone to false between the
+ * first check at the beginning of this method and the second check just
+ * above.
*/
}
return null;
@@ -497,7 +506,7 @@
new TreeSet<ReplicaDBCursor>();
for (int serverId : replicationServerDomain.getServerIds())
{
- // get the last already sent CN from that server to get a cursor
+ // get the last already sent CSN from that server to get a cursor
final CSN lastCsn = serverState.getCSN(serverId);
addCursorIfNotEmpty(results,
replicationServerDomain.getCursorFrom(serverId, lastCsn));
--
Gitblit v1.10.0