mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.17.2014 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,16 +22,17 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
/**
 * Abstract class that must be extended to define a message
 * used for sending Updates between servers.
@@ -39,42 +40,31 @@
public class UpdateMsg extends ReplicationMsg
                                    implements Comparable<UpdateMsg>
{
  /**
   * Protocol version.
   */
  /** Protocol version. */
  protected short protocolVersion;
  /**
   * The CSN of this update.
   */
  /** The CSN of this update. */
  protected CSN csn;
  /**
   * True when the update must use assured replication.
   */
  /** True when the update must use assured replication. */
  protected boolean assuredFlag = false;
  /**
   * When assuredFlag is true, defines the requested assured mode.
   */
  /** When assuredFlag is true, defines the requested assured mode. */
  protected AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  /**
   * When assured mode is safe data, gives the requested level.
   */
  /** When assured mode is safe data, gives the requested level. */
  protected byte safeDataLevel = (byte)1;
  /**
   * The payload that must be encoded in this message.
   */
  private byte[] payload;
  /** The payload that must be encoded in this message. */
  private final byte[] payload;
  /**
   * Creates a new empty UpdateMsg.
   */
  protected UpdateMsg()
  {}
  {
    payload = null;
  }
  /**
   * Creates a new UpdateMsg with the given information.
@@ -85,25 +75,10 @@
   */
  UpdateMsg(byte[] bytes) throws DataFormatException
  {
    // Decode header
    int pos = decodeHeader(MSG_TYPE_GENERIC_UPDATE, bytes);
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    decodeHeader(MSG_TYPE_GENERIC_UPDATE, scanner);
    // Read the payload : all the remaining bytes but the terminating 0
    int length = bytes.length - pos;
    payload = new byte[length];
    try
    {
      System.arraycopy(bytes, pos, payload, 0, length);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    payload = scanner.remainingBytes();
  }
  /**
@@ -152,9 +127,7 @@
    assuredFlag = assured;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object obj)
  {
@@ -162,18 +135,14 @@
        csn.equals(((UpdateMsg) obj).csn);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    return csn.hashCode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int compareTo(UpdateMsg msg)
  {
@@ -243,103 +212,50 @@
   * Encode the common header for all the UpdateMsg. This uses the current
   * protocol version.
   *
   * @param type the type of UpdateMsg to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMsg.
   * @param version The ProtocolVersion to use when encoding.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMsg as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType The type of UpdateMsg to encode.
   * @param protocolVersion The ProtocolVersion to use when encoding.
   * @return a byte array builder containing the common header
   */
  protected byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  protected ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
  {
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><assured>
     * <assured mode> <safe data level>
     * the length of result byte array is therefore :
     *   1 + 1 + CSN length + 1 + 1
     *   + 1 + 1 + additional_length
     */
    int length = 6 + csnByte.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    // put the protocol version
    encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
    int pos = 2;
    // Put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // Put the assured flag
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // Put the assured mode
    encodedMsg[pos++] = assuredMode.getValue();
    // Put the safe data level
    encodedMsg[pos++] = safeDataLevel;
    return encodedMsg;
    final ByteArrayBuilder builder =
        new ByteArrayBuilder(bytes(6) + csnsUTF8(1));
    builder.append(msgType);
    builder.append((byte) ProtocolVersion.getCurrentVersion());
    builder.appendUTF8(getCSN());
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    return builder;
  }
  /**
   * Decode the Header part of this Update Message, and check its type.
   *
   * @param type The allowed type of this Update Message.
   * @param encodedMsg the encoded form of the UpdateMsg.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   * @param allowedType The allowed type of this Update Message.
   * @param scanner The encoded form of the UpdateMsg.
   * @throws DataFormatException
   *           if the scanner does not contain a valid common header.
   */
  protected int decodeHeader(byte type, byte[] encodedMsg)
                          throws DataFormatException
  protected void decodeHeader(byte allowedType, ByteArrayScanner scanner)
      throws DataFormatException
  {
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><assured>
     * <assured mode> <safe data level>
     */
    if (!(type == encodedMsg[0]))
    final byte msgType = scanner.nextByte();
    if (allowedType != msgType)
    {
      throw new DataFormatException("byte[] is not a valid update msg: "
        + encodedMsg[0]);
    // read the protocol version
    protocolVersion = encodedMsg[1];
    try
    {
      // Read the CSN
      int pos = 2;
      int length = getNextLength(encodedMsg, pos);
      String csnStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      csn = new CSN(csnStr);
      // Read the assured information
      assuredFlag = encodedMsg[pos++] == 1;
      // Read the assured mode
      assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
      // Read the safe data level
      safeDataLevel = encodedMsg[pos++];
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
          + msgType);
    }
    protocolVersion = scanner.nextByte();
    csn = scanner.nextCSNUTF8();
    assuredFlag = scanner.nextBoolean();
    assuredMode = AssuredMode.valueOf(scanner.nextByte());
    safeDataLevel = scanner.nextByte();
  }
  /**
@@ -347,10 +263,8 @@
   * protocol version.
   *
   * @return The encoded representation of this update message.
   * @throws UnsupportedEncodingException
   *           If the message could not be encoded.
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  public byte[] getBytes()
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
@@ -364,20 +278,11 @@
   */
  @Override
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    // Encode the header in a byte[] large enough to also contain the payload
    byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        payload.length, ProtocolVersion.getCurrentVersion());
    int pos = resultByteArray.length - payload.length;
    // Add the payload
    for (int i = 0; i < payload.length; i++, pos++)
    {
      resultByteArray[pos] = payload[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        ProtocolVersion.getCurrentVersion());
    builder.append(payload);
    return builder.toByteArray();
  }
  /**