From 7ed8164f91194d46c58290cb0887fcfa7d585da7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 11 Jun 2015 10:20:09 +0000
Subject: [PATCH] OPENDJ-2106 (CR-7228) Entry is not replicated on second instance in 2 RS topology

---
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java          |   28 ++++---------
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java |   16 --------
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java            |    4 +-
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java           |   67 ++++++++++++++-------------------
 4 files changed, 40 insertions(+), 75 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index 18c955f..a13585c 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -31,7 +31,6 @@
 
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.forgerock.i18n.LocalizableMessage;
@@ -234,19 +233,14 @@
   }
 
   /**
-   * Get the next update that must be sent to the consumer
-   * from the message queue or from the database.
+   * Get the next update that must be sent to the consumer from the message queue or from the
+   * database.
    *
-   * @param connectedReplicaIds
-   *            Ids of replicas to accept when returning a message from
-   *            the late queue.
-   * @param synchronous specifies what to do when the queue is empty.
-   *         when true, the method blocks; when false the method return null.
-   *
-   * @return The next update that must be sent to the consumer.
-   *         null when synchronous is false and queue is empty.
+   * @param sendToServerId
+   *          serverId of the replica where changes will be sent
+   * @return The next update that must be sent to the consumer, or {@code null} when queue is empty
    */
-  protected UpdateMsg getNextMessage(Set<Integer> connectedReplicaIds, boolean synchronous)
+  protected UpdateMsg getNextMessage(int sendToServerId)
   {
     while (activeConsumer)
     {
@@ -279,7 +273,7 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          fillLateQueue(connectedReplicaIds);
+          fillLateQueue(sendToServerId);
           if (lateQueue.isEmpty())
           {
             // we could not find any messages in the changelog
@@ -342,10 +336,6 @@
           {
             while (msgQueue.isEmpty() && following)
             {
-              if (!synchronous)
-              {
-                return null;
-              }
               msgQueue.wait(500);
               if (!activeConsumer)
               {
@@ -381,14 +371,14 @@
    * Fills the late queue with the most recent changes, accepting only the
    * messages from provided replica ids.
    */
-  private void fillLateQueue(Set<Integer> connectedReplicaIds)
+  private void fillLateQueue(int sendToServerId)
   {
     try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
     {
       while (cursor.next() && isLateQueueBelowThreshold())
       {
         final UpdateMsg record = cursor.getRecord();
-        if (connectedReplicaIds.contains(record.getCSN().getServerId()))
+        if (record.getCSN().getServerId() != sendToServerId)
         {
           lateQueue.add(record);
         }
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
index 4a9e1d6..0ab175d 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -31,7 +31,6 @@
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -1265,21 +1264,6 @@
   }
 
   /**
-   * Get the next update that need to be sent to a given LDAP server.
-   * This call is blocking when no update is available or when dependencies
-   * do not allow to send the next available change
-   *
-   * @param sHandler The server handler for the target directory server.
-   *
-   * @return the update that must be forwarded
-   */
-  public UpdateMsg take(ServerHandler sHandler)
-  {
-    // Next message can only be taken from connected DSs
-    return sHandler.take(new HashSet<Integer>(getConnectedDSs().keySet()));
-  }
-
-  /**
    * Creates and returns a cursor across this replication domain.
    * <p>
    * Client code must call {@link DBCursor#next()} to advance the cursor to the
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
index 21eb5fb..6656852 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerHandler.java
@@ -31,7 +31,6 @@
 import java.io.IOException;
 import java.util.List;
 import java.util.Random;
-import java.util.Set;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -603,15 +602,6 @@
 
   /**
    * Increment the number of updates sent to the server in assured safe data
-   * mode.
-   */
-  private void incrementAssuredSdSentUpdates()
-  {
-    assuredSdSentUpdates++;
-  }
-
-  /**
-   * Increment the number of updates sent to the server in assured safe data
    * mode that timed out.
    */
   public void incrementAssuredSdSentUpdatesTimeout()
@@ -639,15 +629,6 @@
 
   /**
    * Increment the number of updates sent to the server in assured safe read
-   * mode.
-   */
-  private void incrementAssuredSrSentUpdates()
-  {
-    assuredSrSentUpdates++;
-  }
-
-  /**
-   * Increment the number of updates sent to the server in assured safe read
    * mode that timed out.
    */
   public void incrementAssuredSrSentUpdatesTimeout()
@@ -942,17 +923,31 @@
    * Select the next update that must be sent to the server managed by this
    * ServerHandler.
    *
-   * @param connectedReplicaIds
-   *          Ids of replicas to accept when returning a message.
    * @return the next update that must be sent to the server managed by this
    *         ServerHandler.
    */
-  public UpdateMsg take(Set<Integer> connectedReplicaIds)
+  public UpdateMsg take()
   {
-    boolean interrupted = true;
-    UpdateMsg msg = getNextMessage(connectedReplicaIds, true); // synchronous:block until msg
+    final UpdateMsg msg = getNextMessage(serverId);
 
+    acquirePermitInSendWindow();
+
+    if (msg != null)
+    {
+      incrementOutCount();
+      if (msg.isAssured())
+      {
+        incrementAssuredStats(msg);
+      }
+      return msg;
+    }
+    return null;
+  }
+
+  private void acquirePermitInSendWindow()
+  {
     boolean acquired = false;
+    boolean interrupted = true;
     do
     {
       try
@@ -964,22 +959,18 @@
         // loop until not interrupted
       }
     } while ((interrupted || !acquired) && !shutdownWriter);
-    if (msg != null)
-    {
-      incrementOutCount();
+  }
 
-      if (msg.isAssured())
-      {
-        if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
-        {
-          incrementAssuredSrSentUpdates();
-        } else if (!isDataServer())
-        {
-          incrementAssuredSdSentUpdates();
-        }
-      }
+  private void incrementAssuredStats(final UpdateMsg msg)
+  {
+    if (msg.getAssuredMode() == AssuredMode.SAFE_READ_MODE)
+    {
+      assuredSrSentUpdates++;
     }
-    return msg;
+    else if (!isDataServer())
+    {
+      assuredSdSentUpdates++;
+    }
   }
 
   /**
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
index 343e2d5..f4fbe2a 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ServerWriter.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2014 ForgeRock AS
+ *      Portions Copyright 2011-2015 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -101,7 +101,7 @@
       while (!shutdown
           || !dsrsShutdownSync.canShutdown(replicationServerDomain.getBaseDN()))
       {
-        final UpdateMsg updateMsg = replicationServerDomain.take(this.handler);
+        final UpdateMsg updateMsg = this.handler.take();
         if (updateMsg == null)
         {
           // this connection is closing

--
Gitblit v1.10.0