mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.17.2014 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,17 +22,14 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.protocol.ProtocolVersion;
@@ -75,70 +72,6 @@
    serverIdToCSN.clear();
  }
  /**
   * Creates a new ServerState object from its encoded form.
   *
   * @param in The byte array containing the encoded ServerState form.
   * @param pos The position in the byte array where the encoded ServerState
   *            starts.
   * @param endpos The position in the byte array where the encoded ServerState
   *               ends.
   * @throws DataFormatException If the encoded form was not correct.
   */
  public ServerState(byte[] in, int pos, int endpos) throws DataFormatException
  {
    try
    {
      while (endpos > pos)
      {
        // FIXME JNR: why store the serverId separately from the CSN since the
        // CSN already contains the serverId?
        // read the ServerId
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int serverId = Integer.valueOf(serverIdString);
        pos += length +1;
        // read the CSN
        length = getNextLength(in, pos);
        String csnString = new String(in, pos, length, "UTF-8");
        CSN csn = new CSN(csnString);
        pos += length +1;
        // Add the serverId
        serverIdToCSN.put(serverId, csn);
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the length of the next String encoded in the in byte array.
   * This method is used to cut the different parts (serverIds, CSN)
   * of a server state.
   *
   * @param in the byte array where to calculate the string.
   * @param pos the position where to start from in the byte array.
   * @return the length of the next string.
   * @throws DataFormatException If the byte array does not end with null.
   */
  private int getNextLength(byte[] in, int pos) throws DataFormatException
  {
    int offset = pos;
    int length = 0;
    while (in[offset++] != 0)
    {
      if (offset >= in.length)
        throw new DataFormatException("byte[] is not a valid server state");
      length++;
    }
    return length;
  }
  /**
   * Forward update the Server State with a CSN. The provided CSN will be put on
   * the current object only if it is newer than the existing CSN for the same
@@ -151,7 +84,9 @@
  public boolean update(CSN csn)
  {
    if (csn == null)
    {
      return false;
    }
    saved = false;
@@ -191,7 +126,9 @@
  public boolean update(ServerState serverState)
  {
    if (serverState == null)
    {
      return false;
    }
    boolean updated = false;
    for (CSN csn : serverState.serverIdToCSN.values())
@@ -215,7 +152,9 @@
  public boolean removeCSN(CSN expectedCSN)
  {
    if (expectedCSN == null)
    {
      return false;
    }
    if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN))
    {
@@ -335,63 +274,18 @@
  }
  /**
   * Add the tail into resultByteArray at position pos.
   */
  private int addByteArray(byte[] tail, byte[] resultByteArray, int pos)
  {
    for (int i=0; i<tail.length; i++,pos++)
    {
      resultByteArray[pos] = tail[i];
    }
    resultByteArray[pos++] = 0;
    return pos;
  }
  /**
   * Encode this ServerState object and return its byte array representation.
   * Returns a copy of this ServerState's content as a Map of serverId => CSN.
   *
   * @return a byte array with an encoded representation of this object.
   * @throws UnsupportedEncodingException if UTF8 is not supported by the JVM.
   * @return a copy of this ServerState's content as a Map of serverId => CSN.
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  public Map<Integer, CSN> getServerIdToCSNMap()
  {
    // copy to protect from concurrent updates
    // that could change the number of elements in the Map
    final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN);
    final int size = copy.size();
    List<String> idList = new ArrayList<String>(size);
    List<String> csnList = new ArrayList<String>(size);
    // calculate the total length needed to allocate byte array
    int length = 0;
    for (Entry<Integer, CSN> entry : copy.entrySet())
    {
      // serverId is useless, see comment in ServerState ctor
      final String serverIdStr = String.valueOf(entry.getKey());
      idList.add(serverIdStr);
      length += serverIdStr.length() + 1;
      final String csnStr = entry.getValue().toString();
      csnList.add(csnStr);
      length += csnStr.length() + 1;
    }
    byte[] result = new byte[length];
    // write the server state into the byte array
    int pos = 0;
    for (int i = 0; i < size; i++)
    {
      String str = idList.get(i);
      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
      str = csnList.get(i);
      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
    }
    return result;
    return new HashMap<Integer, CSN>(serverIdToCSN);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public Iterator<CSN> iterator()
  {
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -26,7 +26,10 @@
 */
package org.opends.server.replication.plugin;
import java.io.*;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -59,7 +62,10 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -2063,9 +2069,6 @@
        {
          msg.encode();
          pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
        } catch (UnsupportedEncodingException e)
        {
          // will be caught at publish time.
        }
        catch (NoSuchElementException e)
        {
opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,13 +22,10 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -60,7 +57,7 @@
public class AckMsg extends ReplicationMsg
{
  /** CSN of the update that was acked. */
  private CSN csn;
  private final CSN csn;
  /**
   * Did some servers go in timeout when the matching update (corresponding to
@@ -159,50 +156,28 @@
   * @throws DataFormatException If in does not contain a properly encoded
   *                             AckMsg.
   */
  public AckMsg(byte[] in) throws DataFormatException
  AckMsg(byte[] in) throws DataFormatException
  {
    try
    /*
     * The message is stored in the form:
     * <operation type><CSN><has timeout><has degraded><has replay
     * error><failed server ids>
     */
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_ACK)
    {
      /*
       * The message is stored in the form:
       * <operation type><CSN><has timeout><has degraded><has replay
       * error><failed server ids>
       */
      throw new DataFormatException("byte[] is not a valid modify msg");
    }
      // First byte is the type
      if (in[0] != MSG_TYPE_ACK)
      {
        throw new DataFormatException("byte[] is not a valid modify msg");
      }
      int pos = 1;
    csn = scanner.nextCSNUTF8();
    hasTimeout = scanner.nextBoolean();
    hasWrongStatus = scanner.nextBoolean();
    hasReplayError = scanner.nextBoolean();
      // Read the CSN
      int length = getNextLength(in, pos);
      String csnStr = new String(in, pos, length, "UTF-8");
      csn = new CSN(csnStr);
      pos += length + 1;
      // Read the hasTimeout flag
      hasTimeout = in[pos++] == 1;
      // Read the hasWrongStatus flag
      hasWrongStatus = in[pos++] == 1;
      // Read the hasReplayError flag
      hasReplayError = in[pos++] == 1;
      // Read the list of failed server ids
      while (pos < in.length)
      {
        length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        Integer serverId = Integer.valueOf(serverIdString);
        failedServers.add(serverId);
        pos += length + 1;
      }
    } catch (UnsupportedEncodingException e)
    while (!scanner.isEmpty())
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      failedServers.add(scanner.nextIntUTF8());
    }
  }
@@ -216,53 +191,26 @@
    return csn;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    /*
     * The message is stored in the form:
     * <operation type><CSN><has timeout><has degraded><has replay
     * error><failed server ids>
     */
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ACK);
    builder.appendUTF8(csn);
    builder.append(hasTimeout);
    builder.append(hasWrongStatus);
    builder.append(hasReplayError);
    for (int serverId : failedServers)
    {
      /*
       * The message is stored in the form:
       * <operation type><CSN><has timeout><has degraded><has replay
       * error><failed server ids>
       */
      ByteArrayOutputStream oStream = new ByteArrayOutputStream(200);
      // Put the type of the operation
      oStream.write(MSG_TYPE_ACK);
      // Put the CSN
      byte[] csnByte = csn.toString().getBytes("UTF-8");
      oStream.write(csnByte);
      oStream.write(0);
      // Put the hasTimeout flag
      oStream.write(hasTimeout ? 1 : 0);
      // Put the hasWrongStatus flag
      oStream.write(hasWrongStatus ? 1 : 0);
      // Put the hasReplayError flag
      oStream.write(hasReplayError ? 1 : 0);
      // Put the list of server ids
      for (Integer sid : failedServers)
      {
        byte[] byteServerId = String.valueOf(sid).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
      }
      return oStream.toByteArray();
    } catch (IOException e)
    {
      // never happens
      return null;
      builder.appendUTF8(serverId);
    }
    return builder.toByteArray();
  }
  /**
@@ -307,21 +255,8 @@
   */
  public String errorsToString()
  {
    String idList;
    if (failedServers.size() > 0)
    {
      idList = "[";
      int size = failedServers.size();
      for (int i=0 ; i<size ; i++) {
        idList += failedServers.get(i);
        if ( i != (size-1) )
          idList += ", ";
      }
      idList += "]";
    } else
    {
      idList="none";
    }
    final String idList =
        !failedServers.isEmpty() ? failedServers.toString() : "none";
    return "hasTimeout: " + (hasTimeout ? "yes" : "no")  + ", " +
      "hasWrongStatus: " + (hasWrongStatus ? "yes" : "no")  + ", " +
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,7 @@
   * Creates a new AddMessage.
   * @param op the operation to use when creating the message
   */
  public AddMsg(PostOperationAddOperation op)
  AddMsg(PostOperationAddOperation op)
  {
    super((AddContext) op.getAttachment(SYNCHROCONTEXT), op.getEntryDN());
@@ -143,22 +142,19 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid AddMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
   */
  public AddMsg(byte[] in) throws DataFormatException,
                                  UnsupportedEncodingException
  public AddMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_ADD;
    allowedPduTypes[1] = MSG_TYPE_ADD_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_ADD, MSG_TYPE_ADD_V1);
    // protocol version has been read as part of the header
    if (protocolVersion <= 3)
      decodeBody_V123(in, pos);
    {
      decodeBody_V123(scanner);
    }
    else
    {
      decodeBody_V4(in, pos);
      decodeBody_V4(scanner);
    }
    if (protocolVersion==ProtocolVersion.getCurrentVersion())
    {
@@ -189,122 +185,37 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    int bodyLength = encodedAttributes.length;
    byte[] byteParentId = null;
    if (parentEntryUUID != null)
    {
      byteParentId = parentEntryUUID.getBytes("UTF-8");
      bodyLength += byteParentId.length + 1;
    }
    else
    {
      bodyLength += 1;
    }
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] resultByteArray = encodeHeader_V1(MSG_TYPE_ADD_V1, bodyLength);
    int pos = resultByteArray.length - bodyLength;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, resultByteArray, pos);
    else
      resultByteArray[pos++] = 0;
    /* put the attributes */
    for (int i=0; i<encodedAttributes.length; i++,pos++)
    {
      resultByteArray[pos] = encodedAttributes[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_ADD_V1);
    builder.append(parentEntryUUID);
    builder.append(encodedAttributes);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    // Put together the different encoded pieces
    int bodyLength = encodedAttributes.length;
    // Compute the total length of the body
    byte[] byteParentId = null;
    if (parentEntryUUID != null)
    {
      // Encode parentID now to get the length of the encoded bytes
      byteParentId = parentEntryUUID.getBytes("UTF-8");
      bodyLength += byteParentId.length + 1;
    }
    else
    {
      bodyLength += 1;
    }
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, bodyLength,
          ProtocolVersion.REPLICATION_PROTOCOL_V3);
    int pos = resultByteArray.length - bodyLength;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, resultByteArray, pos);
    else
      resultByteArray[pos++] = 0;
    /* put the attributes */
    for (int i=0; i<encodedAttributes.length; i++,pos++)
    {
      resultByteArray[pos] = encodedAttributes[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_ADD, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    builder.append(parentEntryUUID);
    builder.append(encodedAttributes);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    // Put together the different encoded pieces
    int bodyLength = 0;
    // Compute the total length of the body
    byte[] byteParentId = null;
    if (parentEntryUUID != null)
    {
      // Encode parentID now to get the length of the encoded bytes
      byteParentId = parentEntryUUID.getBytes("UTF-8");
      bodyLength += byteParentId.length + 1;
    }
    else
    {
      bodyLength += 1;
    }
    byte[] byteAttrLen =
      String.valueOf(encodedAttributes.length).getBytes("UTF-8");
    bodyLength += byteAttrLen.length + 1;
    bodyLength += encodedAttributes.length + 1;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] encodedMsg = encodeHeader(MSG_TYPE_ADD, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, encodedMsg, pos);
    else
      encodedMsg[pos++] = 0;
    pos = addByteArray(byteAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedAttributes, encodedMsg, pos);
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_ADD, protocolVersion);
    builder.append(parentEntryUUID);
    builder.appendUTF8(encodedAttributes.length);
    builder.appendZeroTerminated(encodedAttributes);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.appendZeroTerminated(encodedEclIncludes);
    return builder.toByteArray();
  }
  private byte[] encodeAttributes(
@@ -368,11 +279,17 @@
      new LDAPAttribute(objectClass).write(writer);
      for (Attribute a : userAttributes)
      {
        new LDAPAttribute(a).write(writer);
      }
      if (operationalAttributes != null)
      {
        for (Attribute a : operationalAttributes)
        {
          new LDAPAttribute(a).write(writer);
        }
      }
    }
    catch(Exception e)
    {
@@ -385,89 +302,24 @@
  // Msg decoding
  // ============
  private void decodeBody_V123(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V123(ByteArrayScanner scanner)
      throws DataFormatException
  {
    // read the parent unique Id
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      parentEntryUUID = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    else
    {
      parentEntryUUID = null;
      pos += 1;
    }
    // Read/Don't decode attributes : all the remaining bytes
    encodedAttributes = new byte[in.length-pos];
    int i =0;
    while (pos<in.length)
    {
      encodedAttributes[i++] = in[pos++];
    }
    parentEntryUUID = scanner.nextString();
    encodedAttributes = scanner.remainingBytes();
  }
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    // read the parent unique Id
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      parentEntryUUID = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    else
    {
      parentEntryUUID = null;
      pos += 1;
    }
    parentEntryUUID = scanner.nextString();
    // Read attr len
    length = getNextLength(in, pos);
    int attrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    final int attrLen = scanner.nextIntUTF8();
    encodedAttributes = scanner.nextByteArray(attrLen);
    scanner.skipZeroSeparator();
    // Read/Don't decode attributes
    this.encodedAttributes = new byte[attrLen];
    try
    {
      System.arraycopy(in, pos, encodedAttributes, 0, attrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    pos += attrLen + 1;
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
@@ -26,9 +26,15 @@
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DN;
/**
 * Byte array builder class encodes data into byte arrays to send messages over
@@ -42,8 +48,6 @@
public class ByteArrayBuilder
{
  /** This is the null byte, also known as zero byte. */
  public static final byte NULL_BYTE = 0;
  private final ByteStringBuilder builder;
  /**
@@ -51,7 +55,7 @@
   */
  public ByteArrayBuilder()
  {
    builder = new ByteStringBuilder();
    builder = new ByteStringBuilder(256);
  }
  /**
@@ -165,7 +169,8 @@
   */
  public ByteArrayBuilder appendStrings(Collection<String> col)
  {
    append(col.size());
    //append(int) would have been safer, but byte is compatible with legacy code
    append((byte) col.size());
    for (String s : col)
    {
      append(s);
@@ -174,23 +179,28 @@
  }
  /**
   * Append a String to this ByteArrayBuilder.
   * Append a String with a zero separator to this ByteArrayBuilder,
   * or only the zero separator if the string is null
   * or if the string length is zero.
   *
   * @param s
   *          the String to append.
   *          the String to append. Can be null.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(String s)
  {
    try
    {
      append(s.getBytes("UTF-8"));
      if (s != null && s.length() > 0)
      {
        append(s.getBytes("UTF-8"));
      }
      return appendZeroSeparator();
    }
    catch (UnsupportedEncodingException e)
    {
      throw new RuntimeException("Should never happen", e);
    }
    return this;
  }
  /**
@@ -220,17 +230,93 @@
    return this;
  }
  private ByteArrayBuilder append(byte[] sBytes)
  /**
   * Append a DN to this ByteArrayBuilder by converting it to a String then
   * encoding that string to a UTF-8 byte array.
   *
   * @param dn
   *          the DN to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(DN dn)
  {
    for (byte b : sBytes)
    {
      append(b);
    }
    append((byte) 0); // zero separator
    append(dn.toString());
    return this;
  }
  /**
   * Append all the bytes from the byte array to this ByteArrayBuilder.
   *
   * @param bytes
   *          the byte array to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(byte[] bytes)
  {
    builder.append(bytes);
    return this;
  }
  /**
   * Append all the bytes from the byte array to this ByteArrayBuilder
   * and then append a final zero byte separator for compatibility
   * with legacy implementations.
   *
   * @param bytes
   *          the byte array to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder appendZeroTerminated(byte[] bytes)
  {
    builder.append(bytes);
    return appendZeroSeparator();
  }
  private ByteArrayBuilder appendZeroSeparator()
  {
    builder.append((byte) 0);
    return this;
  }
  /**
   * Append the byte representation of a ServerState to this ByteArrayBuilder
   * and then append a final zero byte separator.
   * <p>
   * Caution: ServerState MUST be the last field. Because ServerState can
   * contain null character (string termination of serverId string ..) it cannot
   * be decoded using {@link ByteArrayScanner#nextString()} like the other
   * fields. The only way is to rely on the end of the input buffer: and that
   * forces the ServerState to be the last field. This should be changed if we
   * want to have more than one ServerState field.
   *
   * @param serverState
   *          the ServerState to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(ServerState serverState)
  {
    final Map<Integer, CSN> serverIdToCSN = serverState.getServerIdToCSNMap();
    for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet())
    {
      // FIXME JNR: why append the serverId in addition to the CSN
      // since the CSN already contains the serverId?
      appendUTF8(entry.getKey()); // serverId
      appendUTF8(entry.getValue()); // CSN
    }
    return appendZeroSeparator(); // stupid legacy zero separator
  }
  /**
   * Returns a new ASN1Writer that will append bytes to this ByteArrayBuilder.
   *
   * @return a new ASN1Writer that will append bytes to this ByteArrayBuilder.
   */
  public ASN1Writer getASN1Writer()
  {
    return ASN1.getWriter(builder);
  }
  /**
   * Converts the content of this ByteStringBuilder to a byte array.
   *
   * @return the content of this ByteStringBuilder converted to a byte array.
opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
@@ -27,9 +27,14 @@
import java.util.Collection;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * Byte array scanner class helps decode data from byte arrays received via
@@ -44,6 +49,7 @@
{
  private final ByteSequenceReader bytes;
  private final byte[] byteArray;
  /**
   * Builds a ByteArrayScanner object that will read from the supplied byte
@@ -55,6 +61,7 @@
  public ByteArrayScanner(byte[] bytes)
  {
    this.bytes = ByteString.wrap(bytes).asReader();
    this.byteArray = bytes;
  }
  /**
@@ -172,7 +179,7 @@
  /**
   * Reads the next UTF8-encoded string.
   *
   * @return the next UTF8-encoded string.
   * @return the next UTF8-encoded string or null if the string length is zero
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
@@ -180,9 +187,15 @@
  {
    try
    {
      final String s = bytes.getString(findZeroSeparator());
      bytes.skip(1); // skip the zero separator
      return s;
      final int offset = findZeroSeparator();
      if (offset > 0)
      {
        final String s = bytes.getString(offset);
        skipZeroSeparator();
        return s;
      }
      skipZeroSeparator();
      return null;
    }
    catch (IndexOutOfBoundsException e)
    {
@@ -220,7 +233,8 @@
  public <TCol extends Collection<String>> TCol nextStrings(TCol output)
      throws DataFormatException
  {
    final int colSize = nextInt();
    // nextInt() would have been safer, but byte is compatible with legacy code.
    final int colSize = nextByte();
    for (int i = 0; i < colSize; i++)
    {
      output.add(nextString());
@@ -269,6 +283,127 @@
  }
  /**
   * Reads the next DN.
   *
   * @return the next DN.
   * @throws DataFormatException
   *           if DN was incorrectly encoded or no more data can be read from
   *           the input
   */
  public DN nextDN() throws DataFormatException
  {
    try
    {
      return DN.decode(nextString());
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
  }
  /**
   * Return a new byte array containing all remaining bytes in this
   * ByteArrayScanner.
   *
   * @return new byte array containing all remaining bytes
   */
  public byte[] remainingBytes()
  {
    final int length = byteArray.length - bytes.position();
    return nextByteArray(length);
  }
  /**
   * Return a new byte array containing all remaining bytes in this
   * ByteArrayScanner bar the last one which is a zero terminated byte
   * (compatible with legacy code).
   *
   * @return new byte array containing all remaining bytes bar the last one
   */
  public byte[] remainingBytesZeroTerminated()
  {
    /* do not copy stupid legacy zero separator */
    final int length = byteArray.length - (bytes.position() + 1);
    final byte[] result = nextByteArray(length);
    bytes.skip(1); // ignore last (supposedly) zero byte
    return result;
  }
  /**
   * Return a new byte array containing the requested number of bytes.
   *
   * @param length
   *          the number of bytes to be read and copied to the new byte array.
   * @return new byte array containing the requested number of bytes.
   */
  public byte[] nextByteArray(final int length)
  {
    final byte[] result = new byte[length];
    System.arraycopy(byteArray, bytes.position(), result, 0, length);
    bytes.skip(length);
    return result;
  }
  /**
   * Reads the next ServerState.
   * <p>
   * Caution: ServerState MUST be the last field (see
   * {@link ByteArrayBuilder#append(ServerState)} javadoc).
   *
   * @return the next ServerState.
   * @throws DataFormatException
   *           if ServerState was incorrectly encoded or no more data can be
   *           read from the input
   * @see ByteArrayBuilder#append(ServerState)
   */
  public ServerState nextServerState() throws DataFormatException
  {
    final ServerState result = new ServerState();
    final int maxPos = byteArray.length - 1 /* stupid legacy zero separator */;
    while (bytes.position() < maxPos)
    {
      final int serverId = nextIntUTF8();
      final CSN csn = nextCSNUTF8();
      if (serverId != csn.getServerId())
      {
        throw new DataFormatException("Expected serverId=" + serverId
            + " to be the same as serverId for CSN=" + csn);
      }
      result.update(csn);
    }
    skipZeroSeparator();
    return result;
  }
  /**
   * Skips the next byte and verifies it is effectively the zero separator.
   *
   * @throws DataFormatException
   *           if the next byte is not the zero separator.
   */
  public void skipZeroSeparator() throws DataFormatException
  {
    if (bytes.peek() != (byte) 0)
    {
      throw new DataFormatException("Expected a zero separator at position "
          + bytes.position() + " but found byte " + bytes.peek());
    }
    bytes.skip(1);
  }
  /**
   * Returns a new ASN1Reader that will read bytes from this ByteArrayScanner.
   *
   * @return a new ASN1Reader that will read bytes from this ByteArrayScanner.
   */
  public ASN1Reader getASN1Reader()
  {
    return ASN1.getReader(bytes);
  }
  /**
   * Returns whether the scanner has more bytes to consume.
   *
   * @return true if the scanner has more bytes to consume, false otherwise.
@@ -278,4 +413,10 @@
    return bytes.remaining() == 0;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return bytes.toString();
  }
}
opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
@@ -22,11 +22,12 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerStatus;
/**
@@ -36,10 +37,10 @@
 */
public class ChangeStatusMsg extends ReplicationMsg
{
  // The status we want the DS to enter (used when from RS to DS)
  private ServerStatus requestedStatus = ServerStatus.INVALID_STATUS;
  // The new status the DS just entered (used when from DS to RS)
  private ServerStatus newStatus = ServerStatus.INVALID_STATUS;
  /** The status we want the DS to enter (used when from RS to DS) */
  private final ServerStatus requestedStatus;
  /** The new status the DS just entered (used when from DS to RS) */
  private ServerStatus newStatus;
  /**
   * Create a new ChangeStatusMsg.
@@ -61,25 +62,19 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ChangeStatusMsg.
   */
  public ChangeStatusMsg(byte[] encodedMsg) throws DataFormatException
  ChangeStatusMsg(byte[] encodedMsg) throws DataFormatException
  {
    /*
     * The message is stored in the form:
     * <message type><requested status><new status>
     */
    /* First byte is the type */
    if (encodedMsg[0] != ReplicationMsg.MSG_TYPE_CHANGE_STATUS)
    {
      throw new DataFormatException("byte[] is not a valid msg");
    }
    try
    {
      /* Then the requested status */
      if (encodedMsg[0] != ReplicationMsg.MSG_TYPE_CHANGE_STATUS)
      {
        throw new DataFormatException("byte[] is not a valid msg");
      }
      requestedStatus = ServerStatus.valueOf(encodedMsg[1]);
      /* Then the new status */
      newStatus = ServerStatus.valueOf(encodedMsg[2]);
    } catch (IllegalArgumentException e)
    {
@@ -87,9 +82,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -97,18 +90,12 @@
     * The message is stored in the form:
     * <message type><requested status><new status>
     */
    byte[] encodedMsg = new byte[3];
    /* Put the type of the operation */
    encodedMsg[0] = ReplicationMsg.MSG_TYPE_CHANGE_STATUS;
    /* Put the requested status */
    encodedMsg[1] = requestedStatus.getValue();
    /* Put the requested status */
    encodedMsg[2] = newStatus.getValue();
    return encodedMsg;
    return new byte[]
    {
      ReplicationMsg.MSG_TYPE_CHANGE_STATUS,
      requestedStatus.getValue(),
      newStatus.getValue()
    };
  }
  /**
@@ -129,9 +116,7 @@
    return newStatus;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.controls.SubtreeDeleteControl;
@@ -54,14 +53,14 @@
   *
   * @param operation the Operation from which the message must be created.
   */
  public DeleteMsg(PostOperationDeleteOperation operation)
  DeleteMsg(PostOperationDeleteOperation operation)
  {
    super((OperationContext) operation.getAttachment(SYNCHROCONTEXT),
           operation.getEntryDN());
    try
    {
      if (operation.getRequestControl(SubtreeDeleteControl.DECODER) != null)
        isSubtreeDelete = true;
      isSubtreeDelete =
          operation.getRequestControl(SubtreeDeleteControl.DECODER) != null;
    }
    catch(Exception e)
    {/* do nothing */}
@@ -84,19 +83,16 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid DeleteMsg
   * @throws UnsupportedEncodingException  If UTF8 is not supported by the jvm
   */
  public DeleteMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  DeleteMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_DELETE;
    allowedPduTypes[1] = MSG_TYPE_DELETE_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_DELETE, MSG_TYPE_DELETE_V1);
    // protocol version has been read as part of the header
    if (protocolVersion >= 4)
      decodeBody_V4(in, pos);
    {
      decodeBody_V4(scanner);
    }
    else
    {
      // Keep the previous protocol version behavior - when we don't know the
@@ -115,7 +111,9 @@
        InternalClientConnection.nextMessageID(), null, newDN);
    if (isSubtreeDelete)
    {
      del.addRequestControl(new SubtreeDeleteControl(false));
    }
    DeleteContext ctx = new DeleteContext(getCSN(), getEntryUUID());
    del.setAttachment(SYNCHROCONTEXT, ctx);
@@ -128,108 +126,47 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    return encodeHeader_V1(MSG_TYPE_DELETE_V1, 0);
    return encodeHeader_V1(MSG_TYPE_DELETE_V1)
        .toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    return encodeHeader(MSG_TYPE_DELETE, 0,
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    return encodeHeader(MSG_TYPE_DELETE,ProtocolVersion.REPLICATION_PROTOCOL_V3)
        .toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    // Put together the different encoded pieces
    int bodyLength = 0;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    byte[] byteInitiatorsName = null;
    if (initiatorsName != null)
    {
      byteInitiatorsName = initiatorsName.getBytes("UTF-8");
      bodyLength += byteInitiatorsName.length + 1;
    }
    else
    {
      bodyLength++;
    }
    // subtree flag
    bodyLength++;
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] encodedMsg = encodeHeader(MSG_TYPE_DELETE, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    if (byteInitiatorsName != null)
      pos = addByteArray(byteInitiatorsName, encodedMsg, pos);
    else
      encodedMsg[pos++] = 0;
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    encodedMsg[pos++] = (byte) (isSubtreeDelete ? 1 : 0);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_DELETE, protocolVersion);
    builder.append(initiatorsName);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.appendZeroTerminated(encodedEclIncludes);
    builder.append(isSubtreeDelete);
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      initiatorsName = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    else
    {
      initiatorsName = null;
      pos += 1;
    }
    initiatorsName = scanner.nextString();
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    // Skip the length
    pos += length + 1;
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
    scanner.skipZeroSeparator();
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      // Copy ecl attr
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
      // Skip the attrs
      pos += eclAttrLen +1;
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    // subtree flag
    isSubtreeDelete = (in[pos] == 1);
    isSubtreeDelete = scanner.nextBoolean();
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -54,65 +53,26 @@
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   */
  public DoneMsg(byte[] in) throws DataFormatException
  DoneMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_DONE)
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_DONE)
        throw new DataFormatException("input is not a valid DoneMessage");
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      throw new DataFormatException("input is not a valid DoneMessage");
    }
    this.senderID = scanner.nextIntUTF8();
    this.destination = scanner.nextIntUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_DONE;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_DONE);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    return builder.toByteArray();
  }
}
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -73,56 +72,33 @@
   * @param in The byte array containing the encoded form of the message.
   * @throws DataFormatException If the byte array does not contain
   *         a valid encoded form of the message.
   * @throws UnsupportedEncodingException when it occurs.
   * @throws NotSupportedOldVersionPDUException when it occurs.
   */
  public ECLUpdateMsg(byte[] in)
   throws DataFormatException,
          UnsupportedEncodingException,
          NotSupportedOldVersionPDUException
  ECLUpdateMsg(byte[] in) throws DataFormatException,
      NotSupportedOldVersionPDUException
  {
    try
    {
      if (in[0] != MSG_TYPE_ECL_UPDATE)
      final ByteArrayScanner scanner = new ByteArrayScanner(in);
      if (scanner.nextByte() != MSG_TYPE_ECL_UPDATE)
      {
        throw new DataFormatException("byte[] is not a valid " +
            getClass().getCanonicalName());
      }
      int pos = 1;
      // Decode the cookie
      int length = getNextLength(in, pos);
      String cookieStr = new String(in, pos, length, "UTF-8");
      this.cookie = new MultiDomainServerState(cookieStr);
      pos += length + 1;
      // Decode the baseDN
      length = getNextLength(in, pos);
      this.baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // Decode the changeNumber
      length = getNextLength(in, pos);
      this.changeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      this.cookie = new MultiDomainServerState(scanner.nextString());
      this.baseDN = scanner.nextDN();
      this.changeNumber = scanner.nextIntUTF8();
      // Decode the msg
      /* Read the mods : all the remaining bytes but the terminating 0 */
      length = in.length - pos - 1;
      byte[] encodedMsg = new byte[length];
      System.arraycopy(in, pos, encodedMsg, 0, length);
      ReplicationMsg rmsg = ReplicationMsg.generateMsg(
            encodedMsg, ProtocolVersion.getCurrentVersion());
      this.updateMsg = (LDAPUpdateMsg)rmsg;
      this.updateMsg = (LDAPUpdateMsg) ReplicationMsg.generateMsg(
          scanner.remainingBytesZeroTerminated(),
          ProtocolVersion.getCurrentVersion());
    }
    catch(DirectoryException de)
    {
      throw new DataFormatException(de.toString());
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
@@ -162,9 +138,7 @@
    return updateMsg;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -175,39 +149,19 @@
    " serviceId: " + baseDN + "]";
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8");
    byte[] byteBaseDN = String.valueOf(baseDN).getBytes("UTF-8");
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ECL_UPDATE);
    builder.append(String.valueOf(cookie));
    builder.append(baseDN);
    // FIXME JNR Changing the line below to use long would require a protocol
    // version change. Leave it like this for now until the need arises.
    byte[] byteChangeNumber =
        Integer.toString((int) changeNumber).getBytes("UTF-8");
    byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion);
    int length = 1 + byteCookie.length +
                 1 + byteBaseDN.length +
                 1 + byteChangeNumber.length +
                 1 + byteUpdateMsg.length + 1;
    byte[] resultByteArray = new byte[length];
    /* Encode type */
    resultByteArray[0] = MSG_TYPE_ECL_UPDATE;
    int pos = 1;
    // Encode all fields
    pos = addByteArray(byteCookie, resultByteArray, pos);
    pos = addByteArray(byteBaseDN, resultByteArray, pos);
    pos = addByteArray(byteChangeNumber, resultByteArray, pos);
    pos = addByteArray(byteUpdateMsg, resultByteArray, pos);
    return resultByteArray;
    builder.appendUTF8((int) changeNumber);
    builder.appendZeroTerminated(updateMsg.getBytes(protocolVersion));
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -37,51 +36,39 @@
 */
public class EntryMsg extends RoutableMsg
{
  // The byte array containing the bytes of the entry transported
  private byte[] entryByteArray;
  /** The byte array containing the bytes of the entry transported. */
  private final byte[] entryByteArray;
  private int msgId = -1; // from V4
  /**
   * Creates a new EntryMsg.
   *
   * @param sender      The sender of this message.
   * @param serverID      The sender of this message.
   * @param destination The destination of this message.
   * @param entryBytes  The bytes of the entry.
   * @param msgId       Message counter.
   */
  public EntryMsg(
      int sender,
      int destination,
      byte[] entryBytes,
      int msgId)
  public EntryMsg(int serverID, int destination, byte[] entryBytes, int msgId)
  {
    super(sender, destination);
    this.entryByteArray = new byte[entryBytes.length];
    System.arraycopy(entryBytes, 0, this.entryByteArray, 0, entryBytes.length);
    this.msgId = msgId;
    this(serverID, destination, entryBytes, 0, entryBytes.length, msgId);
  }
  /**
   * Creates a new EntryMsg.
   *
   * @param serverID    The sender of this message.
   * @param i           The destination of this message.
   * @param destination The destination of this message.
   * @param entryBytes  The bytes of the entry.
   * @param pos         The starting Position in the array.
   * @param startPos    The starting Position in the array.
   * @param length      Number of array elements to be copied.
   * @param msgId       Message counter.
   */
  public EntryMsg(
      int serverID,
      int i,
      byte[] entryBytes,
      int pos,
      int length,
      int msgId)
  public EntryMsg(int serverID, int destination, byte[] entryBytes, int startPos,
      int length, int msgId)
  {
    super(serverID, i);
    super(serverID, destination);
    this.entryByteArray = new byte[length];
    System.arraycopy(entryBytes, pos, this.entryByteArray, 0, length);
    System.arraycopy(entryBytes, startPos, this.entryByteArray, 0, length);
    this.msgId = msgId;
  }
@@ -93,47 +80,22 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public EntryMsg(byte[] in, short version) throws DataFormatException
  EntryMsg(byte[] in, short version) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_ENTRY)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ENTRY)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderIDString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderIDString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
      // msgCnt
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // msgCnt
        length = getNextLength(in, pos);
        String msgcntString = new String(in, pos, length, "UTF-8");
        this.msgId = Integer.valueOf(msgcntString);
        pos += length +1;
      }
      // data
      length = in.length - (pos + 1);
      this.entryByteArray = new byte[length];
      System.arraycopy(in, pos, entryByteArray, 0, length);
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    catch (UnsupportedEncodingException e)
    this.senderID = scanner.nextIntUTF8();
    this.destination = scanner.nextIntUTF8();
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      this.msgId = scanner.nextIntUTF8();
    }
    this.entryByteArray = scanner.remainingBytesZeroTerminated();
  }
  /**
@@ -145,46 +107,20 @@
    return entryByteArray;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] msgCntBytes = null;
      byte[] entryBytes = entryByteArray;
      int length = 1 + senderBytes.length +
                   1 + destinationBytes.length +
                   1 + entryBytes.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        msgCntBytes = String.valueOf(msgId).getBytes("UTF-8");
        length += (1 + msgCntBytes.length);
      }
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_ENTRY;
      int pos = 1;
      pos = addByteArray(senderBytes, resultByteArray, pos);
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        pos = addByteArray(msgCntBytes, resultByteArray, pos);
      pos = addByteArray(entryBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ENTRY);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      return null;
      builder.appendUTF8(msgId);
    }
    builder.appendZeroTerminated(entryByteArray);
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -22,20 +22,17 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This message is part of the replication protocol.
 * This message is sent by a server or a replication server when an error
@@ -43,18 +40,21 @@
 */
public class ErrorMsg extends RoutableMsg
{
  // The tracer object for the debug logger
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  // Specifies the messageID built from the error that was detected
  private int msgID;
  /** Specifies the messageID built from the error that was detected */
  private final int msgID;
  // Specifies the complementary details about the error that was detected
  private Message details = null;
  /** Specifies the complementary details about the error that was detected */
  private final Message details;
  // The time of creation of this message.
  //                                        protocol version previous to V4
  private Long creationTime = System.currentTimeMillis();
  /**
   * The time of creation of this message.
   * <p>
   * protocol version previous to V4
   */
  private long creationTime = System.currentTimeMillis();
  /**
   * Creates an ErrorMsg providing the destination server.
@@ -63,8 +63,7 @@
   * @param destination The destination server or servers of this message.
   * @param details The message containing the details of the error.
   */
  public ErrorMsg(int sender, int destination,
                      Message details)
  public ErrorMsg(int sender, int destination, Message details)
  {
    super(sender, destination);
    this.msgID  = details.getDescriptor().getId();
@@ -72,8 +71,10 @@
    this.creationTime = System.currentTimeMillis();
    if (debugEnabled())
      TRACER.debugInfo(" Creating error message" + this.toString()
          + " " + stackTraceToSingleLineString(new Exception("trace")));
    {
      TRACER.debugInfo(" Creating error message" + this + " "
          + stackTraceToSingleLineString(new Exception("trace")));
    }
  }
  /**
@@ -90,7 +91,9 @@
    this.creationTime = System.currentTimeMillis();
    if (debugEnabled())
      TRACER.debugInfo(this.toString());
    {
      TRACER.debugInfo(toString());
    }
  }
  /**
@@ -101,53 +104,23 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded message.
   */
  public ErrorMsg(byte[] in, short version)
  throws DataFormatException
  ErrorMsg(byte[] in, short version) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_ERROR)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ERROR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(serverIdString);
      pos += length +1;
      // MsgID
      length = getNextLength(in, pos);
      String msgIdString = new String(in, pos, length, "UTF-8");
      msgID = Integer.valueOf(msgIdString);
      pos += length +1;
      // Details
      length = getNextLength(in, pos);
      details = Message.raw(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // Creation Time
        length = getNextLength(in, pos);
        String creationTimeString = new String(in, pos, length, "UTF-8");
        creationTime = Long.valueOf(creationTimeString);
        pos += length +1;
      }
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    catch (UnsupportedEncodingException e)
    senderID = scanner.nextIntUTF8();
    destination = scanner.nextIntUTF8();
    msgID = scanner.nextIntUTF8();
    details = Message.raw(scanner.nextString());
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      creationTime = scanner.nextLongUTF8();
    }
  }
@@ -175,60 +148,21 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
      byte[] byteDetails = details.toString().getBytes("UTF-8");
      byte[] byteCreationTime = null;
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteErrMsgId.length + 1
                     + byteDetails.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        byteCreationTime = creationTime.toString().getBytes("UTF-8");
        length += byteCreationTime.length + 1;
      }
      byte[] resultByteArray = new byte[length];
      // put the type of the operation
      resultByteArray[0] = MSG_TYPE_ERROR;
      int pos = 1;
      // sender
      pos = addByteArray(byteSender, resultByteArray, pos);
      // destination
      pos = addByteArray(byteDestination, resultByteArray, pos);
      // MsgId
      pos = addByteArray(byteErrMsgId, resultByteArray, pos);
      // details
      pos = addByteArray(byteDetails, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // creation time
        pos = addByteArray(byteCreationTime, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ERROR);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    builder.appendUTF8(msgID);
    builder.append(details.toString());
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      return null;
      builder.appendUTF8(creationTime);
    }
    return builder.toByteArray();
  }
  /**
@@ -236,6 +170,7 @@
   *
   * @return the string representation of this message.
   */
  @Override
  public String toString()
  {
    return "ErrorMessage=["+
@@ -254,7 +189,7 @@
   *
   * @return the creation time of this message.
   */
  public Long getCreationTime()
  public long getCreationTime()
  {
    return creationTime;
  }
opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
@@ -22,9 +22,8 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
@@ -52,20 +51,18 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public HeartbeatMsg(byte[] in) throws DataFormatException
  HeartbeatMsg(byte[] in) throws DataFormatException
  {
    /* The heartbeat message is encoded in the form :
     * <msg-type>
     */
    /* first byte is the type */
    if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT)
    {
      throw new DataFormatException("Input is not a valid Heartbeat Message.");
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -73,13 +70,7 @@
     * The heartbeat message contains:
     * <msg-type>
     */
    int length = 1;
    byte[] resultByteArray = new byte[length];
    /* put the message type */
    resultByteArray[0] = MSG_TYPE_HEARTBEAT;
    return resultByteArray;
    return new byte[] { MSG_TYPE_HEARTBEAT };
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
@@ -43,7 +41,6 @@
{
  private final int numAck;
  /**
   * Create a new message..
   *
@@ -65,84 +62,37 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the message.
   */
  public InitializeRcvAckMsg(byte[] in) throws DataFormatException
  InitializeRcvAckMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_INITIALIZE_RCV_ACK)
    {
      // msg type
      if (in[0] != MSG_TYPE_INITIALIZE_RCV_ACK)
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(serverIdString);
      pos += length +1;
      // value fo the ack
      length = getNextLength(in, pos);
      String numAckStr = new String(in, pos, length, "UTF-8");
      pos += length +1;
      numAck = Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    senderID = scanner.nextIntUTF8();
    destination = scanner.nextIntUTF8();
    numAck = scanner.nextIntUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteNumAck.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_INITIALIZE_RCV_ACK;
      int pos = 1;
      // sender
      pos = addByteArray(byteSender, resultByteArray, pos);
      // destination
      pos = addByteArray(byteDestination, resultByteArray, pos);
      // ack value
      pos = addByteArray(byteNumAck, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_INITIALIZE_RCV_ACK);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    builder.appendUTF8(numAck);
    return builder.toByteArray();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return this.getClass().getSimpleName()  + "=["+
    return getClass().getSimpleName() + "=[" +
      " sender=" + this.senderID +
      " destination=" + this.destination +
      " msgID=" + this.numAck + "]";
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -22,15 +22,13 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is part of the replication protocol.
@@ -40,7 +38,7 @@
 */
public class InitializeRequestMsg extends RoutableMsg
{
  private DN baseDN;
  private final DN baseDN;
  private int initWindow = 0;
  /**
@@ -66,51 +64,22 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeRequestMsg(byte[] in, short version)
  throws DataFormatException
  InitializeRequestMsg(byte[] in, short version) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_INITIALIZE_REQUEST)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_INITIALIZE_REQUEST)
        throw new DataFormatException(
            "input is not a valid InitializeRequestMessage");
      int pos = 1;
      // baseDN
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // sender
      length = getNextLength(in, pos);
      String sourceServerIdString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(sourceServerIdString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationServerIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(destinationServerIdString);
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        length = getNextLength(in, pos);
        String initWindowString = new String(in, pos, length, "UTF-8");
        initWindow = Integer.valueOf(initWindowString);
        pos += length +1;
      }
      throw new DataFormatException(
          "input is not a valid InitializeRequestMessage");
    }
    catch (UnsupportedEncodingException e)
    baseDN = scanner.nextDN();
    senderID = scanner.nextIntUTF8();
    destination = scanner.nextIntUTF8();
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
      initWindow = scanner.nextIntUTF8();
    }
  }
@@ -128,54 +97,20 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] baseDNBytes = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] initWindowBytes = null;
      int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
        + destinationBytes.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        initWindowBytes = String.valueOf(initWindow).getBytes("UTF-8");
        length += initWindowBytes.length + 1;
      }
      byte[] resultByteArray = new byte[length];
      // type of the operation
      resultByteArray[0] = MSG_TYPE_INITIALIZE_REQUEST;
      int pos = 1;
      // baseDN
      pos = addByteArray(baseDNBytes, resultByteArray, pos);
      // sender
      pos = addByteArray(senderBytes, resultByteArray, pos);
      // destination
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        pos = addByteArray(initWindowBytes, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_INITIALIZE_REQUEST);
    builder.append(baseDN);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      return null;
      builder.appendUTF8(initWindow);
    }
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -22,15 +22,13 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is part of the replication protocol.
@@ -39,17 +37,17 @@
 */
public class InitializeTargetMsg extends RoutableMsg
{
  private DN baseDN;
  private final DN baseDN;
  /** Specifies the number of entries expected to be exported. */
  private long entryCount;
  private final long entryCount;
  /**
   * Specifies the serverID of the server that requested this export to happen.
   * It allows a server that previously sent an InitializeRequestMessage to know
   * that the current message is related to its own request.
   */
  private int requestorID;
  private final int requestorID;
  private int initWindow;
@@ -80,63 +78,24 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeTargetMsg(byte[] in, short version)
  throws DataFormatException
  InitializeTargetMsg(byte[] in, short version) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_INITIALIZE_TARGET)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_INITIALIZE_TARGET)
        throw new DataFormatException(
            "input is not a valid InitializeDestinationMessage");
      int pos = 1;
      // destination
      int length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
      // baseDN
      length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // sender
      length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // requestor
      length = getNextLength(in, pos);
      String requestorString = new String(in, pos, length, "UTF-8");
      requestorID = Integer.valueOf(requestorString);
      pos += length +1;
      // entryCount
      length = getNextLength(in, pos);
      String entryCountString = new String(in, pos, length, "UTF-8");
      entryCount = Long.valueOf(entryCountString);
      pos += length +1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        length = getNextLength(in, pos);
        String initWindowString = new String(in, pos, length, "UTF-8");
        initWindow = Integer.valueOf(initWindowString);
        pos += length +1;
      }
      throw new DataFormatException(
          "input is not a valid InitializeDestinationMessage");
    }
    catch (UnsupportedEncodingException e)
    destination = scanner.nextIntUTF8();
    baseDN = scanner.nextDN();
    senderID = scanner.nextIntUTF8();
    requestorID = scanner.nextIntUTF8();
    entryCount = scanner.nextLongUTF8();
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
      initWindow = scanner.nextIntUTF8();
    }
  }
@@ -185,66 +144,22 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  throws UnsupportedEncodingException
  {
    try
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_INITIALIZE_TARGET);
    builder.appendUTF8(destination);
    builder.append(baseDN);
    builder.appendUTF8(senderID);
    builder.appendUTF8(requestorID);
    builder.appendUTF8(entryCount);
    if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    {
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
      byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
      byte[] byteInitWindow = null;
      int length = 1 + byteDestination.length + 1
                     + byteDn.length + 1
                     + byteSender.length + 1
                     + byteRequestor.length + 1
                     + byteEntryCount.length + 1;
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        byteInitWindow = String.valueOf(initWindow).getBytes("UTF-8");
        length += byteInitWindow.length + 1;
      }
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_INITIALIZE_TARGET;
      int pos = 1;
      /* put the destination */
      pos = addByteArray(byteDestination, resultByteArray, pos);
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the sender */
      pos = addByteArray(byteSender, resultByteArray, pos);
      /* put the requestorID */
      pos = addByteArray(byteRequestor, resultByteArray, pos);
      /* put the entryCount */
      pos = addByteArray(byteEntryCount, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        /* put the initWindow */
        pos = addByteArray(byteInitWindow, resultByteArray, pos);
      }
      return resultByteArray;
      builder.appendUTF8(initWindow);
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.zip.DataFormatException;
@@ -83,7 +82,7 @@
   * @param dn The DN of the entry on which the change
   *           that caused the creation of this object happened
   */
  public LDAPUpdateMsg(OperationContext ctx, DN dn)
  LDAPUpdateMsg(OperationContext ctx, DN dn)
  {
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.csn = ctx.getCSN();
@@ -101,7 +100,7 @@
   * @param dn        The DN of the entry on which the change
   *                  that caused the creation of this object happened
   */
  public LDAPUpdateMsg(CSN csn, String entryUUID, DN dn)
  LDAPUpdateMsg(CSN csn, String entryUUID, DN dn)
  {
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.csn = csn;
@@ -117,27 +116,19 @@
   */
  public static LDAPUpdateMsg generateMsg(PostOperationOperation op)
  {
    LDAPUpdateMsg msg = null;
    switch (op.getOperationType())
    {
    case MODIFY :
      msg = new ModifyMsg((PostOperationModifyOperation) op);
      break;
      return new ModifyMsg((PostOperationModifyOperation) op);
    case ADD:
      msg = new AddMsg((PostOperationAddOperation) op);
      break;
      return new AddMsg((PostOperationAddOperation) op);
    case DELETE :
      msg = new DeleteMsg((PostOperationDeleteOperation) op);
      break;
      return new DeleteMsg((PostOperationDeleteOperation) op);
    case MODIFY_DN :
      msg = new ModifyDNMsg( (PostOperationModifyDNOperation) op);
      break;
      return new ModifyDNMsg( (PostOperationModifyDNOperation) op);
    default:
      return null;
    }
    return msg;
  }
  /**
@@ -210,138 +201,62 @@
   * of a synchronized portion of code.
   *
   * This method is not synchronized and therefore not MT safe.
   *
   * @throws UnsupportedEncodingException when encoding fails.
   */
  public void encode() throws UnsupportedEncodingException
  public void encode()
  {
    bytes = getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * Encode the common header for all the UpdateMsg. This uses the current
   * protocol version.
   *
   * @param type the type of UpdateMsg to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMsg.
   * @param version The ProtocolVersion to use when encoding.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMsg as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   */
  /** {@inheritDoc} */
  @Override
  public byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  public ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
  {
    byte[] byteDn = dn.toString().getBytes("UTF-8");
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    byte[] byteEntryuuid = getEntryUUID().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><dn><entryuuid><assured>
     * <assured mode> <safe data level>
     * the length of result byte array is therefore :
     *   1 + 1 + CSN length + 1 + dn length + 1 + uuid length + 1 + 1
     *   + 1 + 1 + additional_length
     */
    int length = 8 + csnByte.length + byteDn.length
                 + byteEntryuuid.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    // put the protocol version
    encodedMsg[1] = (byte) version;
    int pos = 2;
    // Put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // Put the DN and a terminating 0
    pos = addByteArray(byteDn, encodedMsg, pos);
    // Put the entry uuid and a terminating 0
    pos = addByteArray(byteEntryuuid, encodedMsg, pos);
    // Put the assured flag
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // Put the assured mode
    encodedMsg[pos++] = assuredMode.getValue();
    // Put the safe data level
    encodedMsg[pos++] = safeDataLevel;
    return encodedMsg;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(msgType);
    builder.append((byte) protocolVersion);
    builder.appendUTF8(csn);
    builder.append(dn);
    builder.append(entryUUID);
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    return builder;
  }
  /**
   * Encode the common header for all the UpdateMessage. This uses the version
   * 1 of the replication protocol (used for compatibility purpose).
   *
   * @param type the type of UpdateMessage to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType the type of UpdateMessage to encode.
   * @return a byte array builder containing the common header
   */
  public byte[] encodeHeader_V1(byte type, int additionalLength)
    throws UnsupportedEncodingException
  ByteArrayBuilder encodeHeader_V1(byte msgType)
  {
    byte[] byteDn = dn.toString().getBytes("UTF-8");
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    byte[] byteEntryuuid = getEntryUUID().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><CSN><dn><assured><entryuuid><change>
     * the length of result byte array is therefore :
     *   1 + CSN length + 1 + dn length + 1  + 1 +
     *   uuid length + 1 + additional_length
     */
    int length = 5 + csnByte.length + byteDn.length
                 + byteEntryuuid.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    int pos = 1;
    // put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // put the assured information
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // put the DN and a terminating 0
    pos = addByteArray(byteDn, encodedMsg, pos);
    // put the entry uuid and a terminating 0
    pos = addByteArray(byteEntryuuid, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(msgType);
    builder.appendUTF8(csn);
    builder.append(assuredFlag);
    builder.append(dn);
    builder.append(entryUUID);
    return builder;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      return getBytes_V1();
    }
    else if (reqProtocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      return getBytes_V23();
    }
@@ -351,7 +266,7 @@
      if (bytes == null)
      {
        // this is the current version of the protocol
        bytes = getBytes_V45(reqProtocolVersion);
        bytes = getBytes_V45(protocolVersion);
      }
      return bytes;
    }
@@ -362,45 +277,35 @@
   * 1 of the replication protocol (used for compatibility purpose).
   *
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public abstract byte[] getBytes_V1() throws UnsupportedEncodingException;
  protected abstract byte[] getBytes_V1();
  /**
   * Get the byte array representation of this Message. This uses the version
   * 2 of the replication protocol (used for compatibility purpose).
   *
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public abstract byte[] getBytes_V23() throws UnsupportedEncodingException;
  protected abstract byte[] getBytes_V23();
  /**
   * Get the byte array representation of this Message. This uses the provided
   * version number which must be version 4 or newer.
   * @param reqProtocolVersion TODO
   *
   * @param protocolVersion the actual protocol version to encode into
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public abstract byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException;
  protected abstract byte[] getBytes_V45(short protocolVersion);
  /**
   * Encode a list of attributes.
   */
   static private byte[] encodeAttributes(Collection<Attribute> attributes)
   private static byte[] encodeAttributes(Collection<Attribute> attributes)
   {
     if (attributes==null)
     {
       return new byte[0];
     }
     try
     {
       ByteStringBuilder byteBuilder = new ByteStringBuilder();
@@ -424,151 +329,62 @@
  /**
   * Decode the Header part of this Update Message, and check its type.
   *
   * @param types The allowed types of this Update Message.
   * @param encodedMsg the encoded form of the UpdateMsg.
   * @return the position at which the remaining part of the message starts.
   * @param scanner the encoded form of the UpdateMsg.
   * @param allowedTypes The allowed types of this Update Message.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
   public int decodeHeader(byte[] types, byte[] encodedMsg)
                          throws DataFormatException
   {
     // first byte is the type
     boolean foundMatchingType = false;
     for (byte type : types)
     {
       if (type == encodedMsg[0])
       {
         foundMatchingType = true;
         break;
       }
     }
     if (!foundMatchingType)
       throw new DataFormatException("byte[] is not a valid update msg: "
           + encodedMsg[0]);
     /*
      * For older protocol version PDUs, decode the matching version header
      * instead.
      */
     if ((encodedMsg[0] == MSG_TYPE_ADD_V1) ||
         (encodedMsg[0] == MSG_TYPE_DELETE_V1) ||
         (encodedMsg[0] == MSG_TYPE_MODIFYDN_V1) ||
         (encodedMsg[0] == MSG_TYPE_MODIFY_V1))
     {
       return decodeHeader_V1(encodedMsg);
     }
     // read the protocol version
     protocolVersion = encodedMsg[1];
     try
     {
       // Read the CSN
       int pos = 2;
       int length = getNextLength(encodedMsg, pos);
       String csnStr = new String(encodedMsg, pos, length, "UTF-8");
       pos += length + 1;
       csn = new CSN(csnStr);
       // Read the dn
       length = getNextLength(encodedMsg, pos);
       dn = DN.decode(new String(encodedMsg, pos, length, "UTF-8"));
       pos += length + 1;
       // Read the entryuuid
       length = getNextLength(encodedMsg, pos);
       entryUUID = new String(encodedMsg, pos, length, "UTF-8");
       pos += length + 1;
       // Read the assured information
       assuredFlag = encodedMsg[pos++] == 1;
       // Read the assured mode
       assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
       // Read the safe data level
       safeDataLevel = encodedMsg[pos++];
       return pos;
     }
     catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
     }
     catch (IllegalArgumentException e)
     {
       throw new DataFormatException(e.getLocalizedMessage());
     }
     catch (DirectoryException e)
     {
       throw new DataFormatException(e.getLocalizedMessage());
     }
  }
  /**
   * Decode the Header part of this Update Message, and check its type. This
   * uses the version 1 of the replication protocol (used for compatibility
   * purpose).
   *
   * @param encodedMsg the encoded form of the UpdateMessage.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader_V1(byte[] encodedMsg)
                          throws DataFormatException
  void decodeHeader(ByteArrayScanner scanner, byte... allowedTypes)
      throws DataFormatException
  {
    if ((encodedMsg[0] != MSG_TYPE_ADD_V1) &&
      (encodedMsg[0] != MSG_TYPE_DELETE_V1) &&
      (encodedMsg[0] != MSG_TYPE_MODIFYDN_V1) &&
      (encodedMsg[0] != MSG_TYPE_MODIFY_V1))
      throw new DataFormatException("byte[] is not a valid update msg: expected"
        + " a V1 PDU, received: " + encodedMsg[0]);
    // Force version to V1 (other new parameters take their default values
    // (assured stuff...))
    protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
    try
    final byte msgType = scanner.nextByte();
    if (!isTypeAllowed(msgType, allowedTypes))
    {
      // read the CSN
      int pos = 1;
      int length = getNextLength(encodedMsg, pos);
      String csnStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      csn = new CSN(csnStr);
      // read the assured information
      assuredFlag = encodedMsg[pos++] == 1;
      // read the dn
      length = getNextLength(encodedMsg, pos);
      dn = DN.decode(new String(encodedMsg, pos, length, "UTF-8"));
      pos += length + 1;
      // read the entryuuid
      length = getNextLength(encodedMsg, pos);
      entryUUID = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      return pos;
      throw new DataFormatException("byte[] is not a valid update msg: "
          + msgType);
    }
    catch (UnsupportedEncodingException e)
    if (msgType == MSG_TYPE_ADD_V1
        || msgType == MSG_TYPE_DELETE_V1
        || msgType == MSG_TYPE_MODIFYDN_V1
        || msgType == MSG_TYPE_MODIFY_V1)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      /*
       * For older protocol versions, decode the matching version header instead
       */
      // Force version to V1 (other new parameters take their default values
      // (assured stuff...))
      protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
      csn = scanner.nextCSNUTF8();
      assuredFlag = scanner.nextBoolean();
      dn = scanner.nextDN();
      entryUUID = scanner.nextString();
    }
    catch (DirectoryException e)
    else
    {
      throw new DataFormatException(e.getLocalizedMessage());
      protocolVersion = scanner.nextByte();
      csn = scanner.nextCSNUTF8();
      dn = scanner.nextDN();
      entryUUID = scanner.nextString();
      assuredFlag = scanner.nextBoolean();
      assuredMode = AssuredMode.valueOf(scanner.nextByte());
      safeDataLevel = scanner.nextByte();
    }
  }
  /**
   * Return the number of bytes used by this message.
   *
   * @return The number of bytes used by this message.
   */
  private boolean isTypeAllowed(final byte msgType, byte... allowedTypes)
  {
    for (byte allowedType : allowedTypes)
    {
      if (msgType == allowedType)
      {
        return true;
      }
    }
    return false;
  }
  /** {@inheritDoc} */
  @Override
  public abstract int size();
@@ -613,7 +429,7 @@
   * @throws LDAPException when it occurs.
   * @throws ASN1Exception when it occurs.
   */
  public ArrayList<RawAttribute> decodeRawAttributes(byte[] in)
  ArrayList<RawAttribute> decodeRawAttributes(byte[] in)
  throws LDAPException, ASN1Exception
  {
    ArrayList<RawAttribute> rattr = new ArrayList<RawAttribute>();
@@ -642,7 +458,7 @@
   * @throws LDAPException when it occurs.
   * @throws ASN1Exception when it occurs.
   */
  public ArrayList<Attribute> decodeAttributes(byte[] in)
  ArrayList<Attribute> decodeAttributes(byte[] in)
  throws LDAPException, ASN1Exception
  {
    ArrayList<Attribute> lattr = new ArrayList<Attribute>();
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -67,10 +66,8 @@
    newSuperiorEntryUUID = ctx.getNewSuperiorEntryUUID();
    deleteOldRdn = operation.deleteOldRDN();
    if (operation.getRawNewSuperior() != null)
      newSuperior = operation.getRawNewSuperior().toString();
    else
      newSuperior = null;
    final ByteString rawNewSuperior = operation.getRawNewSuperior();
    newSuperior = rawNewSuperior != null ? rawNewSuperior.toString() : null;
    newRDN = operation.getRawNewRDN().toString();
  }
@@ -129,23 +126,19 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid ModifyDNMsg.
   * @throws UnsupportedEncodingException If UTF8 is not supported.
   */
  public ModifyDNMsg(byte[] in) throws DataFormatException,
                                       UnsupportedEncodingException
  ModifyDNMsg(byte[] in) throws DataFormatException
  {
    // Decode header
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_MODIFYDN;
    allowedPduTypes[1] = MSG_TYPE_MODIFYDN_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_MODIFYDN, MSG_TYPE_MODIFYDN_V1);
    // protocol version has been read as part of the header
    if (protocolVersion <= 3)
      decodeBody_V123(in, pos);
    {
      decodeBody_V123(scanner, in[0]);
    }
    else
    {
      decodeBody_V4(in, pos);
      decodeBody_V4(scanner);
    }
    if (protocolVersion==ProtocolVersion.getCurrentVersion())
@@ -184,349 +177,81 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    int bodyLength = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      bodyLength += byteNewSuperior.length + 1;
    }
    else
      bodyLength += 1;
    if (newSuperiorEntryUUID != null)
    {
      byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
      bodyLength += byteNewSuperiorId.length + 1;
    }
    else
      bodyLength += 1;
    byte[] encodedMsg = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, bodyLength);
    int pos = encodedMsg.length - bodyLength;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, encodedMsg, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorEntryUUID != null)
    {
      pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      encodedMsg[pos++] = 1;
    else
      encodedMsg[pos++] = 0;
    return encodedMsg;
    final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1);
    builder.append(newRDN);
    builder.append(newSuperior);
    builder.append(newSuperiorEntryUUID);
    builder.append(deleteOldRdn);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    // Encoding V2 / V3
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    int length = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      length += byteNewSuperior.length + 1;
    }
    else
      length += 1;
    if (newSuperiorEntryUUID != null)
    {
      byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
      length += byteNewSuperiorId.length + 1;
    }
    else
      length += 1;
    length += encodedMods.length + 1;
    /* encode the header in a byte[] large enough to also contain mods.. */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFYDN, length,
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    int pos = encodedMsg.length - length;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, encodedMsg, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorEntryUUID != null)
    {
      pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      encodedMsg[pos++] = 1;
    else
      encodedMsg[pos++] = 0;
    /* add the mods */
    if (encodedMods.length > 0)
    {
      pos = encodedMsg.length - (encodedMods.length + 1);
      addByteArray(encodedMods, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFYDN,ProtocolVersion.REPLICATION_PROTOCOL_V3);
    builder.append(newRDN);
    builder.append(newSuperior);
    builder.append(newSuperiorEntryUUID);
    builder.append(deleteOldRdn);
    builder.appendZeroTerminated(encodedMods);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    int bodyLength = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      bodyLength += byteNewSuperior.length + 1;
    }
    else
      bodyLength += 1;
    if (newSuperiorEntryUUID != null)
    {
      byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
      bodyLength += byteNewSuperiorId.length + 1;
    }
    else
      bodyLength += 1;
    byte[] byteModsLen =
      String.valueOf(encodedMods.length).getBytes("UTF-8");
    bodyLength += byteModsLen.length + 1;
    bodyLength += encodedMods.length + 1;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    /* encode the header in a byte[] large enough to also contain mods.. */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFYDN, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, encodedMsg, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorEntryUUID != null)
    {
      pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      encodedMsg[pos++] = 1;
    else
      encodedMsg[pos++] = 0;
    pos = addByteArray(byteModsLen, encodedMsg, pos);
    pos = addByteArray(encodedMods, encodedMsg, pos);
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFYDN, protocolVersion);
    builder.append(newRDN);
    builder.append(newSuperior);
    builder.append(newSuperiorEntryUUID);
    builder.append(deleteOldRdn);
    builder.appendUTF8(encodedMods.length);
    builder.appendZeroTerminated(encodedMods);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.appendZeroTerminated(encodedEclIncludes);
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decodeBody_V123(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V123(ByteArrayScanner scanner, byte msgType)
      throws DataFormatException
  {
    /* read the newRDN
     * first calculate the length then construct the string
     */
    int length = getNextLength(in, pos);
    newRDN = new String(in, pos, length, "UTF-8");
    pos += length + 1;
    /* read the newSuperior
     * first calculate the length then construct the string
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperior = new String(in, pos, length, "UTF-8");
    else
      newSuperior = null;
    pos += length + 1;
    /* read the new parent Id
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperiorEntryUUID = new String(in, pos, length, "UTF-8");
    else
      newSuperiorEntryUUID = null;
    pos += length + 1;
    /* get the deleteoldrdn flag */
    deleteOldRdn = in[pos] != 0;
    pos++;
    newRDN = scanner.nextString();
    newSuperior = scanner.nextString();
    newSuperiorEntryUUID = scanner.nextString();
    deleteOldRdn = scanner.nextBoolean();
    // For easiness (no additional method), simply compare PDU type to
    // know if we have to read the mods of V2
    if (in[0] == MSG_TYPE_MODIFYDN)
    if (msgType == MSG_TYPE_MODIFYDN)
    {
      /* Read the mods : all the remaining bytes but the terminating 0 */
      length = in.length - pos - 1;
      if (length > 0) // Otherwise, there is only the trailing 0 byte which we
        // do not need to read
      {
        encodedMods = new byte[length];
        try
        {
          System.arraycopy(in, pos, encodedMods, 0, length);
        } catch (IndexOutOfBoundsException e)
        {
          throw new DataFormatException(e.getMessage());
        } catch (ArrayStoreException e)
        {
          throw new DataFormatException(e.getMessage());
        } catch (NullPointerException e)
        {
          throw new DataFormatException(e.getMessage());
        }
      }
      encodedMods = scanner.remainingBytesZeroTerminated();
    }
  }
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    /* read the newRDN
     * first calculate the length then construct the string
     */
    int length = getNextLength(in, pos);
    newRDN = new String(in, pos, length, "UTF-8");
    pos += length + 1;
    newRDN = scanner.nextString();
    newSuperior = scanner.nextString();
    newSuperiorEntryUUID = scanner.nextString();
    deleteOldRdn = scanner.nextBoolean();
    /* read the newSuperior
     * first calculate the length then construct the string
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperior = new String(in, pos, length, "UTF-8");
    else
      newSuperior = null;
    pos += length + 1;
    final int modsLen = scanner.nextIntUTF8();
    encodedMods = scanner.nextByteArray(modsLen);
    scanner.skipZeroSeparator();
    /* read the new parent Id
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperiorEntryUUID = new String(in, pos, length, "UTF-8");
    else
      newSuperiorEntryUUID = null;
    pos += length + 1;
    /* get the deleteoldrdn flag */
    deleteOldRdn = in[pos] != 0;
    pos++;
    // Read mods len
    length = getNextLength(in, pos);
    int modsLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode attributes
    this.encodedMods = new byte[modsLen];
    try
    {
      System.arraycopy(in, pos, encodedMods, 0, modsLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    pos += modsLen + 1;
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -50,7 +49,7 @@
   *
   * @param op The operation to use for building the message
   */
  public ModifyMsg(PostOperationModifyOperation op)
  ModifyMsg(PostOperationModifyOperation op)
  {
    super((OperationContext) op.getAttachment(OperationContext.SYNCHROCONTEXT),
          op.getEntryDN());
@@ -77,41 +76,35 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
   */
  public ModifyMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  ModifyMsg(byte[] in) throws DataFormatException
  {
    // Decode header
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_MODIFY;
    allowedPduTypes[1] = MSG_TYPE_MODIFY_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_MODIFY, MSG_TYPE_MODIFY_V1);
    // protocol version has been read as part of the header
    if (protocolVersion <= 3)
      decodeBody_V123(in, pos);
    {
      decodeBody_V123(scanner);
    }
    else
      decodeBody_V4(in, pos);
    {
      decodeBody_V4(scanner);
    }
    if (protocolVersion==ProtocolVersion.getCurrentVersion())
    {
      bytes = in;
    }
  }
  /**
   * Creates a new Modify message from a V1 byte[].
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
   *
   * @return The created ModifyMsg.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   */
  public static ModifyMsg createV1(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  static ModifyMsg createV1(byte[] in) throws DataFormatException
  {
    ModifyMsg msg = new ModifyMsg(in);
@@ -127,7 +120,9 @@
      DN newDN) throws LDAPException, ASN1Exception, DataFormatException
  {
    if (newDN == null)
    {
      newDN = getDN();
    }
    List<RawModification> ldapmods = decodeRawMods(encodedMods);
@@ -178,134 +173,53 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    /* encode the header in a byte[] large enough to also contain the mods */
    byte[] encodedMsg = encodeHeader_V1(MSG_TYPE_MODIFY_V1, encodedMods.length +
      1);
    /* add the mods */
    int pos = encodedMsg.length - (encodedMods.length + 1);
    addByteArray(encodedMods, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_MODIFY_V1);
    builder.append(encodedMods);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    // Encoding V2 / V3
    /* encode the header in a byte[] large enough to also contain mods */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1,
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    /* add the mods */
    int pos = encodedMsg.length - (encodedMods.length + 1);
    addByteArray(encodedMods, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFY, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    builder.append(encodedMods);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    int bodyLength = 0;
    byte[] byteModsLen =
      String.valueOf(encodedMods.length).getBytes("UTF-8");
    bodyLength += byteModsLen.length + 1;
    bodyLength += encodedMods.length + 1;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    pos = addByteArray(byteModsLen, encodedMsg, pos);
    pos = addByteArray(encodedMods, encodedMsg, pos);
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFY, protocolVersion);
    builder.appendUTF8(encodedMods.length);
    builder.append(encodedMods);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.append(encodedEclIncludes);
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decodeBody_V123(byte[] in, int pos)
  throws DataFormatException
  private void decodeBody_V123(ByteArrayScanner scanner)
      throws DataFormatException
  {
    // Read and store the mods, in encoded form
    // all the remaining bytes but the terminating 0 */
    int length = in.length - pos - 1;
    encodedMods = new byte[length];
    try
    {
      System.arraycopy(in, pos, encodedMods, 0, length);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    encodedMods = scanner.remainingBytes();
  }
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    // Read mods len
    int length = getNextLength(in, pos);
    int modsLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    final int modsLen = scanner.nextIntUTF8();
    this.encodedMods = scanner.nextByteArray(modsLen);
    // Read/Don't decode mods
    this.encodedMods = new byte[modsLen];
    try
    {
      System.arraycopy(in, pos, encodedMods, 0, modsLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    pos += modsLen + 1;
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,21 +27,16 @@
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
/**
 * This message is part of the replication protocol.
@@ -69,7 +64,7 @@
   * first missing change for each LDAP server connected to a Replication
   * Server.
   */
  static class ServerData
  private static class ServerData
  {
    private ServerState state;
    private long approxFirstMissingDate;
@@ -79,7 +74,7 @@
   * Data structure to manage the state of this replication server
   * and the state information for the servers connected to it.
   */
  static class SubTopoMonitorData
  private static class SubTopoMonitorData
  {
    /** This replication server DbState. */
    private ServerState replServerDbState;
@@ -91,7 +86,7 @@
        new HashMap<Integer, ServerData>();
  }
  private SubTopoMonitorData data = new SubTopoMonitorData();
  private final SubTopoMonitorData data = new SubTopoMonitorData();
  /**
   * Creates a new MonitorMsg.
@@ -120,18 +115,22 @@
   * @param state The server state.
   * @param approxFirstMissingDate  The approximation of the date
   * of the older missing change. null when none.
   * @param isLDAP Specifies whether the server is a LS or a RS
   * @param isLDAPServer Specifies whether the server is a DS or a RS
   */
  public void setServerState(int serverId, ServerState state,
      long approxFirstMissingDate, boolean isLDAP)
      long approxFirstMissingDate, boolean isLDAPServer)
  {
    ServerData sd = new ServerData();
    final ServerData sd = new ServerData();
    sd.state = state;
    sd.approxFirstMissingDate = approxFirstMissingDate;
    if (isLDAP)
    if (isLDAPServer)
    {
      data.ldapStates.put(serverId, sd);
    }
    else
    {
      data.rsStates.put(serverId, sd);
    }
  }
  /**
@@ -154,7 +153,6 @@
    return data.rsStates.get(serverId).state;
  }
  /**
   * Get the approximation of the date of the older missing change for the
   * LDAP Server with the provided server Id.
@@ -185,69 +183,32 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMsg(byte[] in, short version) throws DataFormatException
  MonitorMsg(byte[] in, short version) throws DataFormatException
  {
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR)
    {
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      try
      {
        /* first byte is the type */
        if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
          throw new DataFormatException("input is not a valid " +
              this.getClass().getCanonicalName());
        int pos = 1;
        // sender
        int length = getNextLength(in, pos);
        String senderIDString = new String(in, pos, length, "UTF-8");
        this.senderID = Integer.valueOf(senderIDString);
        pos += length +1;
        // destination
        length = getNextLength(in, pos);
        String destinationString = new String(in, pos, length, "UTF-8");
        this.destination = Integer.valueOf(destinationString);
        pos += length +1;
        reader.position(pos);
      }
      catch (UnsupportedEncodingException e)
      {
        throw new DataFormatException("UTF-8 is not supported by this jvm.");
      }
      this.senderID = scanner.nextIntUTF8();
      this.destination = scanner.nextIntUTF8();
    }
    else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      this.senderID = scanner.nextShort();
      this.destination = scanner.nextShort();
    }
    else
    {
      if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      /*
       * V4 and above uses integers for its serverIds while V2 and V3
       * use shorts.
       */
      if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
      {
        // sender
        this.senderID = reader.getShort();
        // destination
        this.destination = reader.getShort();
      }
      else
      {
        // sender
        this.senderID = reader.getInt();
        // destination
        this.destination = reader.getInt();
      }
      this.senderID = scanner.nextInt();
      this.destination = scanner.nextInt();
    }
    ASN1Reader asn1Reader = ASN1.getReader(reader);
    ASN1Reader asn1Reader = scanner.getASN1Reader();
    try
    {
      asn1Reader.readStartSequence();
@@ -297,13 +258,7 @@
        else
        {
          // the next states are the server states
          ServerData sd = new ServerData();
          sd.state = newState;
          sd.approxFirstMissingDate = outime;
          if (isLDAPServer)
            data.ldapStates.put(serverId, sd);
          else
            data.rsStates.put(serverId, sd);
          setServerState(serverId, newState, outime, isLDAPServer);
        }
      }
      asn1Reader.readEndSequence();
@@ -312,39 +267,19 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      ByteStringBuilder byteBuilder = new ByteStringBuilder();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /* put the type of the operation */
        byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
        /*
         * V4 and above uses integers for its serverIds while V2 and V3
         * use shorts.
         */
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byteBuilder.append(senderID);
          byteBuilder.append(destination);
        }
        else
        {
          byteBuilder.append((short)senderID);
          byteBuilder.append((short)destination);
        }
      }
      final ByteArrayBuilder builder = new ByteArrayBuilder();
      builder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      append(builder, senderID, protocolVersion);
      append(builder, destination, protocolVersion);
      /* Put the serverStates ... */
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      ASN1Writer writer = builder.getASN1Writer();
      writer.writeStartSequence();
      {
        /* first put the Replication Server state */
@@ -354,39 +289,18 @@
        }
        writer.writeEndSequence();
        // then the LDAP server data
        // then the DS + RS server data
        writeServerStates(protocolVersion, writer, false /* DS */);
        // then the RS server datas
        writeServerStates(protocolVersion, writer, true /* RS */);
      }
      writer.writeEndSequence();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        return byteBuilder.toByteArray();
        // legacy coding mistake
        builder.append((byte) 0);
      }
      else
      {
        byte[] temp = byteBuilder.toByteArray();
        byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
        byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
        int length = 1 +  1 + senderBytes.length +
        1 + destinationBytes.length + temp.length +1;
        byte[] resultByteArray = new byte[length];
        /* put the type of the operation */
        resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
        int pos = 1;
        pos = addByteArray(senderBytes, resultByteArray, pos);
        pos = addByteArray(destinationBytes, resultByteArray, pos);
        pos = addByteArray(temp, resultByteArray, pos);
        return resultByteArray;
      }
      return builder.toByteArray();
    }
    catch (Exception e)
    {
@@ -394,6 +308,23 @@
    }
  }
  private void append(final ByteArrayBuilder builder, int data,
      short protocolVersion)
  {
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      builder.appendUTF8(data);
    }
    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      builder.append((short) data);
    }
    else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4
    {
      builder.append(data);
    }
  }
  private void writeServerStates(short protocolVersion, ASN1Writer writer,
      boolean writeRSStates) throws IOException
  {
@@ -454,8 +385,6 @@
    return data.rsStates.keySet().iterator();
  }
  /**
   * Get the destination.
   *
@@ -466,8 +395,6 @@
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
@@ -478,15 +405,11 @@
    return senderID;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    StringBuilder stateS = new StringBuilder("\nRState:[");
    final StringBuilder stateS = new StringBuilder("\nRState:[");
    stateS.append(data.replServerDbState);
    stateS.append("]");
@@ -502,10 +425,10 @@
    stateS.append("\nRSStates:[");
    for (Entry<Integer, ServerData> entry : data.rsStates.entrySet())
    {
      ServerData sd = entry.getValue();
      final ServerData sd = entry.getValue();
      stateS.append("\n[RSState(").append(entry.getKey()).append(")=")
            .append(sd.state).append("]").append(" afmd=")
            .append(sd.approxFirstMissingDate + "]");
            .append(sd.approxFirstMissingDate).append("]");
    }
    return getClass().getCanonicalName() +
    "[ sender=" + this.senderID +
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -46,8 +45,6 @@
   */
  private final int senderID;
  /**
   * Creates a message.
   *
@@ -70,70 +67,30 @@
   * @throws DataFormatException
   *           If the in does not contain a properly, encoded message.
   */
  public MonitorRequestMsg(byte[] in) throws DataFormatException
  MonitorRequestMsg(byte[] in) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderString);
      pos += length + 1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length + 1;
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    this.senderID = scanner.nextIntUTF8();
    this.destination = scanner.nextIntUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_REPL_SERVER_MONITOR_REQUEST);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    return builder.toByteArray();
  }
  /**
   * Get the destination.
   *
@@ -144,8 +101,6 @@
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
@@ -156,13 +111,12 @@
    return senderID;
  }
  /**
   * Returns a string representation of the message.
   *
   * @return the string representation of this message.
   */
  @Override
  public String toString()
  {
    return "[" + getClass().getCanonicalName() + " sender=" + senderID
opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -22,16 +22,14 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * Message sent by a replication server to a directory server in reply to the
@@ -39,17 +37,17 @@
 */
public class ReplServerStartDSMsg extends StartMsg
{
  private int serverId;
  private String serverURL;
  private DN baseDN;
  private int windowSize;
  private ServerState serverState;
  private final int serverId;
  private final String serverURL;
  private final DN baseDN;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * Threshold value used by the RS to determine if a DS must be put in
@@ -61,12 +59,12 @@
  /**
   * The weight affected to the replication server.
   */
  private int weight = -1;
  private final int weight;
  /**
   * Number of currently connected DS to the replication server.
   */
  private int connectedDSNumber = -1;
  private final int connectedDSNumber;
  /**
   * Create a ReplServerStartDSMsg.
@@ -115,100 +113,25 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartDSMsg.
   */
  public ReplServerStartDSMsg(byte[] in) throws DataFormatException
  ReplServerStartDSMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START_DS;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_REPL_SERVER_START_DS);
    try
    {
      /* The ReplServerStartDSMsg payload is stored in the form :
       * <baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <degradedStatusThreshold><weight><connectedDSNumber>
       * <serverState>
       */
      /* first bytes are the header */
      int pos = headerLength;
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /**
       * read the degraded status threshold
       */
      length = getNextLength(in, pos);
      degradedStatusThreshold =
        Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      /*
       * read the weight
       */
      length = getNextLength(in, pos);
      String weightString = new String(in, pos, length, "UTF-8");
      weight = Integer.valueOf(weightString);
      pos += length +1;
      /*
       * read the connected DS number
       */
      length = getNextLength(in, pos);
      String connectedDSNumberString = new String(in, pos, length, "UTF-8");
      connectedDSNumber = Integer.valueOf(connectedDSNumberString);
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
    /* The ReplServerStartDSMsg payload is stored in the form :
     * <baseDN><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><weight><connectedDSNumber>
     * <serverState>
     */
    baseDN = scanner.nextDN();
    serverId = scanner.nextIntUTF8();
    serverURL = scanner.nextString();
    windowSize = scanner.nextIntUTF8();
    sslEncryption = Boolean.valueOf(scanner.nextString());//FIXME
    degradedStatusThreshold =scanner.nextIntUTF8();
    weight = scanner.nextIntUTF8();
    connectedDSNumber = scanner.nextIntUTF8();
    serverState = scanner.nextServerState();
  }
  /**
@@ -248,72 +171,28 @@
    return this.serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
     throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    /* The ReplServerStartDSMsg is stored in the form :
     * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><weight><connectedDSNumber>
     * <serverState>
     */
    byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
    byte[] byteWeight =
      String.valueOf(weight).getBytes("UTF-8");
    byte[] byteConnectedDSNumber =
      String.valueOf(connectedDSNumber).getBytes("UTF-8");
    int length = byteDn.length + 1 + byteServerId.length + 1 +
      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
      byteSSLEncryption.length + 1 + byteDegradedStatusThreshold.length + 1 +
      byteWeight.length + 1 + byteConnectedDSNumber.length + 1 +
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START_DS,
        length, sessionProtocolVersion);
    int pos = headerLength;
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the weight */
    pos = addByteArray(byteWeight, resultByteArray, pos);
    /* put the connected DS number */
    pos = addByteArray(byteConnectedDSNumber, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, builder, protocolVersion);
    builder.append(baseDN);
    builder.appendUTF8(serverId);
    builder.append(serverURL);
    builder.appendUTF8(windowSize);
    builder.append(Boolean.toString(sslEncryption));
    builder.appendUTF8(degradedStatusThreshold);
    builder.appendUTF8(weight);
    builder.appendUTF8(connectedDSNumber);
    // Caution: ServerState MUST be the last field.
    builder.append(serverState);
    return builder.toByteArray();
  }
  /**
@@ -356,9 +235,7 @@
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -22,16 +22,14 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * Message sent by a replication server to another replication server
@@ -39,17 +37,17 @@
 */
public class ReplServerStartMsg extends StartMsg
{
  private Integer serverId;
  private String serverURL;
  private DN baseDN;
  private int windowSize;
  private ServerState serverState;
  private final int serverId;
  private final String serverURL;
  private final DN baseDN;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * NOTE: Starting from protocol V4, we introduce a dedicated PDU for answering
@@ -106,166 +104,28 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartMsg.
   */
  public ReplServerStartMsg(byte[] in) throws DataFormatException
  ReplServerStartMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START;
    allowedPduTypes[1] = MSG_TYPE_REPL_SERVER_START_V1;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner,
        MSG_TYPE_REPL_SERVER_START, MSG_TYPE_REPL_SERVER_START_V1);
    // Protocol version has been read as part of the header:
    // decode the body according to the protocol version read in the header
    switch(protocolVersion)
    /* The ReplServerStartMsg payload is stored in the form :
     * <baseDN><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><serverState>
     */
    baseDN = scanner.nextDN();
    serverId = scanner.nextIntUTF8();
    serverURL = scanner.nextString();
    windowSize = scanner.nextIntUTF8();
    sslEncryption = Boolean.valueOf(scanner.nextString());
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        decodeBody_V1(in, headerLength);
        return;
      degradedStatusThreshold = scanner.nextIntUTF8();
    }
    try
    {
      /* The ReplServerStartMsg payload is stored in the form :
       * <baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <degradedStatusThreshold><serverState>
       */
      /* first bytes are the header */
      int pos = headerLength;
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /**
       * read the degraded status threshold
       */
      length = getNextLength(in, pos);
      degradedStatusThreshold =
        Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
  }
  /**
   * Decodes the body of a just received ReplServerStartMsg. The body is in the
   * passed array, and starts at the provided location. This is for a PDU
   * encoded in V1 protocol version.
   * @param in A byte array containing the body for the ReplServerStartMsg
   * @param pos The position in the array where the decoding should start
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartMsg.
   */
  public void decodeBody_V1(byte[] in, int pos) throws DataFormatException
  {
    try
    {
      /* The ReplServerStartMsg payload is stored in the form :
       * <baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <serverState>
       */
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
    serverState = scanner.nextServerState();
  }
  /**
@@ -305,69 +165,43 @@
    return this.serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
     throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    // If an older version requested, encode in the requested way
    switch(sessionProtocolVersion)
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
      /*
       * The ReplServerStartMessage is stored in the form :
       * <operation type><basedn><serverid><serverURL><windowsize><serverState>
       */
      encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1, builder);
      builder.append(baseDN);
      builder.appendUTF8(serverId);
      builder.append(serverURL);
      builder.appendUTF8(windowSize);
      builder.append(Boolean.toString(sslEncryption));
      // Caution: ServerState MUST be the last field.
      builder.append(serverState);
    }
    /* The ReplServerStartMsg is stored in the form :
     * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><serverState>
     */
    byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
    int length = byteDn.length + 1 + byteServerId.length + 1 +
      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
      byteSSLEncryption.length + 1 +
      byteDegradedStatusThreshold.length + 1 +
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length,
        sessionProtocolVersion);
    int pos = headerLength;
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
    else
    {
      /* The ReplServerStartMsg is stored in the form :
       * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <degradedStatusThreshold><serverState>
       */
      encodeHeader(MSG_TYPE_REPL_SERVER_START, builder, protocolVersion);
      builder.append(baseDN);
      builder.appendUTF8(serverId);
      builder.append(serverURL);
      builder.appendUTF8(windowSize);
      builder.append(Boolean.toString(sslEncryption));
      builder.appendUTF8(degradedStatusThreshold);
      // Caution: ServerState MUST be the last field.
      builder.append(serverState);
    }
    return builder.toByteArray();
  }
  /**
@@ -410,9 +244,7 @@
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -428,64 +260,4 @@
      "\ndegradedStatusThreshold: " + degradedStatusThreshold +
      "\nwindowSize: " + windowSize;
  }
  /**
   * Get the byte array representation of this Message. This uses the version
   * 1 of the replication protocol (used for compatibility purpose).
   *
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  {
    /*
     * The ReplServerStartMessage is stored in the form :
     * <operation type><basedn><serverid><serverURL><windowsize><serverState>
     */
    try {
      byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough */
      byte resultByteArray[] = encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1,
        length);
      int pos = headerLength;
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the ServerId */
      pos = addByteArray(byteServerId, resultByteArray, pos);
      /* put the ServerURL */
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      /* put the window size */
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      /* put the SSL Encryption setting */
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      /* put the ServerState */
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -106,15 +105,8 @@
   *          The protocol version to use for serialization. The version should
   *          normally be older than the current one.
   * @return The encoded PDU.
   * @throws UnsupportedEncodingException
   *           When the encoding of the message failed because the UTF-8
   *           encoding is not supported or the requested protocol version to
   *           use is not supported by this PDU.
   */
  public abstract byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException;
  public abstract byte[] getBytes(short protocolVersion);
  /**
   * Generates a ReplicationMsg from its encoded form. This un-serialization is
@@ -128,15 +120,12 @@
   * @return The generated SynchronizationMessage.
   * @throws DataFormatException
   *           If the encoded form was not a valid msg.
   * @throws UnsupportedEncodingException
   *           If UTF8 is not supported.
   * @throws NotSupportedOldVersionPDUException
   *           If the PDU is part of an old protocol version and we do not
   *           support it.
   */
  public static ReplicationMsg generateMsg(byte[] buffer, short protocolVersion)
      throws DataFormatException, UnsupportedEncodingException,
      NotSupportedOldVersionPDUException
      throws DataFormatException, NotSupportedOldVersionPDUException
  {
    switch (buffer[0])
    {
@@ -214,51 +203,4 @@
      throw new DataFormatException("received message with unknown type");
    }
  }
  /**
   * Concatenate the tail byte array into the resultByteArray.
   * The resultByteArray must be large enough before calling this method.
   *
   * @param tail the byte array to concatenate.
   * @param resultByteArray The byte array to concatenate to.
   * @param pos the position where to concatenate.
   * @return the next position to use in the resultByteArray.
   */
  protected static int addByteArray(byte[] tail, byte[] resultByteArray,
    int pos)
  {
    for (int i=0; i<tail.length; i++,pos++)
    {
      resultByteArray[pos] = tail[i];
    }
    resultByteArray[pos++] = 0;
    return pos;
  }
  /**
   * Get the length of the next String encoded in the in byte array.
   *
   * @param in
   *          the byte array where to calculate the string.
   * @param pos
   *          the position where to start from in the byte array.
   * @return the length of the next string.
   * @throws DataFormatException
   *           If the byte array does not end with null.
   */
  protected static int getNextLength(byte[] in, int pos)
      throws DataFormatException
  {
    int offset = pos;
    int length = 0;
    while (in[offset++] != 0)
    {
      if (offset >= in.length)
        throw new DataFormatException("byte[] is not a valid msg");
      length++;
    }
    return length;
  }
}
opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
@@ -22,23 +22,19 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by an LDAP server to communicate to the topology
 * that the generation must be reset for the domain.
 */
public class ResetGenerationIdMsg extends ReplicationMsg
{
  private long generationId;
  private final long generationId;
  /**
   * Creates a new message.
@@ -57,52 +53,25 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMessage.
   */
  public ResetGenerationIdMsg(byte[] in) throws DataFormatException
  ResetGenerationIdMsg(byte[] in) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_RESET_GENERATION_ID)
    {
      if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
        throw new
        DataFormatException("input is not a valid GenerationId Message");
      int pos = 1;
      /* read the generationId */
      int length = getNextLength(in, pos);
      generationId = Long.valueOf(new String(in, pos, length,
      "UTF-8"));
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      throw new DataFormatException(
          "input is not a valid GenerationId Message");
    }
    generationId = scanner.nextLongUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_RESET_GENERATION_ID);
      // Put the generationId
      oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
      oStream.write(0);
      return oStream.toByteArray();
    }
    catch (IOException e)
    {
      // never happens
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_RESET_GENERATION_ID);
    builder.appendUTF8(generationId);
    return builder.toByteArray();
  }
  /**
@@ -115,9 +84,7 @@
    return this.generationId;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
@@ -38,26 +37,25 @@
 */
public class ServerStartECLMsg extends StartMsg
{
  private String serverURL;
  private int maxReceiveQueue;
  private int maxSendQueue;
  private int maxReceiveDelay;
  private int maxSendDelay;
  private int windowSize;
  private ServerState serverState = null;
  private final String serverURL;
  private final int maxReceiveQueue;
  private final int maxSendQueue;
  private final int maxReceiveDelay;
  private final int maxSendDelay;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  private final long heartbeatInterval;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * Creates a new ServerStartMsg. This message is to be sent by an LDAP
@@ -108,86 +106,21 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMsg.
   */
  public ServerStartECLMsg(byte[] in) throws DataFormatException
  ServerStartECLMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_START_ECL;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_START_ECL);
    try
    {
      /* first bytes are the header */
      int pos = headerLength;
      /*
       * read the ServerURL
       */
      int length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the maxReceiveDelay
       */
      length = getNextLength(in, pos);
      maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxReceiveQueue
       */
      length = getNextLength(in, pos);
      maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendDelay
       */
      length = getNextLength(in, pos);
      maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendQueue
       */
      length = getNextLength(in, pos);
      maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the windowSize
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the heartbeatInterval
       */
      length = getNextLength(in, pos);
      heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of sererid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    serverURL = scanner.nextString();
    maxReceiveDelay = scanner.nextIntUTF8();
    maxReceiveQueue = scanner.nextIntUTF8();
    maxSendDelay = scanner.nextIntUTF8();
    maxSendQueue = scanner.nextIntUTF8();
    windowSize = scanner.nextIntUTF8();
    heartbeatInterval = scanner.nextIntUTF8();
    // FIXME awful encoding
    sslEncryption = Boolean.valueOf(scanner.nextString());
    serverState = scanner.nextServerState();
  }
  /**
@@ -244,69 +177,24 @@
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
  {
    try {
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteMaxRecvDelay =
                     String.valueOf(maxReceiveDelay).getBytes("UTF-8");
      byte[] byteMaxRecvQueue =
                     String.valueOf(maxReceiveQueue).getBytes("UTF-8");
      byte[] byteMaxSendDelay =
                     String.valueOf(maxSendDelay).getBytes("UTF-8");
      byte[] byteMaxSendQueue =
                     String.valueOf(maxSendQueue).getBytes("UTF-8");
      byte[] byteWindowSize =
                     String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteHeartbeatInterval =
                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      int length = byteServerUrl.length + 1 +
                   byteMaxRecvDelay.length + 1 +
                   byteMaxRecvQueue.length + 1 +
                   byteMaxSendDelay.length + 1 +
                   byteMaxSendQueue.length + 1 +
                   byteWindowSize.length + 1 +
                   byteHeartbeatInterval.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length,
          sessionProtocolVersion);
      int pos = headerLength;
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos);
      pos = addByteArray(byteMaxSendDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    encodeHeader(MSG_TYPE_START_ECL, builder, sessionProtocolVersion);
    builder.append(serverURL);
    builder.appendUTF8(maxReceiveDelay);
    builder.appendUTF8(maxReceiveQueue);
    builder.appendUTF8(maxSendDelay);
    builder.appendUTF8(maxSendQueue);
    builder.appendUTF8(windowSize);
    builder.appendUTF8(heartbeatInterval);
    // FIXME awful encoding
    builder.append(Boolean.toString(sslEncryption));
    // Caution: ServerState MUST be the last field.
    builder.append(serverState);
    return builder.toByteArray();
  }
  /**
@@ -343,13 +231,11 @@
    return sslEncryption;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return this.getClass().getCanonicalName() + " content: " +
    return getClass().getCanonicalName() + " content: " +
      "\nprotocolVersion: " + protocolVersion +
      "\ngenerationId: " + generationId +
      "\ngroupId: " + groupId +
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -22,16 +22,14 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is used by LDAP server when they first connect.
@@ -40,27 +38,28 @@
 */
public class ServerStartMsg extends StartMsg
{
  private int serverId; // Id of the LDAP server that sent this message
  private String serverURL;
  private DN baseDN;
  private int maxReceiveQueue;
  private int maxSendQueue;
  private int maxReceiveDelay;
  private int maxSendDelay;
  private int windowSize;
  private ServerState serverState = null;
  /** Id of the LDAP server that sent this message */
  private final int serverId;
  private final String serverURL;
  private final DN baseDN;
  private final int maxReceiveQueue;
  private final int maxSendQueue;
  private final int maxReceiveDelay;
  private final int maxSendDelay;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  private final long heartbeatInterval;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * Creates a new ServerStartMsg. This message is to be sent by an LDAP
@@ -108,107 +107,22 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMsg.
   */
  public ServerStartMsg(byte[] in) throws DataFormatException
  ServerStartMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_SERVER_START;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_SERVER_START);
    try
    {
      /* first bytes are the header */
      int pos = headerLength;
      /*
       * read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the maxReceiveDelay
       */
      length = getNextLength(in, pos);
      maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxReceiveQueue
       */
      length = getNextLength(in, pos);
      maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendDelay
       */
      length = getNextLength(in, pos);
      maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendQueue
       */
      length = getNextLength(in, pos);
      maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the windowSize
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the heartbeatInterval
       */
      length = getNextLength(in, pos);
      heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of sererid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
    baseDN = scanner.nextDN();
    serverId = scanner.nextIntUTF8();
    serverURL = scanner.nextString();
    maxReceiveDelay = scanner.nextIntUTF8();
    maxReceiveQueue = scanner.nextIntUTF8();
    maxSendDelay = scanner.nextIntUTF8();
    maxSendQueue = scanner.nextIntUTF8();
    windowSize = scanner.nextIntUTF8();
    heartbeatInterval = scanner.nextIntUTF8();
    sslEncryption = Boolean.valueOf(scanner.nextString());
    serverState = scanner.nextServerState();
  }
  /**
@@ -284,76 +198,26 @@
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
  public byte[] getBytes(short protocolVersion)
  {
    try {
      byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteMaxRecvDelay =
                     String.valueOf(maxReceiveDelay).getBytes("UTF-8");
      byte[] byteMaxRecvQueue =
                     String.valueOf(maxReceiveQueue).getBytes("UTF-8");
      byte[] byteMaxSendDelay =
                     String.valueOf(maxSendDelay).getBytes("UTF-8");
      byte[] byteMaxSendQueue =
                     String.valueOf(maxSendQueue).getBytes("UTF-8");
      byte[] byteWindowSize =
                     String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteHeartbeatInterval =
                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    encodeHeader(MSG_TYPE_SERVER_START, builder, protocolVersion);
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 +
                   byteMaxRecvDelay.length + 1 +
                   byteMaxRecvQueue.length + 1 +
                   byteMaxSendDelay.length + 1 +
                   byteMaxSendQueue.length + 1 +
                   byteWindowSize.length + 1 +
                   byteHeartbeatInterval.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length,
          sessionProtocolVersion);
      int pos = headerLength;
      pos = addByteArray(byteDn, resultByteArray, pos);
      pos = addByteArray(byteServerId, resultByteArray, pos);
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos);
      pos = addByteArray(byteMaxSendDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    builder.append(baseDN);
    builder.appendUTF8(serverId);
    builder.append(serverURL);
    builder.appendUTF8(maxReceiveDelay);
    builder.appendUTF8(maxReceiveQueue);
    builder.appendUTF8(maxSendDelay);
    builder.appendUTF8(maxSendQueue);
    builder.appendUTF8(windowSize);
    builder.appendUTF8(heartbeatInterval);
    builder.append(Boolean.toString(sslEncryption));
    // Caution: ServerState MUST be the last field.
    builder.append(serverState);
    return builder.toByteArray();
  }
  /**
@@ -390,9 +254,7 @@
    return sslEncryption;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -22,12 +22,10 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -150,7 +148,7 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public StartECLSessionMsg(byte[] in) throws DataFormatException
  StartECLSessionMsg(byte[] in) throws DataFormatException
  {
    /*
     * The message is stored in the form:
@@ -158,68 +156,25 @@
     * <list of referrals urls>
     * (each referral url terminates with 0)
     */
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_START_ECL_SESSION)
    {
      // first bytes are the header
      int pos = 0;
      throw new DataFormatException("Input is not a valid "
          + getClass().getCanonicalName());
    }
      // first byte is the type
      if (in.length < 1 || in[pos++] != MSG_TYPE_START_ECL_SESSION)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
      }
      // start mode
      int length = getNextLength(in, pos);
      int requestType = Integer.parseInt(new String(in, pos, length, "UTF-8"));
      eclRequestType = ECLRequestType.values()[requestType];
      pos += length +1;
      length = getNextLength(in, pos);
      firstChangeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      length = getNextLength(in, pos);
      lastChangeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      length = getNextLength(in, pos);
      csn = new CSN(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // persistentSearch mode
      length = getNextLength(in, pos);
      int persistent = Integer.parseInt(new String(in, pos, length, "UTF-8"));
      isPersistent = Persistent.values()[persistent];
      pos += length + 1;
      // generalized state
      length = getNextLength(in, pos);
      crossDomainServerState = new String(in, pos, length, "UTF-8");
      pos += length + 1;
      length = getNextLength(in, pos);
      operationId = new String(in, pos, length, "UTF-8");
      pos += length + 1;
      // excluded DN
      length = getNextLength(in, pos);
      String excludedDNsString = new String(in, pos, length, "UTF-8");
      if (excludedDNsString.length()>0)
      {
        String[] excludedDNsStr = excludedDNsString.split(";");
        Collections.addAll(this.excludedBaseDNs, excludedDNsStr);
      }
      pos += length + 1;
    } catch (UnsupportedEncodingException e)
    eclRequestType = ECLRequestType.values()[scanner.nextIntUTF8()];
    firstChangeNumber = scanner.nextIntUTF8();
    lastChangeNumber = scanner.nextIntUTF8();
    csn = scanner.nextCSNUTF8();
    isPersistent = Persistent.values()[scanner.nextIntUTF8()];
    crossDomainServerState = scanner.nextString();
    operationId = scanner.nextString();
    final String excludedDNsString = scanner.nextString();
    if (excludedDNsString.length() > 0)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
      Collections.addAll(excludedBaseDNs, excludedDNsString.split(";"));
    }
  }
@@ -238,71 +193,26 @@
    excludedBaseDNs = new HashSet<String>();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    String excludedBaseDNsString =
        StaticUtils.collectionToString(excludedBaseDNs, ";");
    try
    {
      byte[] byteMode = toBytes(eclRequestType.ordinal());
      // FIXME JNR Changing the lines below to use long would require a protocol
      // version change. Leave it like this for now until the need arises.
      byte[] byteChangeNumber = toBytes((int) firstChangeNumber);
      byte[] byteStopChangeNumber = toBytes((int) lastChangeNumber);
      byte[] byteCSN = csn.toString().getBytes("UTF-8");
      byte[] bytePsearch = toBytes(isPersistent.ordinal());
      byte[] byteGeneralizedState = toBytes(crossDomainServerState);
      byte[] byteOperationId = toBytes(operationId);
      byte[] byteExcludedDNs = toBytes(excludedBaseDNsString);
      int length =
        byteMode.length + 1 +
        byteChangeNumber.length + 1 +
        byteStopChangeNumber.length + 1 +
        byteCSN.length + 1 +
        bytePsearch.length + 1 +
        byteGeneralizedState.length + 1 +
        byteOperationId.length + 1 +
        byteExcludedDNs.length + 1 +
        1;
      byte[] resultByteArray = new byte[length];
      int pos = 0;
      resultByteArray[pos++] = MSG_TYPE_START_ECL_SESSION;
      pos = addByteArray(byteMode, resultByteArray, pos);
      pos = addByteArray(byteChangeNumber, resultByteArray, pos);
      pos = addByteArray(byteStopChangeNumber, resultByteArray, pos);
      pos = addByteArray(byteCSN, resultByteArray, pos);
      pos = addByteArray(bytePsearch, resultByteArray, pos);
      pos = addByteArray(byteGeneralizedState, resultByteArray, pos);
      pos = addByteArray(byteOperationId, resultByteArray, pos);
      pos = addByteArray(byteExcludedDNs, resultByteArray, pos);
      return resultByteArray;
    } catch (IOException e)
    {
      // never happens
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_START_ECL_SESSION);
    builder.appendUTF8(eclRequestType.ordinal());
    // FIXME JNR Changing the lines below to use long would require a protocol
    // version change. Leave it like this for now until the need arises.
    builder.appendUTF8((int) firstChangeNumber);
    builder.appendUTF8((int) lastChangeNumber);
    builder.appendUTF8(csn);
    builder.appendUTF8(isPersistent.ordinal());
    builder.append(crossDomainServerState);
    builder.append(operationId);
    builder.append(StaticUtils.collectionToString(excludedBaseDNs, ";"));
    return builder.toByteArray();
  }
  private byte[] toBytes(int i) throws UnsupportedEncodingException
  {
    return toBytes(String.valueOf(i));
  }
  private byte[] toBytes(String s) throws UnsupportedEncodingException
  {
    return String.valueOf(s).getBytes("UTF-8");
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This abstract message class is the superclass for start messages used
 * by LDAP servers and Replication servers to initiate their communications.
@@ -43,12 +41,7 @@
  /** Generation id of data set we want to work with. */
  protected long  generationId;
  /** Group id of the replicated domain. */
  protected byte groupId = (byte)-1;
  /**
   * The length of the header of this message.
   */
  protected int headerLength;
  protected byte groupId = -1;
  /**
   * Create a new StartMsg.
@@ -66,7 +59,7 @@
   * @param generationId    The generationId for this server.
   *
   */
  public StartMsg(short protocolVersion, long generationId)
  StartMsg(short protocolVersion, long generationId)
  {
    this.protocolVersion = protocolVersion;
    this.generationId = generationId;
@@ -75,196 +68,105 @@
  /**
   * Encode the header for the start message.
   *
   * @param type The type of the message to create.
   * @param additionalLength Additional length needed to encode the remaining
   * @param msgType The type of the message to create.
   * @param builder Additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @param sessionProtocolVersion  The version to use when encoding the header.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param protocolVersion  The version to use when encoding the header.
   */
  public byte[] encodeHeader(
      byte type, int additionalLength,
      short sessionProtocolVersion)
  throws UnsupportedEncodingException
  void encodeHeader(byte msgType, ByteArrayBuilder builder, short protocolVersion)
  {
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
    /* The message header is stored in the form :
     * <message type><protocol version><generation id><group id>
     */
    int length = 1 + 1 + byteGenerationID.length + 1 + 1 +
                     additionalLength;
    byte[] encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)sessionProtocolVersion;
    /* put the generationId */
    int pos = 2;
    pos = addByteArray(byteGenerationID, encodedMsg, pos);
    /* put the group id */
    encodedMsg[pos] = groupId;
    pos++;
    headerLength = pos;
    return encodedMsg;
    builder.append(msgType);
    builder.append((byte) protocolVersion);
    builder.appendUTF8(generationId);
    builder.append(groupId);
  }
  /**
   * Encode the header for the start message. This uses the version 1 of the
   * replication protocol (used for compatibility purpose).
   *
   * @param type The type of the message to create.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType The type of the message to create.
   * @param builder The builder where to append the remaining part of the
   *                UpdateMessage.
   */
  public byte[] encodeHeader_V1(byte type, int additionalLength)
  throws UnsupportedEncodingException
  void encodeHeader_V1(byte msgType, ByteArrayBuilder builder)
  {
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
    /* The message header is stored in the form :
     * <message type><protocol version><generation id>
     */
    int length = 1 + 1 + 1 +
                     byteGenerationID.length + 1 +
                     additionalLength;
    byte[] encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL;
    encodedMsg[2] = (byte)0;
    /* put the generationId */
    int pos = 3;
    headerLength = addByteArray(byteGenerationID, encodedMsg, pos);
    return encodedMsg;
    builder.append(msgType);
    builder.append((byte) ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
    builder.append((byte) 0);
    builder.appendUTF8(generationId);
  }
  /**
   * Decode the Header part of this message, and check its type.
   *
   * @param types The allowed types of this message.
   * @param encodedMsg the encoded form of the message.
   * @return the position at which the remaining part of the message starts.
   * @param scanner where to read the message from.
   * @param allowedTypes The allowed types of this message.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader(byte[] types, byte [] encodedMsg)
  throws DataFormatException
  void decodeHeader(final ByteArrayScanner scanner, byte... allowedTypes)
      throws DataFormatException
  {
    /* first byte is the type */
    boolean foundMatchingType = false;
    for (byte type : types) {
      if (type == encodedMsg[0]) {
        foundMatchingType = true;
        break;
      }
    final byte msgType = scanner.nextByte();
    if (!isTypeAllowed(allowedTypes, msgType))
    {
      throw new DataFormatException("byte[] is not a valid start msg: "
          + msgType);
    }
    if (!foundMatchingType)
      throw new DataFormatException("byte[] is not a valid start msg: " +
        encodedMsg[0]);
    final byte version = scanner.nextByte();
    // Filter for supported old versions PDUs
    if (encodedMsg[0] == MSG_TYPE_REPL_SERVER_START_V1)
      return decodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1, encodedMsg);
    try
    if (msgType == MSG_TYPE_REPL_SERVER_START_V1)
    {
      /* then read the version */
      short readVersion = (short)encodedMsg[1];
      if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
        throw new DataFormatException("Not a valid message: type is " +
          encodedMsg[0] + " but protocol version byte is " + readVersion +
          " instead of " + ProtocolVersion.getCurrentVersion());
      protocolVersion = readVersion;
      if (version != ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL)
      {
        throw new DataFormatException("Not a valid message: type is " + msgType
            + " but protocol version byte is " + version + " instead of "
            + ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
      }
      /* read the generationId */
      int pos = 2;
      int length = getNextLength(encodedMsg, pos);
      generationId = Long.valueOf(new String(encodedMsg, pos, length,
          "UTF-8"));
      pos += length +1;
      // Force version to V1
      // We need to translate the MSG_TYPE_REPL_SERVER_START_V1 version
      // into REPLICATION_PROTOCOL_V1 so that we only see V1 everywhere.
      protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
      /* read the group id */
      groupId = encodedMsg[pos];
      pos++;
      return pos;
    } catch (UnsupportedEncodingException e)
      // In V1, version was 1 (49) in string, so with a null
      // terminating string. Let's position the cursor at the next byte
      scanner.skipZeroSeparator();
      generationId = scanner.nextLongUTF8();
    }
    else
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      if (version < ProtocolVersion.REPLICATION_PROTOCOL_V2)
      {
        throw new DataFormatException("Not a valid message: type is " + msgType
            + " but protocol version byte is " + version + " instead of "
            + ProtocolVersion.getCurrentVersion());
      }
      protocolVersion = version;
      generationId = scanner.nextLongUTF8();
      groupId = scanner.nextByte();
    }
  }
  /**
   * Decode the Header part of this message, and check its type. This uses the
   * version 1 of the replication protocol (used for compatibility purpose).
   *
   * @param type The type of this message.
   * @param encodedMsg the encoded form of the message.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader_V1(byte type, byte [] encodedMsg)
  throws DataFormatException
  private boolean isTypeAllowed(byte[] allowedTypes, final byte msgType)
  {
    if (encodedMsg[0] != type)
      throw new DataFormatException("byte[] is not a valid start msg: expected "
        + " a V1 PDU, received: " + encodedMsg[0]);
    if (encodedMsg[1] != ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL)
    for (byte allowedType : allowedTypes)
    {
      throw new DataFormatException("Not a valid message: type is " +
        type + " but protocol version byte is " + encodedMsg[1] + " instead of "
        + ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
      if (msgType == allowedType)
      {
        return true;
      }
    }
    // Force version to V1
    // We need to translate the MSG_TYPE_REPL_SERVER_START_V1 version
    // into REPLICATION_PROTOCOL_V1 so that we only see V1 everywhere.
    protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
    try
    {
      // In V1, version was 1 (49) in string, so with a null
      // terminating string. Let's position the cursor at the next byte
      int pos = 3;
      /* read the generationId */
      int length = getNextLength(encodedMsg, pos);
      generationId = Long.valueOf(new String(encodedMsg, pos, length,
          "UTF-8"));
      pos += length +1;
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    return false;
  }
  /**
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -80,7 +77,7 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public StartSessionMsg(byte[] in, short version) throws DataFormatException
  StartSessionMsg(byte[] in, short version) throws DataFormatException
  {
    if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
@@ -110,35 +107,21 @@
    this.safeDataLevel = safeDataLevel;
  }
  /**
   * Creates a new message with the given required parameters.
   * Assured mode is false.
   * @param status Status we are starting with
   * @param referralsURLs Referrals URLs to be used by peer DSs
   */
  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs)
  {
    this.referralsURLs.addAll(referralsURLs);
    this.status = status;
    this.assuredFlag = false;
  }
  // ============
  // Msg encoding
  // ============
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    if (reqProtocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      return getBytes_V23();
    }
    else
    {
      return getBytes_V45(reqProtocolVersion);
      return getBytes_V45(protocolVersion);
    }
  }
@@ -157,7 +140,9 @@
      writer.writeStartSequence();
      for (String url : referralsURLs)
      {
        writer.writeOctetString(url);
      }
      writer.writeEndSequence();
      writer.writeStartSequence();
@@ -193,57 +178,37 @@
     * <list of referrals urls>
     * (each referral url terminates with 0)
     */
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_START_SESSION);
    builder.append(status.getValue());
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    try
    if (referralsURLs.size() >= 1)
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_START_SESSION);
      // Put the status
      oStream.write(status.getValue());
      // Put the assured flag
      oStream.write(assuredFlag ? (byte) 1 : (byte) 0);
      // Put assured mode
      oStream.write(assuredMode.getValue());
      // Put safe data level
      oStream.write(safeDataLevel);
      // Put the referrals URLs
      if (referralsURLs.size() >= 1)
      for (String url : referralsURLs)
      {
        for (String url : referralsURLs)
        {
          byte[] byteArrayURL = url.getBytes("UTF-8");
          oStream.write(byteArrayURL);
          oStream.write(0);
        }
        builder.append(url);
      }
      return oStream.toByteArray();
    } catch (IOException e)
    {
      // never happens
      return null;
    }
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decode_V45(byte[] in, short version)
  throws DataFormatException
  private void decode_V45(byte[] in, short version) throws DataFormatException
  {
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    try
    {
      if (reader.get() != MSG_TYPE_START_SESSION)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      {
        throw new DataFormatException("input is not a valid "
            + getClass().getCanonicalName());
      }
      /*
      status = ServerStatus.valueOf(asn1Reader.readOctetString().byteAt(0));
@@ -279,8 +244,7 @@
        asn1Reader.readStartSequence();
        while (asn1Reader.hasNextElement())
        {
          String s = asn1Reader.readOctetStringAsString();
          this.eclIncludesForDeletes.add(s);
          this.eclIncludesForDeletes.add(asn1Reader.readOctetStringAsString());
        }
        asn1Reader.readEndSequence();
      }
@@ -296,8 +260,7 @@
    }
  }
  private void decode_V23(byte[] in)
  throws DataFormatException
  private void decode_V23(byte[] in) throws DataFormatException
  {
    /*
     * The message is stored in the form:
@@ -305,46 +268,22 @@
     * <list of referrals urls>
     * (each referral url terminates with 0)
     */
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_START_SESSION)
    {
      /* first byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_START_SESSION)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
      }
      throw new DataFormatException(
          "Input is not a valid " + getClass().getCanonicalName());
    }
      /* Read the status */
      status = ServerStatus.valueOf(in[1]);
    status = ServerStatus.valueOf(scanner.nextByte());
    assuredFlag = scanner.nextBoolean();
    assuredMode = AssuredMode.valueOf(scanner.nextByte());
    safeDataLevel = scanner.nextByte();
      /* Read the assured flag */
      assuredFlag = in[2] == 1;
      /* Read the assured mode */
      assuredMode = AssuredMode.valueOf(in[3]);
      /* Read the safe data level */
      safeDataLevel = in[4];
      /* Read the referrals URLs */
      int pos = 5;
      while (pos < in.length)
      {
        /*
         * Read the next URL
         * first calculate the length then construct the string
         */
        int length = getNextLength(in, pos);
        referralsURLs.add(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
      }
    } catch (UnsupportedEncodingException e)
    while (!scanner.isEmpty())
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
      referralsURLs.add(scanner.nextString());
    }
  }
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -37,6 +34,8 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
 * This class defines a message that is sent:
 * - By a RS to the other RSs in the topology, containing:
@@ -68,173 +67,102 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public TopologyMsg(byte[] in, short version) throws DataFormatException
  TopologyMsg(byte[] in, short version) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_TOPOLOGY)
    {
      /* First byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      Map<Integer, DSInfo> replicaInfos =
          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
      while (nDsInfo > 0 && pos < in.length)
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int dsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read DS URL */
        String dsUrl;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
        {
          length = getNextLength(in, pos);
          dsUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
        }
        else
        {
          dsUrl = "";
        }
        /* Read RS id */
        length = getNextLength(in, pos);
        serverIdString = new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
        /* Read DS assured flag */
        boolean assuredFlag = in[pos++] == 1;
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
        /* Read DS safe data level */
        byte safeDataLevel = in[pos++];
        /* Read DS group id */
        byte groupId = in[pos++];
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        pos = readStrings(in, pos, refUrls);
        Set<String> attrs = new HashSet<String>();
        Set<String> delattrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          pos = readStrings(in, pos, attrs);
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            pos = readStrings(in, pos, delattrs);
          }
          else
          {
            // Default to using the same set of attributes for deletes.
            delattrs.addAll(attrs);
          }
          /* Read Protocol version */
          protocolVersion = in[pos++];
        }
        /* Now create DSInfo and store it */
        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
            attrs, delattrs, protocolVersion));
        nDsInfo--;
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while (nRsInfo > 0 && pos < in.length)
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
        int weight = 1;
        String serverUrl = null;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          length = getNextLength(in, pos);
          serverUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
        }
        /* Now create RSInfo and store it */
        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
        nRsInfo--;
      }
      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
      this.rsInfos = Collections.unmodifiableList(rsInfos);
      throw new DataFormatException("Input is not a valid "
          + getClass().getCanonicalName());
    }
    catch (UnsupportedEncodingException e)
    // Read the DS info entries, first read number of them
    int nDsInfo = scanner.nextByte();
    final Map<Integer, DSInfo> replicaInfos =
        new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
    while (nDsInfo > 0 && !scanner.isEmpty())
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      final DSInfo dsInfo = nextDSInfo(scanner, version);
      replicaInfos.put(dsInfo.getDsId(), dsInfo);
      nDsInfo--;
    }
    // Read the RS info entries
    int nRsInfo = scanner.nextByte();
    final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
    while (nRsInfo > 0 && !scanner.isEmpty())
    {
      rsInfos.add(nextRSInfo(scanner, version));
      nRsInfo--;
    }
    this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
    this.rsInfos = Collections.unmodifiableList(rsInfos);
  }
  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
      throws DataFormatException, UnsupportedEncodingException
  private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
      throws DataFormatException
  {
    byte nAttrs = in[pos++];
    byte nRead = 0;
    // Read all elements until expected number read
    while (nRead != nAttrs && pos < in.length)
    final int dsId = scanner.nextIntUTF8();
    final String dsUrl =
        version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
    final int rsId = scanner.nextIntUTF8();
    final long generationId = scanner.nextLongUTF8();
    final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
    final boolean assuredFlag = scanner.nextBoolean();
    final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
    final byte safeDataLevel = scanner.nextByte();
    final byte groupId = scanner.nextByte();
    final List<String> refUrls = new ArrayList<String>();
    scanner.nextStrings(refUrls);
    final Set<String> attrs = new HashSet<String>();
    final Set<String> delattrs = new HashSet<String>();
    short protocolVersion = -1;
    if (version >= REPLICATION_PROTOCOL_V4)
    {
      int length = getNextLength(in, pos);
      outputCol.add(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      nRead++;
      scanner.nextStrings(attrs);
      if (version >= REPLICATION_PROTOCOL_V5)
      {
        scanner.nextStrings(delattrs);
      }
      else
      {
        // Default to using the same set of attributes for deletes.
        delattrs.addAll(attrs);
      }
      protocolVersion = scanner.nextByte();
    }
    return pos;
    return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
        assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
        protocolVersion);
  }
  private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
      throws DataFormatException
  {
    final int rsId = scanner.nextIntUTF8();
    final long generationId = scanner.nextLongUTF8();
    final byte groupId = scanner.nextByte();
    int weight = 1;
    String serverUrl = null;
    if (version >= REPLICATION_PROTOCOL_V4)
    {
      serverUrl = scanner.nextString();
      weight = scanner.nextIntUTF8();
    }
    return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
  }
  /**
   * Creates a new  message of the currently connected servers.
   * Creates a new message of the currently connected servers.
   *
   * @param dsInfos The collection of currently connected DS servers ID.
   * @param rsInfos The list of currently connected RS servers ID.
@@ -272,122 +200,62 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version) throws UnsupportedEncodingException
  public byte[] getBytes(short version)
  {
    try
    /**
     * Message has the following form:
     * <pdu type><number of following DSInfo entries>[<DSInfo>]*
     * <number of following RSInfo entries>[<RSInfo>]*
     */
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_TOPOLOGY);
    // Put DS infos
    builder.append((byte) replicaInfos.size());
    for (DSInfo dsInfo : replicaInfos.values())
    {
      /**
       * Message has the following form:
       * <pdu type><number of following DSInfo entries>[<DSInfo>]*
       * <number of following RSInfo entries>[<RSInfo>]*
       */
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_TOPOLOGY);
      // Put number of following DS info entries
      oStream.write((byte) replicaInfos.size());
      // Put DS info
      for (DSInfo dsInfo : replicaInfos.values())
      builder.appendUTF8(dsInfo.getDsId());
      if (version >= REPLICATION_PROTOCOL_V6)
      {
        // Put DS id
        byte[] byteServerId =
          String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
        {
          // Put DS URL
          oStream.write(dsInfo.getDsUrl().getBytes("UTF-8"));
          oStream.write(0);
        }
        // Put RS id
        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
        oStream.write(String.valueOf(dsInfo.getGenerationId()).
            getBytes("UTF-8"));
        oStream.write(0);
        // Put DS status
        oStream.write(dsInfo.getStatus().getValue());
        // Put DS assured flag
        oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
        // Put DS assured mode
        oStream.write(dsInfo.getAssuredMode().getValue());
        // Put DS safe data level
        oStream.write(dsInfo.getSafeDataLevel());
        // Put DS group id
        oStream.write(dsInfo.getGroupId());
        writeStrings(oStream, dsInfo.getRefUrls());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          writeStrings(oStream, dsInfo.getEclIncludes());
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
          {
            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
          }
          oStream.write(dsInfo.getProtocolVersion());
        }
        builder.append(dsInfo.getDsUrl());
      }
      builder.appendUTF8(dsInfo.getRsId());
      builder.appendUTF8(dsInfo.getGenerationId());
      builder.append(dsInfo.getStatus().getValue());
      builder.append(dsInfo.isAssured());
      builder.append(dsInfo.getAssuredMode().getValue());
      builder.append(dsInfo.getSafeDataLevel());
      builder.append(dsInfo.getGroupId());
      // Put number of following RS info entries
      oStream.write((byte) rsInfos.size());
      builder.appendStrings(dsInfo.getRefUrls());
      // Put RS info
      for (RSInfo rsInfo : rsInfos)
      if (version >= REPLICATION_PROTOCOL_V4)
      {
        // Put RS id
        byte[] byteServerId =
          String.valueOf(rsInfo.getId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
        oStream.write(String.valueOf(rsInfo.getGenerationId()).
          getBytes("UTF-8"));
        oStream.write(0);
        // Put RS group id
        oStream.write(rsInfo.getGroupId());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        builder.appendStrings(dsInfo.getEclIncludes());
        if (version >= REPLICATION_PROTOCOL_V5)
        {
          // Put server URL
          oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
          oStream.write(0);
          // Put RS weight
          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
          oStream.write(0);
          builder.appendStrings(dsInfo.getEclIncludesForDeletes());
        }
        builder.append((byte) dsInfo.getProtocolVersion());
      }
    }
      return oStream.toByteArray();
    }
    catch (IOException e)
    // Put RS infos
    builder.append((byte) rsInfos.size());
    for (RSInfo rsInfo : rsInfos)
    {
      // never happens
      throw new RuntimeException(e);
    }
  }
      builder.appendUTF8(rsInfo.getId());
      builder.appendUTF8(rsInfo.getGenerationId());
      builder.append(rsInfo.getGroupId());
  private void writeStrings(ByteArrayOutputStream oStream,
      Collection<String> col) throws IOException, UnsupportedEncodingException
  {
    // Put collection length as a byte
    oStream.write(col.size());
    for (String elem : col)
    {
      // Write the element and a 0 terminating byte
      oStream.write(elem.getBytes("UTF-8"));
      oStream.write(0);
      if (version >= REPLICATION_PROTOCOL_V4)
      {
        builder.append(rsInfo.getServerUrl());
        builder.appendUTF8(rsInfo.getWeight());
      }
    }
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
@@ -414,7 +282,7 @@
      + "CONNECTED RS SERVERS:"
      + "\n--------------------\n"
      + rsStr
      + (rsStr.equals("") ? "----------------------------\n" : "");
      + ("".equals(rsStr) ? "----------------------------\n" : "");
  }
  /**
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,16 +22,17 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
/**
 * Abstract class that must be extended to define a message
 * used for sending Updates between servers.
@@ -39,42 +40,31 @@
public class UpdateMsg extends ReplicationMsg
                                    implements Comparable<UpdateMsg>
{
  /**
   * Protocol version.
   */
  /** Protocol version. */
  protected short protocolVersion;
  /**
   * The CSN of this update.
   */
  /** The CSN of this update. */
  protected CSN csn;
  /**
   * True when the update must use assured replication.
   */
  /** True when the update must use assured replication. */
  protected boolean assuredFlag = false;
  /**
   * When assuredFlag is true, defines the requested assured mode.
   */
  /** When assuredFlag is true, defines the requested assured mode. */
  protected AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  /**
   * When assured mode is safe data, gives the requested level.
   */
  /** When assured mode is safe data, gives the requested level. */
  protected byte safeDataLevel = (byte)1;
  /**
   * The payload that must be encoded in this message.
   */
  private byte[] payload;
  /** The payload that must be encoded in this message. */
  private final byte[] payload;
  /**
   * Creates a new empty UpdateMsg.
   */
  protected UpdateMsg()
  {}
  {
    payload = null;
  }
  /**
   * Creates a new UpdateMsg with the given information.
@@ -85,25 +75,10 @@
   */
  UpdateMsg(byte[] bytes) throws DataFormatException
  {
    // Decode header
    int pos = decodeHeader(MSG_TYPE_GENERIC_UPDATE, bytes);
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    decodeHeader(MSG_TYPE_GENERIC_UPDATE, scanner);
    // Read the payload : all the remaining bytes but the terminating 0
    int length = bytes.length - pos;
    payload = new byte[length];
    try
    {
      System.arraycopy(bytes, pos, payload, 0, length);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    payload = scanner.remainingBytes();
  }
  /**
@@ -152,9 +127,7 @@
    assuredFlag = assured;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object obj)
  {
@@ -162,18 +135,14 @@
        csn.equals(((UpdateMsg) obj).csn);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    return csn.hashCode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int compareTo(UpdateMsg msg)
  {
@@ -243,103 +212,50 @@
   * Encode the common header for all the UpdateMsg. This uses the current
   * protocol version.
   *
   * @param type the type of UpdateMsg to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMsg.
   * @param version The ProtocolVersion to use when encoding.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMsg as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType The type of UpdateMsg to encode.
   * @param protocolVersion The ProtocolVersion to use when encoding.
   * @return a byte array builder containing the common header
   */
  protected byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  protected ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
  {
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><assured>
     * <assured mode> <safe data level>
     * the length of result byte array is therefore :
     *   1 + 1 + CSN length + 1 + 1
     *   + 1 + 1 + additional_length
     */
    int length = 6 + csnByte.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    // put the protocol version
    encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
    int pos = 2;
    // Put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // Put the assured flag
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // Put the assured mode
    encodedMsg[pos++] = assuredMode.getValue();
    // Put the safe data level
    encodedMsg[pos++] = safeDataLevel;
    return encodedMsg;
    final ByteArrayBuilder builder =
        new ByteArrayBuilder(bytes(6) + csnsUTF8(1));
    builder.append(msgType);
    builder.append((byte) ProtocolVersion.getCurrentVersion());
    builder.appendUTF8(getCSN());
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    return builder;
  }
  /**
   * Decode the Header part of this Update Message, and check its type.
   *
   * @param type The allowed type of this Update Message.
   * @param encodedMsg the encoded form of the UpdateMsg.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   * @param allowedType The allowed type of this Update Message.
   * @param scanner The encoded form of the UpdateMsg.
   * @throws DataFormatException
   *           if the scanner does not contain a valid common header.
   */
  protected int decodeHeader(byte type, byte[] encodedMsg)
                          throws DataFormatException
  protected void decodeHeader(byte allowedType, ByteArrayScanner scanner)
      throws DataFormatException
  {
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><assured>
     * <assured mode> <safe data level>
     */
    if (!(type == encodedMsg[0]))
    final byte msgType = scanner.nextByte();
    if (allowedType != msgType)
    {
      throw new DataFormatException("byte[] is not a valid update msg: "
        + encodedMsg[0]);
    // read the protocol version
    protocolVersion = encodedMsg[1];
    try
    {
      // Read the CSN
      int pos = 2;
      int length = getNextLength(encodedMsg, pos);
      String csnStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      csn = new CSN(csnStr);
      // Read the assured information
      assuredFlag = encodedMsg[pos++] == 1;
      // Read the assured mode
      assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
      // Read the safe data level
      safeDataLevel = encodedMsg[pos++];
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
          + msgType);
    }
    protocolVersion = scanner.nextByte();
    csn = scanner.nextCSNUTF8();
    assuredFlag = scanner.nextBoolean();
    assuredMode = AssuredMode.valueOf(scanner.nextByte());
    safeDataLevel = scanner.nextByte();
  }
  /**
@@ -347,10 +263,8 @@
   * protocol version.
   *
   * @return The encoded representation of this update message.
   * @throws UnsupportedEncodingException
   *           If the message could not be encoded.
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  public byte[] getBytes()
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
@@ -364,20 +278,11 @@
   */
  @Override
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    // Encode the header in a byte[] large enough to also contain the payload
    byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        payload.length, ProtocolVersion.getCurrentVersion());
    int pos = resultByteArray.length - payload.length;
    // Add the payload
    for (int i = 0; i < payload.length; i++, pos++)
    {
      resultByteArray[pos] = payload[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        ProtocolVersion.getCurrentVersion());
    builder.append(payload);
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
@@ -43,7 +41,6 @@
{
  private final int numAck;
  /**
   * Create a new WindowMsg.
   *
@@ -63,64 +60,28 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMsg.
   */
  public WindowMsg(byte[] in) throws DataFormatException
  WindowMsg(byte[] in) throws DataFormatException
  {
    /* The WindowMsg is encoded in the form :
     * <numAck>
     */
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_WINDOW)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_WINDOW)
        throw new DataFormatException("input is not a valid Window Message");
      int pos = 1;
      /*
       * read the number of acks contained in this message.
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      String numAckStr = new String(in, pos, length, "UTF-8");
      pos += length +1;
      numAck = Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      throw new DataFormatException("input is not a valid Window Message");
    }
    numAck = scanner.nextIntUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    /*
     * WindowMsg contains.
     * <numAck>
     */
    try {
      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
      int length = 1 + byteNumAck.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_WINDOW;
      int pos = 1;
      pos = addByteArray(byteNumAck, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_WINDOW);
    builder.appendUTF8(numAck);
    return builder.toByteArray();
  }
  /**
   * Get the number of message acknowledged by the Window Message.
   *
@@ -131,9 +92,7 @@
    return numAck;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.server;
@@ -31,9 +31,7 @@
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.*;
/**
 * This is a facility class that is in fact an hack to optimize replication
@@ -48,28 +46,32 @@
 * when calling the isAssured() method.
 *
 */
public class NotAssuredUpdateMsg extends UpdateMsg
class NotAssuredUpdateMsg extends UpdateMsg
{
  // The real update message this message represents
  private UpdateMsg realUpdateMsg = null;
  /** The real update message this message represents */
  private final UpdateMsg realUpdateMsg;
  // V1 serialized form of the real message with assured flag set to false.
  // Ready to be sent.
  private byte[] realUpdateMsgNotAssuredBytesV1 = null;
  /**
   * V1 serialized form of the real message with assured flag set to false.
   * Ready to be sent.
   */
  private final byte[] realUpdateMsgNotAssuredBytesV1;
  // VLatest serialized form of the real message with assured flag set to false.
  // Ready to be sent.
  private byte[] realUpdateMsgNotAssuredBytesVLatest = null;
  /**
   * VLatest serialized form of the real message with assured flag set to false.
   * Ready to be sent.
   */
  private final byte[] realUpdateMsgNotAssuredBytesVLatest;
  /**
   * Creates a new empty UpdateMsg.
   * This class is only used by replication server code so constructor is not
   * public by security.
   *
   * @param updateMsg The real underlying update message this object represents.
   * @throws UnsupportedEncodingException  When the pre-encoding of the message
   *         failed because the UTF-8 encoding is not supported or the
   *         requested protocol version to use is not supported by this PDU.
   *
   */
  NotAssuredUpdateMsg(UpdateMsg updateMsg) throws UnsupportedEncodingException
  {
@@ -85,18 +87,7 @@
       * Get the encoding form of the real message then overwrite the assured
       * flag to always be false.
       */
      byte[] origBytes = realUpdateMsg.getBytes(
        ProtocolVersion.REPLICATION_PROTOCOL_V1);
      // Clone the byte array to be able to modify it without problems
      // (ModifyMsg messages for instance do not return a cloned version of
      // their byte array)
      byte[] bytes = new byte[origBytes.length];
      System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
      int maxLen = bytes.length;
      int pos;
      int nZeroFound = 0; // Number of 0 value found
      boolean found = false;
      byte[] bytes = getRealUpdateMsgBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
      /* Look for assured flag position:
       * The message header is stored in the form :
@@ -107,23 +98,7 @@
       * See LDAPUpdateMsg.encodeHeader_V1() for more information
       */
      // Find end of CSN then end of dn
      for (pos = 1; pos < maxLen; pos++)
      {
        if (bytes[pos] == (byte) 0)
        {
          nZeroFound++;
          if (nZeroFound == 2) // 2 end of string to find
          {
            found = true;
            break;
          }
        }
      }
      if (!found)
        throw new UnsupportedEncodingException("Could not find end of CSN.");
      pos++;
      if (pos >= maxLen)
        throw new UnsupportedEncodingException("Reached end of packet.");
      int pos = findNthZeroByte(bytes, 1, 2);
      // Force assured flag to false
      bytes[pos] = 0;
@@ -135,16 +110,7 @@
       * Get the encoding form of the real message then overwrite the assured
       * flag to always be false.
       */
      origBytes = realUpdateMsg.getBytes(ProtocolVersion.getCurrentVersion());
      // Clone the byte array to be able to modify it without problems
      // (ModifyMsg messages for instance do not return a cloned version of
      // their byte array)
      bytes = new byte[origBytes.length];
      System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
      maxLen = bytes.length;
      nZeroFound = 0; // Number of 0 value found
      found = false;
      bytes = getRealUpdateMsgBytes(ProtocolVersion.getCurrentVersion());
      /* Look for assured flag position:
       * The message header is stored in the form :
@@ -156,48 +122,22 @@
       * See LDAPUpdateMsg.encodeHeader() for more information
       */
      // Find end of CSN then end of dn then end of uuid
      for (pos = 2; pos < maxLen; pos++)
      {
        if (bytes[pos] == (byte) 0)
        {
          nZeroFound++;
          if (nZeroFound == 3) // 3 end of string to find
          {
            found = true;
            break;
          }
        }
      }
      if (!found)
        throw new UnsupportedEncodingException("Could not find end of CSN.");
      pos++;
      if (pos >= maxLen)
        throw new UnsupportedEncodingException("Reached end of packet.");
      pos = findNthZeroByte(bytes, 2, 3);
      // Force assured flag to false
      bytes[pos] = 0;
      // Store computed VLATEST serialized form
      realUpdateMsgNotAssuredBytesVLatest = bytes;
    } else
    }
    else
    {
      realUpdateMsgNotAssuredBytesV1 = null;
      /**
       * Prepare VLATEST serialized form of the message:
       * Get the encoding form of the real message then overwrite the assured
       * flag to always be false.
       */
      byte[] origBytes = realUpdateMsg.getBytes(
          ProtocolVersion.getCurrentVersion());
      // Clone the byte array to be able to modify it without problems
      // (ModifyMsg messages for instance do not return a cloned version of
      // their byte array)
      byte[] bytes = new byte[origBytes.length];
      System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
      int maxLen = bytes.length;
      int pos;
      int nZeroFound = 0; // Number of 0 value found
      boolean found = false;
      byte[] bytes = getRealUpdateMsgBytes(ProtocolVersion.getCurrentVersion());
      // This is a generic update message
      /* Look for assured flag position:
@@ -210,23 +150,7 @@
       * See UpdateMsg.encodeHeader() for more  information
       */
      // Find end of CSN
      for (pos = 2; pos < maxLen; pos++)
      {
        if (bytes[pos] == (byte) 0)
        {
          nZeroFound++;
          if (nZeroFound == 1) // 1 end of string to find
          {
            found = true;
            break;
          }
        }
      }
      if (!found)
        throw new UnsupportedEncodingException("Could not find end of CSN.");
      pos++;
      if (pos >= maxLen)
        throw new UnsupportedEncodingException("Reached end of packet.");
      int pos = findNthZeroByte(bytes, 2, 1);
      // Force assured flag to false
      bytes[pos] = 0;
@@ -236,17 +160,52 @@
  }
  /**
   * {@inheritDoc}
   * Clones the byte array to be able to modify it without problems
   * (ModifyMsg messages for instance do not return a cloned version of
   * their byte array).
   */
  private byte[] getRealUpdateMsgBytes(final short protocolVersion)
  {
    byte[] origBytes = realUpdateMsg.getBytes(protocolVersion);
    byte[] bytes = new byte[origBytes.length];
    System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
    return bytes;
  }
  private int findNthZeroByte(byte[] bytes, int startPos, int nbToFind)
      throws UnsupportedEncodingException
  {
    final int maxLen = bytes.length;
    int nbZeroFound = 0; // Number of 0 values found
    for (int pos = startPos; pos < maxLen; pos++)
    {
      if (bytes[pos] == (byte) 0)
      {
        nbZeroFound++;
        if (nbZeroFound == nbToFind)
        {
          // nb of end of strings reached
          pos++;
          if (pos >= maxLen)
          {
            throw new UnsupportedEncodingException("Reached end of packet.");
          }
          return pos;
        }
      }
    }
    throw new UnsupportedEncodingException(
        "Could not find " + nbToFind + " zero bytes in byte array.");
  }
  /** {@inheritDoc} */
  @Override
  public CSN getCSN()
  {
    return realUpdateMsg.getCSN();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isAssured()
  {
@@ -254,9 +213,7 @@
    return false;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void setAssured(boolean assured)
  {
@@ -264,78 +221,59 @@
    // and we do not want to change the original real update message settings
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object obj)
  {
    // Compare with the underlying real update message
    if (obj != null)
    {
      if (obj.getClass() != realUpdateMsg.getClass())
        return false;
      return realUpdateMsg.getCSN().
        equals(((UpdateMsg)obj).getCSN());
    }
    else
    if (obj == null)
    {
      return false;
    }
    return obj.getClass() == realUpdateMsg.getClass()
        && realUpdateMsg.getCSN().equals(((UpdateMsg) obj).getCSN());
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    return realUpdateMsg.hashCode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int compareTo(UpdateMsg msg)
  {
    return realUpdateMsg.compareTo(msg);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      return realUpdateMsgNotAssuredBytesV1;
    else
      return realUpdateMsgNotAssuredBytesVLatest;
    }
    return realUpdateMsgNotAssuredBytesVLatest;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public AssuredMode getAssuredMode()
  {
    return realUpdateMsg.getAssuredMode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte getSafeDataLevel()
  {
    return realUpdateMsg.getSafeDataLevel();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void setAssuredMode(AssuredMode assuredMode)
  {
@@ -343,9 +281,7 @@
    // and we do not want to change the original real update message settings
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void setSafeDataLevel(byte safeDataLevel)
  {
@@ -353,49 +289,38 @@
    // and we do not want to change the original real update message settings
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public short getVersion()
  {
    return realUpdateMsg.getVersion();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int size()
  {
    return realUpdateMsg.size();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  protected byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  protected ByteArrayBuilder encodeHeader(byte allowedType,
      short protocolVersion)
  {
    // Not called as only used by constructors using bytes
    return null;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int decodeHeader(byte type, byte[] encodedMsg)
                          throws DataFormatException
  protected void decodeHeader(byte msgType, ByteArrayScanner scanner)
      throws DataFormatException
  {
    // Not called as only used by getBytes methods
    return -1;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getPayload()
  {
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -25,7 +25,6 @@
 */
package org.opends.server.replication.server.changelog.file;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -35,7 +34,6 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -48,11 +46,9 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * Represents a replication server database for one server in the topology.
@@ -370,22 +366,12 @@
  /** Parser of records persisted in the ReplicaDB log. */
  private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
  {
    private static final DebugTracer TRACER = getTracer();
    @Override
    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
    {
      final UpdateMsg message = record.getValue();
      try
      {
        return ByteString.wrap(message.getBytes());
      }
      catch (UnsupportedEncodingException e)
      {
        // should never happen
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        return ByteString.empty();
      }
      return ByteString.wrap(message.getBytes());
    }
    @Override
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -34,6 +34,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -56,14 +58,14 @@
 * <p>
 * This is the only class that should have code using the BDB interfaces.
 */
public class ReplicationDB
class ReplicationDB
{
  private Database db;
  private ReplicationDbEnv dbEnv;
  private ReplicationServer replicationServer;
  private int serverId;
  private DN baseDN;
  private final ReplicationDbEnv dbEnv;
  private final ReplicationServer replicationServer;
  private final int serverId;
  private final DN baseDN;
  /**
   * The lock used to provide exclusive access to the thread that close the db
@@ -120,7 +122,7 @@
   * @param dbEnv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened
   */
  public ReplicationDB(int serverId, DN baseDN,
  ReplicationDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
      throws ChangelogException
  {
@@ -188,7 +190,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void addEntry(UpdateMsg change) throws ChangelogException
  void addEntry(UpdateMsg change) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -200,7 +202,9 @@
      }
      final DatabaseEntry key = createReplicationKey(change.getCSN());
      final DatabaseEntry data = new ReplicationData(change);
      // Always keep messages in the replication DB with the current protocol
      // version
      final DatabaseEntry data = new DatabaseEntry(change.getBytes());
      insertCounterRecordIfNeeded(change.getCSN());
      db.put(null, key, data);
@@ -256,7 +260,7 @@
  /**
   * Shutdown the database.
   */
  public void shutdown()
  void shutdown()
  {
    dbCloseLock.writeLock().lock();
    try
@@ -286,8 +290,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public ReplServerDBCursor openReadCursor(CSN startCSN)
      throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN);
  }
@@ -301,7 +304,7 @@
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor() throws ChangelogException
  ReplServerDBCursor openDeleteCursor() throws ChangelogException
  {
    return new ReplServerDBCursor();
  }
@@ -325,7 +328,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readOldestCSN() throws ChangelogException
  CSN readOldestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -381,7 +384,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readNewestCSN() throws ChangelogException
  CSN readNewestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -432,93 +435,7 @@
    }
  }
  /**
   * Try to find in the DB, the CSN right before the one passed as a parameter.
   *
   * @param csn
   *          The CSN from which we start searching.
   * @return the CSN right before the one passed as a parameter. Can return null
   *         if there is none.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN getPreviousCSN(CSN csn) throws ChangelogException
  {
    if (csn == null)
    {
      return null;
    }
    dbCloseLock.readLock().lock();
    Cursor cursor = null;
    try
    {
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return null;
      }
      DatabaseEntry key = createReplicationKey(csn);
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
      {
        // We can move close to the CSN.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
        // else, there was no change previous to our CSN.
      }
      else
      {
        // We could not move the cursor past to the CSN
        // Check if the last change is older than CSN
        if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
      }
    }
    catch (DatabaseException e)
    {
      throw new ChangelogException(e);
    }
    finally
    {
      closeAndReleaseReadLock(cursor);
    }
    return null;
  }
  private CSN getRegularRecord(Cursor cursor, DatabaseEntry key,
      DatabaseEntry data) throws DatabaseException
  {
    final CSN csn = toCSN(key.getData());
    if (!isACounterRecord(csn))
    {
      return csn;
    }
    // There cannot be 2 counter record next to each other,
    // it is safe to return previous record which must exist
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
    {
      return toCSN(key.getData());
    }
    // database only contain a counter record, which should not be possible
    // let's just say no CSN
    return null;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -529,7 +446,7 @@
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  public class ReplServerDBCursor implements Closeable
  class ReplServerDBCursor implements Closeable
  {
    /**
     * The transaction that will protect the actions done with the cursor.
@@ -713,7 +630,7 @@
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
    public void abort()
    void abort()
    {
      synchronized (this)
      {
@@ -735,7 +652,7 @@
     * @throws ChangelogException
     *           In case of underlying database problem.
     */
    public CSN nextCSN() throws ChangelogException
    CSN nextCSN() throws ChangelogException
    {
      if (isClosed)
      {
@@ -761,7 +678,7 @@
     *
     * @return the next UpdateMsg.
     */
    public UpdateMsg next()
    UpdateMsg next()
    {
      if (isClosed)
      {
@@ -791,7 +708,8 @@
          {
            continue;
          }
          currentChange = ReplicationData.generateChange(data.getData());
          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
              data.getData(), ProtocolVersion.getCurrentVersion());
        }
        catch (Exception e)
        {
@@ -806,7 +724,7 @@
           */
          Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
              .get(replicationServer.getServerId(),
                  (csn == null ? "" : csn.toString()),
                  (csn != null ? csn.toString() : ""),
                  e.getMessage());
          logError(message);
        }
@@ -819,7 +737,7 @@
     *
     * @throws ChangelogException In case of database problem.
     */
    public void delete() throws ChangelogException
    void delete() throws ChangelogException
    {
      if (isClosed)
      {
@@ -842,7 +760,7 @@
   *
   * @throws ChangelogException In case of database problem.
   */
  public void clear() throws ChangelogException
  void clear() throws ChangelogException
  {
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
@@ -912,7 +830,7 @@
   * Encode the provided counter value in a database entry.
   * @return The database entry with the counter value encoded inside.
   */
  static private DatabaseEntry encodeCounterValue(int value)
  private static DatabaseEntry encodeCounterValue(int value)
  {
    DatabaseEntry entry = new DatabaseEntry();
    entry.setData(getBytes(String.valueOf(value)));
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationData.java
File was deleted
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
@@ -93,11 +93,6 @@
    String stringRep = serverState.toString();
    assertTrue(stringRep.contains(csn2.toString()));
    assertTrue(stringRep.contains(csn3.toString()));
    // Check getBytes
    byte[] b = serverState.getBytes();
    ServerState generatedServerState = new ServerState(b,0,b.length -1) ;
    assertEquals(b, generatedServerState.getBytes()) ;
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
@@ -27,13 +27,22 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.zip.DataFormatException;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteStringBuilder;
import org.testng.Assert;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes.
 */
@@ -41,44 +50,194 @@
public class ByteArrayTest extends DirectoryServerTestCase
{
  private static final class IntegerRange implements Iterator<Object[]>
  {
    private int next;
    private final int endInclusive;
    public IntegerRange(int startInclusive, int endInclusive)
    {
      this.next = startInclusive;
      this.endInclusive = endInclusive;
    }
    @Override
    public boolean hasNext()
    {
      return next <= this.endInclusive;
    }
    @Override
    public Object[] next()
    {
      return new Object[] { next++ };
    }
    @Override
    public void remove() { /* unused */ }
  }
  @BeforeClass
  public void setup() throws Exception
  {
    TestCaseUtils.startFakeServer();
  }
  @AfterClass
  public void teardown() throws Exception
  {
    TestCaseUtils.shutdownFakeServer();
  }
  private final byte[] byteArray = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, };
  @Test
  public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception
  {
    final boolean bo = true;
    final boolean boFalse = false;
    final boolean boTrue = true;
    final byte by = 80;
    final short sh = 42;
    final int i = sh + 1;
    final long l = i + 1;
    final String st = "Yay!";
    final String nullStr = null;
    final String str = "Yay!";
    final Collection<String> col = Arrays.asList("foo", "bar", "baz");
    final CSN csn = new CSN(42424242, 13, 42);
    final DN dn = DN.decode("dc=example,dc=com");
    final ServerState ss = new ServerState();
    ss.update(csn);
    byte[] bytes = new ByteArrayBuilder()
        .append(bo)
        .append(boTrue)
        .append(boFalse)
        .append(by)
        .append(sh)
        .append(i)
        .append(l)
        .append(st)
        .append(nullStr)
        .append(str)
        .appendStrings(col)
        .appendUTF8(i)
        .appendUTF8(l)
        .append(csn)
        .appendUTF8(csn)
        .append(dn)
        .appendZeroTerminated(byteArray)
        .append(byteArray)
        .append(ss)
        .toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    Assert.assertEquals(scanner.nextBoolean(), bo);
    Assert.assertEquals(scanner.nextByte(), by);
    Assert.assertEquals(scanner.nextShort(), sh);
    Assert.assertEquals(scanner.nextInt(), i);
    Assert.assertEquals(scanner.nextLong(), l);
    Assert.assertEquals(scanner.nextString(), st);
    Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
    Assert.assertEquals(scanner.nextIntUTF8(), i);
    Assert.assertEquals(scanner.nextLongUTF8(), l);
    Assert.assertEquals(scanner.nextCSN(), csn);
    Assert.assertEquals(scanner.nextCSNUTF8(), csn);
    Assert.assertTrue(scanner.isEmpty());
    assertFalse(scanner.isEmpty());
    assertEquals(scanner.nextBoolean(), boTrue);
    assertEquals(scanner.nextBoolean(), boFalse);
    assertEquals(scanner.nextByte(), by);
    assertEquals(scanner.nextShort(), sh);
    assertEquals(scanner.nextInt(), i);
    assertEquals(scanner.nextLong(), l);
    assertEquals(scanner.nextString(), nullStr);
    assertEquals(scanner.nextString(), str);
    assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
    assertEquals(scanner.nextIntUTF8(), i);
    assertEquals(scanner.nextLongUTF8(), l);
    assertEquals(scanner.nextCSN(), csn);
    assertEquals(scanner.nextCSNUTF8(), csn);
    assertEquals(scanner.nextDN(), dn);
    assertEquals(scanner.nextByteArray(byteArray.length), byteArray);
    scanner.skipZeroSeparator();
    assertEquals(scanner.nextByteArray(byteArray.length), byteArray);
    assertEquals(scanner.nextServerState().toString(), ss.toString());
    assertTrue(scanner.isEmpty());
  }
  @Test
  public void testByteArrayScanner_remainingBytes() throws Exception
  {
    final byte[] bytes = new ByteArrayBuilder().append(byteArray).toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    assertEquals(scanner.remainingBytes(), byteArray);
    assertTrue(scanner.isEmpty());
  }
  @Test
  public void testByteArrayScanner_remainingBytesZeroTerminated() throws Exception
  {
    final byte[] bytes =
        new ByteArrayBuilder().appendZeroTerminated(byteArray).toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    assertEquals(scanner.remainingBytesZeroTerminated(), byteArray);
    assertTrue(scanner.isEmpty());
  }
  @DataProvider
  public Iterator<Object[]> testCasesForNextMethodsWithEmptyByteArray()
  {
    return new IntegerRange(0, 7);
  }
  @Test(dataProvider = "testCasesForNextMethodsWithEmptyByteArray",
      expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_nextMethods_throwsExceptionWhenNoData(int testNumber) throws Exception
  {
    delegate(testNumber);
  }
  /**
   * TestNG does not like test methods with a return type other than void,
   * so used a delegate to simplify the code down below.
   */
  private Object delegate(int testNumber) throws DataFormatException
  {
    final ByteArrayScanner scanner = new ByteArrayScanner(new byte[0]);
    switch (testNumber)
    {
    case 0:
      return scanner.nextByte();
    case 1:
      return scanner.nextBoolean();
    case 2:
      return scanner.nextShort();
    case 3:
      return scanner.nextInt();
    case 4:
      return scanner.nextIntUTF8();
    case 5:
      return scanner.nextLong();
    case 6:
      return scanner.nextLongUTF8();
    case 7:
      return scanner.nextCSN();
    default:
      return null;
    }
  }
  @Test(expectedExceptions = IndexOutOfBoundsException.class)
  public void testByteArrayScanner_skipZeroSeparator_throwsExceptionWhenNoData() throws Exception
  {
    new ByteArrayScanner(new byte[0]).skipZeroSeparator();
  }
  @Test(expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_skipZeroSeparator_throwsExceptionWhenNoZeroSeparator() throws Exception
  {
    new ByteArrayScanner(new byte[] { 1 }).skipZeroSeparator();
  }
  @Test(expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_nextCSNUTF8_throwsExceptionWhenInvalidCSN() throws Exception
  {
    new ByteArrayScanner(new byte[] { 1, 0 }).nextCSNUTF8();
  }
  @Test(expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_nextDN_throwsExceptionWhenInvalidDN() throws Exception
  {
    final byte[] bytes = new ByteArrayBuilder().append("this is not a valid DN").toByteArray();
    new ByteArrayScanner(bytes).nextDN();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -38,6 +38,7 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.ServerState;
@@ -894,9 +895,9 @@
      }
      // Send StartSessionMsg
      StartSessionMsg startSessionMsg =
        new StartSessionMsg(ServerStatus.NORMAL_STATUS,
        new ArrayList<String>());
      StartSessionMsg startSessionMsg = new StartSessionMsg(
          ServerStatus.NORMAL_STATUS, new ArrayList<String>(),
          false, AssuredMode.SAFE_DATA_MODE, (byte) 1);
      session.publish(startSessionMsg);
      // Read the TopologyMsg that should come back.