From 029beee63ef0fa2eb3915241c3b3c866d0355b6c Mon Sep 17 00:00:00 2001
From: Jan-Peter Nilsson <notifications@github.com>
Date: Wed, 09 Sep 2020 14:31:56 +0000
Subject: [PATCH] FIX Replication not catching up properly if there is many pending changes https://github.com/OpenIdentityPlatform/OpenDJ/issues/141
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java | 32 ++++++++++++++++++++++++++++----
1 files changed, 28 insertions(+), 4 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index 9cfb03d..6b3f1e8 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -258,7 +258,7 @@
* restart as usual
* load this change on the delayList
*/
- fillLateQueue();
+ boolean queueContributesToDomainState = fillLateQueue();
if (lateQueue.isEmpty())
{
// we could not find any messages in the changelog
@@ -283,6 +283,15 @@
UpdateMsg msg = lateQueue.first();
synchronized (msgQueue)
{
+ if (!queueContributesToDomainState)
+ {
+ // If nothing in the queue contributesToDomainState, add it all to msgQueue so we can get out of here
+ while(!lateQueue.isEmpty())
+ {
+ msgQueue.add(lateQueue.removeFirst());
+ }
+ }
+
if (msgQueue.contains(msg))
{
/* we finally catch up with the regular queue */
@@ -307,9 +316,14 @@
// By default a server is always not following. A weird case where messages not representing
// an operation may happen, making the late queue repeatedly fill and be emptied without ever
// getting the server out of state "not following".
+
if (lateQueue.isEmpty() && msgQueue.isEmpty())
{
- following = true;
+ CSN nextChange = findOldestCSNFromReplicaDBs();
+ if (nextChange == null)
+ {
+ following = true;
+ }
}
}
if (updateServerState(msg))
@@ -363,15 +377,25 @@
* Fills the late queue with the most recent changes, accepting only the
* messages from provided replica ids.
*/
- private void fillLateQueue() throws ChangelogException
+ private boolean fillLateQueue() throws ChangelogException
{
+ boolean contributesToDomainState = false;
try (DBCursor<UpdateMsg> cursor = replicationServerDomain.getCursorFrom(serverState);)
{
while (cursor.next() && isLateQueueBelowThreshold())
{
- lateQueue.add(cursor.getRecord());
+ UpdateMsg msg = cursor.getRecord();
+ lateQueue.add(msg);
+ contributesToDomainState |= msg.contributesToDomainState();
}
}
+
+ // If we stopped because we filled the queue, set contributesToDomainState to true
+ if (!contributesToDomainState && !isLateQueueBelowThreshold())
+ {
+ contributesToDomainState = true;
+ }
+ return contributesToDomainState;
}
private boolean isLateQueueBelowThreshold()
--
Gitblit v1.10.0