From e4f80a3325b8ca06ad3823113b2e30054e471062 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 11 Jun 2015 10:20:09 +0000
Subject: [PATCH] OPENDJ-2106 (CR-7228) Entry is not replicated on second instance in 2 RS topology

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java |   28 +++++++++-------------------
 1 files changed, 9 insertions(+), 19 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index 18c955f..a13585c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -31,7 +31,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.forgerock.i18n.LocalizableMessage;
@@ -234,19 +233,14 @@
   }
 
   /**
-   * Get the next update that must be sent to the consumer
-   * from the message queue or from the database.
+   * Get the next update that must be sent to the consumer from the message queue or from the
+   * database.
    *
-   * @param connectedReplicaIds
-   *            Ids of replicas to accept when returning a message from
-   *            the late queue.
-   * @param synchronous specifies what to do when the queue is empty.
-   *         when true, the method blocks; when false the method return null.
-   *
-   * @return The next update that must be sent to the consumer.
-   *         null when synchronous is false and queue is empty.
+   * @param sendToServerId
+   *          serverId of the replica where changes will be sent
+   * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
    */
-  protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
+  protected UpdateMsg getNextMessage(int sendToServerId)
   {
     while (activeConsumer)
     {
@@ -279,7 +273,7 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          fillLateQueue(connectedReplicaIds);
+          fillLateQueue(sendToServerId);
           if (lateQueue.isEmpty())
           {
             // we could not find any messages in the changelog
@@ -342,10 +336,6 @@
           {
             while (msgQueue.isEmpty() && following)
             {
-              if (!synchronous)
-              {
-                return null;
-              }
               msgQueue.wait(500);
               if (!activeConsumer)
               {
@@ -381,14 +371,14 @@
    * Fills the late queue with the most recent changes, accepting only the
    * messages from provided replica ids.
    */
-  private void fillLateQueue(Set<Integer> connectedReplicaIds)
+  private void fillLateQueue(int sendToServerId)
   {
     try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
     {
       while (cursor.next() && isLateQueueBelowThreshold())
       {
         final UpdateMsg record = cursor.getRecord();
-        if (connectedReplicaIds.contains(record.getCSN().getServerId()))
+        if (record.getCSN().getServerId() != sendToServerId)
         {
           lateQueue.add(record);
         }

--
Gitblit v1.10.0