From 029beee63ef0fa2eb3915241c3b3c866d0355b6c Mon Sep 17 00:00:00 2001
From: Jan-Peter Nilsson <notifications@github.com>
Date: Wed, 09 Sep 2020 14:31:56 +0000
Subject: [PATCH] FIX Replication not catching up properly if there is many pending changes  https://github.com/OpenIdentityPlatform/OpenDJ/issues/141

---
 opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java |   32 ++++++++++++++++++++++++++++----
 1 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index 9cfb03d..6b3f1e8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -258,7 +258,7 @@
            *           restart as usual
            *   load this change on the delayList
            */
-          fillLateQueue();
+          boolean queueContributesToDomainState = fillLateQueue();
           if (lateQueue.isEmpty())
           {
             // we could not find any messages in the changelog
@@ -283,6 +283,15 @@
             UpdateMsg msg = lateQueue.first();
             synchronized (msgQueue)
             {
+              if (!queueContributesToDomainState)
+              {
+               // If nothing in the queue contributesToDomainState, add it all to msgQueue so we can get out of here
+               while(!lateQueue.isEmpty())
+               {
+                 msgQueue.add(lateQueue.removeFirst());
+               }
+              }
+
               if (msgQueue.contains(msg))
               {
                 /* we finally catch up with the regular queue */
@@ -307,9 +316,14 @@
             // By default a server is always not following. A weird case where messages not representing
             // an operation may happen, making the late queue repeatedly fill and be emptied without ever
             // getting the server out of state "not following".
+
             if (lateQueue.isEmpty() && msgQueue.isEmpty())
             {
-              following = true;
+              CSN nextChange = findOldestCSNFromReplicaDBs();
+              if (nextChange == null)
+              {
+                following = true;
+              }
             }
           }
           if (updateServerState(msg))
@@ -363,15 +377,25 @@
    * Fills the late queue with the most recent changes, accepting only the
    * messages from provided replica ids.
    */
-  private void fillLateQueue() throws ChangelogException
+  private boolean fillLateQueue() throws ChangelogException
   {
+    boolean contributesToDomainState = false;
     try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
     {
       while (cursor.next() && isLateQueueBelowThreshold())
       {
-        lateQueue.add(cursor.getRecord());
+        UpdateMsg msg = cursor.getRecord();
+        lateQueue.add(msg);
+        contributesToDomainState |= msg.contributesToDomainState();
       }
     }
+
+    // If we stopped because we filled the queue, set contributesToDomainState to true
+    if (!contributesToDomainState && !isLateQueueBelowThreshold())
+    {
+      contributesToDomainState = true;
+    }
+    return contributesToDomainState;
   }
 
   private boolean isLateQueueBelowThreshold()

--
Gitblit v1.10.0