From 76ebd1ad82e2a1fc421519f09c62b948e9376e8a Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Tue, 06 Oct 2009 12:34:32 +0000
Subject: [PATCH] Entry attributes for ECL - Protocol V4
---
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 284 +++++++++++++++++++++++++++++++++++---------------------
1 files changed, 177 insertions(+), 107 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index b30f0dd..fd34473 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -22,7 +22,7 @@
* CDDL HEADER END
*
*
- * Copyright 2007-2008 Sun Microsystems, Inc.
+ * Copyright 2007-2009 Sun Microsystems, Inc.
*/
package org.opends.server.replication.protocol;
@@ -30,8 +30,11 @@
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import java.util.zip.DataFormatException;
+
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.DSInfo;
import org.opends.server.replication.common.RSInfo;
@@ -62,13 +65,165 @@
private List<RSInfo> rsList = new ArrayList<RSInfo>();
/**
+ * The protocolVersion that should be used when serializing this message.
+ */
+ private final short protocolVersion;
+
+ /**
* Creates a new changelogInfo message from its encoded form.
*
* @param in The byte array containing the encoded form of the message.
+ * @param version The protocol version to use to decode the msg.
* @throws java.util.zip.DataFormatException If the byte array does not
* contain a valid encoded form of the message.
*/
- public TopologyMsg(byte[] in) throws DataFormatException
+ public TopologyMsg(byte[] in, short version) throws DataFormatException
+ {
+ protocolVersion = ProtocolVersion.getCurrentVersion();
+ decode(in, version);
+ }
+
+ /**
+ * Creates a new message from a list of the currently connected servers.
+ *
+ * @param dsList The list of currently connected DS servers ID.
+ * @param rsList The list of currently connected RS servers ID.
+ * @param version The protocol version to use to decode the msg.
+ */
+ public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList, short version)
+ {
+ if (dsList != null) // null means no info, let empty list from init time
+ this.dsList = dsList;
+ if (rsList != null) // null means no info, let empty list from init time
+ this.rsList = rsList;
+ this.protocolVersion = version;
+ }
+
+ /**
+ * Creates a new message from a list of the currently connected servers.
+ *
+ * @param dsList The list of currently connected DS servers ID.
+ * @param rsList The list of currently connected RS servers ID.
+ */
+ public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
+ {
+ if (dsList != null) // null means no info, let empty list from init time
+ this.dsList = dsList;
+ if (rsList != null) // null means no info, let empty list from init time
+ this.rsList = rsList;
+ this.protocolVersion = ProtocolVersion.getCurrentVersion();
+ }
+
+ // ============
+ // Msg encoding
+ // ============
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public byte[] getBytes()
+ throws UnsupportedEncodingException
+ {
+ try
+ {
+ /**
+ * Message has the following form:
+ * <pdu type><number of following DSInfo entries>[<DSInfo>]*
+ * <number of following RSInfo entries>[<RSInfo>]*
+ */
+ ByteArrayOutputStream oStream = new ByteArrayOutputStream();
+
+ /* Put the message type */
+ oStream.write(MSG_TYPE_TOPOLOGY);
+
+ // Put number of following DS info entries
+ oStream.write((byte)dsList.size());
+
+ // Put DS info
+ for (DSInfo dsInfo : dsList)
+ {
+ // Put DS id
+ byte[] byteServerId =
+ String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
+ oStream.write(byteServerId);
+ oStream.write(0);
+ // Put RS id
+ byteServerId =
+ String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
+ oStream.write(byteServerId);
+ oStream.write(0);
+ // Put the generation id
+ oStream.write(String.valueOf(dsInfo.getGenerationId()).
+ getBytes("UTF-8"));
+ oStream.write(0);
+ // Put DS status
+ oStream.write(dsInfo.getStatus().getValue());
+ // Put DS assured flag
+ oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
+ // Put DS assured mode
+ oStream.write(dsInfo.getAssuredMode().getValue());
+ // Put DS safe data level
+ oStream.write(dsInfo.getSafeDataLevel());
+ // Put DS group id
+ oStream.write(dsInfo.getGroupId());
+
+ List<String> refUrls = dsInfo.getRefUrls();
+ // Put number of following URLs as a byte
+ oStream.write(refUrls.size());
+ for (String url : refUrls)
+ {
+ // Write the url and a 0 terminating byte
+ oStream.write(url.getBytes("UTF-8"));
+ oStream.write(0);
+ }
+
+ if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ Set<String> attrs = dsInfo.getEclIncludes();
+ oStream.write(attrs.size());
+ for (String attr : attrs)
+ {
+ oStream.write(attr.getBytes("UTF-8"));
+ oStream.write(0);
+ }
+ }
+ }
+
+ // Put number of following RS info entries
+ oStream.write((byte)rsList.size());
+
+ // Put RS info
+ for (RSInfo rsInfo : rsList)
+ {
+ // Put RS id
+ byte[] byteServerId =
+ String.valueOf(rsInfo.getId()).getBytes("UTF-8");
+ oStream.write(byteServerId);
+ oStream.write(0);
+ // Put the generation id
+ oStream.write(String.valueOf(rsInfo.getGenerationId()).
+ getBytes("UTF-8"));
+ oStream.write(0);
+ // Put DS group id
+ oStream.write(rsInfo.getGroupId());
+ }
+
+ return oStream.toByteArray();
+ } catch (IOException e)
+ {
+ // never happens
+ return null;
+ }
+
+ }
+
+ // ============
+ // Msg decoding
+ // ============
+
+ private void decode(byte[] in, short version)
+ throws DataFormatException
{
try
{
@@ -151,10 +306,29 @@
nRead++;
}
+ Set<String> attrs = new HashSet<String>();
+ if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ byte nAttrs = in[pos++];
+ nRead = 0;
+ /* Read attrs until expected number read */
+ while ((nRead != nAttrs) &&
+ (pos < in.length) //security
+ )
+ {
+ length = getNextLength(in, pos);
+ String attr = new String(in, pos, length, "UTF-8");
+ attrs.add(attr);
+ pos +=
+ length + 1;
+ nRead++;
+ }
+ }
+
/* Now create DSInfo and store it in list */
DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
- assuredFlag, assuredMode, safeDataLevel, groupId, refUrls);
+ assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs);
dsList.add(dsInfo);
nDsInfo--;
@@ -197,110 +371,6 @@
{
throw new DataFormatException("UTF-8 is not supported by this jvm.");
}
-
- }
-
- /**
- * Creates a new ReplServerInfo message from a list of the currently
- * connected servers.
- *
- * @param dsList The list of currently connected DS servers ID.
- * @param rsList The list of currently connected RS servers ID.
- */
- public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
- {
- if (dsList != null) // null means no info, let empty list from init time
- this.dsList = dsList;
- if (rsList != null) // null means no info, let empty list from init time
- this.rsList = rsList;
- }
-
- /**
- * {@inheritDoc}
- */
- @Override
- public byte[] getBytes()
- {
- try
- {
- /**
- * Message has the following form:
- * <pdu type><number of following DSInfo entries>[<DSInfo>]*
- * <number of following RSInfo entries>[<RSInfo>]*
- */
- ByteArrayOutputStream oStream = new ByteArrayOutputStream();
-
- /* Put the message type */
- oStream.write(MSG_TYPE_TOPOLOGY);
-
- // Put number of following DS info entries
- oStream.write((byte)dsList.size());
-
- // Put DS info
- for (DSInfo dsInfo : dsList)
- {
- // Put DS id
- byte[] byteServerId =
- String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put RS id
- byteServerId =
- String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put the generation id
- oStream.write(String.valueOf(dsInfo.getGenerationId()).
- getBytes("UTF-8"));
- oStream.write(0);
- // Put DS status
- oStream.write(dsInfo.getStatus().getValue());
- // Put DS assured flag
- oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
- // Put DS assured mode
- oStream.write(dsInfo.getAssuredMode().getValue());
- // Put DS safe data level
- oStream.write(dsInfo.getSafeDataLevel());
- // Put DS group id
- oStream.write(dsInfo.getGroupId());
-
- List<String> refUrls = dsInfo.getRefUrls();
- // Put number of following URLs as a byte
- oStream.write(refUrls.size());
- for (String url : refUrls)
- {
- // Write the url and a 0 terminating byte
- oStream.write(url.getBytes("UTF-8"));
- oStream.write(0);
- }
- }
-
- // Put number of following RS info entries
- oStream.write((byte)rsList.size());
-
- // Put RS info
- for (RSInfo rsInfo : rsList)
- {
- // Put RS id
- byte[] byteServerId =
- String.valueOf(rsInfo.getId()).getBytes("UTF-8");
- oStream.write(byteServerId);
- oStream.write(0);
- // Put the generation id
- oStream.write(String.valueOf(rsInfo.getGenerationId()).
- getBytes("UTF-8"));
- oStream.write(0);
- // Put DS group id
- oStream.write(rsInfo.getGroupId());
- }
-
- return oStream.toByteArray();
- } catch (IOException e)
- {
- // never happens
- return null;
- }
-
}
/**
--
Gitblit v1.10.0