From 7160f59040db3b159e1d73d9ba58055e8806163d Mon Sep 17 00:00:00 2001
From: Fabio Pistolesi <fabio.pistolesi@forgerock.com>
Date: Wed, 12 Oct 2016 10:30:03 +0000
Subject: [PATCH] OPENDJ-3337 Make sure ReplicationOffline messages are not repeatedly returned by cursors.
---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicaCursor.java | 18 +++++++++++++++---
opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java | 7 +++++++
opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java | 4 +++-
3 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
index 61ac4b4..9cfb03d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/MessageHandler.java
@@ -304,6 +304,13 @@
synchronized (msgQueue) // TODO JNR why synchronize(msgQueue) here?
{
msg = lateQueue.removeFirst();
+ // By default a server is always not following. A weird case where messages not representing
+ // an operation may happen, making the late queue repeatedly fill and be emptied without ever
+ // getting the server out of state "not following".
+ if (lateQueue.isEmpty() && msgQueue.isEmpty())
+ {
+ following = true;
+ }
}
if (updateServerState(msg))
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
index 645e403..b53ca6f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -423,7 +423,9 @@
}
return true;
}
- return false;
+ // Replica offline messages should not get to a connected DS, they are meant to be
+ // exchanged only between RSes
+ return updateMsg instanceof ReplicaOfflineMsg;
}
private PreparedAssuredInfo getPreparedAssuredInfo(UpdateMsg updateMsg,
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicaCursor.java b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicaCursor.java
index 18de378..046bcd5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicaCursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/replication/server/changelog/file/ReplicaCursor.java
@@ -11,7 +11,7 @@
* Header, with the fields enclosed by brackets [] replaced by your own identifying
* information: "Portions Copyright [year] [name of copyright owner]".
*
- * Copyright 2014-2015 ForgeRock AS.
+ * Copyright 2014-2016 ForgeRock AS.
*/
package org.opends.server.replication.server.changelog.file;
@@ -74,8 +74,20 @@
*/
public void setOfflineCSN(CSN offlineCSN)
{
- this.replicaOfflineMsg.set(
- offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null);
+ if (offlineCSN != null)
+ {
+ ReplicaOfflineMsg prevOfflineMsg = this.replicaOfflineMsg.get();
+ if (prevOfflineMsg == null || prevOfflineMsg.getCSN().isOlderThan(offlineCSN))
+ {
+ // Do not spin if the the message for this replica has been changed. Either a newer
+ // message has arrived or the next cursor iteration will pick it up.
+ this.replicaOfflineMsg.compareAndSet(prevOfflineMsg, new ReplicaOfflineMsg(offlineCSN));
+ }
+ }
+ else
+ {
+ this.replicaOfflineMsg.set(null);
+ }
}
/** {@inheritDoc} */
--
Gitblit v1.10.0