From 4fd152ec8ba98ac9a70202dbac2b3a579df1033a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 10 Jun 2013 14:25:54 +0000
Subject: [PATCH] Fix OPENDJ-951: Reduce size and frequency of replication MonitorMsg

---
 opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java |  148 +++++++++++++++++++++++++++---------------------
 1 files changed, 83 insertions(+), 65 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 09b985a..091b23c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -23,20 +23,24 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+
+
 import java.util.zip.DataFormatException;
 
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+
 
 /**
- * Class that define messages sent by a replication domain (DS)
- * to the replication server to let the RS know the DS current
- * change time.
+ * Class that define messages sent by a replication domain (DS) to the
+ * replication server to let the RS know the DS current change time.
  */
 public class ChangeTimeHeartbeatMsg extends ReplicationMsg
 {
@@ -45,26 +49,25 @@
    */
   private final ChangeNumber changeNumber;
 
-  /**
-   * Constructor of a Change Time Heartbeat message.
-   */
-  public ChangeTimeHeartbeatMsg()
-  {
-    this.changeNumber = new ChangeNumber((long)0,0,0);
-  }
+
 
   /**
-   * Constructor of a Change Time Heartbeat message providing
-   * the change time value in a change number.
-   * @param cn The provided change number.
+   * Constructor of a Change Time Heartbeat message providing the change time
+   * value in a change number.
+   *
+   * @param cn
+   *          The provided change number.
    */
   public ChangeTimeHeartbeatMsg(ChangeNumber cn)
   {
     this.changeNumber = cn;
   }
 
+
+
   /**
    * Get a change number with the transmitted change time.
+   *
    * @return the ChangeNumber
    */
   public ChangeNumber getChangeNumber()
@@ -72,77 +75,92 @@
     return changeNumber;
   }
 
-  /**
-   * Encode a change time message.
-   * @return The encoded message.
-   * @throws UnsupportedEncodingException When an error occurs.
-   */
-  public byte[] encode() throws UnsupportedEncodingException
-  {
-    byte[] changeNumberByte =
-      this.getChangeNumber().toString().getBytes("UTF-8");
-    int length = changeNumberByte.length;
-    byte[] encodedMsg = new byte[length];
 
-    /* Put the ChangeNumber */
-    addByteArray(changeNumberByte, encodedMsg, 0);
-
-    return encodedMsg;
-  }
 
   /**
    * Creates a message from a provided byte array.
-   * @param in The provided byte array.
-   * @throws DataFormatException When an error occurs.
+   *
+   * @param in
+   *          The provided byte array.
+   * @param version
+   *          The version of the protocol to use to decode the msg.
+   * @throws DataFormatException
+   *           When an error occurs.
    */
-  public ChangeTimeHeartbeatMsg(byte[] in) throws DataFormatException
+  public ChangeTimeHeartbeatMsg(byte[] in, short version)
+      throws DataFormatException
   {
+    final ByteSequenceReader reader = ByteString.wrap(in).asReader();
     try
     {
-      /* Read the changeNumber */
-      /* First byte is the type */
-      if (in[0] != MSG_TYPE_CT_HEARTBEAT)
+      if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
       {
-        throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
+        // Throw better exception below.
+        throw new IllegalArgumentException();
       }
-      int pos = 1;
-      int length = getNextLength(in, pos);
-      String changenumberStr = new String(in, pos, length, "UTF-8");
-      changeNumber = new ChangeNumber(changenumberStr);
+
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+      {
+        changeNumber = ChangeNumber.valueOf(reader
+            .getByteSequence(ChangeNumber.BYTE_ENCODING_LENGTH));
+      }
+      else
+      {
+        changeNumber = ChangeNumber.valueOf(reader
+            .getString(ChangeNumber.STRING_ENCODING_LENGTH));
+        reader.get(); // Read trailing 0 byte.
+      }
+
+      if (reader.remaining() > 0)
+      {
+        // Throw better exception below.
+        throw new IllegalArgumentException();
+      }
     }
-    catch (UnsupportedEncodingException e)
+    catch (Exception e)
     {
-      throw new DataFormatException("UTF-8 is not supported by this jvm.");
-    }
-    catch (IllegalArgumentException e)
-    {
-      throw new DataFormatException(e.getMessage());
+      // Index out of bounds, bad format, etc.
+      throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
     }
   }
 
+
+
   /**
-   * Get a byte array from the message.
-   * @return The byte array containing the PDU of the message.
-   * @throws UnsupportedEncodingException When an error occurs.
+   * {@inheritDoc}
    */
-  public byte[] getBytes() throws UnsupportedEncodingException
+  @Override
+  public byte[] getBytes()
   {
-    try {
-      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
 
-      /* Put the type of the operation */
-      oStream.write(MSG_TYPE_CT_HEARTBEAT);
 
-      /* Put the ChangeNumber */
-      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
-      oStream.write(changeNumberByte);
-      oStream.write(0);
 
-      return oStream.toByteArray();
-    } catch (IOException e)
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+  {
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
     {
-      // never happens
-      return null;
+      final ByteStringBuilder builder = new ByteStringBuilder(
+          ChangeNumber.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
+      builder.append(MSG_TYPE_CT_HEARTBEAT);
+      changeNumber.toByteString(builder);
+      return builder.toByteArray();
+    }
+    else
+    {
+      final ByteStringBuilder builder = new ByteStringBuilder(
+          ChangeNumber.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
+      builder.append(MSG_TYPE_CT_HEARTBEAT);
+      builder.append(changeNumber.toString());
+      builder.append((byte) 0); // For compatibility with earlier protocol
+                                // versions.
+      return builder.toByteArray();
     }
   }
+
 }

--
Gitblit v1.10.0