From 4fd152ec8ba98ac9a70202dbac2b3a579df1033a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 10 Jun 2013 14:25:54 +0000
Subject: [PATCH] Fix OPENDJ-951: Reduce size and frequency of replication MonitorMsg

---
 opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java |  114 +++++++++++++++++++++++++++------------------------------
 1 files changed, 54 insertions(+), 60 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index c20d33a..c616ff4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,12 +27,12 @@
  */
 package org.opends.server.replication.protocol;
 
+import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.zip.DataFormatException;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Set;
+import java.util.Map;
 
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.protocols.asn1.ASN1Reader;
@@ -277,8 +277,16 @@
         // loop on the list of CN of the state
         while(asn1Reader.hasNextElement())
         {
-          String s = asn1Reader.readOctetStringAsString();
-          ChangeNumber cn = new ChangeNumber(s);
+          ChangeNumber cn;
+          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+          {
+            cn = ChangeNumber.valueOf(asn1Reader.readOctetString());
+          }
+          else
+          {
+            cn = ChangeNumber.valueOf(asn1Reader.readOctetStringAsString());
+          }
+
           if ((data.replServerDbState != null) && (serverId == 0))
           {
             // we are on the first CN that is a fake CN to store the serverId
@@ -323,7 +331,6 @@
    */
   @Override
   public byte[] getBytes()
-  throws UnsupportedEncodingException
   {
     return getBytes(ProtocolVersion.getCurrentVersion());
   }
@@ -333,12 +340,10 @@
    */
   @Override
   public byte[] getBytes(short protocolVersion)
-     throws UnsupportedEncodingException
   {
     try
     {
       ByteStringBuilder byteBuilder = new ByteStringBuilder();
-      ASN1Writer writer = ASN1.getWriter(byteBuilder);
 
       if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
       {
@@ -362,66 +367,22 @@
       }
 
       /* Put the serverStates ... */
+      ASN1Writer writer = ASN1.getWriter(byteBuilder);
       writer.writeStartSequence();
-
-      /* first put the Replication Server state */
-      writer.writeStartSequence();
-      ArrayList<ByteString> cnOctetList =
-        data.replServerDbState.toASN1ArrayList();
-      for (ByteString soci : cnOctetList)
       {
-        writer.writeOctetString(soci);
-      }
-      writer.writeEndSequence();
-
-      // then the LDAP server data
-      Set<Integer> servers = data.ldapStates.keySet();
-      for (Integer sid : servers)
-      {
-        ServerState statei = data.ldapStates.get(sid).state;
-        Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
-
-        // retrieves the change numbers as an arrayList of ANSN1OctetString
-        cnOctetList = statei.toASN1ArrayList();
-
+        /* first put the Replication Server state */
         writer.writeStartSequence();
-        // a fake changenumber helps storing the LDAP server ID
-        ChangeNumber cn = new ChangeNumber(outime,1,sid);
-        writer.writeOctetString(cn.toString());
-
-        // the changenumbers that make the state
-        for (ByteString soci : cnOctetList)
         {
-          writer.writeOctetString(soci);
+          data.replServerDbState.writeTo(writer, protocolVersion);
         }
-
         writer.writeEndSequence();
+
+        // then the LDAP server data
+        writeServerStates(protocolVersion, writer, false /* DS */);
+
+        // then the RS server datas
+        writeServerStates(protocolVersion, writer, true /* RS */);
       }
-
-      // then the RS server datas
-      servers = data.rsStates.keySet();
-      for (Integer sid : servers)
-      {
-        ServerState statei = data.rsStates.get(sid).state;
-        Long outime = data.rsStates.get(sid).approxFirstMissingDate;
-
-        // retrieves the change numbers as an arrayList of ANSN1OctetString
-        cnOctetList = statei.toASN1ArrayList();
-
-        writer.writeStartSequence();
-        // a fake changenumber helps storing the LDAP server ID
-        ChangeNumber cn = new ChangeNumber(outime,0,sid);
-        writer.writeOctetString(cn.toString());
-
-        // the changenumbers that make the state
-        for (ByteString soci : cnOctetList)
-        {
-          writer.writeOctetString(soci);
-        }
-
-        writer.writeEndSequence();
-      }
-
       writer.writeEndSequence();
 
       if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -456,6 +417,39 @@
     }
   }
 
+  private void writeServerStates(short protocolVersion, ASN1Writer writer,
+      boolean writeRSStates) throws IOException
+  {
+    Map<Integer, ServerData> servers = writeRSStates ? data.rsStates
+        : data.ldapStates;
+    for (Map.Entry<Integer, ServerData> server : servers.entrySet())
+    {
+      writer.writeStartSequence();
+      {
+        /*
+         * A fake change number helps storing the LDAP server ID. The sequence
+         * number will be used to differentiate between an LDAP server (1) or an
+         * RS (0).
+         */
+        ChangeNumber cn = new ChangeNumber(
+            server.getValue().approxFirstMissingDate, writeRSStates ? 0 : 1,
+            server.getKey());
+        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+        {
+          writer.writeOctetString(cn.toByteString());
+        }
+        else
+        {
+          writer.writeOctetString(cn.toString());
+        }
+
+        // the changenumbers that make the state
+        server.getValue().state.writeTo(writer, protocolVersion);
+      }
+      writer.writeEndSequence();
+    }
+  }
+
   /**
    * Get the state of the replication server that sent this message.
    * @return The state.

--
Gitblit v1.10.0