mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.08.2014 1c59d6c7d4e33c5b88fbe0692c1d50c0eab74c4a
opendj3-server-dev/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -38,15 +38,14 @@
import org.opends.server.replication.common.ServerStatus;
/**
 *
 * This class defines a message that is sent:
 * - By a RS to the other RSs in the topology, containing:
 *   - the list of DSs directly connected to the RS in the DS list
 *   - only this RS in the RS list
 *   - the DSs directly connected to the RS in the DS infos
 *   - only this RS in the RS infos
 * - By a RS to his connected DSs, containing every DSs and RSs he knows.
 * In that case the message contains:
 *   - the list of every DS the RS knows except the destinator DS in the DS list
 *   - the list of every connected RSs (including the sending RS) in the RS list
 *   - every DSs the RS knows except the destinator DS in the DS infos
 *   - every connected RSs (including the sending RS) in the RS infos
 *
 * Exchanging these messages allows to have each RS or DS take
 * appropriate decisions according to the current topology:
@@ -56,10 +55,10 @@
 */
public class TopologyMsg extends ReplicationMsg
{
  // Information for the DS known in the topology
  private final List<DSInfo> dsList;
  // Information for the RS known in the topology
  private final List<RSInfo> rsList;
  /** Information for the DSs (aka replicas) known in the topology. */
  private final Map<Integer, DSInfo> replicaInfos;
  /** Information for the RSs known in the topology. */
  private final List<RSInfo> rsInfos;
  /**
   * Creates a new changelogInfo message from its encoded form.
@@ -77,18 +76,18 @@
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
          "Input is not a valid " + getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      List<DSInfo> dsList = new ArrayList<DSInfo>(Math.max(0, nDsInfo));
      while ( (nDsInfo > 0) && (pos < in.length) )
      Map<Integer, DSInfo> replicaInfos =
          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
      while (nDsInfo > 0 && pos < in.length)
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
@@ -110,26 +109,21 @@
        }
        /* Read RS id */
        length =
          getNextLength(in, pos);
        serverIdString =
          new String(in, pos, length, "UTF-8");
        length = getNextLength(in, pos);
        serverIdString = new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
        /* Read DS assured flag */
        boolean assuredFlag;
        assuredFlag = in[pos++] == 1;
        boolean assuredFlag = in[pos++] == 1;
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
@@ -142,50 +136,18 @@
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        byte nUrls = in[pos++];
        byte nRead = 0;
        /* Read urls until expected number read */
        while ((nRead != nUrls) &&
          (pos < in.length) //security
          )
        {
          length = getNextLength(in, pos);
          String url = new String(in, pos, length, "UTF-8");
          refUrls.add(url);
          pos += length + 1;
          nRead++;
        }
        pos = readStrings(in, pos, refUrls);
        Set<String> attrs = new HashSet<String>();
        Set<String> delattrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byte nAttrs = in[pos++];
          nRead = 0;
          /* Read attrs until expected number read */
          while ((nRead != nAttrs) && (pos < in.length))
          {
            length = getNextLength(in, pos);
            String attr = new String(in, pos, length, "UTF-8");
            attrs.add(attr);
            pos += length + 1;
            nRead++;
          }
          pos = readStrings(in, pos, attrs);
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            nAttrs = in[pos++];
            nRead = 0;
            /* Read attrs until expected number read */
            while ((nRead != nAttrs) && (pos < in.length))
            {
              length = getNextLength(in, pos);
              String attr = new String(in, pos, length, "UTF-8");
              delattrs.add(attr);
              pos += length + 1;
              nRead++;
            }
            pos = readStrings(in, pos, delattrs);
          }
          else
          {
@@ -194,26 +156,23 @@
          }
          /* Read Protocol version */
          protocolVersion = (short)in[pos++];
          protocolVersion = in[pos++];
        }
        /* Now create DSInfo and store it in list */
        DSInfo dsInfo = new DSInfo(dsId, dsUrl, rsId, generationId, status,
          assuredFlag, assuredMode, safeDataLevel, groupId, refUrls, attrs,
          delattrs, protocolVersion);
        dsList.add(dsInfo);
        /* Now create DSInfo and store it */
        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
            attrs, delattrs, protocolVersion));
        nDsInfo--;
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      List<RSInfo> rsList = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while ( (nRsInfo > 0) && (pos < in.length) )
      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while (nRsInfo > 0 && pos < in.length)
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
@@ -223,9 +182,7 @@
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId =
          Long.valueOf(new String(in, pos, length,
          "UTF-8"));
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
@@ -245,47 +202,67 @@
          pos += length + 1;
        }
        /* Now create RSInfo and store it in list */
        RSInfo rsInfo = new RSInfo(id, serverUrl, generationId, groupId,
          weight);
        rsList.add(rsInfo);
        /* Now create RSInfo and store it */
        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
        nRsInfo--;
      }
      this.dsList = Collections.unmodifiableList(dsList);
      this.rsList = Collections.unmodifiableList(rsList);
    } catch (UnsupportedEncodingException e)
      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
      this.rsInfos = Collections.unmodifiableList(rsInfos);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Creates a new  message from a list of the currently connected servers.
   *
   * @param dsList The list of currently connected DS servers ID.
   * @param rsList The list of currently connected RS servers ID.
   */
  public TopologyMsg(List<DSInfo> dsList, List<RSInfo> rsList)
  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
      throws DataFormatException, UnsupportedEncodingException
  {
    if (dsList == null || dsList.isEmpty())
    byte nAttrs = in[pos++];
    byte nRead = 0;
    // Read all elements until expected number read
    while (nRead != nAttrs && pos < in.length)
    {
      this.dsList = Collections.emptyList();
      int length = getNextLength(in, pos);
      outputCol.add(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      nRead++;
    }
    return pos;
  }
  /**
   * Creates a new  message of the currently connected servers.
   *
   * @param dsInfos The collection of currently connected DS servers ID.
   * @param rsInfos The list of currently connected RS servers ID.
   */
  public TopologyMsg(Collection<DSInfo> dsInfos, List<RSInfo> rsInfos)
  {
    if (dsInfos == null || dsInfos.isEmpty())
    {
      this.replicaInfos = Collections.emptyMap();
    }
    else
    {
      this.dsList = Collections.unmodifiableList(new ArrayList<DSInfo>(dsList));
      Map<Integer, DSInfo> replicas = new HashMap<Integer, DSInfo>();
      for (DSInfo dsInfo : dsInfos)
      {
        replicas.put(dsInfo.getDsId(), dsInfo);
      }
      this.replicaInfos = Collections.unmodifiableMap(replicas);
    }
    if (rsList == null || rsList.isEmpty())
    if (rsInfos == null || rsInfos.isEmpty())
    {
      this.rsList = Collections.emptyList();
      this.rsInfos = Collections.emptyList();
    }
    else
    {
      this.rsList = Collections.unmodifiableList(new ArrayList<RSInfo>(rsList));
      this.rsInfos =
          Collections.unmodifiableList(new ArrayList<RSInfo>(rsInfos));
    }
  }
@@ -293,12 +270,9 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  throws UnsupportedEncodingException
  public byte[] getBytes(short version) throws UnsupportedEncodingException
  {
    try
    {
@@ -313,10 +287,10 @@
      oStream.write(MSG_TYPE_TOPOLOGY);
      // Put number of following DS info entries
      oStream.write((byte)dsList.size());
      oStream.write((byte) replicaInfos.size());
      // Put DS info
      for (DSInfo dsInfo : dsList)
      for (DSInfo dsInfo : replicaInfos.values())
      {
        // Put DS id
        byte[] byteServerId =
@@ -330,8 +304,7 @@
          oStream.write(0);
        }
        // Put RS id
        byteServerId =
          String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
@@ -349,36 +322,16 @@
        // Put DS group id
        oStream.write(dsInfo.getGroupId());
        List<String> refUrls = dsInfo.getRefUrls();
        // Put number of following URLs as a byte
        oStream.write(refUrls.size());
        for (String url : refUrls)
        {
          // Write the url and a 0 terminating byte
          oStream.write(url.getBytes("UTF-8"));
          oStream.write(0);
        }
        writeStrings(oStream, dsInfo.getRefUrls());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          Set<String> attrs = dsInfo.getEclIncludes();
          oStream.write(attrs.size());
          for (String attr : attrs)
          {
            oStream.write(attr.getBytes("UTF-8"));
            oStream.write(0);
          }
          writeStrings(oStream, dsInfo.getEclIncludes());
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            Set<String> delattrs = dsInfo.getEclIncludesForDeletes();
            oStream.write(delattrs.size());
            for (String attr : delattrs)
            {
              oStream.write(attr.getBytes("UTF-8"));
              oStream.write(0);
            }
            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
          }
          oStream.write(dsInfo.getProtocolVersion());
@@ -386,10 +339,10 @@
      }
      // Put number of following RS info entries
      oStream.write((byte)rsList.size());
      oStream.write((byte) rsInfos.size());
      // Put RS info
      for (RSInfo rsInfo : rsList)
      for (RSInfo rsInfo : rsInfos)
      {
        // Put RS id
        byte[] byteServerId =
@@ -422,54 +375,65 @@
      // never happens
      throw new RuntimeException(e);
    }
  }
  private void writeStrings(ByteArrayOutputStream oStream,
      Collection<String> col) throws IOException, UnsupportedEncodingException
  {
    // Put collection length as a byte
    oStream.write(col.size());
    for (String elem : col)
    {
      // Write the element and a 0 terminating byte
      oStream.write(elem.getBytes("UTF-8"));
      oStream.write(0);
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    String dsStr = "";
    for (DSInfo dsInfo : dsList)
    for (DSInfo dsInfo : replicaInfos.values())
    {
      dsStr += dsInfo.toString() + "\n----------------------------\n";
      dsStr += dsInfo + "\n----------------------------\n";
    }
    String rsStr = "";
    for (RSInfo rsInfo : rsList)
    for (RSInfo rsInfo : rsInfos)
    {
      rsStr += rsInfo.toString() + "\n----------------------------\n";
      rsStr += rsInfo + "\n----------------------------\n";
    }
    return ("TopologyMsg content: "
    return "TopologyMsg content:"
      + "\n----------------------------"
      + "\nCONNECTED DS SERVERS:"
      + "\n--------------------\n"
      + dsStr
      + "CONNECTED RS SERVERS:"
      + "\n--------------------\n"
      + rsStr + (rsStr.equals("") ? "----------------------------\n" : ""));
      + rsStr
      + (rsStr.equals("") ? "----------------------------\n" : "");
  }
  /**
   * Get the list of DS info.
   * @return The list of DS info
   * Get the DS infos.
   *
   * @return The DS infos
   */
  public List<DSInfo> getDsList()
  public Map<Integer, DSInfo> getReplicaInfos()
  {
    return dsList;
    return replicaInfos;
  }
  /**
   * Get the list of RS info.
   * @return The list of RS info
   * Get the RS infos.
   *
   * @return The RS infos
   */
  public List<RSInfo> getRsList()
  public List<RSInfo> getRsInfos()
  {
    return rsList;
    return rsInfos;
  }
}