From 5a0befe36d7b7e93a65bb19df19d667d413c7c89 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 15:07:20 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java | 37 ++++++++++++++++++++++++++++---------
1 files changed, 28 insertions(+), 9 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index 15a784b..f3f74c5 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -34,7 +34,6 @@
import org.opends.server.replication.protocol.ChangeTimeHeartbeatMsg;
import org.opends.server.replication.protocol.Session;
import org.opends.server.util.StaticUtils;
-import org.opends.server.util.TimeThread;
/**
* This thread publishes a {@link ChangeTimeHeartbeatMsg} on a given protocol
@@ -70,15 +69,15 @@
* @param session The session on which heartbeats are to be sent.
* @param heartbeatInterval The interval between heartbeats sent
* (in milliseconds).
- * @param serverId2 The serverId of the sender domain.
+ * @param serverId The serverId of the sender domain.
*/
public CTHeartbeatPublisherThread(String threadName, Session session,
- long heartbeatInterval, int serverId2)
+ long heartbeatInterval, int serverId)
{
super(threadName);
this.session = session;
this.heartbeatInterval = heartbeatInterval;
- this.serverId = serverId2;
+ this.serverId = serverId;
}
/**
@@ -87,6 +86,7 @@
@Override
public void run()
{
+ long lastHeartbeatTime = 0;
try
{
if (logger.isTraceEnabled())
@@ -97,13 +97,12 @@
while (!shutdown)
{
- long now = System.currentTimeMillis();
- final CSN csn = new CSN(TimeThread.getTime(), 0, serverId);
- ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
-
+ final long now = System.currentTimeMillis();
if (now > session.getLastPublishTime() + heartbeatInterval)
{
- session.publish(ctHeartbeatMsg);
+ final CSN csn = new CSN(now, 0, serverId);
+ session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
+ lastHeartbeatTime = csn.getTime();
}
long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
@@ -129,6 +128,26 @@
}
}
}
+
+ if (shutdown)
+ {
+ /*
+ * Shortcoming: this thread is restarted each time the DS reconnects,
+ * e.g. during load balancing. This is not that much of a problem
+ * because the ChangeNumberIndexer tolerates receiving replica offline
+ * heartbeats and then receiving messages back again.
+ */
+ /*
+ * However, during shutdown we need to be sure that all pending client
+ * operations have either completed or have been aborted before shutting
+ * down replication. Otherwise, the medium consistency will move forward
+ * without knowing about these changes.
+ */
+ final long now = System.currentTimeMillis();
+ final int seqNum = lastHeartbeatTime == now ? 1 : 0;
+ final CSN offlineCSN = new CSN(now, seqNum, serverId);
+ session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
+ }
}
catch (IOException e)
{
--
Gitblit v1.10.0