From a5131f44a6afa554af8f4c82c7ffd3d4ceac1bd4 Mon Sep 17 00:00:00 2001
From: Matthew Swift <matthew.swift@forgerock.com>
Date: Fri, 04 Feb 2011 12:50:58 +0000
Subject: [PATCH] OPEN - issue OPENDJ-26: Fix OpenDS issue 4585: ConcurrentModificationException in ReplicationBroker https://bugster.forgerock.org/jira/browse/OPENDJ-26
---
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java | 359 ++++++++++++++++++++++++++++++-----------------------------
1 files changed, 183 insertions(+), 176 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index c26fd77..2fdf43b 100644
--- a/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -23,16 +23,14 @@
*
*
* Copyright 2007-2010 Sun Microsystems, Inc.
+ * Portions Copyright 2011 ForgeRock AS
*/
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
+import java.util.*;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
@@ -60,9 +58,9 @@
public class TopologyMsg extends ReplicationMsg
{
// Information for the DS known in the topology
- private List<DSInfo> dsList = new ArrayList<DSInfo>();
+ private final List<DSInfo> dsList;
// Information for the RS known in the topology
- private List<RSInfo> rsList = new ArrayList<RSInfo>();
+ private final List<RSInfo> rsList;
/**
* Creates a new changelogInfo message from its encoded form.
@@ -74,7 +72,168 @@
*/
public TopologyMsg(byte[] in, short version) throws DataFormatException
{
- decode(in, version);
+ try
+ {
+ /* First byte is the type */
+ if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
+ {
+ throw new DataFormatException(
+ "Input is not a valid " + this.getClass().getCanonicalName());
+ }
+
+ int pos = 1;
+
+ /* Read number of following DS info entries */
+
+ byte nDsInfo = in[pos++];
+
+ /* Read the DS info entries */
+ List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
+ while ( (nDsInfo > 0) && (pos < in.length) )
+ {
+ /* Read DS id */
+ int length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ int dsId = Integer.valueOf(serverIdString);
+ pos += length + 1;
+
+ /* Read RS id */
+ length =
+ getNextLength(in, pos);
+ serverIdString =
+ new String(in, pos, length, "UTF-8");
+ int rsId = Integer.valueOf(serverIdString);
+ pos += length + 1;
+
+ /* Read the generation id */
+ length = getNextLength(in, pos);
+ long generationId =
+ Long.valueOf(new String(in, pos, length,
+ "UTF-8"));
+ pos += length + 1;
+
+ /* Read DS status */
+ ServerStatus status = ServerStatus.valueOf(in[pos++]);
+
+ /* Read DS assured flag */
+ boolean assuredFlag;
+ if (in[pos++] == 1)
+ {
+ assuredFlag = true;
+ } else
+ {
+ assuredFlag = false;
+ }
+
+ /* Read DS assured mode */
+ AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
+
+ /* Read DS safe data level */
+ byte safeDataLevel = in[pos++];
+
+ /* Read DS group id */
+ byte groupId = in[pos++];
+
+ /* Read number of referrals URLs */
+ List<String> refUrls = new ArrayList<String>();
+ byte nUrls = in[pos++];
+ byte nRead = 0;
+ /* Read urls until expected number read */
+ while ((nRead != nUrls) &&
+ (pos < in.length) //security
+ )
+ {
+ length = getNextLength(in, pos);
+ String url = new String(in, pos, length, "UTF-8");
+ refUrls.add(url);
+ pos += length + 1;
+ nRead++;
+ }
+
+ Set<String> attrs = new HashSet<String>();
+ short protocolVersion = -1;
+ if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ byte nAttrs = in[pos++];
+ nRead = 0;
+ /* Read attrs until expected number read */
+ while ((nRead != nAttrs) &&
+ (pos < in.length) //security
+ )
+ {
+ length = getNextLength(in, pos);
+ String attr = new String(in, pos, length, "UTF-8");
+ attrs.add(attr);
+ pos += length + 1;
+ nRead++;
+ }
+ /* Read Protocol version */
+ protocolVersion = Short.valueOf(in[pos++]);
+ }
+
+ /* Now create DSInfo and store it in list */
+
+ DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
+ assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
+ protocolVersion);
+ dsList.add(dsInfo);
+
+ nDsInfo--;
+ }
+
+ /* Read number of following RS info entries */
+
+ byte nRsInfo = in[pos++];
+
+ /* Read the RS info entries */
+ List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+ while ( (nRsInfo > 0) && (pos < in.length) )
+ {
+ /* Read RS id */
+ int length = getNextLength(in, pos);
+ String serverIdString = new String(in, pos, length, "UTF-8");
+ int id = Integer.valueOf(serverIdString);
+ pos += length + 1;
+
+ /* Read the generation id */
+ length = getNextLength(in, pos);
+ long generationId =
+ Long.valueOf(new String(in, pos, length,
+ "UTF-8"));
+ pos += length + 1;
+
+ /* Read RS group id */
+ byte groupId = in[pos++];
+
+ int weight = 1;
+ String serverUrl = null;
+ if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
+ {
+ length = getNextLength(in, pos);
+ serverUrl = new String(in, pos, length, "UTF-8");
+ pos += length + 1;
+
+ /* Read RS weight */
+ length = getNextLength(in, pos);
+ weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
+ pos += length + 1;
+ }
+
+ /* Now create RSInfo and store it in list */
+
+ RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
+ weight);
+ rsList.add(rsInfo);
+
+ nRsInfo--;
+ }
+
+ this.dsList = Collections.unmodifiableList(dsList);
+ this.rsList = Collections.unmodifiableList(rsList);
+ } catch (UnsupportedEncodingException e)
+ {
+ throw new DataFormatException("UTF-8 is not supported by this jvm.");
+ }
}
/**
@@ -85,10 +244,23 @@
*/
public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
{
- if (dsList != null) // null means no info, let empty list from init time
- this.dsList = dsList;
- if (rsList != null) // null means no info, let empty list from init time
- this.rsList = rsList;
+ if (dsList == null || dsList.isEmpty())
+ {
+ this.dsList = Collections.emptyList();
+ }
+ else
+ {
+ this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+ }
+
+ if (rsList == null || rsList.isEmpty())
+ {
+ this.rsList = Collections.emptyList();
+ }
+ else
+ {
+ this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+ }
}
// ============
@@ -219,172 +391,7 @@
}
- // ============
- // Msg decoding
- // ============
- private void decode(byte[] in, short version)
- throws DataFormatException
- {
- try
- {
- /* First byte is the type */
- if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
- {
- throw new DataFormatException(
- "Input is not a valid " + this.getClass().getCanonicalName());
- }
-
- int pos = 1;
-
- /* Read number of following DS info entries */
-
- byte nDsInfo = in[pos++];
-
- /* Read the DS info entries */
- while ( (nDsInfo > 0) && (pos < in.length) )
- {
- /* Read DS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int dsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read RS id */
- length =
- getNextLength(in, pos);
- serverIdString =
- new String(in, pos, length, "UTF-8");
- int rsId = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
- pos += length + 1;
-
- /* Read DS status */
- ServerStatus status = ServerStatus.valueOf(in[pos++]);
-
- /* Read DS assured flag */
- boolean assuredFlag;
- if (in[pos++] == 1)
- {
- assuredFlag = true;
- } else
- {
- assuredFlag = false;
- }
-
- /* Read DS assured mode */
- AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
-
- /* Read DS safe data level */
- byte safeDataLevel = in[pos++];
-
- /* Read DS group id */
- byte groupId = in[pos++];
-
- /* Read number of referrals URLs */
- List<String> refUrls = new ArrayList<String>();
- byte nUrls = in[pos++];
- byte nRead = 0;
- /* Read urls until expected number read */
- while ((nRead != nUrls) &&
- (pos < in.length) //security
- )
- {
- length = getNextLength(in, pos);
- String url = new String(in, pos, length, "UTF-8");
- refUrls.add(url);
- pos += length + 1;
- nRead++;
- }
-
- Set<String> attrs = new HashSet<String>();
- short protocolVersion = -1;
- if (version>=ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- byte nAttrs = in[pos++];
- nRead = 0;
- /* Read attrs until expected number read */
- while ((nRead != nAttrs) &&
- (pos < in.length) //security
- )
- {
- length = getNextLength(in, pos);
- String attr = new String(in, pos, length, "UTF-8");
- attrs.add(attr);
- pos += length + 1;
- nRead++;
- }
- /* Read Protocol version */
- protocolVersion = Short.valueOf(in[pos++]);
- }
-
- /* Now create DSInfo and store it in list */
-
- DSInfo dsInfo = new DSInfo(dsId, rsId, generationId, status,
- assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
- protocolVersion);
- dsList.add(dsInfo);
-
- nDsInfo--;
- }
-
- /* Read number of following RS info entries */
-
- byte nRsInfo = in[pos++];
-
- /* Read the RS info entries */
- while ( (nRsInfo > 0) && (pos < in.length) )
- {
- /* Read RS id */
- int length = getNextLength(in, pos);
- String serverIdString = new String(in, pos, length, "UTF-8");
- int id = Integer.valueOf(serverIdString);
- pos += length + 1;
-
- /* Read the generation id */
- length = getNextLength(in, pos);
- long generationId =
- Long.valueOf(new String(in, pos, length,
- "UTF-8"));
- pos += length + 1;
-
- /* Read RS group id */
- byte groupId = in[pos++];
-
- int weight = 1;
- String serverUrl = null;
- if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
- {
- length = getNextLength(in, pos);
- serverUrl = new String(in, pos, length, "UTF-8");
- pos += length + 1;
-
- /* Read RS weight */
- length = getNextLength(in, pos);
- weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
- pos += length + 1;
- }
-
- /* Now create RSInfo and store it in list */
-
- RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
- weight);
- rsList.add(rsInfo);
-
- nRsInfo--;
- }
-
- } catch (UnsupportedEncodingException e)
- {
- throw new DataFormatException("UTF-8 is not supported by this jvm.");
- }
- }
/**
* {@inheritDoc}
--
Gitblit v1.10.0