From 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 23 May 2014 15:17:15 +0000
Subject: [PATCH] (CR-3599) Convert all protocols message to use ByteArrayBuilder + ByteArrayScanner
---
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 388 ++++++++++++++++++-------------------------------------
1 files changed, 128 insertions(+), 260 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index 61c793f..56a1bbf 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -26,9 +26,6 @@
*/
package org.opends.server.replication.protocol;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -37,6 +34,8 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
+
/**
* This class defines a message that is sent:
* - By a RS to the other RSs in the topology, containing:
@@ -68,173 +67,102 @@
* @throws java.util.zip.DataFormatException If the byte array does not
* contain a valid encoded form of the message.
*/
- public TopologyMsg(byte[] in, short version) throws DataFormatException
+ TopologyMsg(byte[] in, short version) throws DataFormatException
{
- try
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_TOPOLOGY)
{
- /* First byte is the type */
- if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
- {
- throw new DataFormatException(
- "Input is not a valid " + getClass().getCanonicalName());
- }
-
- int pos = 1;
-
- /* Read number of following DS info entries */
- byte nDsInfo = in[pos++];
-
- /* Read the DS info entries */
- Map<Integer, DSInfo> replicaInfos =
- new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
- while (nDsInfo > 0 && pos < in.length)
- {
- /* Read DS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int dsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read DS URL */
- String dsUrl;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
- {
- length = getNextLength(in, pos);
- dsUrl = new String(in, pos, length, "UTF-8");
- pos += length + 1;
- }
- else
- {
- dsUrl = "";
- }
-
- /* Read RS id */
- length = getNextLength(in, pos);
- serverIdString = new String(in, pos, length, "UTF-8");
- int rsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- /* Read DS status */
- ServerStatus status = ServerStatus.valueOf(in[pos++]);
-
- /* Read DS assured flag */
- boolean assuredFlag = in[pos++] == 1;
-
- /* Read DS assured mode */
- AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
-
- /* Read DS safe data level */
- byte safeDataLevel = in[pos++];
-
- /* Read DS group id */
- byte groupId = in[pos++];
-
- /* Read number of referrals URLs */
- List<String> refUrls = new ArrayList<String>();
- pos = readStrings(in, pos, refUrls);
-
- Set<String> attrs = new HashSet<String>();
- Set<String> delattrs = new HashSet<String>();
- short protocolVersion = -1;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- pos = readStrings(in, pos, attrs);
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
- {
- pos = readStrings(in, pos, delattrs);
- }
- else
- {
- // Default to using the same set of attributes for deletes.
- delattrs.addAll(attrs);
- }
-
- /* Read Protocol version */
- protocolVersion = in[pos++];
- }
-
- /* Now create DSInfo and store it */
- replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
- status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
- attrs, delattrs, protocolVersion));
-
- nDsInfo--;
- }
-
- /* Read number of following RS info entries */
- byte nRsInfo = in[pos++];
-
- /* Read the RS info entries */
- List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
- while (nRsInfo > 0 && pos < in.length)
- {
- /* Read RS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int id = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
-
- /* Read RS group id */
- byte groupId = in[pos++];
-
- int weight = 1;
- String serverUrl = null;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- length = getNextLength(in, pos);
- serverUrl = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- /* Read RS weight */
- length = getNextLength(in, pos);
- weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- }
-
- /* Now create RSInfo and store it */
- rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
-
- nRsInfo--;
- }
-
- this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
- this.rsInfos = Collections.unmodifiableList(rsInfos);
+ throw new DataFormatException("Input is not a valid "
+ + getClass().getCanonicalName());
}
- catch (UnsupportedEncodingException e)
+
+ // Read the DS info entries, first read number of them
+ int nDsInfo = scanner.nextByte();
+ final Map<Integer, DSInfo> replicaInfos =
+ new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+ while (nDsInfo > 0 && !scanner.isEmpty())
{
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ final DSInfo dsInfo = nextDSInfo(scanner, version);
+ replicaInfos.put(dsInfo.getDsId(), dsInfo);
+ nDsInfo--;
}
+
+ // Read the RS info entries
+ int nRsInfo = scanner.nextByte();
+ final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+ while (nRsInfo > 0 && !scanner.isEmpty())
+ {
+ rsInfos.add(nextRSInfo(scanner, version));
+ nRsInfo--;
+ }
+
+ this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+ this.rsInfos = Collections.unmodifiableList(rsInfos);
}
- private int readStrings(byte[] in, int pos, Collection<String> outputCol)
- throws DataFormatException, UnsupportedEncodingException
+ private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
+ throws DataFormatException
{
- byte nAttrs = in[pos++];
- byte nRead = 0;
- // Read all elements until expected number read
- while (nRead != nAttrs && pos < in.length)
+ final int dsId = scanner.nextIntUTF8();
+ final String dsUrl =
+ version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
+ final int rsId = scanner.nextIntUTF8();
+ final long generationId = scanner.nextLongUTF8();
+ final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
+ final boolean assuredFlag = scanner.nextBoolean();
+ final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
+ final byte safeDataLevel = scanner.nextByte();
+ final byte groupId = scanner.nextByte();
+
+ final List<String> refUrls = new ArrayList<String>();
+ scanner.nextStrings(refUrls);
+
+ final Set<String> attrs = new HashSet<String>();
+ final Set<String> delattrs = new HashSet<String>();
+ short protocolVersion = -1;
+ if (version >= REPLICATION_PROTOCOL_V4)
{
- int length = getNextLength(in, pos);
- outputCol.add(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- nRead++;
+ scanner.nextStrings(attrs);
+
+ if (version >= REPLICATION_PROTOCOL_V5)
+ {
+ scanner.nextStrings(delattrs);
+ }
+ else
+ {
+ // Default to using the same set of attributes for deletes.
+ delattrs.addAll(attrs);
+ }
+
+ protocolVersion = scanner.nextByte();
}
- return pos;
+
+ return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
+ assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
+ protocolVersion);
+ }
+
+ private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
+ throws DataFormatException
+ {
+ final int rsId = scanner.nextIntUTF8();
+ final long generationId = scanner.nextLongUTF8();
+ final byte groupId = scanner.nextByte();
+
+ int weight = 1;
+ String serverUrl = null;
+ if (version >= REPLICATION_PROTOCOL_V4)
+ {
+ serverUrl = scanner.nextString();
+ weight = scanner.nextIntUTF8();
+ }
+
+ return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
}
/**
- * Creates a new message of the currently connected servers.
+ * Creates a new message of the currently connected servers.
*
* @param dsInfos The collection of currently connected DS servers ID.
* @param rsInfos The list of currently connected RS servers ID.
@@ -272,122 +200,62 @@
/** {@inheritDoc} */
@Override
- public byte[] getBytes(short version) throws UnsupportedEncodingException
+ public byte[] getBytes(short version)
{
- try
+ /**
+ * Message has the following form:
+ * <pdu type><number of following DSInfo entries>[<DSInfo>]*
+ * <number of following RSInfo entries>[<RSInfo>]*
+ */
+ final ByteArrayBuilder builder = new ByteArrayBuilder();
+ builder.append(MSG_TYPE_TOPOLOGY);
+
+ // Put DS infos
+ builder.append((byte) replicaInfos.size());
+ for (DSInfo dsInfo : replicaInfos.values())
{
- /**
- * Message has the following form:
- * <pdu type><number of following DSInfo entries>[<DSInfo>]*
- * <number of following RSInfo entries>[<RSInfo>]*
- */
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
-
- /* Put the message type */
- oStream.write(MSG_TYPE_TOPOLOGY);
-
- // Put number of following DS info entries
- oStream.write((byte) replicaInfos.size());
-
- // Put DS info
- for (DSInfo dsInfo : replicaInfos.values())
+ builder.appendUTF8(dsInfo.getDsId());
+ if (version >= REPLICATION_PROTOCOL_V6)
{
- // Put DS id
- byte[] byteServerId =
- String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
- {
- // Put DS URL
- oStream.write(dsInfo.getDsUrl().getBytes("UTF-8"));
- oStream.write(0);
- }
- // Put RS id
- byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put the generation id
- oStream.write(String.valueOf(dsInfo.getGenerationId()).
- getBytes("UTF-8"));
- oStream.write(0);
- // Put DS status
- oStream.write(dsInfo.getStatus().getValue());
- // Put DS assured flag
- oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
- // Put DS assured mode
- oStream.write(dsInfo.getAssuredMode().getValue());
- // Put DS safe data level
- oStream.write(dsInfo.getSafeDataLevel());
- // Put DS group id
- oStream.write(dsInfo.getGroupId());
-
- writeStrings(oStream, dsInfo.getRefUrls());
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- // Put ECL includes
- writeStrings(oStream, dsInfo.getEclIncludes());
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
- {
- writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
- }
-
- oStream.write(dsInfo.getProtocolVersion());
- }
+ builder.append(dsInfo.getDsUrl());
}
+ builder.appendUTF8(dsInfo.getRsId());
+ builder.appendUTF8(dsInfo.getGenerationId());
+ builder.append(dsInfo.getStatus().getValue());
+ builder.append(dsInfo.isAssured());
+ builder.append(dsInfo.getAssuredMode().getValue());
+ builder.append(dsInfo.getSafeDataLevel());
+ builder.append(dsInfo.getGroupId());
- // Put number of following RS info entries
- oStream.write((byte) rsInfos.size());
+ builder.appendStrings(dsInfo.getRefUrls());
- // Put RS info
- for (RSInfo rsInfo : rsInfos)
+ if (version >= REPLICATION_PROTOCOL_V4)
{
- // Put RS id
- byte[] byteServerId =
- String.valueOf(rsInfo.getId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put the generation id
- oStream.write(String.valueOf(rsInfo.getGenerationId()).
- getBytes("UTF-8"));
- oStream.write(0);
- // Put RS group id
- oStream.write(rsInfo.getGroupId());
-
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ builder.appendStrings(dsInfo.getEclIncludes());
+ if (version >= REPLICATION_PROTOCOL_V5)
{
- // Put server URL
- oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
- oStream.write(0);
-
- // Put RS weight
- oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
- oStream.write(0);
+ builder.appendStrings(dsInfo.getEclIncludesForDeletes());
}
+ builder.append((byte) dsInfo.getProtocolVersion());
}
+ }
- return oStream.toByteArray();
- }
- catch (IOException e)
+ // Put RS infos
+ builder.append((byte) rsInfos.size());
+ for (RSInfo rsInfo : rsInfos)
{
- // never happens
- throw new RuntimeException(e);
- }
- }
+ builder.appendUTF8(rsInfo.getId());
+ builder.appendUTF8(rsInfo.getGenerationId());
+ builder.append(rsInfo.getGroupId());
- private void writeStrings(ByteArrayOutputStream oStream,
- Collection<String> col) throws IOException, UnsupportedEncodingException
- {
- // Put collection length as a byte
- oStream.write(col.size());
- for (String elem : col)
- {
- // Write the element and a 0 terminating byte
- oStream.write(elem.getBytes("UTF-8"));
- oStream.write(0);
+ if (version >= REPLICATION_PROTOCOL_V4)
+ {
+ builder.append(rsInfo.getServerUrl());
+ builder.appendUTF8(rsInfo.getWeight());
+ }
}
+
+ return builder.toByteArray();
}
/** {@inheritDoc} */
@@ -414,7 +282,7 @@
+ "CONNECTED RS SERVERS:"
+ "\n--------------------\n"
+ rsStr
- + (rsStr.equals("") ? "----------------------------\n" : "");
+ + ("".equals(rsStr) ? "----------------------------\n" : "");
}
/**
--
Gitblit v1.10.0