From 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 23 May 2014 15:17:15 +0000
Subject: [PATCH] (CR-3599) Convert all protocols message to use ByteArrayBuilder + ByteArrayScanner

---
 opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java |  388 ++++++++++++++++++-------------------------------------
 1 files changed, 128 insertions(+), 260 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index 61c793f..56a1bbf 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -26,9 +26,6 @@
  */
 package org.opends.server.replication.protocol;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
 import java.util.*;
 import java.util.zip.DataFormatException;
 
@@ -37,6 +34,8 @@
 import org.opends.server.replication.common.RSInfo;
 import org.opends.server.replication.common.ServerStatus;
 
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
+
 /**
  * This class defines a message that is sent:
  * - By a RS to the other RSs in the topology, containing:
@@ -68,173 +67,102 @@
    * @throws java.util.zip.DataFormatException If the byte array does not
    * contain a valid encoded form of the message.
    */
-  public TopologyMsg(byte[] in, short version) throws DataFormatException
+  TopologyMsg(byte[] in, short version) throws DataFormatException
   {
-    try
+    final ByteArrayScanner scanner = new ByteArrayScanner(in);
+    final byte msgType = scanner.nextByte();
+    if (msgType != MSG_TYPE_TOPOLOGY)
     {
-      /* First byte is the type */
-      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
-      {
-        throw new DataFormatException(
-          "Input is not a valid " + getClass().getCanonicalName());
-      }
-
-      int pos = 1;
-
-      /* Read number of following DS info entries */
-      byte nDsInfo = in[pos++];
-
-      /* Read the DS info entries */
-      Map<Integer, DSInfo> replicaInfos =
-          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
-      while (nDsInfo > 0 && pos < in.length)
-      {
-        /* Read DS id */
-        int length = getNextLength(in, pos);
-        String serverIdString = new String(in, pos, length, "UTF-8");
-        int dsId = Integer.valueOf(serverIdString);
-        pos += length + 1;
-
-        /* Read DS URL */
-        String dsUrl;
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
-        {
-          length = getNextLength(in, pos);
-          dsUrl = new String(in, pos, length, "UTF-8");
-          pos += length + 1;
-        }
-        else
-        {
-          dsUrl = "";
-        }
-
-        /* Read RS id */
-        length = getNextLength(in, pos);
-        serverIdString = new String(in, pos, length, "UTF-8");
-        int rsId = Integer.valueOf(serverIdString);
-        pos += length + 1;
-
-        /* Read the generation id */
-        length = getNextLength(in, pos);
-        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
-        pos += length + 1;
-
-        /* Read DS status */
-        ServerStatus status = ServerStatus.valueOf(in[pos++]);
-
-        /* Read DS assured flag */
-        boolean assuredFlag = in[pos++] == 1;
-
-        /* Read DS assured mode */
-        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
-
-        /* Read DS safe data level */
-        byte safeDataLevel = in[pos++];
-
-        /* Read DS group id */
-        byte groupId = in[pos++];
-
-        /* Read number of referrals URLs */
-        List<String> refUrls = new ArrayList<String>();
-        pos = readStrings(in, pos, refUrls);
-
-        Set<String> attrs = new HashSet<String>();
-        Set<String> delattrs = new HashSet<String>();
-        short protocolVersion = -1;
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          pos = readStrings(in, pos, attrs);
-
-          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
-          {
-            pos = readStrings(in, pos, delattrs);
-          }
-          else
-          {
-            // Default to using the same set of attributes for deletes.
-            delattrs.addAll(attrs);
-          }
-
-          /* Read Protocol version */
-          protocolVersion = in[pos++];
-        }
-
-        /* Now create DSInfo and store it */
-        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
-            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
-            attrs, delattrs, protocolVersion));
-
-        nDsInfo--;
-      }
-
-      /* Read number of following RS info entries */
-      byte nRsInfo = in[pos++];
-
-      /* Read the RS info entries */
-      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
-      while (nRsInfo > 0 && pos < in.length)
-      {
-        /* Read RS id */
-        int length = getNextLength(in, pos);
-        String serverIdString = new String(in, pos, length, "UTF-8");
-        int id = Integer.valueOf(serverIdString);
-        pos += length + 1;
-
-        /* Read the generation id */
-        length = getNextLength(in, pos);
-        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
-        pos += length + 1;
-
-        /* Read RS group id */
-        byte groupId = in[pos++];
-
-        int weight = 1;
-        String serverUrl = null;
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          length = getNextLength(in, pos);
-          serverUrl = new String(in, pos, length, "UTF-8");
-          pos += length + 1;
-
-          /* Read RS weight */
-          length = getNextLength(in, pos);
-          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
-          pos += length + 1;
-        }
-
-        /* Now create RSInfo and store it */
-        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
-
-        nRsInfo--;
-      }
-
-      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
-      this.rsInfos = Collections.unmodifiableList(rsInfos);
+      throw new DataFormatException("Input is not a valid "
+          + getClass().getCanonicalName());
     }
-    catch (UnsupportedEncodingException e)
+
+    // Read the DS info entries, first read number of them
+    int nDsInfo = scanner.nextByte();
+    final Map<Integer, DSInfo> replicaInfos =
+        new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+    while (nDsInfo > 0 && !scanner.isEmpty())
     {
-      throw new DataFormatException("UTF-8 is not supported by this jvm.");
+      final DSInfo dsInfo = nextDSInfo(scanner, version);
+      replicaInfos.put(dsInfo.getDsId(), dsInfo);
+      nDsInfo--;
     }
+
+    // Read the RS info entries
+    int nRsInfo = scanner.nextByte();
+    final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+    while (nRsInfo > 0 && !scanner.isEmpty())
+    {
+      rsInfos.add(nextRSInfo(scanner, version));
+      nRsInfo--;
+    }
+
+    this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+    this.rsInfos = Collections.unmodifiableList(rsInfos);
   }
 
-  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
-      throws DataFormatException, UnsupportedEncodingException
+  private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
+      throws DataFormatException
   {
-    byte nAttrs = in[pos++];
-    byte nRead = 0;
-    // Read all elements until expected number read
-    while (nRead != nAttrs && pos < in.length)
+    final int dsId = scanner.nextIntUTF8();
+    final String dsUrl =
+        version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
+    final int rsId = scanner.nextIntUTF8();
+    final long generationId = scanner.nextLongUTF8();
+    final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
+    final boolean assuredFlag = scanner.nextBoolean();
+    final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
+    final byte safeDataLevel = scanner.nextByte();
+    final byte groupId = scanner.nextByte();
+
+    final List<String> refUrls = new ArrayList<String>();
+    scanner.nextStrings(refUrls);
+
+    final Set<String> attrs = new HashSet<String>();
+    final Set<String> delattrs = new HashSet<String>();
+    short protocolVersion = -1;
+    if (version >= REPLICATION_PROTOCOL_V4)
     {
-      int length = getNextLength(in, pos);
-      outputCol.add(new String(in, pos, length, "UTF-8"));
-      pos += length + 1;
-      nRead++;
+      scanner.nextStrings(attrs);
+
+      if (version >= REPLICATION_PROTOCOL_V5)
+      {
+        scanner.nextStrings(delattrs);
+      }
+      else
+      {
+        // Default to using the same set of attributes for deletes.
+        delattrs.addAll(attrs);
+      }
+
+      protocolVersion = scanner.nextByte();
     }
-    return pos;
+
+    return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
+        assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
+        protocolVersion);
+  }
+
+  private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
+      throws DataFormatException
+  {
+    final int rsId = scanner.nextIntUTF8();
+    final long generationId = scanner.nextLongUTF8();
+    final byte groupId = scanner.nextByte();
+
+    int weight = 1;
+    String serverUrl = null;
+    if (version >= REPLICATION_PROTOCOL_V4)
+    {
+      serverUrl = scanner.nextString();
+      weight = scanner.nextIntUTF8();
+    }
+
+    return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
   }
 
   /**
-   * Creates a new  message of the currently connected servers.
+   * Creates a new message of the currently connected servers.
    *
    * @param dsInfos The collection of currently connected DS servers ID.
    * @param rsInfos The list of currently connected RS servers ID.
@@ -272,122 +200,62 @@
 
   /** {@inheritDoc} */
   @Override
-  public byte[] getBytes(short version) throws UnsupportedEncodingException
+  public byte[] getBytes(short version)
   {
-    try
+    /**
+     * Message has the following form:
+     * <pdu type><number of following DSInfo entries>[<DSInfo>]*
+     * <number of following RSInfo entries>[<RSInfo>]*
+     */
+    final ByteArrayBuilder builder = new ByteArrayBuilder();
+    builder.append(MSG_TYPE_TOPOLOGY);
+
+    // Put DS infos
+    builder.append((byte) replicaInfos.size());
+    for (DSInfo dsInfo : replicaInfos.values())
     {
-      /**
-       * Message has the following form:
-       * <pdu type><number of following DSInfo entries>[<DSInfo>]*
-       * <number of following RSInfo entries>[<RSInfo>]*
-       */
-      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
-
-      /* Put the message type */
-      oStream.write(MSG_TYPE_TOPOLOGY);
-
-      // Put number of following DS info entries
-      oStream.write((byte) replicaInfos.size());
-
-      // Put DS info
-      for (DSInfo dsInfo : replicaInfos.values())
+      builder.appendUTF8(dsInfo.getDsId());
+      if (version >= REPLICATION_PROTOCOL_V6)
       {
-        // Put DS id
-        byte[] byteServerId =
-          String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
-        oStream.write(byteServerId);
-        oStream.write(0);
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
-        {
-          // Put DS URL
-          oStream.write(dsInfo.getDsUrl().getBytes("UTF-8"));
-          oStream.write(0);
-        }
-        // Put RS id
-        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
-        oStream.write(byteServerId);
-        oStream.write(0);
-        // Put the generation id
-        oStream.write(String.valueOf(dsInfo.getGenerationId()).
-            getBytes("UTF-8"));
-        oStream.write(0);
-        // Put DS status
-        oStream.write(dsInfo.getStatus().getValue());
-        // Put DS assured flag
-        oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
-        // Put DS assured mode
-        oStream.write(dsInfo.getAssuredMode().getValue());
-        // Put DS safe data level
-        oStream.write(dsInfo.getSafeDataLevel());
-        // Put DS group id
-        oStream.write(dsInfo.getGroupId());
-
-        writeStrings(oStream, dsInfo.getRefUrls());
-
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
-        {
-          // Put ECL includes
-          writeStrings(oStream, dsInfo.getEclIncludes());
-
-          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
-          {
-            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
-          }
-
-          oStream.write(dsInfo.getProtocolVersion());
-        }
+        builder.append(dsInfo.getDsUrl());
       }
+      builder.appendUTF8(dsInfo.getRsId());
+      builder.appendUTF8(dsInfo.getGenerationId());
+      builder.append(dsInfo.getStatus().getValue());
+      builder.append(dsInfo.isAssured());
+      builder.append(dsInfo.getAssuredMode().getValue());
+      builder.append(dsInfo.getSafeDataLevel());
+      builder.append(dsInfo.getGroupId());
 
-      // Put number of following RS info entries
-      oStream.write((byte) rsInfos.size());
+      builder.appendStrings(dsInfo.getRefUrls());
 
-      // Put RS info
-      for (RSInfo rsInfo : rsInfos)
+      if (version >= REPLICATION_PROTOCOL_V4)
       {
-        // Put RS id
-        byte[] byteServerId =
-          String.valueOf(rsInfo.getId()).getBytes("UTF-8");
-        oStream.write(byteServerId);
-        oStream.write(0);
-        // Put the generation id
-        oStream.write(String.valueOf(rsInfo.getGenerationId()).
-          getBytes("UTF-8"));
-        oStream.write(0);
-        // Put RS group id
-        oStream.write(rsInfo.getGroupId());
-
-        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+        builder.appendStrings(dsInfo.getEclIncludes());
+        if (version >= REPLICATION_PROTOCOL_V5)
         {
-          // Put server URL
-          oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
-          oStream.write(0);
-
-          // Put RS weight
-          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
-          oStream.write(0);
+          builder.appendStrings(dsInfo.getEclIncludesForDeletes());
         }
+        builder.append((byte) dsInfo.getProtocolVersion());
       }
+    }
 
-      return oStream.toByteArray();
-    }
-    catch (IOException e)
+    // Put RS infos
+    builder.append((byte) rsInfos.size());
+    for (RSInfo rsInfo : rsInfos)
     {
-      // never happens
-      throw new RuntimeException(e);
-    }
-  }
+      builder.appendUTF8(rsInfo.getId());
+      builder.appendUTF8(rsInfo.getGenerationId());
+      builder.append(rsInfo.getGroupId());
 
-  private void writeStrings(ByteArrayOutputStream oStream,
-      Collection<String> col) throws IOException, UnsupportedEncodingException
-  {
-    // Put collection length as a byte
-    oStream.write(col.size());
-    for (String elem : col)
-    {
-      // Write the element and a 0 terminating byte
-      oStream.write(elem.getBytes("UTF-8"));
-      oStream.write(0);
+      if (version >= REPLICATION_PROTOCOL_V4)
+      {
+        builder.append(rsInfo.getServerUrl());
+        builder.appendUTF8(rsInfo.getWeight());
+      }
     }
+
+    return builder.toByteArray();
   }
 
   /** {@inheritDoc} */
@@ -414,7 +282,7 @@
       + "CONNECTED RS SERVERS:"
       + "\n--------------------\n"
       + rsStr
-      + (rsStr.equals("") ? "----------------------------\n" : "");
+      + ("".equals(rsStr) ? "----------------------------\n" : "");
   }
 
   /**

--
Gitblit v1.10.0