From 030cd9b2dbc0d7016114f75e8b93190e34d1e213 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Fri, 28 Nov 2014 14:11:01 +0000
Subject: [PATCH] OPENDJ-1611 CR-5492 Filter replicaIds when filling late queue in MessageHandler

---
 opends/src/server/org/opends/server/replication/server/MessageHandler.java          |   23 ++++++++++++++++++-----
 opends/src/server/org/opends/server/replication/server/ServerHandler.java           |    7 +++++--
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java |   19 ++++++++++---------
 3 files changed, 33 insertions(+), 16 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/MessageHandler.java b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
index f3366b2..ac7ce11 100644
--- a/opends/src/server/org/opends/server/replication/server/MessageHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/MessageHandler.java
@@ -28,6 +28,7 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.opends.messages.Message;
@@ -255,12 +256,16 @@
    * Get the next update that must be sent to the consumer
    * from the message queue or from the database.
    *
-   * @param  synchronous specifies what to do when the queue is empty.
+   * @param connectedReplicaIds
+   *            Ids of replicas to accept when returning a message from
+   *            the late queue.
+   * @param synchronous specifies what to do when the queue is empty.
    *         when true, the method blocks; when false the method return null.
+   *
    * @return The next update that must be sent to the consumer.
    *         null when synchronous is false and queue is empty.
    */
-  protected UpdateMsg getNextMessage(boolean synchronous)
+  protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
   {
     while (activeConsumer)
     {
@@ -293,7 +298,7 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          fillLateQueue();
+          fillLateQueue(connectedReplicaIds);
           if (lateQueue.isEmpty())
           {
             // we could not find any messages in the changelog
@@ -387,7 +392,11 @@
     return null;
   }
 
-  private void fillLateQueue()
+  /**
+   * Fills the late queue with the most recent changes, accepting only the
+   * messages from provided replica ids.
+   */
+  private void fillLateQueue(Set<Integer> connectedReplicaIds)
   {
     DBCursor<UpdateMsg> cursor = null;
     try
@@ -395,7 +404,11 @@
       cursor = replicationServerDomain.getCursorFrom(serverState);
       while (cursor.next() && isLateQueueBelowThreshold())
       {
-        lateQueue.add(cursor.getRecord());
+        final UpdateMsg record = cursor.getRecord();
+        if (connectedReplicaIds.contains(record.getCSN().getServerId()))
+        {
+          lateQueue.add(record);
+        }
       }
     }
     catch (ChangelogException e)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 675853c..34727a4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,9 +31,11 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1291,15 +1293,14 @@
    */
   public UpdateMsg take(ServerHandler sHandler)
   {
-    /*
-     * Get the balanced tree that we use to sort the changes to be
-     * sent to the replica from the cookie
-     *
-     * The next change to send is always the first one in the tree
-     * So this methods simply need to check that dependencies are OK
-     * and update this replicaId RUV
-     */
-    return sHandler.take();
+    // Next message can only be taken from connected DSs
+    final Set<Integer> connectedReplicaIds = new HashSet<Integer>(getConnectedDSs().keySet());
+    if (sHandler.isDataServer())
+    {
+      // Prevents sending to a DS its own messages
+      connectedReplicaIds.remove(sHandler.getServerId());
+    }
+    return sHandler.take(connectedReplicaIds);
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index 58009ad..5e38204 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -29,6 +29,7 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -960,13 +961,15 @@
    * Select the next update that must be sent to the server managed by this
    * ServerHandler.
    *
+   * @param connectedReplicaIds
+   *          Ids of replicas to accept when returning a message.
    * @return the next update that must be sent to the server managed by this
    *         ServerHandler.
    */
-  public UpdateMsg take()
+  public UpdateMsg take(Set<Integer> connectedReplicaIds)
   {
     boolean interrupted = true;
-    UpdateMsg msg = getNextMessage(true); // synchronous:block until msg
+    UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg
 
     boolean acquired = false;
     do

--
Gitblit v1.10.0