From e4ae0bdaa33ad0568be2e3b7f1e6a8d5695513e8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 23 Jun 2014 09:29:29 +0000
Subject: [PATCH] OPENDJ-1453 Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java | 52 ++------------------------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 24 +++--------
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 9 ----
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java | 22 ----------
4 files changed, 13 insertions(+), 94 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 073d2eb..e0a5dd5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -39,48 +39,22 @@
*/
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
- private static final byte NORMAL_HEARTBEAT = 0;
- private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
/**
* The CSN containing the change time.
*/
private final CSN csn;
- /**
- * The CSN containing the change time.
- */
- private final byte eventType;
-
- private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
- {
- this.csn = csn;
- this.eventType = eventType;
- }
/**
- * Factory method that builds a change time heartbeat message providing the
- * change time value in a CSN.
+ * Constructor of a Change Time Heartbeat message providing the change time
+ * value in a CSN.
*
* @param csn
* The provided CSN.
- * @return a new ChangeTimeHeartbeatMsg
*/
- public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
+ public ChangeTimeHeartbeatMsg(CSN csn)
{
- return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
- }
-
- /**
- * Factory method that builds a change time heartbeat message for a replica
- * going offline.
- *
- * @param offlineCSN
- * the serverId and timestamp of the replica going offline
- * @return a new ChangeTimeHeartbeatMsg
- */
- public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
- {
- return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
+ this.csn = csn;
}
/**
@@ -94,17 +68,6 @@
}
/**
- * Returns whether this is a replica offline message.
- *
- * @return true if this is a replica offline message, false if this is a
- * regular heartbeat message.
- */
- public boolean isReplicaOfflineMsg()
- {
- return eventType == REPLICA_OFFLINE_HEARTBEAT;
- }
-
- /**
* Creates a message from a provided byte array.
*
* @param in
@@ -130,9 +93,6 @@
csn = version >= REPLICATION_PROTOCOL_V7
? scanner.nextCSN()
: scanner.nextCSNUTF8();
- eventType = version >= REPLICATION_PROTOCOL_V8
- ? scanner.nextByte()
- : NORMAL_HEARTBEAT;
if (!scanner.isEmpty())
{
@@ -163,10 +123,6 @@
final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
builder.appendByte(MSG_TYPE_CT_HEARTBEAT);
builder.appendCSN(csn);
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
- {
- builder.appendByte(eventType);
- }
return builder.toByteArray();
}
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 4b2944c..7f3ad97 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2508,14 +2508,7 @@
{
try
{
- if (msg.isReplicaOfflineMsg())
- {
- domainDB.notifyReplicaOffline(baseDN, msg.getCSN());
- }
- else
- {
- domainDB.replicaHeartbeat(baseDN, msg.getCSN());
- }
+ domainDB.replicaHeartbeat(baseDN, msg.getCSN());
}
catch (ChangelogException e)
{
diff --git a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index f509938..51425a2 100644
--- a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -107,7 +107,7 @@
if (now > session.getLastPublishTime() + heartbeatInterval)
{
final CSN csn = new CSN(now, 0, serverId);
- session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
+ session.publish(new ChangeTimeHeartbeatMsg(csn));
lastHeartbeatTime = csn.getTime();
}
@@ -137,26 +137,6 @@
}
}
}
-
- if (shutdown)
- {
- /*
- * Shortcoming: this thread is restarted each time the DS reconnects,
- * e.g. during load balancing. This is not that much of a problem
- * because the ChangeNumberIndexer tolerates receiving replica offline
- * heartbeats and then receiving messages back again.
- */
- /*
- * However, during shutdown we need to be sure that all pending client
- * operations have either completed or have been aborted before shutting
- * down replication. Otherwise, the medium consistency will move forward
- * without knowing about these changes.
- */
- final long now = System.currentTimeMillis();
- final int seqNum = lastHeartbeatTime == now ? 1 : 0;
- final CSN offlineCSN = new CSN(now, seqNum, serverId);
- session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
- }
}
catch (IOException e)
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index cae1372..2a90a7d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -829,28 +829,18 @@
@Test
public void changeTimeHeartbeatMsgTest() throws Exception
{
- final short v1 = REPLICATION_PROTOCOL_V1;
- final short v7 = REPLICATION_PROTOCOL_V7;
- final short v8 = REPLICATION_PROTOCOL_V8;
-
final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
- final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn);
- assertCTHearbeatMsg(heartbeatMsg, v1, false);
- assertCTHearbeatMsg(heartbeatMsg, v7, false);
- assertCTHearbeatMsg(heartbeatMsg, v8, false);
-
- final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn);
- assertCTHearbeatMsg(offlineMsg, v1, false);
- assertCTHearbeatMsg(offlineMsg, v7, false);
- assertCTHearbeatMsg(offlineMsg, v8, true);
+ final ChangeTimeHeartbeatMsg heartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
+ assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V1);
+ assertCTHearbeatMsg(heartbeatMsg, REPLICATION_PROTOCOL_V7);
}
- private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg,
- short version, boolean expected) throws DataFormatException
+ private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg expectedMsg,
+ short version) throws DataFormatException
{
- final byte[] bytes = heartbeatMsg.getBytes(version);
+ final byte[] bytes = expectedMsg.getBytes(version);
ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version);
- assertEquals(decodedMsg.isReplicaOfflineMsg(), expected);
+ assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
}
/**
--
Gitblit v1.10.0