From e4ae0bdaa33ad0568be2e3b7f1e6a8d5695513e8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 23 Jun 2014 09:29:29 +0000
Subject: [PATCH] OPENDJ-1453 Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java                         |   52 ++------------------------
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java |   24 +++--------
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                          |    9 ----
 opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java                      |   22 ----------
 4 files changed, 13 insertions(+), 94 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 073d2eb..e0a5dd5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -39,48 +39,22 @@
  */
 public class ChangeTimeHeartbeatMsg extends ReplicationMsg
 {
-  private static final byte NORMAL_HEARTBEAT = 0;
-  private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
 
   /**
    * The CSN containing the change time.
    */
   private final CSN csn;
-  /**
-   * The CSN containing the change time.
-   */
-  private final byte eventType;
-
-  private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
-  {
-    this.csn = csn;
-    this.eventType = eventType;
-  }
 
   /**
-   * Factory method that builds a change time heartbeat message providing the
-   * change time value in a CSN.
+   * Constructor of a Change Time Heartbeat message providing the change time
+   * value in a CSN.
    *
    * @param csn
    *          The provided CSN.
-   * @return a new ChangeTimeHeartbeatMsg
    */
-  public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
+  public ChangeTimeHeartbeatMsg(CSN csn)
   {
-    return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
-  }
-
-  /**
-   * Factory method that builds a change time heartbeat message for a replica
-   * going offline.
-   *
-   * @param offlineCSN
-   *          the serverId and timestamp of the replica going offline
-   * @return a new ChangeTimeHeartbeatMsg
-   */
-  public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
-  {
-    return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
+    this.csn = csn;
   }
 
   /**
@@ -94,17 +68,6 @@
   }
 
   /**
-   * Returns whether this is a replica offline message.
-   *
-   * @return true if this is a replica offline message, false if this is a
-   *         regular heartbeat message.
-   */
-  public boolean isReplicaOfflineMsg()
-  {
-    return eventType == REPLICA_OFFLINE_HEARTBEAT;
-  }
-
-  /**
    * Creates a message from a provided byte array.
    *
    * @param in
@@ -130,9 +93,6 @@
       csn = version >= REPLICATION_PROTOCOL_V7
           ? scanner.nextCSN()
           : scanner.nextCSNUTF8();
-      eventType = version >= REPLICATION_PROTOCOL_V8
-          ? scanner.nextByte()
-          : NORMAL_HEARTBEAT;
 
       if (!scanner.isEmpty())
       {
@@ -163,10 +123,6 @@
     final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
     builder.appendByte(MSG_TYPE_CT_HEARTBEAT);
     builder.appendCSN(csn);
-    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
-    {
-      builder.appendByte(eventType);
-    }
     return builder.toByteArray();
   }
 
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4b2944c..7f3ad97 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2508,14 +2508,7 @@
   {
     try
     {
-      if (msg.isReplicaOfflineMsg())
-      {
-        domainDB.notifyReplicaOffline(baseDN, msg.getCSN());
-      }
-      else
-      {
-        domainDB.replicaHeartbeat(baseDN, msg.getCSN());
-      }
+      domainDB.replicaHeartbeat(baseDN, msg.getCSN());
     }
     catch (ChangelogException e)
     {
diff --git a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index f509938..51425a2 100644
--- a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -107,7 +107,7 @@
         if (now > session.getLastPublishTime() + heartbeatInterval)
         {
           final CSN csn = new CSN(now, 0, serverId);
-          session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
+          session.publish(new ChangeTimeHeartbeatMsg(csn));
           lastHeartbeatTime = csn.getTime();
         }
 
@@ -137,26 +137,6 @@
           }
         }
       }
-
-      if (shutdown)
-      {
-        /*
-         * Shortcoming: this thread is restarted each time the DS reconnects,
-         * e.g. during load balancing. This is not that much of a problem
-         * because the ChangeNumberIndexer tolerates receiving replica offline
-         * heartbeats and then receiving messages back again.
-         */
-        /*
-         * However, during shutdown we need to be sure that all pending client
-         * operations have either completed or have been aborted before shutting
-         * down replication. Otherwise, the medium consistency will move forward
-         * without knowing about these changes.
-         */
-        final long now = System.currentTimeMillis();
-        final int seqNum = lastHeartbeatTime == now ? 1 : 0;
-        final CSN offlineCSN = new CSN(now, seqNum, serverId);
-        session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
-      }
     }
     catch (IOException e)
     {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index cae1372..2a90a7d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -829,28 +829,18 @@
   @Test
   public void changeTimeHeartbeatMsgTest() throws Exception
   {
-    final short v1 = REPLICATION_PROTOCOL_V1;
-    final short v7 = REPLICATION_PROTOCOL_V7;
-    final short v8 = REPLICATION_PROTOCOL_V8;
-
     final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
-    final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn);
-    assertCTHearbeatMsg(heartbeatMsg, v1, false);
-    assertCTHearbeatMsg(heartbeatMsg, v7, false);
-    assertCTHearbeatMsg(heartbeatMsg, v8, false);
-
-    final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn);
-    assertCTHearbeatMsg(offlineMsg, v1, false);
-    assertCTHearbeatMsg(offlineMsg, v7, false);
-    assertCTHearbeatMsg(offlineMsg, v8, true);
+    final ChangeTimeHeartbeatMsg heartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
+    assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V1);
+    assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V7);
   }
 
-  private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg,
-      short version, boolean expected) throws DataFormatException
+  private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg expectedMsg,
+      short version) throws DataFormatException
   {
-    final byte[] bytes = heartbeatMsg.getBytes(version);
+    final byte[] bytes = expectedMsg.getBytes(version);
     ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version);
-    assertEquals(decodedMsg.isReplicaOfflineMsg(), expected);
+    assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
   }
 
   /**

--
Gitblit v1.10.0