From de36fa06856d8d04652401bb24e49c3259aef154 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 10:26:42 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java |   39 +++++++++++++++++++++++++++++----------
 1 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index faeab2b..6b090d9 100644
--- a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -35,7 +35,6 @@
 import org.opends.server.replication.protocol.Session;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
-import org.opends.server.util.TimeThread;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
@@ -76,15 +75,15 @@
    * @param session The session on which heartbeats are to be sent.
    * @param heartbeatInterval The interval between heartbeats sent
    *                          (in milliseconds).
-   * @param serverId2 The serverId of the sender domain.
+   * @param serverId The serverId of the sender domain.
    */
   public CTHeartbeatPublisherThread(String threadName, Session session,
-                  long heartbeatInterval, int serverId2)
+                  long heartbeatInterval, int serverId)
   {
     super(threadName);
     this.session = session;
     this.heartbeatInterval = heartbeatInterval;
-    this.serverId = serverId2;
+    this.serverId = serverId;
   }
 
   /**
@@ -93,6 +92,7 @@
   @Override
   public void run()
   {
+    long lastHeartbeatTime = 0;
     try
     {
       if (debugEnabled())
@@ -103,13 +103,12 @@
 
       while (!shutdown)
       {
-        long now = System.currentTimeMillis();
-        final CSN csn = new CSN(TimeThread.getTime(), 0, serverId);
-        ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
-
+        final long now = System.currentTimeMillis();
         if (now > session.getLastPublishTime() + heartbeatInterval)
         {
-          session.publish(ctHeartbeatMsg);
+          final CSN csn = new CSN(now, 0, serverId);
+          session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
+          lastHeartbeatTime = csn.getTime();
         }
 
         long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
@@ -138,6 +137,26 @@
           }
         }
       }
+
+      if (shutdown)
+      {
+        /*
+         * Shortcoming: this thread is restarted each time the DS reconnects,
+         * e.g. during load balancing. This is not that much of a problem
+         * because the ChangeNumberIndexer tolerates receiving replica offline
+         * heartbeats and then receiving messages back again.
+         */
+        /*
+         * However, during shutdown we need to be sure that all pending client
+         * operations have either completed or have been aborted before shutting
+         * down replication. Otherwise, the medium consistency will move forward
+         * without knowing about these changes.
+         */
+        final long now = System.currentTimeMillis();
+        final int seqNum = lastHeartbeatTime == now ? 1 : 0;
+        final CSN offlineCSN = new CSN(now, seqNum, serverId);
+        session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
+      }
     }
     catch (IOException e)
     {

--
Gitblit v1.10.0