From 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 20 Feb 2014 14:08:01 +0000
Subject: [PATCH] OPENDJ-1271 (CR-3008) dsreplication pre-external-initialization task fails with STOPPED_BY_ERROR

---
 opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java |  252 +++++++++++++++++++++----------------------------
 1 files changed, 108 insertions(+), 144 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
index d9e2e47..72b73d7 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -38,15 +38,14 @@
 import org.opends.server.replication.common.ServerStatus;
 
 /**
- *
  * This class defines a message that is sent:
  * - By a RS to the other RSs in the topology, containing:
- *   - the list of DSs directly connected to the RS in the DS list
- *   - only this RS in the RS list
+ *   - the DSs directly connected to the RS in the DS infos
+ *   - only this RS in the RS infos
  * - By a RS to his connected DSs, containing every DSs and RSs he knows.
  * In that case the message contains:
- *   - the list of every DS the RS knows except the destinator DS in the DS list
- *   - the list of every connected RSs (including the sending RS) in the RS list
+ *   - every DSs the RS knows except the destinator DS in the DS infos
+ *   - every connected RSs (including the sending RS) in the RS infos
  *
  * Exchanging these messages allows to have each RS or DS take
  * appropriate decisions according to the current topology:
@@ -56,10 +55,10 @@
  */
 public class TopologyMsg extends ReplicationMsg
 {
-  // Information for the DS known in the topology
-  private final List<DSInfo> dsList;
-  // Information for the RS known in the topology
-  private final List<RSInfo> rsList;
+  /** Information for the DSs (aka replicas) known in the topology. */
+  private final Map<Integer, DSInfo> replicaInfos;
+  /** Information for the RSs known in the topology. */
+  private final List<RSInfo> rsInfos;
 
   /**
    * Creates a new changelogInfo message from its encoded form.
@@ -77,18 +76,18 @@
       if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
       {
         throw new DataFormatException(
-          "Input is not a valid " + this.getClass().getCanonicalName());
+          "Input is not a valid " + getClass().getCanonicalName());
       }
 
       int pos = 1;
 
       /* Read number of following DS info entries */
-
       byte nDsInfo = in[pos++];
 
       /* Read the DS info entries */
-      List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
-      while ( (nDsInfo > 0) && (pos < in.length) )
+      Map<Integer, DSInfo> replicaInfos =
+          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
+      while (nDsInfo > 0 && pos < in.length)
       {
         /* Read DS id */
         int length = getNextLength(in, pos);
@@ -110,26 +109,21 @@
         }
 
         /* Read RS id */
-        length =
-          getNextLength(in, pos);
-        serverIdString =
-          new String(in, pos, length, "UTF-8");
+        length = getNextLength(in, pos);
+        serverIdString = new String(in, pos, length, "UTF-8");
         int rsId = Integer.valueOf(serverIdString);
         pos += length + 1;
 
         /* Read the generation id */
         length = getNextLength(in, pos);
-        long generationId =
-          Long.valueOf(new String(in, pos, length,
-          "UTF-8"));
+        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
         pos += length + 1;
 
         /* Read DS status */
         ServerStatus status = ServerStatus.valueOf(in[pos++]);
 
         /* Read DS assured flag */
-        boolean assuredFlag;
-        assuredFlag = in[pos++] == 1;
+        boolean assuredFlag = in[pos++] == 1;
 
         /* Read DS assured mode */
         AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -142,50 +136,18 @@
 
         /* Read number of referrals URLs */
         List<String> refUrls = new ArrayList<String>();
-        byte nUrls = in[pos++];
-        byte nRead = 0;
-        /* Read urls until expected number read */
-        while ((nRead != nUrls) &&
-          (pos < in.length) //security
-          )
-        {
-          length = getNextLength(in, pos);
-          String url = new String(in, pos, length, "UTF-8");
-          refUrls.add(url);
-          pos += length + 1;
-          nRead++;
-        }
+        pos = readStrings(in, pos, refUrls);
 
         Set<String> attrs = new HashSet<String>();
         Set<String> delattrs = new HashSet<String>();
         short protocolVersion = -1;
         if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
-          byte nAttrs = in[pos++];
-          nRead = 0;
-          /* Read attrs until expected number read */
-          while ((nRead != nAttrs) && (pos < in.length))
-          {
-            length = getNextLength(in, pos);
-            String attr = new String(in, pos, length, "UTF-8");
-            attrs.add(attr);
-            pos += length + 1;
-            nRead++;
-          }
+          pos = readStrings(in, pos, attrs);
 
           if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
           {
-            nAttrs = in[pos++];
-            nRead = 0;
-            /* Read attrs until expected number read */
-            while ((nRead != nAttrs) && (pos < in.length))
-            {
-              length = getNextLength(in, pos);
-              String attr = new String(in, pos, length, "UTF-8");
-              delattrs.add(attr);
-              pos += length + 1;
-              nRead++;
-            }
+            pos = readStrings(in, pos, delattrs);
           }
           else
           {
@@ -194,26 +156,23 @@
           }
 
           /* Read Protocol version */
-          protocolVersion = (short)in[pos++];
+          protocolVersion = in[pos++];
         }
 
-        /* Now create DSInfo and store it in list */
-
-        DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status,
-          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
-          delattrs, protocolVersion);
-        dsList.add(dsInfo);
+        /* Now create DSInfo and store it */
+        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
+            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
+            attrs, delattrs, protocolVersion));
 
         nDsInfo--;
       }
 
       /* Read number of following RS info entries */
-
       byte nRsInfo = in[pos++];
 
       /* Read the RS info entries */
-      List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
-      while ( (nRsInfo > 0) && (pos < in.length) )
+      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
+      while (nRsInfo > 0 && pos < in.length)
       {
         /* Read RS id */
         int length = getNextLength(in, pos);
@@ -223,9 +182,7 @@
 
         /* Read the generation id */
         length = getNextLength(in, pos);
-        long generationId =
-          Long.valueOf(new String(in, pos, length,
-          "UTF-8"));
+        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
         pos += length + 1;
 
         /* Read RS group id */
@@ -245,47 +202,67 @@
           pos += length + 1;
         }
 
-        /* Now create RSInfo and store it in list */
-
-        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
-          weight);
-        rsList.add(rsInfo);
+        /* Now create RSInfo and store it */
+        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
 
         nRsInfo--;
       }
 
-      this.dsList = Collections.unmodifiableList(dsList);
-      this.rsList = Collections.unmodifiableList(rsList);
-    } catch (UnsupportedEncodingException e)
+      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
+      this.rsInfos = Collections.unmodifiableList(rsInfos);
+    }
+    catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
     }
   }
 
-  /**
-   * Creates a new  message from a list of the currently connected servers.
-   *
-   * @param dsList The list of currently connected DS servers ID.
-   * @param rsList The list of currently connected RS servers ID.
-   */
-  public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
+  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
+      throws DataFormatException, UnsupportedEncodingException
   {
-    if (dsList == null || dsList.isEmpty())
+    byte nAttrs = in[pos++];
+    byte nRead = 0;
+    // Read all elements until expected number read
+    while (nRead != nAttrs && pos < in.length)
     {
-      this.dsList = Collections.emptyList();
+      int length = getNextLength(in, pos);
+      outputCol.add(new String(in, pos, length, "UTF-8"));
+      pos += length + 1;
+      nRead++;
+    }
+    return pos;
+  }
+
+  /**
+   * Creates a new  message of the currently connected servers.
+   *
+   * @param dsInfos The collection of currently connected DS servers ID.
+   * @param rsInfos The list of currently connected RS servers ID.
+   */
+  public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
+  {
+    if (dsInfos == null || dsInfos.isEmpty())
+    {
+      this.replicaInfos = Collections.emptyMap();
     }
     else
     {
-      this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
+      Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>();
+      for (DSInfo dsInfo : dsInfos)
+      {
+        replicas.put(dsInfo.getDsId(), dsInfo);
+      }
+      this.replicaInfos = Collections.unmodifiableMap(replicas);
     }
 
-    if (rsList == null || rsList.isEmpty())
+    if (rsInfos == null || rsInfos.isEmpty())
     {
-      this.rsList = Collections.emptyList();
+      this.rsInfos = Collections.emptyList();
     }
     else
     {
-      this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
+      this.rsInfos =
+          Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
     }
   }
 
@@ -293,12 +270,9 @@
   // Msg encoding
   // ============
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
-  public byte[] getBytes(short version)
-  throws UnsupportedEncodingException
+  public byte[] getBytes(short version) throws UnsupportedEncodingException
   {
     try
     {
@@ -313,10 +287,10 @@
       oStream.write(MSG_TYPE_TOPOLOGY);
 
       // Put number of following DS info entries
-      oStream.write((byte)dsList.size());
+      oStream.write((byte) replicaInfos.size());
 
       // Put DS info
-      for (DSInfo dsInfo : dsList)
+      for (DSInfo dsInfo : replicaInfos.values())
       {
         // Put DS id
         byte[] byteServerId =
@@ -330,8 +304,7 @@
           oStream.write(0);
         }
         // Put RS id
-        byteServerId =
-          String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
+        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
         oStream.write(byteServerId);
         oStream.write(0);
         // Put the generation id
@@ -349,36 +322,16 @@
         // Put DS group id
         oStream.write(dsInfo.getGroupId());
 
-        List<String> refUrls = dsInfo.getRefUrls();
-        // Put number of following URLs as a byte
-        oStream.write(refUrls.size());
-        for (String url : refUrls)
-        {
-          // Write the url and a 0 terminating byte
-          oStream.write(url.getBytes("UTF-8"));
-          oStream.write(0);
-        }
+        writeStrings(oStream, dsInfo.getRefUrls());
 
         if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
         {
           // Put ECL includes
-          Set<String> attrs = dsInfo.getEclIncludes();
-          oStream.write(attrs.size());
-          for (String attr : attrs)
-          {
-            oStream.write(attr.getBytes("UTF-8"));
-            oStream.write(0);
-          }
+          writeStrings(oStream, dsInfo.getEclIncludes());
 
           if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
           {
-            Set<String> delattrs = dsInfo.getEclIncludesForDeletes();
-            oStream.write(delattrs.size());
-            for (String attr : delattrs)
-            {
-              oStream.write(attr.getBytes("UTF-8"));
-              oStream.write(0);
-            }
+            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
           }
 
           oStream.write(dsInfo.getProtocolVersion());
@@ -386,10 +339,10 @@
       }
 
       // Put number of following RS info entries
-      oStream.write((byte)rsList.size());
+      oStream.write((byte) rsInfos.size());
 
       // Put RS info
-      for (RSInfo rsInfo : rsList)
+      for (RSInfo rsInfo : rsInfos)
       {
         // Put RS id
         byte[] byteServerId =
@@ -422,54 +375,65 @@
       // never happens
       throw new RuntimeException(e);
     }
-
   }
 
+  private void writeStrings(ByteArrayOutputStream oStream,
+      Collection<String> col) throws IOException, UnsupportedEncodingException
+  {
+    // Put collection length as a byte
+    oStream.write(col.size());
+    for (String elem : col)
+    {
+      // Write the element and a 0 terminating byte
+      oStream.write(elem.getBytes("UTF-8"));
+      oStream.write(0);
+    }
+  }
 
-
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public String toString()
   {
     String dsStr = "";
-    for (DSInfo dsInfo : dsList)
+    for (DSInfo dsInfo : replicaInfos.values())
     {
-      dsStr += dsInfo.toString() + "\n----------------------------\n";
+      dsStr += dsInfo + "\n----------------------------\n";
     }
 
     String rsStr = "";
-    for (RSInfo rsInfo : rsList)
+    for (RSInfo rsInfo : rsInfos)
     {
-      rsStr += rsInfo.toString() + "\n----------------------------\n";
+      rsStr += rsInfo + "\n----------------------------\n";
     }
 
-    return ("TopologyMsg content: "
+    return "TopologyMsg content:"
       + "\n----------------------------"
       + "\nCONNECTED DS SERVERS:"
       + "\n--------------------\n"
       + dsStr
       + "CONNECTED RS SERVERS:"
       + "\n--------------------\n"
-      + rsStr + (rsStr.equals("") ? "----------------------------\n" : ""));
+      + rsStr
+      + (rsStr.equals("") ? "----------------------------\n" : "");
   }
 
   /**
-   * Get the list of DS info.
-   * @return The list of DS info
+   * Get the DS infos.
+   *
+   * @return The DS infos
    */
-  public List<DSInfo> getDsList()
+  public Map<Integer, DSInfo> getReplicaInfos()
   {
-    return dsList;
+    return replicaInfos;
   }
 
   /**
-   * Get the list of RS info.
-   * @return The list of RS info
+   * Get the RS infos.
+   *
+   * @return The RS infos
    */
-  public List<RSInfo> getRsList()
+  public List<RSInfo> getRsInfos()
   {
-    return rsList;
+    return rsInfos;
   }
 }

--
Gitblit v1.10.0