From 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:08:01 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR
---
opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 252 +++++++++++++++++++++----------------------------
1 files changed, 108 insertions(+), 144 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index d9e2e47..72b73d7 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -38,15 +38,14 @@
import org.opends.server.replication.common.ServerStatus;
/**
- *
* This class defines a message that is sent:
* - By a RS to the other RSs in the topology, containing:
- * - the list of DSs directly connected to the RS in the DS list
- * - only this RS in the RS list
+ * - the DSs directly connected to the RS in the DS infos
+ * - only this RS in the RS infos
* - By a RS to his connected DSs, containing every DSs and RSs he knows.
* In that case the message contains:
- * - the list of every DS the RS knows except the destinator DS in the DS list
- * - the list of every connected RSs (including the sending RS) in the RS list
+ * - every DSs the RS knows except the destinator DS in the DS infos
+ * - every connected RSs (including the sending RS) in the RS infos
*
* Exchanging these messages allows to have each RS or DS take
* appropriate decisions according to the current topology:
@@ -56,10 +55,10 @@
*/
public class TopologyMsg extends ReplicationMsg
{
- // Information for the DS known in the topology
- private final List<DSInfo> dsList;
- // Information for the RS known in the topology
- private final List<RSInfo> rsList;
+ /** Information for the DSs (aka replicas) known in the topology. */
+ private final Map<Integer, DSInfo> replicaInfos;
+ /** Information for the RSs known in the topology. */
+ private final List<RSInfo> rsInfos;
/**
* Creates a new changelogInfo message from its encoded form.
@@ -77,18 +76,18 @@
if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
{
throw new DataFormatException(
- "Input is not a valid " + this.getClass().getCanonicalName());
+ "Input is not a valid " + getClass().getCanonicalName());
}
int pos = 1;
/* Read number of following DS info entries */
-
byte nDsInfo = in[pos++];
/* Read the DS info entries */
- List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
- while ( (nDsInfo > 0) && (pos < in.length) )
+ Map<Integer, DSInfo> replicaInfos =
+ new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+ while (nDsInfo > 0 && pos < in.length)
{
/* Read DS id */
int length = getNextLength(in, pos);
@@ -110,26 +109,21 @@
}
/* Read RS id */
- length =
- getNextLength(in, pos);
- serverIdString =
- new String(in, pos, length, "UTF-8");
+ length = getNextLength(in, pos);
+ serverIdString = new String(in, pos, length, "UTF-8");
int rsId = Integer.valueOf(serverIdString);
pos += length + 1;
/* Read the generation id */
length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
+ long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
pos += length + 1;
/* Read DS status */
ServerStatus status = ServerStatus.valueOf(in[pos++]);
/* Read DS assured flag */
- boolean assuredFlag;
- assuredFlag = in[pos++] == 1;
+ boolean assuredFlag = in[pos++] == 1;
/* Read DS assured mode */
AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -142,50 +136,18 @@
/* Read number of referrals URLs */
List<String> refUrls = new ArrayList<String>();
- byte nUrls = in[pos++];
- byte nRead = 0;
- /* Read urls until expected number read */
- while ((nRead != nUrls) &&
- (pos < in.length) //security
- )
- {
- length = getNextLength(in, pos);
- String url = new String(in, pos, length, "UTF-8");
- refUrls.add(url);
- pos += length + 1;
- nRead++;
- }
+ pos = readStrings(in, pos, refUrls);
Set<String> attrs = new HashSet<String>();
Set<String> delattrs = new HashSet<String>();
short protocolVersion = -1;
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
- byte nAttrs = in[pos++];
- nRead = 0;
- /* Read attrs until expected number read */
- while ((nRead != nAttrs) && (pos < in.length))
- {
- length = getNextLength(in, pos);
- String attr = new String(in, pos, length, "UTF-8");
- attrs.add(attr);
- pos += length + 1;
- nRead++;
- }
+ pos = readStrings(in, pos, attrs);
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
{
- nAttrs = in[pos++];
- nRead = 0;
- /* Read attrs until expected number read */
- while ((nRead != nAttrs) && (pos < in.length))
- {
- length = getNextLength(in, pos);
- String attr = new String(in, pos, length, "UTF-8");
- delattrs.add(attr);
- pos += length + 1;
- nRead++;
- }
+ pos = readStrings(in, pos, delattrs);
}
else
{
@@ -194,26 +156,23 @@
}
/* Read Protocol version */
- protocolVersion = (short)in[pos++];
+ protocolVersion = in[pos++];
}
- /* Now create DSInfo and store it in list */
-
- DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status,
- assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
- delattrs, protocolVersion);
- dsList.add(dsInfo);
+ /* Now create DSInfo and store it */
+ replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
+ status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
+ attrs, delattrs, protocolVersion));
nDsInfo--;
}
/* Read number of following RS info entries */
-
byte nRsInfo = in[pos++];
/* Read the RS info entries */
- List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
- while ( (nRsInfo > 0) && (pos < in.length) )
+ List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+ while (nRsInfo > 0 && pos < in.length)
{
/* Read RS id */
int length = getNextLength(in, pos);
@@ -223,9 +182,7 @@
/* Read the generation id */
length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
+ long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
pos += length + 1;
/* Read RS group id */
@@ -245,47 +202,67 @@
pos += length + 1;
}
- /* Now create RSInfo and store it in list */
-
- RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
- weight);
- rsList.add(rsInfo);
+ /* Now create RSInfo and store it */
+ rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
nRsInfo--;
}
- this.dsList = Collections.unmodifiableList(dsList);
- this.rsList = Collections.unmodifiableList(rsList);
- } catch (UnsupportedEncodingException e)
+ this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+ this.rsInfos = Collections.unmodifiableList(rsInfos);
+ }
+ catch (UnsupportedEncodingException e)
{
throw new DataFormatException("UTF-8 is not supported by this jvm.");
}
}
- /**
- * Creates a new message from a list of the currently connected servers.
- *
- * @param dsList The list of currently connected DS servers ID.
- * @param rsList The list of currently connected RS servers ID.
- */
- public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
+ private int readStrings(byte[] in, int pos, Collection<String> outputCol)
+ throws DataFormatException, UnsupportedEncodingException
{
- if (dsList == null || dsList.isEmpty())
+ byte nAttrs = in[pos++];
+ byte nRead = 0;
+ // Read all elements until expected number read
+ while (nRead != nAttrs && pos < in.length)
{
- this.dsList = Collections.emptyList();
+ int length = getNextLength(in, pos);
+ outputCol.add(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
+ nRead++;
+ }
+ return pos;
+ }
+
+ /**
+ * Creates a new message of the currently connected servers.
+ *
+ * @param dsInfos The collection of currently connected DS servers ID.
+ * @param rsInfos The list of currently connected RS servers ID.
+ */
+ public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
+ {
+ if (dsInfos == null || dsInfos.isEmpty())
+ {
+ this.replicaInfos = Collections.emptyMap();
}
else
{
- this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+ Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>();
+ for (DSInfo dsInfo : dsInfos)
+ {
+ replicas.put(dsInfo.getDsId(), dsInfo);
+ }
+ this.replicaInfos = Collections.unmodifiableMap(replicas);
}
- if (rsList == null || rsList.isEmpty())
+ if (rsInfos == null || rsInfos.isEmpty())
{
- this.rsList = Collections.emptyList();
+ this.rsInfos = Collections.emptyList();
}
else
{
- this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+ this.rsInfos =
+ Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
}
}
@@ -293,12 +270,9 @@
// Msg encoding
// ============
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
- public byte[] getBytes(short version)
- throws UnsupportedEncodingException
+ public byte[] getBytes(short version) throws UnsupportedEncodingException
{
try
{
@@ -313,10 +287,10 @@
oStream.write(MSG_TYPE_TOPOLOGY);
// Put number of following DS info entries
- oStream.write((byte)dsList.size());
+ oStream.write((byte) replicaInfos.size());
// Put DS info
- for (DSInfo dsInfo : dsList)
+ for (DSInfo dsInfo : replicaInfos.values())
{
// Put DS id
byte[] byteServerId =
@@ -330,8 +304,7 @@
oStream.write(0);
}
// Put RS id
- byteServerId =
- String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
+ byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
oStream.write(byteServerId);
oStream.write(0);
// Put the generation id
@@ -349,36 +322,16 @@
// Put DS group id
oStream.write(dsInfo.getGroupId());
- List<String> refUrls = dsInfo.getRefUrls();
- // Put number of following URLs as a byte
- oStream.write(refUrls.size());
- for (String url : refUrls)
- {
- // Write the url and a 0 terminating byte
- oStream.write(url.getBytes("UTF-8"));
- oStream.write(0);
- }
+ writeStrings(oStream, dsInfo.getRefUrls());
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
{
// Put ECL includes
- Set<String> attrs = dsInfo.getEclIncludes();
- oStream.write(attrs.size());
- for (String attr : attrs)
- {
- oStream.write(attr.getBytes("UTF-8"));
- oStream.write(0);
- }
+ writeStrings(oStream, dsInfo.getEclIncludes());
if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
{
- Set<String> delattrs = dsInfo.getEclIncludesForDeletes();
- oStream.write(delattrs.size());
- for (String attr : delattrs)
- {
- oStream.write(attr.getBytes("UTF-8"));
- oStream.write(0);
- }
+ writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
}
oStream.write(dsInfo.getProtocolVersion());
@@ -386,10 +339,10 @@
}
// Put number of following RS info entries
- oStream.write((byte)rsList.size());
+ oStream.write((byte) rsInfos.size());
// Put RS info
- for (RSInfo rsInfo : rsList)
+ for (RSInfo rsInfo : rsInfos)
{
// Put RS id
byte[] byteServerId =
@@ -422,54 +375,65 @@
// never happens
throw new RuntimeException(e);
}
-
}
+ private void writeStrings(ByteArrayOutputStream oStream,
+ Collection<String> col) throws IOException, UnsupportedEncodingException
+ {
+ // Put collection length as a byte
+ oStream.write(col.size());
+ for (String elem : col)
+ {
+ // Write the element and a 0 terminating byte
+ oStream.write(elem.getBytes("UTF-8"));
+ oStream.write(0);
+ }
+ }
-
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
String dsStr = "";
- for (DSInfo dsInfo : dsList)
+ for (DSInfo dsInfo : replicaInfos.values())
{
- dsStr += dsInfo.toString() + "\n----------------------------\n";
+ dsStr += dsInfo + "\n----------------------------\n";
}
String rsStr = "";
- for (RSInfo rsInfo : rsList)
+ for (RSInfo rsInfo : rsInfos)
{
- rsStr += rsInfo.toString() + "\n----------------------------\n";
+ rsStr += rsInfo + "\n----------------------------\n";
}
- return ("TopologyMsg content: "
+ return "TopologyMsg content:"
+ "\n----------------------------"
+ "\nCONNECTED DS SERVERS:"
+ "\n--------------------\n"
+ dsStr
+ "CONNECTED RS SERVERS:"
+ "\n--------------------\n"
- + rsStr + (rsStr.equals("") ? "----------------------------\n" : ""));
+ + rsStr
+ + (rsStr.equals("") ? "----------------------------\n" : "");
}
/**
- * Get the list of DS info.
- * @return The list of DS info
+ * Get the DS infos.
+ *
+ * @return The DS infos
*/
- public List<DSInfo> getDsList()
+ public Map<Integer, DSInfo> getReplicaInfos()
{
- return dsList;
+ return replicaInfos;
}
/**
- * Get the list of RS info.
- * @return The list of RS info
+ * Get the RS infos.
+ *
+ * @return The RS infos
*/
- public List<RSInfo> getRsList()
+ public List<RSInfo> getRsInfos()
{
- return rsList;
+ return rsInfos;
}
}
--
Gitblit v1.10.0