From 0bee4dc38f118fd9941f085f5194a8b1d4c66d5a Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Tue, 02 Dec 2014 16:29:06 +0000
Subject: [PATCH] OPENDJ-1611 CR-5492 Filter replicaIds when filling late queue in MessageHandler
---
opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java | 23 ++++++++++++++++++-----
1 files changed, 18 insertions(+), 5 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
index ab64477..1b227cf 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import org.forgerock.i18n.LocalizableMessage;
@@ -256,12 +257,16 @@
* Get the next update that must be sent to the consumer
* from the message queue or from the database.
*
- * @param synchronous specifies what to do when the queue is empty.
+ * @param connectedReplicaIds
+ * Ids of replicas to accept when returning a message from
+ * the late queue.
+ * @param synchronous specifies what to do when the queue is empty.
* when true, the method blocks; when false the method return null.
+ *
* @return The next update that must be sent to the consumer.
* null when synchronous is false and queue is empty.
*/
- protected UpdateMsg getNextMessage(boolean synchronous)
+ protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
{
while (activeConsumer)
{
@@ -294,7 +299,7 @@
* restart as usual
* load this change on the delayList
*/
- fillLateQueue();
+ fillLateQueue(connectedReplicaIds);
if (lateQueue.isEmpty())
{
// we could not find any messages in the changelog
@@ -388,7 +393,11 @@
return null;
}
- private void fillLateQueue()
+ /**
+ * Fills the late queue with the most recent changes, accepting only the
+ * messages from provided replica ids.
+ */
+ private void fillLateQueue(Set<Integer> connectedReplicaIds)
{
DBCursor<UpdateMsg> cursor = null;
try
@@ -396,7 +405,11 @@
cursor = replicationServerDomain.getCursorFrom(serverState);
while (cursor.next() && isLateQueueBelowThreshold())
{
- lateQueue.add(cursor.getRecord());
+ final UpdateMsg record = cursor.getRecord();
+ if (connectedReplicaIds.contains(record.getCSN().getServerId()))
+ {
+ lateQueue.add(record);
+ }
}
}
catch (ChangelogException e)
--
Gitblit v1.10.0