From 4fd152ec8ba98ac9a70202dbac2b3a579df1033a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 10 Jun 2013 14:25:54 +0000
Subject: [PATCH] Fix OPENDJ-951: Reduce size and frequency of replication MonitorMsg

---
 opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java        |    8 
 opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java |  148 ++++++++++--------
 opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java             |  114 ++++++-------
 opends/src/server/org/opends/server/replication/common/ChangeNumber.java             |  109 +++++++++++--
 opends/src/server/org/opends/server/replication/protocol/AckMsg.java                 |    2 
 opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java         |    2 
 opends/src/server/org/opends/server/replication/common/ServerState.java              |   38 ++++
 7 files changed, 274 insertions(+), 147 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
index 27347b7..56dd1fb 100644
--- a/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
+++ b/opends/src/server/org/opends/server/replication/common/ChangeNumber.java
@@ -29,20 +29,71 @@
 
 import java.util.Date;
 
+import org.opends.server.types.ByteSequence;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
 /**
  * Class used to represent Change Numbers.
  */
 public class ChangeNumber implements java.io.Serializable,
                                      java.lang.Comparable<ChangeNumber>
 {
+  /**
+   * The number of bytes used by the byte string representation of a change
+   * number.
+   *
+   * @see #valueOf(ByteSequence)
+   * @see #toByteString()
+   * @see #toByteString(ByteStringBuilder)
+   */
+  public static final int BYTE_ENCODING_LENGTH = 14;
+
+  /**
+   * The number of characters used by the string representation of a change
+   * number.
+   *
+   * @see #valueOf(String)
+   * @see #toString()
+   */
+  public static final int STRING_ENCODING_LENGTH = 28;
+
   private static final long serialVersionUID = -8802722277749190740L;
   private final long timeStamp;
   private final int seqnum;
   private final int serverId;
 
-  // A String representation of the ChangeNumber suitable for network
-  // transmission.
-  private String formatedString = null;
+  /**
+   * Parses the provided {@link #toString()} representation of a change number.
+   *
+   * @param s
+   *          The string to be parsed.
+   * @return The parsed change number.
+   * @see #toString()
+   */
+  public static ChangeNumber valueOf(String s)
+  {
+    return new ChangeNumber(s);
+  }
+
+  /**
+   * Decodes the provided {@link #toByteString()} representation of a change
+   * number.
+   *
+   * @param bs
+   *          The byte sequence to be parsed.
+   * @return The decoded change number.
+   * @see #toByteString()
+   */
+  public static ChangeNumber valueOf(ByteSequence bs)
+  {
+    ByteSequenceReader reader = bs.asReader();
+    long timeStamp = reader.getLong();
+    int serverId = reader.getShort() & 0xffff;
+    int seqnum = reader.getInt();
+    return new ChangeNumber(timeStamp, seqnum, serverId);
+  }
 
   /**
    * Create a new ChangeNumber from a String.
@@ -59,8 +110,6 @@
 
     temp = str.substring(20, 28);
     seqnum = Integer.parseInt(temp, 16);
-
-    formatedString = str;
   }
 
   /**
@@ -140,25 +189,47 @@
   }
 
   /**
+   * Encodes this change number as a byte string.
+   * <p>
+   * NOTE: this representation must not be modified otherwise interop with
+   * earlier protocol versions will be broken.
+   *
+   * @return The encoded representation of this change number.
+   * @see #valueOf(ByteSequence)
+   */
+  public ByteString toByteString()
+  {
+    return toByteString(new ByteStringBuilder(BYTE_ENCODING_LENGTH))
+        .toByteString();
+  }
+
+  /**
+   * Encodes this change number into the provided byte string builder.
+   * <p>
+   * NOTE: this representation must not be modified otherwise interop with
+   * earlier protocol versions will be broken.
+   *
+   * @param builder
+   *          The byte string builder.
+   * @return The byte string builder containing the encoded change number.
+   * @see #valueOf(ByteSequence)
+   */
+  public ByteStringBuilder toByteString(ByteStringBuilder builder)
+  {
+    return builder.append(timeStamp).append((short) (serverId & 0xffff))
+        .append(seqnum);
+  }
+
+  /**
    * Convert the ChangeNumber to a printable String.
+   * <p>
+   * NOTE: this representation must not be modified otherwise interop with
+   * earlier protocol versions will be broken.
+   *
    * @return the string
    */
   public String toString()
   {
-    return format();
-  }
-
-  /**
-   * Convert the ChangeNumber to a String that is suitable for network
-   * transmission.
-   *
-   * @return the string
-   */
-  public String format()
-  {
-    if (formatedString != null)
-      return formatedString;
-
     return String.format("%016x%04x%08x", timeStamp, serverId, seqnum);
   }
 
diff --git a/opends/src/server/org/opends/server/replication/common/ServerState.java b/opends/src/server/org/opends/server/replication/common/ServerState.java
index 92bf8d1..e9963fa 100644
--- a/opends/src/server/org/opends/server/replication/common/ServerState.java
+++ b/opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -27,6 +27,7 @@
  */
 package org.opends.server.replication.common;
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.Date;
@@ -37,6 +38,8 @@
 import java.util.Set;
 import java.util.zip.DataFormatException;
 
+import org.opends.server.protocols.asn1.ASN1Writer;
+import org.opends.server.replication.protocol.ProtocolVersion;
 import org.opends.server.types.ByteString;
 
 
@@ -278,6 +281,41 @@
     }
     return values;
   }
+
+
+
+  /**
+   * Encodes this server state to the provided ASN1 writer.
+   *
+   * @param writer
+   *          The ASN1 writer.
+   * @param protocolVersion
+   *          The replication protocol version.
+   * @throws IOException
+   *           If an error occurred during encoding.
+   */
+  public void writeTo(ASN1Writer writer, short protocolVersion)
+      throws IOException
+  {
+    synchronized (list)
+    {
+      if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+      {
+        for (ChangeNumber cn : list.values())
+        {
+          writer.writeOctetString(cn.toByteString());
+        }
+      }
+      else
+      {
+        for (ChangeNumber cn : list.values())
+        {
+          writer.writeOctetString(cn.toString());
+        }
+      }
+    }
+  }
+
   /**
    * Return the text representation of ServerState.
    * @return the text representation of ServerState
diff --git a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
index 544e680..3862a38 100644
--- a/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -230,7 +230,7 @@
       oStream.write(MSG_TYPE_ACK);
 
       /* Put the ChangeNumber */
-      byte[] changeNumberByte = changeNumber.format().getBytes("UTF-8");
+      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
       oStream.write(changeNumberByte);
       oStream.write(0);
 
diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 09b985a..091b23c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -23,20 +23,24 @@
  *
  *
  *      Copyright 2006-2009 Sun Microsystems, Inc.
+ *      Portions copyright 2013 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
+
+
 import java.util.zip.DataFormatException;
 
 import org.opends.server.replication.common.ChangeNumber;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
+
+
 
 /**
- * Class that define messages sent by a replication domain (DS)
- * to the replication server to let the RS know the DS current
- * change time.
+ * Class that define messages sent by a replication domain (DS) to the
+ * replication server to let the RS know the DS current change time.
  */
 public class ChangeTimeHeartbeatMsg extends ReplicationMsg
 {
@@ -45,26 +49,25 @@
    */
   private final ChangeNumber changeNumber;
 
-  /**
-   * Constructor of a Change Time Heartbeat message.
-   */
-  public ChangeTimeHeartbeatMsg()
-  {
-    this.changeNumber = new ChangeNumber((long)0,0,0);
-  }
+
 
   /**
-   * Constructor of a Change Time Heartbeat message providing
-   * the change time value in a change number.
-   * @param cn The provided change number.
+   * Constructor of a Change Time Heartbeat message providing the change time
+   * value in a change number.
+   *
+   * @param cn
+   *          The provided change number.
    */
   public ChangeTimeHeartbeatMsg(ChangeNumber cn)
   {
     this.changeNumber = cn;
   }
 
+
+
   /**
    * Get a change number with the transmitted change time.
+   *
    * @return the ChangeNumber
    */
   public ChangeNumber getChangeNumber()
@@ -72,77 +75,92 @@
     return changeNumber;
   }
 
-  /**
-   * Encode a change time message.
-   * @return The encoded message.
-   * @throws UnsupportedEncodingException When an error occurs.
-   */
-  public byte[] encode() throws UnsupportedEncodingException
-  {
-    byte[] changeNumberByte =
-      this.getChangeNumber().toString().getBytes("UTF-8");
-    int length = changeNumberByte.length;
-    byte[] encodedMsg = new byte[length];
 
-    /* Put the ChangeNumber */
-    addByteArray(changeNumberByte, encodedMsg, 0);
-
-    return encodedMsg;
-  }
 
   /**
    * Creates a message from a provided byte array.
-   * @param in The provided byte array.
-   * @throws DataFormatException When an error occurs.
+   *
+   * @param in
+   *          The provided byte array.
+   * @param version
+   *          The version of the protocol to use to decode the msg.
+   * @throws DataFormatException
+   *           When an error occurs.
    */
-  public ChangeTimeHeartbeatMsg(byte[] in) throws DataFormatException
+  public ChangeTimeHeartbeatMsg(byte[] in, short version)
+      throws DataFormatException
   {
+    final ByteSequenceReader reader = ByteString.wrap(in).asReader();
     try
     {
-      /* Read the changeNumber */
-      /* First byte is the type */
-      if (in[0] != MSG_TYPE_CT_HEARTBEAT)
+      if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
       {
-        throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
+        // Throw better exception below.
+        throw new IllegalArgumentException();
       }
-      int pos = 1;
-      int length = getNextLength(in, pos);
-      String changenumberStr = new String(in, pos, length, "UTF-8");
-      changeNumber = new ChangeNumber(changenumberStr);
+
+      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+      {
+        changeNumber = ChangeNumber.valueOf(reader
+            .getByteSequence(ChangeNumber.BYTE_ENCODING_LENGTH));
+      }
+      else
+      {
+        changeNumber = ChangeNumber.valueOf(reader
+            .getString(ChangeNumber.STRING_ENCODING_LENGTH));
+        reader.get(); // Read trailing 0 byte.
+      }
+
+      if (reader.remaining() > 0)
+      {
+        // Throw better exception below.
+        throw new IllegalArgumentException();
+      }
     }
-    catch (UnsupportedEncodingException e)
+    catch (Exception e)
     {
-      throw new DataFormatException("UTF-8 is not supported by this jvm.");
-    }
-    catch (IllegalArgumentException e)
-    {
-      throw new DataFormatException(e.getMessage());
+      // Index out of bounds, bad format, etc.
+      throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
     }
   }
 
+
+
   /**
-   * Get a byte array from the message.
-   * @return The byte array containing the PDU of the message.
-   * @throws UnsupportedEncodingException When an error occurs.
+   * {@inheritDoc}
    */
-  public byte[] getBytes() throws UnsupportedEncodingException
+  @Override
+  public byte[] getBytes()
   {
-    try {
-      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+    return getBytes(ProtocolVersion.getCurrentVersion());
+  }
 
-      /* Put the type of the operation */
-      oStream.write(MSG_TYPE_CT_HEARTBEAT);
 
-      /* Put the ChangeNumber */
-      byte[] changeNumberByte = changeNumber.toString().getBytes("UTF-8");
-      oStream.write(changeNumberByte);
-      oStream.write(0);
 
-      return oStream.toByteArray();
-    } catch (IOException e)
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+  {
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
     {
-      // never happens
-      return null;
+      final ByteStringBuilder builder = new ByteStringBuilder(
+          ChangeNumber.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
+      builder.append(MSG_TYPE_CT_HEARTBEAT);
+      changeNumber.toByteString(builder);
+      return builder.toByteArray();
+    }
+    else
+    {
+      final ByteStringBuilder builder = new ByteStringBuilder(
+          ChangeNumber.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
+      builder.append(MSG_TYPE_CT_HEARTBEAT);
+      builder.append(changeNumber.toString());
+      builder.append((byte) 0); // For compatibility with earlier protocol
+                                // versions.
+      return builder.toByteArray();
     }
   }
+
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index c20d33a..c616ff4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,12 +27,12 @@
  */
 package org.opends.server.replication.protocol;
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.zip.DataFormatException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Set;
+import java.util.Map;
 
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.protocols.asn1.ASN1Reader;
@@ -277,8 +277,16 @@
         // loop on the list of CN of the state
         while(asn1Reader.hasNextElement())
         {
-          String s = asn1Reader.readOctetStringAsString();
-          ChangeNumber cn = new ChangeNumber(s);
+          ChangeNumber cn;
+          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+          {
+            cn = ChangeNumber.valueOf(asn1Reader.readOctetString());
+          }
+          else
+          {
+            cn = ChangeNumber.valueOf(asn1Reader.readOctetStringAsString());
+          }
+
           if ((data.replServerDbState != null) && (serverId == 0))
           {
             // we are on the first CN that is a fake CN to store the serverId
@@ -323,7 +331,6 @@
    */
   @Override
   public byte[] getBytes()
-  throws UnsupportedEncodingException
   {
     return getBytes(ProtocolVersion.getCurrentVersion());
   }
@@ -333,12 +340,10 @@
    */
   @Override
   public byte[] getBytes(short protocolVersion)
-     throws UnsupportedEncodingException
   {
     try
     {
       ByteStringBuilder byteBuilder = new ByteStringBuilder();
-      ASN1Writer writer = ASN1.getWriter(byteBuilder);
 
       if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
       {
@@ -362,66 +367,22 @@
       }
 
       /* Put the serverStates ... */
+      ASN1Writer writer = ASN1.getWriter(byteBuilder);
       writer.writeStartSequence();
-
-      /* first put the Replication Server state */
-      writer.writeStartSequence();
-      ArrayList<ByteString> cnOctetList =
-        data.replServerDbState.toASN1ArrayList();
-      for (ByteString soci : cnOctetList)
       {
-        writer.writeOctetString(soci);
-      }
-      writer.writeEndSequence();
-
-      // then the LDAP server data
-      Set<Integer> servers = data.ldapStates.keySet();
-      for (Integer sid : servers)
-      {
-        ServerState statei = data.ldapStates.get(sid).state;
-        Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
-
-        // retrieves the change numbers as an arrayList of ANSN1OctetString
-        cnOctetList = statei.toASN1ArrayList();
-
+        /* first put the Replication Server state */
         writer.writeStartSequence();
-        // a fake changenumber helps storing the LDAP server ID
-        ChangeNumber cn = new ChangeNumber(outime,1,sid);
-        writer.writeOctetString(cn.toString());
-
-        // the changenumbers that make the state
-        for (ByteString soci : cnOctetList)
         {
-          writer.writeOctetString(soci);
+          data.replServerDbState.writeTo(writer, protocolVersion);
         }
-
         writer.writeEndSequence();
+
+        // then the LDAP server data
+        writeServerStates(protocolVersion, writer, false /* DS */);
+
+        // then the RS server datas
+        writeServerStates(protocolVersion, writer, true /* RS */);
       }
-
-      // then the RS server datas
-      servers = data.rsStates.keySet();
-      for (Integer sid : servers)
-      {
-        ServerState statei = data.rsStates.get(sid).state;
-        Long outime = data.rsStates.get(sid).approxFirstMissingDate;
-
-        // retrieves the change numbers as an arrayList of ANSN1OctetString
-        cnOctetList = statei.toASN1ArrayList();
-
-        writer.writeStartSequence();
-        // a fake changenumber helps storing the LDAP server ID
-        ChangeNumber cn = new ChangeNumber(outime,0,sid);
-        writer.writeOctetString(cn.toString());
-
-        // the changenumbers that make the state
-        for (ByteString soci : cnOctetList)
-        {
-          writer.writeOctetString(soci);
-        }
-
-        writer.writeEndSequence();
-      }
-
       writer.writeEndSequence();
 
       if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -456,6 +417,39 @@
     }
   }
 
+  private void writeServerStates(short protocolVersion, ASN1Writer writer,
+      boolean writeRSStates) throws IOException
+  {
+    Map<Integer, ServerData> servers = writeRSStates ? data.rsStates
+        : data.ldapStates;
+    for (Map.Entry<Integer, ServerData> server : servers.entrySet())
+    {
+      writer.writeStartSequence();
+      {
+        /*
+         * A fake change number helps storing the LDAP server ID. The sequence
+         * number will be used to differentiate between an LDAP server (1) or an
+         * RS (0).
+         */
+        ChangeNumber cn = new ChangeNumber(
+            server.getValue().approxFirstMissingDate, writeRSStates ? 0 : 1,
+            server.getKey());
+        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+        {
+          writer.writeOctetString(cn.toByteString());
+        }
+        else
+        {
+          writer.writeOctetString(cn.toString());
+        }
+
+        // the changenumbers that make the state
+        server.getValue().state.writeTo(writer, protocolVersion);
+      }
+      writer.writeEndSequence();
+    }
+  }
+
   /**
    * Get the state of the replication server that sent this message.
    * @return The state.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 377fea2..b8b834b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -81,9 +81,15 @@
   public static final short REPLICATION_PROTOCOL_V6 = 6;
 
   /**
+   * The constant for the 7th version of the replication protocol.
+   * - compact encoding for length, CSNs, and server IDs.
+   */
+  public static final short REPLICATION_PROTOCOL_V7 = 7;
+
+  /**
    * The replication protocol version used by the instance of RS/DS in this VM.
    */
-  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V6;
+  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7;
 
   /**
    * Gets the current version of the replication protocol.
diff --git a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index 56a0215..7c7cbb5 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -246,7 +246,7 @@
         msg = new ECLUpdateMsg(buffer);
       break;
       case MSG_TYPE_CT_HEARTBEAT:
-        msg = new ChangeTimeHeartbeatMsg(buffer);
+        msg = new ChangeTimeHeartbeatMsg(buffer, version);
       break;
       case MSG_TYPE_REPL_SERVER_START_DS:
         msg = new ReplServerStartDSMsg(buffer);

--
Gitblit v1.10.0