From 4fd152ec8ba98ac9a70202dbac2b3a579df1033a Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Mon, 10 Jun 2013 14:25:54 +0000
Subject: [PATCH] Fix OPENDJ-951: Reduce size and frequency of replication MonitorMsg
---
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java | 114 +++++++++++++++++++++++++++------------------------------
1 files changed, 54 insertions(+), 60 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
index c20d33a..c616ff4 100644
--- a/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,12 +27,12 @@
*/
package org.opends.server.replication.protocol;
+import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
-import java.util.Set;
+import java.util.Map;
import org.opends.server.replication.common.ServerState;
import org.opends.server.protocols.asn1.ASN1Reader;
@@ -277,8 +277,16 @@
// loop on the list of CN of the state
while(asn1Reader.hasNextElement())
{
- String s = asn1Reader.readOctetStringAsString();
- ChangeNumber cn = new ChangeNumber(s);
+ ChangeNumber cn;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ {
+ cn = ChangeNumber.valueOf(asn1Reader.readOctetString());
+ }
+ else
+ {
+ cn = ChangeNumber.valueOf(asn1Reader.readOctetStringAsString());
+ }
+
if ((data.replServerDbState != null) && (serverId == 0))
{
// we are on the first CN that is a fake CN to store the serverId
@@ -323,7 +331,6 @@
*/
@Override
public byte[] getBytes()
- throws UnsupportedEncodingException
{
return getBytes(ProtocolVersion.getCurrentVersion());
}
@@ -333,12 +340,10 @@
*/
@Override
public byte[] getBytes(short protocolVersion)
- throws UnsupportedEncodingException
{
try
{
ByteStringBuilder byteBuilder = new ByteStringBuilder();
- ASN1Writer writer = ASN1.getWriter(byteBuilder);
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
{
@@ -362,66 +367,22 @@
}
/* Put the serverStates ... */
+ ASN1Writer writer = ASN1.getWriter(byteBuilder);
writer.writeStartSequence();
-
- /* first put the Replication Server state */
- writer.writeStartSequence();
- ArrayList<ByteString> cnOctetList =
- data.replServerDbState.toASN1ArrayList();
- for (ByteString soci : cnOctetList)
{
- writer.writeOctetString(soci);
- }
- writer.writeEndSequence();
-
- // then the LDAP server data
- Set<Integer> servers = data.ldapStates.keySet();
- for (Integer sid : servers)
- {
- ServerState statei = data.ldapStates.get(sid).state;
- Long outime = data.ldapStates.get(sid).approxFirstMissingDate;
-
- // retrieves the change numbers as an arrayList of ANSN1OctetString
- cnOctetList = statei.toASN1ArrayList();
-
+ /* first put the Replication Server state */
writer.writeStartSequence();
- // a fake changenumber helps storing the LDAP server ID
- ChangeNumber cn = new ChangeNumber(outime,1,sid);
- writer.writeOctetString(cn.toString());
-
- // the changenumbers that make the state
- for (ByteString soci : cnOctetList)
{
- writer.writeOctetString(soci);
+ data.replServerDbState.writeTo(writer, protocolVersion);
}
-
writer.writeEndSequence();
+
+ // then the LDAP server data
+ writeServerStates(protocolVersion, writer, false /* DS */);
+
+ // then the RS server datas
+ writeServerStates(protocolVersion, writer, true /* RS */);
}
-
- // then the RS server datas
- servers = data.rsStates.keySet();
- for (Integer sid : servers)
- {
- ServerState statei = data.rsStates.get(sid).state;
- Long outime = data.rsStates.get(sid).approxFirstMissingDate;
-
- // retrieves the change numbers as an arrayList of ANSN1OctetString
- cnOctetList = statei.toASN1ArrayList();
-
- writer.writeStartSequence();
- // a fake changenumber helps storing the LDAP server ID
- ChangeNumber cn = new ChangeNumber(outime,0,sid);
- writer.writeOctetString(cn.toString());
-
- // the changenumbers that make the state
- for (ByteString soci : cnOctetList)
- {
- writer.writeOctetString(soci);
- }
-
- writer.writeEndSequence();
- }
-
writer.writeEndSequence();
if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
@@ -456,6 +417,39 @@
}
}
+ private void writeServerStates(short protocolVersion, ASN1Writer writer,
+ boolean writeRSStates) throws IOException
+ {
+ Map<Integer, ServerData> servers = writeRSStates ? data.rsStates
+ : data.ldapStates;
+ for (Map.Entry<Integer, ServerData> server : servers.entrySet())
+ {
+ writer.writeStartSequence();
+ {
+ /*
+ * A fake change number helps storing the LDAP server ID. The sequence
+ * number will be used to differentiate between an LDAP server (1) or an
+ * RS (0).
+ */
+ ChangeNumber cn = new ChangeNumber(
+ server.getValue().approxFirstMissingDate, writeRSStates ? 0 : 1,
+ server.getKey());
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+ {
+ writer.writeOctetString(cn.toByteString());
+ }
+ else
+ {
+ writer.writeOctetString(cn.toString());
+ }
+
+ // the changenumbers that make the state
+ server.getValue().state.writeTo(writer, protocolVersion);
+ }
+ writer.writeEndSequence();
+ }
+ }
+
/**
* Get the state of the replication server that sent this message.
* @return The state.
--
Gitblit v1.10.0