From 5a0befe36d7b7e93a65bb19df19d667d413c7c89 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 15:07:20 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology
---
opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java | 108 ++++++++++++++++++++++++++++++++++++------------------
1 files changed, 72 insertions(+), 36 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index b31a5cc..291e1bf 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -29,9 +29,9 @@
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.CSN;
-import org.forgerock.opendj.ldap.ByteSequenceReader;
-import org.forgerock.opendj.ldap.ByteString;
-import org.forgerock.opendj.ldap.ByteStringBuilder;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
* Class that define messages sent by a replication domain (DS) to the
@@ -39,21 +39,48 @@
*/
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
+ private static final byte NORMAL_HEARTBEAT = 0;
+ private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
+
/**
* The CSN containing the change time.
*/
private final CSN csn;
+ /**
+ * The CSN containing the change time.
+ */
+ private final byte eventType;
+
+ private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
+ {
+ this.csn = csn;
+ this.eventType = eventType;
+ }
/**
- * Constructor of a Change Time Heartbeat message providing the change time
- * value in a CSN.
+ * Factory method that builds a change time heartbeat message providing the
+ * change time value in a CSN.
*
* @param csn
* The provided CSN.
+ * @return a new ChangeTimeHeartbeatMsg
*/
- public ChangeTimeHeartbeatMsg(CSN csn)
+ public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
{
- this.csn = csn;
+ return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
+ }
+
+ /**
+ * Factory method that builds a change time heartbeat message for a replica
+ * going offline.
+ *
+ * @param offlineCSN
+ * the serverId and timestamp of the replica going offline
+ * @return a new ChangeTimeHeartbeatMsg
+ */
+ public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
+ {
+ return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
}
/**
@@ -67,6 +94,17 @@
}
/**
+ * Returns whether this is a replica offline message.
+ *
+ * @return true if this is a replica offline message, false if this is a
+ * regular heartbeat message.
+ */
+ public boolean isReplicaOfflineMsg()
+ {
+ return eventType == REPLICA_OFFLINE_HEARTBEAT;
+ }
+
+ /**
* Creates a message from a provided byte array.
*
* @param in
@@ -79,32 +117,31 @@
public ChangeTimeHeartbeatMsg(byte[] in, short version)
throws DataFormatException
{
- final ByteSequenceReader reader = ByteString.wrap(in).asReader();
try
{
- if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_CT_HEARTBEAT)
{
- // Throw better exception below.
- throw new IllegalArgumentException();
+ throw new DataFormatException("input is not a valid "
+ + getClass().getSimpleName() + " message: " + msgType);
}
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
- {
- csn = CSN.valueOf(reader.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
- }
- else
- {
- csn = CSN.valueOf(reader.getString(CSN.STRING_ENCODING_LENGTH));
- reader.get(); // Read trailing 0 byte.
- }
+ csn = version >= REPLICATION_PROTOCOL_V7
+ ? scanner.nextCSN()
+ : scanner.nextCSNUTF8();
+ eventType = version >= REPLICATION_PROTOCOL_V8
+ ? scanner.nextByte()
+ : NORMAL_HEARTBEAT;
- if (reader.remaining() > 0)
+ if (!scanner.isEmpty())
{
- // Throw better exception below.
- throw new IllegalArgumentException();
+ throw new DataFormatException(
+ "Did not expect to find more bytes to read for "
+ + getClass().getSimpleName() + " message.");
}
}
- catch (Exception e)
+ catch (RuntimeException e)
{
// Index out of bounds, bad format, etc.
throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
@@ -115,24 +152,22 @@
@Override
public byte[] getBytes(short protocolVersion)
{
- if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V7)
{
- final ByteStringBuilder builder = new ByteStringBuilder(
- CSN.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
+ ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csnsUTF8(1));
builder.append(MSG_TYPE_CT_HEARTBEAT);
- csn.toByteString(builder);
+ builder.appendUTF8(csn);
return builder.toByteArray();
}
- else
+
+ final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
+ builder.append(MSG_TYPE_CT_HEARTBEAT);
+ builder.append(csn);
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
{
- final ByteStringBuilder builder = new ByteStringBuilder(
- CSN.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
- builder.append(MSG_TYPE_CT_HEARTBEAT);
- builder.append(csn.toString());
- builder.append((byte) 0); // For compatibility with earlier protocol
- // versions.
- return builder.toByteArray();
+ builder.append(eventType);
}
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@@ -141,4 +176,5 @@
{
return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
}
+
}
--
Gitblit v1.10.0