From 4fd152ec8ba98ac9a70202dbac2b3a579df1033a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 10 Jun 2013 14:25:54 +0000
Subject: [PATCH] Fix OPENDJ-951: Reduce size and frequency of replication MonitorMsg
---
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 8
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java | 148 ++++++++++--------
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 114 ++++++-------
opends/src/server/org/opends/server/replication/common/ChangeNumber.java | 109 +++++++++++--
opends/src/server/org/opends/server/replication/protocol/AckMsg.java | 2
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 2
opends/src/server/org/opends/server/replication/common/ServerState.java | 38 ++++
7 files changed, 274 insertions(+), 147 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 27347b7..56dd1fb 100644
--- a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -29,20 +29,71 @@
import java.util.Date;
+import org.opends.server.types.ByteSequence;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
/**
* Class used to represent Change Numbers.
*/
public class ChangeNumber implements java.io.Serializable,
java.lang.Comparable<ChangeNumber>
{
+ /**
+ * The number of bytes used by the byte string representation of a change
+ * number.
+ *
+ * @see #valueOf(ByteSequence)
+ * @see #toByteString()
+ * @see #toByteString(ByteStringBuilder)
+ */
+ public static final int BYTE_ENCODING_LENGTH = 14;
+
+ /**
+ * The number of characters used by the string representation of a change
+ * number.
+ *
+ * @see #valueOf(String)
+ * @see #toString()
+ */
+ public static final int STRING_ENCODING_LENGTH = 28;
+
private static final long serialVersionUID = -8802722277749190740L;
private final long timeStamp;
private final int seqnum;
private final int serverId;
- // A String representation of the ChangeNumber suitable for network
- // transmission.
- private String formatedString = null;
+ /**
+ * Parses the provided {@link #toString()} representation of a change number.
+ *
+ * @param s
+ * The string to be parsed.
+ * @return The parsed change number.
+ * @see #toString()
+ */
+ public static ChangeNumber valueOf(String s)
+ {
+ return new ChangeNumber(s);
+ }
+
+ /**
+ * Decodes the provided {@link #toByteString()} representation of a change
+ * number.
+ *
+ * @param bs
+ * The byte sequence to be parsed.
+ * @return The decoded change number.
+ * @see #toByteString()
+ */
+ public static ChangeNumber valueOf(ByteSequence bs)
+ {
+ ByteSequenceReader reader = bs.asReader();
+ long timeStamp = reader.getLong();
+ int serverId = reader.getShort() & 0xffff;
+ int seqnum = reader.getInt();
+ return new ChangeNumber(timeStamp, seqnum, serverId);
+ }
/**
* Create a new ChangeNumber from a String.
@@ -59,8 +110,6 @@
temp = str.substring(20, 28);
seqnum = Integer.parseInt(temp, 16);
-
- formatedString = str;
}
/**
@@ -140,25 +189,47 @@
}
/**
+ * Encodes this change number as a byte string.
+ * <p>
+ * NOTE: this representation must not be modified otherwise interop with
+ * earlier protocol versions will be broken.
+ *
+ * @return The encoded representation of this change number.
+ * @see #valueOf(ByteSequence)
+ */
+ public ByteString toByteString()
+ {
+ return toByteString(new ByteStringBuilder(BYTE_ENCODING_LENGTH))
+ .toByteString();
+ }
+
+ /**
+ * Encodes this change number into the provided byte string builder.
+ * <p>
+ * NOTE: this representation must not be modified otherwise interop with
+ * earlier protocol versions will be broken.
+ *
+ * @param builder
+ * The byte string builder.
+ * @return The byte string builder containing the encoded change number.
+ * @see #valueOf(ByteSequence)
+ */
+ public ByteStringBuilder toByteString(ByteStringBuilder builder)
+ {
+ return builder.append(timeStamp).append((short) (serverId & 0xffff))
+ .append(seqnum);
+ }
+
+ /**
* Convert the ChangeNumber to a printable String.
+ * <p>
+ * NOTE: this representation must not be modified otherwise interop with
+ * earlier protocol versions will be broken.
+ *
* @return the string
*/
public String toString()
{
- return format();
- }
-
- /**
- * Convert the ChangeNumber to a String that is suitable for network
- * transmission.
- *
- * @return the string
- */
- public String format()
- {
- if (formatedString != null)
- return formatedString;
-
return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
}
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 92bf8d1..e9963fa 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -27,6 +27,7 @@
*/
package org.opends.server.replication.common;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
@@ -37,6 +38,8 @@
import java.util.Set;
import java.util.zip.DataFormatException;
+import org.opends.server.protocols.asn1.ASN1Writer;
+import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.types.ByteString;
@@ -278,6 +281,41 @@
}
return values;
}
+
+
+
+ /**
+ * Encodes this server state to the provided ASN1 writer.
+ *
+ * @param writer
+ * The ASN1 writer.
+ * @param protocolVersion
+ * The replication protocol version.
+ * @throws IOException
+ * If an error occurred during encoding.
+ */
+ public void writeTo(ASN1Writer writer, short protocolVersion)
+ throws IOException
+ {
+ synchronized (list)
+ {
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ {
+ for (ChangeNumber cn : list.values())
+ {
+ writer.writeOctetString(cn.toByteString());
+ }
+ }
+ else
+ {
+ for (ChangeNumber cn : list.values())
+ {
+ writer.writeOctetString(cn.toString());
+ }
+ }
+ }
+ }
+
/**
* Return the text representation of ServerState.
* @return the text representation of ServerState
diff --git a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index 544e680..3862a38 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -230,7 +230,7 @@
oStream.write(MSG_TYPE_ACK);
/* Put the ChangeNumber */
- byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8");
+ byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
oStream.write(changeNumberByte);
oStream.write(0);
diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 09b985a..091b23c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -23,20 +23,24 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
+ * Portions copyright 2013 ForgeRock AS.
*/
package org.opends.server.replication.protocol;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+
+
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+
/**
- * Class that define messages sent by a replication domain (DS)
- * to the replication server to let the RS know the DS current
- * change time.
+ * Class that define messages sent by a replication domain (DS) to the
+ * replication server to let the RS know the DS current change time.
*/
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
@@ -45,26 +49,25 @@
*/
private final ChangeNumber changeNumber;
- /**
- * Constructor of a Change Time Heartbeat message.
- */
- public ChangeTimeHeartbeatMsg()
- {
- this.changeNumber = new ChangeNumber((long)0,0,0);
- }
+
/**
- * Constructor of a Change Time Heartbeat message providing
- * the change time value in a change number.
- * @param cn The provided change number.
+ * Constructor of a Change Time Heartbeat message providing the change time
+ * value in a change number.
+ *
+ * @param cn
+ * The provided change number.
*/
public ChangeTimeHeartbeatMsg(ChangeNumber cn)
{
this.changeNumber = cn;
}
+
+
/**
* Get a change number with the transmitted change time.
+ *
* @return the ChangeNumber
*/
public ChangeNumber getChangeNumber()
@@ -72,77 +75,92 @@
return changeNumber;
}
- /**
- * Encode a change time message.
- * @return The encoded message.
- * @throws UnsupportedEncodingException When an error occurs.
- */
- public byte[] encode() throws UnsupportedEncodingException
- {
- byte[] changeNumberByte =
- this.getChangeNumber().toString().getBytes("UTF-8");
- int length = changeNumberByte.length;
- byte[] encodedMsg = new byte[length];
- /* Put the ChangeNumber */
- addByteArray(changeNumberByte, encodedMsg, 0);
-
- return encodedMsg;
- }
/**
* Creates a message from a provided byte array.
- * @param in The provided byte array.
- * @throws DataFormatException When an error occurs.
+ *
+ * @param in
+ * The provided byte array.
+ * @param version
+ * The version of the protocol to use to decode the msg.
+ * @throws DataFormatException
+ * When an error occurs.
*/
- public ChangeTimeHeartbeatMsg(byte[] in) throws DataFormatException
+ public ChangeTimeHeartbeatMsg(byte[] in, short version)
+ throws DataFormatException
{
+ final ByteSequenceReader reader = ByteString.wrap(in).asReader();
try
{
- /* Read the changeNumber */
- /* First byte is the type */
- if (in[0] != MSG_TYPE_CT_HEARTBEAT)
+ if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
{
- throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
+ // Throw better exception below.
+ throw new IllegalArgumentException();
}
- int pos = 1;
- int length = getNextLength(in, pos);
- String changenumberStr = new String(in, pos, length, "UTF-8");
- changeNumber = new ChangeNumber(changenumberStr);
+
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ {
+ changeNumber = ChangeNumber.valueOf(reader
+ .getByteSequence(ChangeNumber.BYTE_ENCODING_LENGTH));
+ }
+ else
+ {
+ changeNumber = ChangeNumber.valueOf(reader
+ .getString(ChangeNumber.STRING_ENCODING_LENGTH));
+ reader.get(); // Read trailing 0 byte.
+ }
+
+ if (reader.remaining() > 0)
+ {
+ // Throw better exception below.
+ throw new IllegalArgumentException();
+ }
}
- catch (UnsupportedEncodingException e)
+ catch (Exception e)
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- catch (IllegalArgumentException e)
- {
- throw new DataFormatException(e.getMessage());
+ // Index out of bounds, bad format, etc.
+ throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
}
}
+
+
/**
- * Get a byte array from the message.
- * @return The byte array containing the PDU of the message.
- * @throws UnsupportedEncodingException When an error occurs.
+ * {@inheritDoc}
*/
- public byte[] getBytes() throws UnsupportedEncodingException
+ @Override
+ public byte[] getBytes()
{
- try {
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+ return getBytes(ProtocolVersion.getCurrentVersion());
+ }
- /* Put the type of the operation */
- oStream.write(MSG_TYPE_CT_HEARTBEAT);
- /* Put the ChangeNumber */
- byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
- oStream.write(changeNumberByte);
- oStream.write(0);
- return oStream.toByteArray();
- } catch (IOException e)
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes(short protocolVersion)
+ {
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
{
- // never happens
- return null;
+ final ByteStringBuilder builder = new ByteStringBuilder(
+ ChangeNumber.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
+ builder.append(MSG_TYPE_CT_HEARTBEAT);
+ changeNumber.toByteString(builder);
+ return builder.toByteArray();
+ }
+ else
+ {
+ final ByteStringBuilder builder = new ByteStringBuilder(
+ ChangeNumber.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
+ builder.append(MSG_TYPE_CT_HEARTBEAT);
+ builder.append(changeNumber.toString());
+ builder.append((byte) 0); // For compatibility with earlier protocol
+ // versions.
+ return builder.toByteArray();
}
}
+
}
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index c20d33a..c616ff4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,12 +27,12 @@
*/
package org.opends.server.replication.protocol;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.Set;
+import java.util.Map;
import org.opends.server.replication.common.ServerState;
import org.opends.server.protocols.asn1.ASN1Reader;
@@ -277,8 +277,16 @@
// loop on the list of CN of the state
while(asn1Reader.hasNextElement())
{
- String s = asn1Reader.readOctetStringAsString();
- ChangeNumber cn = new ChangeNumber(s);
+ ChangeNumber cn;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ {
+ cn = ChangeNumber.valueOf(asn1Reader.readOctetString());
+ }
+ else
+ {
+ cn = ChangeNumber.valueOf(asn1Reader.readOctetStringAsString());
+ }
+
if ((data.replServerDbState != null) && (serverId == 0))
{
// we are on the first CN that is a fake CN to store the serverId
@@ -323,7 +331,6 @@
*/
@Override
public byte[] getBytes()
- throws UnsupportedEncodingException
{
return getBytes(ProtocolVersion.getCurrentVersion());
}
@@ -333,12 +340,10 @@
*/
@Override
public byte[] getBytes(short protocolVersion)
- throws UnsupportedEncodingException
{
try
{
ByteStringBuilder byteBuilder = new ByteStringBuilder();
- ASN1Writer writer = ASN1.getWriter(byteBuilder);
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
@@ -362,66 +367,22 @@
}
/* Put the serverStates ... */
+ ASN1Writer writer = ASN1.getWriter(byteBuilder);
writer.writeStartSequence();
-
- /* first put the Replication Server state */
- writer.writeStartSequence();
- ArrayList<ByteString> cnOctetList =
- data.replServerDbState.toASN1ArrayList();
- for (ByteString soci : cnOctetList)
{
- writer.writeOctetString(soci);
- }
- writer.writeEndSequence();
-
- // then the LDAP server data
- Set<Integer> servers = data.ldapStates.keySet();
- for (Integer sid : servers)
- {
- ServerState statei = data.ldapStates.get(sid).state;
- Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
-
- // retrieves the change numbers as an arrayList of ANSN1OctetString
- cnOctetList = statei.toASN1ArrayList();
-
+ /* first put the Replication Server state */
writer.writeStartSequence();
- // a fake changenumber helps storing the LDAP server ID
- ChangeNumber cn = new ChangeNumber(outime,1,sid);
- writer.writeOctetString(cn.toString());
-
- // the changenumbers that make the state
- for (ByteString soci : cnOctetList)
{
- writer.writeOctetString(soci);
+ data.replServerDbState.writeTo(writer, protocolVersion);
}
-
writer.writeEndSequence();
+
+ // then the LDAP server data
+ writeServerStates(protocolVersion, writer, false /* DS */);
+
+ // then the RS server datas
+ writeServerStates(protocolVersion, writer, true /* RS */);
}
-
- // then the RS server datas
- servers = data.rsStates.keySet();
- for (Integer sid : servers)
- {
- ServerState statei = data.rsStates.get(sid).state;
- Long outime = data.rsStates.get(sid).approxFirstMissingDate;
-
- // retrieves the change numbers as an arrayList of ANSN1OctetString
- cnOctetList = statei.toASN1ArrayList();
-
- writer.writeStartSequence();
- // a fake changenumber helps storing the LDAP server ID
- ChangeNumber cn = new ChangeNumber(outime,0,sid);
- writer.writeOctetString(cn.toString());
-
- // the changenumbers that make the state
- for (ByteString soci : cnOctetList)
- {
- writer.writeOctetString(soci);
- }
-
- writer.writeEndSequence();
- }
-
writer.writeEndSequence();
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -456,6 +417,39 @@
}
}
+ private void writeServerStates(short protocolVersion, ASN1Writer writer,
+ boolean writeRSStates) throws IOException
+ {
+ Map<Integer, ServerData> servers = writeRSStates ? data.rsStates
+ : data.ldapStates;
+ for (Map.Entry<Integer, ServerData> server : servers.entrySet())
+ {
+ writer.writeStartSequence();
+ {
+ /*
+ * A fake change number helps storing the LDAP server ID. The sequence
+ * number will be used to differentiate between an LDAP server (1) or an
+ * RS (0).
+ */
+ ChangeNumber cn = new ChangeNumber(
+ server.getValue().approxFirstMissingDate, writeRSStates ? 0 : 1,
+ server.getKey());
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ {
+ writer.writeOctetString(cn.toByteString());
+ }
+ else
+ {
+ writer.writeOctetString(cn.toString());
+ }
+
+ // the changenumbers that make the state
+ server.getValue().state.writeTo(writer, protocolVersion);
+ }
+ writer.writeEndSequence();
+ }
+ }
+
/**
* Get the state of the replication server that sent this message.
* @return The state.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 377fea2..b8b834b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -81,9 +81,15 @@
public static final short REPLICATION_PROTOCOL_V6 = 6;
/**
+ * The constant for the 7th version of the replication protocol.
+ * - compact encoding for length, CSNs, and server IDs.
+ */
+ public static final short REPLICATION_PROTOCOL_V7 = 7;
+
+ /**
* The replication protocol version used by the instance of RS/DS in this VM.
*/
- private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V6;
+ private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7;
/**
* Gets the current version of the replication protocol.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 56a0215..7c7cbb5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -246,7 +246,7 @@
msg = new ECLUpdateMsg(buffer);
break;
case MSG_TYPE_CT_HEARTBEAT:
- msg = new ChangeTimeHeartbeatMsg(buffer);
+ msg = new ChangeTimeHeartbeatMsg(buffer, version);
break;
case MSG_TYPE_REPL_SERVER_START_DS:
msg = new ReplServerStartDSMsg(buffer);
--
Gitblit v1.10.0