mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.17.2014 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -37,6 +34,8 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
 * This class defines a message that is sent:
 * - By a RS to the other RSs in the topology, containing:
@@ -68,173 +67,102 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public TopologyMsg(byte[] in, short version) throws DataFormatException
  TopologyMsg(byte[] in, short version) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_TOPOLOGY)
    {
      /* First byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      Map<Integer, DSInfo> replicaInfos =
          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
      while (nDsInfo > 0 && pos < in.length)
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int dsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read DS URL */
        String dsUrl;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
        {
          length = getNextLength(in, pos);
          dsUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
        }
        else
        {
          dsUrl = "";
        }
        /* Read RS id */
        length = getNextLength(in, pos);
        serverIdString = new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
        /* Read DS assured flag */
        boolean assuredFlag = in[pos++] == 1;
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
        /* Read DS safe data level */
        byte safeDataLevel = in[pos++];
        /* Read DS group id */
        byte groupId = in[pos++];
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        pos = readStrings(in, pos, refUrls);
        Set<String> attrs = new HashSet<String>();
        Set<String> delattrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          pos = readStrings(in, pos, attrs);
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            pos = readStrings(in, pos, delattrs);
          }
          else
          {
            // Default to using the same set of attributes for deletes.
            delattrs.addAll(attrs);
          }
          /* Read Protocol version */
          protocolVersion = in[pos++];
        }
        /* Now create DSInfo and store it */
        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
            attrs, delattrs, protocolVersion));
        nDsInfo--;
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while (nRsInfo > 0 && pos < in.length)
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
        int weight = 1;
        String serverUrl = null;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          length = getNextLength(in, pos);
          serverUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
        }
        /* Now create RSInfo and store it */
        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
        nRsInfo--;
      }
      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
      this.rsInfos = Collections.unmodifiableList(rsInfos);
      throw new DataFormatException("Input is not a valid "
          + getClass().getCanonicalName());
    }
    catch (UnsupportedEncodingException e)
    // Read the DS info entries, first read number of them
    int nDsInfo = scanner.nextByte();
    final Map<Integer, DSInfo> replicaInfos =
        new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
    while (nDsInfo > 0 && !scanner.isEmpty())
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      final DSInfo dsInfo = nextDSInfo(scanner, version);
      replicaInfos.put(dsInfo.getDsId(), dsInfo);
      nDsInfo--;
    }
    // Read the RS info entries
    int nRsInfo = scanner.nextByte();
    final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
    while (nRsInfo > 0 && !scanner.isEmpty())
    {
      rsInfos.add(nextRSInfo(scanner, version));
      nRsInfo--;
    }
    this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
    this.rsInfos = Collections.unmodifiableList(rsInfos);
  }
  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
      throws DataFormatException, UnsupportedEncodingException
  private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
      throws DataFormatException
  {
    byte nAttrs = in[pos++];
    byte nRead = 0;
    // Read all elements until expected number read
    while (nRead != nAttrs && pos < in.length)
    final int dsId = scanner.nextIntUTF8();
    final String dsUrl =
        version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
    final int rsId = scanner.nextIntUTF8();
    final long generationId = scanner.nextLongUTF8();
    final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
    final boolean assuredFlag = scanner.nextBoolean();
    final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
    final byte safeDataLevel = scanner.nextByte();
    final byte groupId = scanner.nextByte();
    final List<String> refUrls = new ArrayList<String>();
    scanner.nextStrings(refUrls);
    final Set<String> attrs = new HashSet<String>();
    final Set<String> delattrs = new HashSet<String>();
    short protocolVersion = -1;
    if (version >= REPLICATION_PROTOCOL_V4)
    {
      int length = getNextLength(in, pos);
      outputCol.add(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      nRead++;
      scanner.nextStrings(attrs);
      if (version >= REPLICATION_PROTOCOL_V5)
      {
        scanner.nextStrings(delattrs);
      }
      else
      {
        // Default to using the same set of attributes for deletes.
        delattrs.addAll(attrs);
      }
      protocolVersion = scanner.nextByte();
    }
    return pos;
    return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
        assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
        protocolVersion);
  }
  private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
      throws DataFormatException
  {
    final int rsId = scanner.nextIntUTF8();
    final long generationId = scanner.nextLongUTF8();
    final byte groupId = scanner.nextByte();
    int weight = 1;
    String serverUrl = null;
    if (version >= REPLICATION_PROTOCOL_V4)
    {
      serverUrl = scanner.nextString();
      weight = scanner.nextIntUTF8();
    }
    return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
  }
  /**
   * Creates a new  message of the currently connected servers.
   * Creates a new message of the currently connected servers.
   *
   * @param dsInfos The collection of currently connected DS servers ID.
   * @param rsInfos The list of currently connected RS servers ID.
@@ -272,122 +200,62 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version) throws UnsupportedEncodingException
  public byte[] getBytes(short version)
  {
    try
    /**
     * Message has the following form:
     * <pdu type><number of following DSInfo entries>[<DSInfo>]*
     * <number of following RSInfo entries>[<RSInfo>]*
     */
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_TOPOLOGY);
    // Put DS infos
    builder.append((byte) replicaInfos.size());
    for (DSInfo dsInfo : replicaInfos.values())
    {
      /**
       * Message has the following form:
       * <pdu type><number of following DSInfo entries>[<DSInfo>]*
       * <number of following RSInfo entries>[<RSInfo>]*
       */
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_TOPOLOGY);
      // Put number of following DS info entries
      oStream.write((byte) replicaInfos.size());
      // Put DS info
      for (DSInfo dsInfo : replicaInfos.values())
      builder.appendUTF8(dsInfo.getDsId());
      if (version >= REPLICATION_PROTOCOL_V6)
      {
        // Put DS id
        byte[] byteServerId =
          String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
        {
          // Put DS URL
          oStream.write(dsInfo.getDsUrl().getBytes("UTF-8"));
          oStream.write(0);
        }
        // Put RS id
        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
        oStream.write(String.valueOf(dsInfo.getGenerationId()).
            getBytes("UTF-8"));
        oStream.write(0);
        // Put DS status
        oStream.write(dsInfo.getStatus().getValue());
        // Put DS assured flag
        oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
        // Put DS assured mode
        oStream.write(dsInfo.getAssuredMode().getValue());
        // Put DS safe data level
        oStream.write(dsInfo.getSafeDataLevel());
        // Put DS group id
        oStream.write(dsInfo.getGroupId());
        writeStrings(oStream, dsInfo.getRefUrls());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          writeStrings(oStream, dsInfo.getEclIncludes());
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
          }
          oStream.write(dsInfo.getProtocolVersion());
        }
        builder.append(dsInfo.getDsUrl());
      }
      builder.appendUTF8(dsInfo.getRsId());
      builder.appendUTF8(dsInfo.getGenerationId());
      builder.append(dsInfo.getStatus().getValue());
      builder.append(dsInfo.isAssured());
      builder.append(dsInfo.getAssuredMode().getValue());
      builder.append(dsInfo.getSafeDataLevel());
      builder.append(dsInfo.getGroupId());
      // Put number of following RS info entries
      oStream.write((byte) rsInfos.size());
      builder.appendStrings(dsInfo.getRefUrls());
      // Put RS info
      for (RSInfo rsInfo : rsInfos)
      if (version >= REPLICATION_PROTOCOL_V4)
      {
        // Put RS id
        byte[] byteServerId =
          String.valueOf(rsInfo.getId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
        oStream.write(String.valueOf(rsInfo.getGenerationId()).
          getBytes("UTF-8"));
        oStream.write(0);
        // Put RS group id
        oStream.write(rsInfo.getGroupId());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        builder.appendStrings(dsInfo.getEclIncludes());
        if (version >= REPLICATION_PROTOCOL_V5)
        {
          // Put server URL
          oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
          oStream.write(0);
          // Put RS weight
          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
          oStream.write(0);
          builder.appendStrings(dsInfo.getEclIncludesForDeletes());
        }
        builder.append((byte) dsInfo.getProtocolVersion());
      }
    }
      return oStream.toByteArray();
    }
    catch (IOException e)
    // Put RS infos
    builder.append((byte) rsInfos.size());
    for (RSInfo rsInfo : rsInfos)
    {
      // never happens
      throw new RuntimeException(e);
    }
  }
      builder.appendUTF8(rsInfo.getId());
      builder.appendUTF8(rsInfo.getGenerationId());
      builder.append(rsInfo.getGroupId());
  private void writeStrings(ByteArrayOutputStream oStream,
      Collection<String> col) throws IOException, UnsupportedEncodingException
  {
    // Put collection length as a byte
    oStream.write(col.size());
    for (String elem : col)
    {
      // Write the element and a 0 terminating byte
      oStream.write(elem.getBytes("UTF-8"));
      oStream.write(0);
      if (version >= REPLICATION_PROTOCOL_V4)
      {
        builder.append(rsInfo.getServerUrl());
        builder.appendUTF8(rsInfo.getWeight());
      }
    }
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
@@ -414,7 +282,7 @@
      + "CONNECTED RS SERVERS:"
      + "\n--------------------\n"
      + rsStr
      + (rsStr.equals("") ? "----------------------------\n" : "");
      + ("".equals(rsStr) ? "----------------------------\n" : "");
  }
  /**