mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
23.17.2014 88cfe5045d77d433ce02b0ef10ee84c9d4fb15e2
(CR-3599) Convert all protocols message to use ByteArrayBuilder + ByteArrayScanner

The downside of this change is that now there could be superfluous byte array copies when appending to a ByteArrayBuilder while code before was building the final byte array once after the final size was known.
If this proves to be a performance hit, it could be optimized later by building the final byte array on the call to ByteArrayBuilder.toByteArray().

Since OpenDJ 2.8 will introduce a protocol version change, it could be a good time to further cleanup the protocol by always encoding ints, longs, CSNs and booleans in their most compact representations. It should also be possible to remove zero terminated madness for anything else than strings.



ByteArrayBuilder.java, ByteArrayScanner.java:
Changed existing code to exactly match legacy behaviour.
Added methods to simplify protocol (de)serialization.
In ByteArrayBuilder ctor, set the default byte array size to 256 to avoid too copying byte arrays too many times initially.

ByteArrayTest.java:
Added tests for new functionalities.
Increased coverage.


AckMsg.java:
In errorsToString(), used List.toString().

MonitorMsg.java:
In ctor, used setServerState().

ReplicationMsg.java:
Removed now unused methods getNextLength() and addByteArray().

StartSessionMsg.java:
Removed ctor only called from tests.

ReplicationServerTest.java:
Consequence of removing a ctor for StartSessionMsg.

NotAssuredUpdateMsg.java:
Extracted methods getRealUpdateMsgBytes(), findNthZeroByte(),


ServerState.java:
Moved ctor code to ByteArrayScanner.nextServerState() + removed getNextLength().
Moved getBytes() code to ByteArrayBuilder.append(ServerState) + removed addByteArray().
Added method getServerIdToCSNMap().

ServerStateTest.java:
Consequence of the change to ServerState class.


ReplicationData.java: REMOVED, not used anymore

ReplicationDB.java:
Consequence of removing ReplicationData.
Removed dead methods getPreviousCSN() and getRegularRecord().


*Msg.java:
Inlined a few methods (getBytes_V1() for example) to make the code clearer.

*.java:
Removed comments paraphrasing the code.
Removed now unnecessary exception handling + declarations for UnsupportedEncodingExceptions.
Added final keywords.
Reduced visibility of class members.
Fixed javadocs.
1 files deleted
39 files modified
5716 ■■■■ changed files
opends/src/server/org/opends/server/replication/common/ServerState.java 130 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AckMsg.java 113 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/AddMsg.java 240 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java 112 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java 149 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java 49 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java 127 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/DoneMsg.java 66 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java 84 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java 126 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java 159 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java 21 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java 90 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java 103 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java 135 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java 348 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java 387 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java 164 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java 195 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java 74 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java 197 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java 326 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java 64 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java 63 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java 194 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java 228 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java 148 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartMsg.java 216 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java 123 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java 314 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java 199 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/protocol/WindowMsg.java 69 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java 251 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 138 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationData.java 80 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java 7 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java 193 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java 7 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/common/ServerState.java
@@ -22,17 +22,14 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.protocol.ProtocolVersion;
@@ -75,70 +72,6 @@
    serverIdToCSN.clear();
  }
  /**
   * Creates a new ServerState object from its encoded form.
   *
   * @param in The byte array containing the encoded ServerState form.
   * @param pos The position in the byte array where the encoded ServerState
   *            starts.
   * @param endpos The position in the byte array where the encoded ServerState
   *               ends.
   * @throws DataFormatException If the encoded form was not correct.
   */
  public ServerState(byte[] in, int pos, int endpos) throws DataFormatException
  {
    try
    {
      while (endpos > pos)
      {
        // FIXME JNR: why store the serverId separately from the CSN since the
        // CSN already contains the serverId?
        // read the ServerId
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int serverId = Integer.valueOf(serverIdString);
        pos += length +1;
        // read the CSN
        length = getNextLength(in, pos);
        String csnString = new String(in, pos, length, "UTF-8");
        CSN csn = new CSN(csnString);
        pos += length +1;
        // Add the serverId
        serverIdToCSN.put(serverId, csn);
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Get the length of the next String encoded in the in byte array.
   * This method is used to cut the different parts (serverIds, CSN)
   * of a server state.
   *
   * @param in the byte array where to calculate the string.
   * @param pos the position where to start from in the byte array.
   * @return the length of the next string.
   * @throws DataFormatException If the byte array does not end with null.
   */
  private int getNextLength(byte[] in, int pos) throws DataFormatException
  {
    int offset = pos;
    int length = 0;
    while (in[offset++] != 0)
    {
      if (offset >= in.length)
        throw new DataFormatException("byte[] is not a valid server state");
      length++;
    }
    return length;
  }
  /**
   * Forward update the Server State with a CSN. The provided CSN will be put on
   * the current object only if it is newer than the existing CSN for the same
@@ -151,7 +84,9 @@
  public boolean update(CSN csn)
  {
    if (csn == null)
    {
      return false;
    }
    saved = false;
@@ -191,7 +126,9 @@
  public boolean update(ServerState serverState)
  {
    if (serverState == null)
    {
      return false;
    }
    boolean updated = false;
    for (CSN csn : serverState.serverIdToCSN.values())
@@ -215,7 +152,9 @@
  public boolean removeCSN(CSN expectedCSN)
  {
    if (expectedCSN == null)
    {
      return false;
    }
    if (serverIdToCSN.remove(expectedCSN.getServerId(), expectedCSN))
    {
@@ -335,63 +274,18 @@
  }
  /**
   * Add the tail into resultByteArray at position pos.
   */
  private int addByteArray(byte[] tail, byte[] resultByteArray, int pos)
  {
    for (int i=0; i<tail.length; i++,pos++)
    {
      resultByteArray[pos] = tail[i];
    }
    resultByteArray[pos++] = 0;
    return pos;
  }
  /**
   * Encode this ServerState object and return its byte array representation.
   * Returns a copy of this ServerState's content as a Map of serverId => CSN.
   *
   * @return a byte array with an encoded representation of this object.
   * @throws UnsupportedEncodingException if UTF8 is not supported by the JVM.
   * @return a copy of this ServerState's content as a Map of serverId => CSN.
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  public Map<Integer, CSN> getServerIdToCSNMap()
  {
    // copy to protect from concurrent updates
    // that could change the number of elements in the Map
    final Map<Integer, CSN> copy = new HashMap<Integer, CSN>(serverIdToCSN);
    final int size = copy.size();
    List<String> idList = new ArrayList<String>(size);
    List<String> csnList = new ArrayList<String>(size);
    // calculate the total length needed to allocate byte array
    int length = 0;
    for (Entry<Integer, CSN> entry : copy.entrySet())
    {
      // serverId is useless, see comment in ServerState ctor
      final String serverIdStr = String.valueOf(entry.getKey());
      idList.add(serverIdStr);
      length += serverIdStr.length() + 1;
      final String csnStr = entry.getValue().toString();
      csnList.add(csnStr);
      length += csnStr.length() + 1;
    }
    byte[] result = new byte[length];
    // write the server state into the byte array
    int pos = 0;
    for (int i = 0; i < size; i++)
    {
      String str = idList.get(i);
      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
      str = csnList.get(i);
      pos = addByteArray(str.getBytes("UTF-8"), result, pos);
    }
    return result;
    return new HashMap<Integer, CSN>(serverIdToCSN);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public Iterator<CSN> iterator()
  {
opends/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -26,7 +26,10 @@
 */
package org.opends.server.replication.plugin;
import java.io.*;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.util.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -59,7 +62,10 @@
import org.opends.server.protocols.ldap.LDAPControl;
import org.opends.server.protocols.ldap.LDAPFilter;
import org.opends.server.protocols.ldap.LDAPModification;
import org.opends.server.replication.common.*;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.common.ServerStatus;
import org.opends.server.replication.common.StatusMachineEvent;
import org.opends.server.replication.protocol.*;
import org.opends.server.replication.service.ReplicationBroker;
import org.opends.server.replication.service.ReplicationDomain;
@@ -2063,9 +2069,6 @@
        {
          msg.encode();
          pendingChanges.commitAndPushCommittedChanges(curCSN, msg);
        } catch (UnsupportedEncodingException e)
        {
          // will be caught at publish time.
        }
        catch (NoSuchElementException e)
        {
opends/src/server/org/opends/server/replication/protocol/AckMsg.java
@@ -22,13 +22,10 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -60,7 +57,7 @@
public class AckMsg extends ReplicationMsg
{
  /** CSN of the update that was acked. */
  private CSN csn;
  private final CSN csn;
  /**
   * Did some servers go in timeout when the matching update (corresponding to
@@ -159,50 +156,28 @@
   * @throws DataFormatException If in does not contain a properly encoded
   *                             AckMsg.
   */
  public AckMsg(byte[] in) throws DataFormatException
  {
    try
  AckMsg(byte[] in) throws DataFormatException
    {
      /*
       * The message is stored in the form:
       * <operation type><CSN><has timeout><has degraded><has replay
       * error><failed server ids>
       */
      // First byte is the type
      if (in[0] != MSG_TYPE_ACK)
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_ACK)
      {
        throw new DataFormatException("byte[] is not a valid modify msg");
      }
      int pos = 1;
      // Read the CSN
      int length = getNextLength(in, pos);
      String csnStr = new String(in, pos, length, "UTF-8");
      csn = new CSN(csnStr);
      pos += length + 1;
    csn = scanner.nextCSNUTF8();
    hasTimeout = scanner.nextBoolean();
    hasWrongStatus = scanner.nextBoolean();
    hasReplayError = scanner.nextBoolean();
      // Read the hasTimeout flag
      hasTimeout = in[pos++] == 1;
      // Read the hasWrongStatus flag
      hasWrongStatus = in[pos++] == 1;
      // Read the hasReplayError flag
      hasReplayError = in[pos++] == 1;
      // Read the list of failed server ids
      while (pos < in.length)
    while (!scanner.isEmpty())
      {
        length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        Integer serverId = Integer.valueOf(serverIdString);
        failedServers.add(serverId);
        pos += length + 1;
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      failedServers.add(scanner.nextIntUTF8());
    }
  }
@@ -216,53 +191,26 @@
    return csn;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      /*
       * The message is stored in the form:
       * <operation type><CSN><has timeout><has degraded><has replay
       * error><failed server ids>
       */
      ByteArrayOutputStream oStream = new ByteArrayOutputStream(200);
      // Put the type of the operation
      oStream.write(MSG_TYPE_ACK);
      // Put the CSN
      byte[] csnByte = csn.toString().getBytes("UTF-8");
      oStream.write(csnByte);
      oStream.write(0);
      // Put the hasTimeout flag
      oStream.write(hasTimeout ? 1 : 0);
      // Put the hasWrongStatus flag
      oStream.write(hasWrongStatus ? 1 : 0);
      // Put the hasReplayError flag
      oStream.write(hasReplayError ? 1 : 0);
      // Put the list of server ids
      for (Integer sid : failedServers)
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ACK);
    builder.appendUTF8(csn);
    builder.append(hasTimeout);
    builder.append(hasWrongStatus);
    builder.append(hasReplayError);
    for (int serverId : failedServers)
      {
        byte[] byteServerId = String.valueOf(sid).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
      builder.appendUTF8(serverId);
      }
      return oStream.toByteArray();
    } catch (IOException e)
    {
      // never happens
      return null;
    }
    return builder.toByteArray();
  }
  /**
@@ -307,21 +255,8 @@
   */
  public String errorsToString()
  {
    String idList;
    if (failedServers.size() > 0)
    {
      idList = "[";
      int size = failedServers.size();
      for (int i=0 ; i<size ; i++) {
        idList += failedServers.get(i);
        if ( i != (size-1) )
          idList += ", ";
      }
      idList += "]";
    } else
    {
      idList="none";
    }
    final String idList =
        !failedServers.isEmpty() ? failedServers.toString() : "none";
    return "hasTimeout: " + (hasTimeout ? "yes" : "no")  + ", " +
      "hasWrongStatus: " + (hasWrongStatus ? "yes" : "no")  + ", " +
opends/src/server/org/opends/server/replication/protocol/AddMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -64,7 +63,7 @@
   * Creates a new AddMessage.
   * @param op the operation to use when creating the message
   */
  public AddMsg(PostOperationAddOperation op)
  AddMsg(PostOperationAddOperation op)
  {
    super((AddContext) op.getAttachment(SYNCHROCONTEXT), op.getEntryDN());
@@ -143,22 +142,19 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid AddMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the jvm
   */
  public AddMsg(byte[] in) throws DataFormatException,
                                  UnsupportedEncodingException
  public AddMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_ADD;
    allowedPduTypes[1] = MSG_TYPE_ADD_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_ADD, MSG_TYPE_ADD_V1);
    // protocol version has been read as part of the header
    if (protocolVersion <= 3)
      decodeBody_V123(in, pos);
    {
      decodeBody_V123(scanner);
    }
    else
    {
      decodeBody_V4(in, pos);
      decodeBody_V4(scanner);
    }
    if (protocolVersion==ProtocolVersion.getCurrentVersion())
    {
@@ -189,122 +185,37 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    int bodyLength = encodedAttributes.length;
    byte[] byteParentId = null;
    if (parentEntryUUID != null)
    {
      byteParentId = parentEntryUUID.getBytes("UTF-8");
      bodyLength += byteParentId.length + 1;
    }
    else
    {
      bodyLength += 1;
    }
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] resultByteArray = encodeHeader_V1(MSG_TYPE_ADD_V1, bodyLength);
    int pos = resultByteArray.length - bodyLength;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, resultByteArray, pos);
    else
      resultByteArray[pos++] = 0;
    /* put the attributes */
    for (int i=0; i<encodedAttributes.length; i++,pos++)
    {
      resultByteArray[pos] = encodedAttributes[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_ADD_V1);
    builder.append(parentEntryUUID);
    builder.append(encodedAttributes);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    // Put together the different encoded pieces
    int bodyLength = encodedAttributes.length;
    // Compute the total length of the body
    byte[] byteParentId = null;
    if (parentEntryUUID != null)
    {
      // Encode parentID now to get the length of the encoded bytes
      byteParentId = parentEntryUUID.getBytes("UTF-8");
      bodyLength += byteParentId.length + 1;
    }
    else
    {
      bodyLength += 1;
    }
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] resultByteArray = encodeHeader(MSG_TYPE_ADD, bodyLength,
          ProtocolVersion.REPLICATION_PROTOCOL_V3);
    int pos = resultByteArray.length - bodyLength;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, resultByteArray, pos);
    else
      resultByteArray[pos++] = 0;
    /* put the attributes */
    for (int i=0; i<encodedAttributes.length; i++,pos++)
    {
      resultByteArray[pos] = encodedAttributes[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_ADD, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    builder.append(parentEntryUUID);
    builder.append(encodedAttributes);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    // Put together the different encoded pieces
    int bodyLength = 0;
    // Compute the total length of the body
    byte[] byteParentId = null;
    if (parentEntryUUID != null)
    {
      // Encode parentID now to get the length of the encoded bytes
      byteParentId = parentEntryUUID.getBytes("UTF-8");
      bodyLength += byteParentId.length + 1;
    }
    else
    {
      bodyLength += 1;
    }
    byte[] byteAttrLen =
      String.valueOf(encodedAttributes.length).getBytes("UTF-8");
    bodyLength += byteAttrLen.length + 1;
    bodyLength += encodedAttributes.length + 1;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] encodedMsg = encodeHeader(MSG_TYPE_ADD, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    if (byteParentId != null)
      pos = addByteArray(byteParentId, encodedMsg, pos);
    else
      encodedMsg[pos++] = 0;
    pos = addByteArray(byteAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedAttributes, encodedMsg, pos);
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_ADD, protocolVersion);
    builder.append(parentEntryUUID);
    builder.appendUTF8(encodedAttributes.length);
    builder.appendZeroTerminated(encodedAttributes);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.appendZeroTerminated(encodedEclIncludes);
    return builder.toByteArray();
  }
  private byte[] encodeAttributes(
@@ -368,12 +279,18 @@
      new LDAPAttribute(objectClass).write(writer);
      for (Attribute a : userAttributes)
      {
        new LDAPAttribute(a).write(writer);
      }
      if (operationalAttributes != null)
      {
        for (Attribute a : operationalAttributes)
        {
          new LDAPAttribute(a).write(writer);
    }
      }
    }
    catch(Exception e)
    {
      // Do something
@@ -385,89 +302,24 @@
  // Msg decoding
  // ============
  private void decodeBody_V123(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V123(ByteArrayScanner scanner)
      throws DataFormatException
  {
    // read the parent unique Id
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      parentEntryUUID = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    else
    {
      parentEntryUUID = null;
      pos += 1;
    parentEntryUUID = scanner.nextString();
    encodedAttributes = scanner.remainingBytes();
    }
    // Read/Don't decode attributes : all the remaining bytes
    encodedAttributes = new byte[in.length-pos];
    int i =0;
    while (pos<in.length)
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
    {
      encodedAttributes[i++] = in[pos++];
    }
  }
    parentEntryUUID = scanner.nextString();
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  {
    // read the parent unique Id
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      parentEntryUUID = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    else
    {
      parentEntryUUID = null;
      pos += 1;
    }
    final int attrLen = scanner.nextIntUTF8();
    encodedAttributes = scanner.nextByteArray(attrLen);
    scanner.skipZeroSeparator();
    // Read attr len
    length = getNextLength(in, pos);
    int attrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode attributes
    this.encodedAttributes = new byte[attrLen];
    try
    {
      System.arraycopy(in, pos, encodedAttributes, 0, attrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    pos += attrLen + 1;
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
@@ -26,9 +26,15 @@
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DN;
/**
 * Byte array builder class encodes data into byte arrays to send messages over
@@ -42,8 +48,6 @@
public class ByteArrayBuilder
{
  /** This is the null byte, also known as zero byte. */
  public static final byte NULL_BYTE = 0;
  private final ByteStringBuilder builder;
  /**
@@ -51,7 +55,7 @@
   */
  public ByteArrayBuilder()
  {
    builder = new ByteStringBuilder();
    builder = new ByteStringBuilder(256);
  }
  /**
@@ -165,7 +169,8 @@
   */
  public ByteArrayBuilder appendStrings(Collection<String> col)
  {
    append(col.size());
    //append(int) would have been safer, but byte is compatible with legacy code
    append((byte) col.size());
    for (String s : col)
    {
      append(s);
@@ -174,23 +179,28 @@
  }
  /**
   * Append a String to this ByteArrayBuilder.
   * Append a String with a zero separator to this ByteArrayBuilder,
   * or only the zero separator if the string is null
   * or if the string length is zero.
   *
   * @param s
   *          the String to append.
   *          the String to append. Can be null.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(String s)
  {
    try
    {
      if (s != null && s.length() > 0)
      {
      append(s.getBytes("UTF-8"));
    }
      return appendZeroSeparator();
    }
    catch (UnsupportedEncodingException e)
    {
      throw new RuntimeException("Should never happen", e);
    }
    return this;
  }
  /**
@@ -220,17 +230,93 @@
    return this;
  }
  private ByteArrayBuilder append(byte[] sBytes)
  /**
   * Append a DN to this ByteArrayBuilder by converting it to a String then
   * encoding that string to a UTF-8 byte array.
   *
   * @param dn
   *          the DN to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(DN dn)
  {
    for (byte b : sBytes)
    {
      append(b);
    }
    append((byte) 0); // zero separator
    append(dn.toString());
    return this;
  }
  /**
   * Append all the bytes from the byte array to this ByteArrayBuilder.
   *
   * @param bytes
   *          the byte array to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(byte[] bytes)
  {
    builder.append(bytes);
    return this;
  }
  /**
   * Append all the bytes from the byte array to this ByteArrayBuilder
   * and then append a final zero byte separator for compatibility
   * with legacy implementations.
   *
   * @param bytes
   *          the byte array to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder appendZeroTerminated(byte[] bytes)
  {
    builder.append(bytes);
    return appendZeroSeparator();
  }
  private ByteArrayBuilder appendZeroSeparator()
  {
    builder.append((byte) 0);
    return this;
  }
  /**
   * Append the byte representation of a ServerState to this ByteArrayBuilder
   * and then append a final zero byte separator.
   * <p>
   * Caution: ServerState MUST be the last field. Because ServerState can
   * contain null character (string termination of serverId string ..) it cannot
   * be decoded using {@link ByteArrayScanner#nextString()} like the other
   * fields. The only way is to rely on the end of the input buffer: and that
   * forces the ServerState to be the last field. This should be changed if we
   * want to have more than one ServerState field.
   *
   * @param serverState
   *          the ServerState to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(ServerState serverState)
  {
    final Map<Integer, CSN> serverIdToCSN = serverState.getServerIdToCSNMap();
    for (Entry<Integer, CSN> entry : serverIdToCSN.entrySet())
    {
      // FIXME JNR: why append the serverId in addition to the CSN
      // since the CSN already contains the serverId?
      appendUTF8(entry.getKey()); // serverId
      appendUTF8(entry.getValue()); // CSN
    }
    return appendZeroSeparator(); // stupid legacy zero separator
  }
  /**
   * Returns a new ASN1Writer that will append bytes to this ByteArrayBuilder.
   *
   * @return a new ASN1Writer that will append bytes to this ByteArrayBuilder.
   */
  public ASN1Writer getASN1Writer()
  {
    return ASN1.getWriter(builder);
  }
  /**
   * Converts the content of this ByteStringBuilder to a byte array.
   *
   * @return the content of this ByteStringBuilder converted to a byte array.
opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
@@ -27,9 +27,14 @@
import java.util.Collection;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * Byte array scanner class helps decode data from byte arrays received via
@@ -44,6 +49,7 @@
{
  private final ByteSequenceReader bytes;
  private final byte[] byteArray;
  /**
   * Builds a ByteArrayScanner object that will read from the supplied byte
@@ -55,6 +61,7 @@
  public ByteArrayScanner(byte[] bytes)
  {
    this.bytes = ByteString.wrap(bytes).asReader();
    this.byteArray = bytes;
  }
  /**
@@ -172,7 +179,7 @@
  /**
   * Reads the next UTF8-encoded string.
   *
   * @return the next UTF8-encoded string.
   * @return the next UTF8-encoded string or null if the string length is zero
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
@@ -180,10 +187,16 @@
  {
    try
    {
      final String s = bytes.getString(findZeroSeparator());
      bytes.skip(1); // skip the zero separator
      final int offset = findZeroSeparator();
      if (offset > 0)
      {
        final String s = bytes.getString(offset);
        skipZeroSeparator();
      return s;
    }
      skipZeroSeparator();
      return null;
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
@@ -220,7 +233,8 @@
  public <TCol extends Collection<String>> TCol nextStrings(TCol output)
      throws DataFormatException
  {
    final int colSize = nextInt();
    // nextInt() would have been safer, but byte is compatible with legacy code.
    final int colSize = nextByte();
    for (int i = 0; i < colSize; i++)
    {
      output.add(nextString());
@@ -269,6 +283,127 @@
  }
  /**
   * Reads the next DN.
   *
   * @return the next DN.
   * @throws DataFormatException
   *           if DN was incorrectly encoded or no more data can be read from
   *           the input
   */
  public DN nextDN() throws DataFormatException
  {
    try
    {
      return DN.decode(nextString());
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
  }
  /**
   * Return a new byte array containing all remaining bytes in this
   * ByteArrayScanner.
   *
   * @return new byte array containing all remaining bytes
   */
  public byte[] remainingBytes()
  {
    final int length = byteArray.length - bytes.position();
    return nextByteArray(length);
  }
  /**
   * Return a new byte array containing all remaining bytes in this
   * ByteArrayScanner bar the last one which is a zero terminated byte
   * (compatible with legacy code).
   *
   * @return new byte array containing all remaining bytes bar the last one
   */
  public byte[] remainingBytesZeroTerminated()
  {
    /* do not copy stupid legacy zero separator */
    final int length = byteArray.length - (bytes.position() + 1);
    final byte[] result = nextByteArray(length);
    bytes.skip(1); // ignore last (supposedly) zero byte
    return result;
  }
  /**
   * Return a new byte array containing the requested number of bytes.
   *
   * @param length
   *          the number of bytes to be read and copied to the new byte array.
   * @return new byte array containing the requested number of bytes.
   */
  public byte[] nextByteArray(final int length)
  {
    final byte[] result = new byte[length];
    System.arraycopy(byteArray, bytes.position(), result, 0, length);
    bytes.skip(length);
    return result;
  }
  /**
   * Reads the next ServerState.
   * <p>
   * Caution: ServerState MUST be the last field (see
   * {@link ByteArrayBuilder#append(ServerState)} javadoc).
   *
   * @return the next ServerState.
   * @throws DataFormatException
   *           if ServerState was incorrectly encoded or no more data can be
   *           read from the input
   * @see ByteArrayBuilder#append(ServerState)
   */
  public ServerState nextServerState() throws DataFormatException
  {
    final ServerState result = new ServerState();
    final int maxPos = byteArray.length - 1 /* stupid legacy zero separator */;
    while (bytes.position() < maxPos)
    {
      final int serverId = nextIntUTF8();
      final CSN csn = nextCSNUTF8();
      if (serverId != csn.getServerId())
      {
        throw new DataFormatException("Expected serverId=" + serverId
            + " to be the same as serverId for CSN=" + csn);
      }
      result.update(csn);
    }
    skipZeroSeparator();
    return result;
  }
  /**
   * Skips the next byte and verifies it is effectively the zero separator.
   *
   * @throws DataFormatException
   *           if the next byte is not the zero separator.
   */
  public void skipZeroSeparator() throws DataFormatException
  {
    if (bytes.peek() != (byte) 0)
    {
      throw new DataFormatException("Expected a zero separator at position "
          + bytes.position() + " but found byte " + bytes.peek());
    }
    bytes.skip(1);
  }
  /**
   * Returns a new ASN1Reader that will read bytes from this ByteArrayScanner.
   *
   * @return a new ASN1Reader that will read bytes from this ByteArrayScanner.
   */
  public ASN1Reader getASN1Reader()
  {
    return ASN1.getReader(bytes);
  }
  /**
   * Returns whether the scanner has more bytes to consume.
   *
   * @return true if the scanner has more bytes to consume, false otherwise.
@@ -278,4 +413,10 @@
    return bytes.remaining() == 0;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return bytes.toString();
  }
}
opends/src/server/org/opends/server/replication/protocol/ChangeStatusMsg.java
@@ -22,11 +22,12 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerStatus;
/**
@@ -36,10 +37,10 @@
 */
public class ChangeStatusMsg extends ReplicationMsg
{
  // The status we want the DS to enter (used when from RS to DS)
  private ServerStatus requestedStatus = ServerStatus.INVALID_STATUS;
  // The new status the DS just entered (used when from DS to RS)
  private ServerStatus newStatus = ServerStatus.INVALID_STATUS;
  /** The status we want the DS to enter (used when from RS to DS) */
  private final ServerStatus requestedStatus;
  /** The new status the DS just entered (used when from DS to RS) */
  private ServerStatus newStatus;
  /**
   * Create a new ChangeStatusMsg.
@@ -61,25 +62,19 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ChangeStatusMsg.
   */
  public ChangeStatusMsg(byte[] encodedMsg) throws DataFormatException
  ChangeStatusMsg(byte[] encodedMsg) throws DataFormatException
  {
    /*
     * The message is stored in the form:
     * <message type><requested status><new status>
     */
    /* First byte is the type */
    try
    {
    if (encodedMsg[0] != ReplicationMsg.MSG_TYPE_CHANGE_STATUS)
    {
      throw new DataFormatException("byte[] is not a valid msg");
    }
    try
    {
      /* Then the requested status */
      requestedStatus = ServerStatus.valueOf(encodedMsg[1]);
      /* Then the new status */
      newStatus = ServerStatus.valueOf(encodedMsg[2]);
    } catch (IllegalArgumentException e)
    {
@@ -87,9 +82,7 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -97,18 +90,12 @@
     * The message is stored in the form:
     * <message type><requested status><new status>
     */
    byte[] encodedMsg = new byte[3];
    /* Put the type of the operation */
    encodedMsg[0] = ReplicationMsg.MSG_TYPE_CHANGE_STATUS;
    /* Put the requested status */
    encodedMsg[1] = requestedStatus.getValue();
    /* Put the requested status */
    encodedMsg[2] = newStatus.getValue();
    return encodedMsg;
    return new byte[]
    {
      ReplicationMsg.MSG_TYPE_CHANGE_STATUS,
      requestedStatus.getValue(),
      newStatus.getValue()
    };
  }
  /**
@@ -129,9 +116,7 @@
    return newStatus;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/DeleteMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.controls.SubtreeDeleteControl;
@@ -54,14 +53,14 @@
   *
   * @param operation the Operation from which the message must be created.
   */
  public DeleteMsg(PostOperationDeleteOperation operation)
  DeleteMsg(PostOperationDeleteOperation operation)
  {
    super((OperationContext) operation.getAttachment(SYNCHROCONTEXT),
           operation.getEntryDN());
    try
    {
      if (operation.getRequestControl(SubtreeDeleteControl.DECODER) != null)
        isSubtreeDelete = true;
      isSubtreeDelete =
          operation.getRequestControl(SubtreeDeleteControl.DECODER) != null;
    }
    catch(Exception e)
    {/* do nothing */}
@@ -84,19 +83,16 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid DeleteMsg
   * @throws UnsupportedEncodingException  If UTF8 is not supported by the jvm
   */
  public DeleteMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  DeleteMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_DELETE;
    allowedPduTypes[1] = MSG_TYPE_DELETE_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_DELETE, MSG_TYPE_DELETE_V1);
    // protocol version has been read as part of the header
    if (protocolVersion >= 4)
      decodeBody_V4(in, pos);
    {
      decodeBody_V4(scanner);
    }
    else
    {
      // Keep the previous protocol version behavior - when we don't know the
@@ -115,7 +111,9 @@
        InternalClientConnection.nextMessageID(), null, newDN);
    if (isSubtreeDelete)
    {
      del.addRequestControl(new SubtreeDeleteControl(false));
    }
    DeleteContext ctx = new DeleteContext(getCSN(), getEntryUUID());
    del.setAttachment(SYNCHROCONTEXT, ctx);
@@ -128,108 +126,47 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    return encodeHeader_V1(MSG_TYPE_DELETE_V1, 0);
    return encodeHeader_V1(MSG_TYPE_DELETE_V1)
        .toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    return encodeHeader(MSG_TYPE_DELETE, 0,
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    return encodeHeader(MSG_TYPE_DELETE,ProtocolVersion.REPLICATION_PROTOCOL_V3)
        .toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    // Put together the different encoded pieces
    int bodyLength = 0;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    byte[] byteInitiatorsName = null;
    if (initiatorsName != null)
    {
      byteInitiatorsName = initiatorsName.getBytes("UTF-8");
      bodyLength += byteInitiatorsName.length + 1;
    }
    else
    {
      bodyLength++;
    }
    // subtree flag
    bodyLength++;
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] encodedMsg = encodeHeader(MSG_TYPE_DELETE, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    if (byteInitiatorsName != null)
      pos = addByteArray(byteInitiatorsName, encodedMsg, pos);
    else
      encodedMsg[pos++] = 0;
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    encodedMsg[pos++] = (byte) (isSubtreeDelete ? 1 : 0);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_DELETE, protocolVersion);
    builder.append(initiatorsName);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.appendZeroTerminated(encodedEclIncludes);
    builder.append(isSubtreeDelete);
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    int length = getNextLength(in, pos);
    if (length != 0)
    {
      initiatorsName = new String(in, pos, length, "UTF-8");
      pos += length + 1;
    }
    else
    {
      initiatorsName = null;
      pos += 1;
    }
    initiatorsName = scanner.nextString();
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    // Skip the length
    pos += length + 1;
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
    scanner.skipZeroSeparator();
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      // Copy ecl attr
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
      // Skip the attrs
      pos += eclAttrLen +1;
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    // subtree flag
    isSubtreeDelete = (in[pos] == 1);
    isSubtreeDelete = scanner.nextBoolean();
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/DoneMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -54,65 +53,26 @@
   * @throws DataFormatException If the in does not contain a properly,
   *                             encoded message.
   */
  public DoneMsg(byte[] in) throws DataFormatException
  DoneMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_DONE)
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_DONE)
        throw new DataFormatException("input is not a valid DoneMessage");
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    this.senderID = scanner.nextIntUTF8();
    this.destination = scanner.nextIntUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1
                     + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_DONE;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_DONE);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    return builder.toByteArray();
  }
}
opends/src/server/org/opends/server/replication/protocol/ECLUpdateMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.MultiDomainServerState;
@@ -73,56 +72,33 @@
   * @param in The byte array containing the encoded form of the message.
   * @throws DataFormatException If the byte array does not contain
   *         a valid encoded form of the message.
   * @throws UnsupportedEncodingException when it occurs.
   * @throws NotSupportedOldVersionPDUException when it occurs.
   */
  public ECLUpdateMsg(byte[] in)
   throws DataFormatException,
          UnsupportedEncodingException,
  ECLUpdateMsg(byte[] in) throws DataFormatException,
          NotSupportedOldVersionPDUException
  {
    try
    {
      if (in[0] != MSG_TYPE_ECL_UPDATE)
      final ByteArrayScanner scanner = new ByteArrayScanner(in);
      if (scanner.nextByte() != MSG_TYPE_ECL_UPDATE)
      {
        throw new DataFormatException("byte[] is not a valid " +
            getClass().getCanonicalName());
      }
      int pos = 1;
      // Decode the cookie
      int length = getNextLength(in, pos);
      String cookieStr = new String(in, pos, length, "UTF-8");
      this.cookie = new MultiDomainServerState(cookieStr);
      pos += length + 1;
      // Decode the baseDN
      length = getNextLength(in, pos);
      this.baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // Decode the changeNumber
      length = getNextLength(in, pos);
      this.changeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      this.cookie = new MultiDomainServerState(scanner.nextString());
      this.baseDN = scanner.nextDN();
      this.changeNumber = scanner.nextIntUTF8();
      // Decode the msg
      /* Read the mods : all the remaining bytes but the terminating 0 */
      length = in.length - pos - 1;
      byte[] encodedMsg = new byte[length];
      System.arraycopy(in, pos, encodedMsg, 0, length);
      ReplicationMsg rmsg = ReplicationMsg.generateMsg(
            encodedMsg, ProtocolVersion.getCurrentVersion());
      this.updateMsg = (LDAPUpdateMsg)rmsg;
      this.updateMsg = (LDAPUpdateMsg) ReplicationMsg.generateMsg(
          scanner.remainingBytesZeroTerminated(),
          ProtocolVersion.getCurrentVersion());
    }
    catch(DirectoryException de)
    {
      throw new DataFormatException(de.toString());
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
@@ -162,9 +138,7 @@
    return updateMsg;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -175,39 +149,19 @@
    " serviceId: " + baseDN + "]";
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    byte[] byteCookie = String.valueOf(cookie).getBytes("UTF-8");
    byte[] byteBaseDN = String.valueOf(baseDN).getBytes("UTF-8");
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ECL_UPDATE);
    builder.append(String.valueOf(cookie));
    builder.append(baseDN);
    // FIXME JNR Changing the line below to use long would require a protocol
    // version change. Leave it like this for now until the need arises.
    byte[] byteChangeNumber =
        Integer.toString((int) changeNumber).getBytes("UTF-8");
    byte[] byteUpdateMsg = updateMsg.getBytes(protocolVersion);
    int length = 1 + byteCookie.length +
                 1 + byteBaseDN.length +
                 1 + byteChangeNumber.length +
                 1 + byteUpdateMsg.length + 1;
    byte[] resultByteArray = new byte[length];
    /* Encode type */
    resultByteArray[0] = MSG_TYPE_ECL_UPDATE;
    int pos = 1;
    // Encode all fields
    pos = addByteArray(byteCookie, resultByteArray, pos);
    pos = addByteArray(byteBaseDN, resultByteArray, pos);
    pos = addByteArray(byteChangeNumber, resultByteArray, pos);
    pos = addByteArray(byteUpdateMsg, resultByteArray, pos);
    return resultByteArray;
    builder.appendUTF8((int) changeNumber);
    builder.appendZeroTerminated(updateMsg.getBytes(protocolVersion));
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/EntryMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -37,51 +36,39 @@
 */
public class EntryMsg extends RoutableMsg
{
  // The byte array containing the bytes of the entry transported
  private byte[] entryByteArray;
  /** The byte array containing the bytes of the entry transported. */
  private final byte[] entryByteArray;
  private int msgId = -1; // from V4
  /**
   * Creates a new EntryMsg.
   *
   * @param sender      The sender of this message.
   * @param serverID      The sender of this message.
   * @param destination The destination of this message.
   * @param entryBytes  The bytes of the entry.
   * @param msgId       Message counter.
   */
  public EntryMsg(
      int sender,
      int destination,
      byte[] entryBytes,
      int msgId)
  public EntryMsg(int serverID, int destination, byte[] entryBytes, int msgId)
  {
    super(sender, destination);
    this.entryByteArray = new byte[entryBytes.length];
    System.arraycopy(entryBytes, 0, this.entryByteArray, 0, entryBytes.length);
    this.msgId = msgId;
    this(serverID, destination, entryBytes, 0, entryBytes.length, msgId);
  }
  /**
   * Creates a new EntryMsg.
   *
   * @param serverID    The sender of this message.
   * @param i           The destination of this message.
   * @param destination The destination of this message.
   * @param entryBytes  The bytes of the entry.
   * @param pos         The starting Position in the array.
   * @param startPos    The starting Position in the array.
   * @param length      Number of array elements to be copied.
   * @param msgId       Message counter.
   */
  public EntryMsg(
      int serverID,
      int i,
      byte[] entryBytes,
      int pos,
      int length,
      int msgId)
  public EntryMsg(int serverID, int destination, byte[] entryBytes, int startPos,
      int length, int msgId)
  {
    super(serverID, i);
    super(serverID, destination);
    this.entryByteArray = new byte[length];
    System.arraycopy(entryBytes, pos, this.entryByteArray, 0, length);
    System.arraycopy(entryBytes, startPos, this.entryByteArray, 0, length);
    this.msgId = msgId;
  }
@@ -93,47 +80,22 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public EntryMsg(byte[] in, short version) throws DataFormatException
  EntryMsg(byte[] in, short version) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_ENTRY)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ENTRY)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderIDString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderIDString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
      // msgCnt
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    this.senderID = scanner.nextIntUTF8();
    this.destination = scanner.nextIntUTF8();
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // msgCnt
        length = getNextLength(in, pos);
        String msgcntString = new String(in, pos, length, "UTF-8");
        this.msgId = Integer.valueOf(msgcntString);
        pos += length +1;
      this.msgId = scanner.nextIntUTF8();
      }
      // data
      length = in.length - (pos + 1);
      this.entryByteArray = new byte[length];
      System.arraycopy(in, pos, entryByteArray, 0, length);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    this.entryByteArray = scanner.remainingBytesZeroTerminated();
  }
  /**
@@ -145,46 +107,20 @@
    return entryByteArray;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] msgCntBytes = null;
      byte[] entryBytes = entryByteArray;
      int length = 1 + senderBytes.length +
                   1 + destinationBytes.length +
                   1 + entryBytes.length + 1;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ENTRY);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        msgCntBytes = String.valueOf(msgId).getBytes("UTF-8");
        length += (1 + msgCntBytes.length);
      builder.appendUTF8(msgId);
      }
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_ENTRY;
      int pos = 1;
      pos = addByteArray(senderBytes, resultByteArray, pos);
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        pos = addByteArray(msgCntBytes, resultByteArray, pos);
      pos = addByteArray(entryBytes, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    builder.appendZeroTerminated(entryByteArray);
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/ErrorMsg.java
@@ -22,20 +22,17 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import org.opends.messages.Message;
import static org.opends.server.loggers.debug.DebugLogger.debugEnabled;
import static org.opends.server.loggers.debug.DebugLogger.getTracer;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.messages.Message;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.util.StaticUtils.*;
/**
 * This message is part of the replication protocol.
 * This message is sent by a server or a replication server when an error
@@ -43,18 +40,21 @@
 */
public class ErrorMsg extends RoutableMsg
{
  // The tracer object for the debug logger
  /** The tracer object for the debug logger */
  private static final DebugTracer TRACER = getTracer();
  // Specifies the messageID built from the error that was detected
  private int msgID;
  /** Specifies the messageID built from the error that was detected */
  private final int msgID;
  // Specifies the complementary details about the error that was detected
  private Message details = null;
  /** Specifies the complementary details about the error that was detected */
  private final Message details;
  // The time of creation of this message.
  //                                        protocol version previous to V4
  private Long creationTime = System.currentTimeMillis();
  /**
   * The time of creation of this message.
   * <p>
   * protocol version previous to V4
   */
  private long creationTime = System.currentTimeMillis();
  /**
   * Creates an ErrorMsg providing the destination server.
@@ -63,8 +63,7 @@
   * @param destination The destination server or servers of this message.
   * @param details The message containing the details of the error.
   */
  public ErrorMsg(int sender, int destination,
                      Message details)
  public ErrorMsg(int sender, int destination, Message details)
  {
    super(sender, destination);
    this.msgID  = details.getDescriptor().getId();
@@ -72,8 +71,10 @@
    this.creationTime = System.currentTimeMillis();
    if (debugEnabled())
      TRACER.debugInfo(" Creating error message" + this.toString()
          + " " + stackTraceToSingleLineString(new Exception("trace")));
    {
      TRACER.debugInfo(" Creating error message" + this + " "
          + stackTraceToSingleLineString(new Exception("trace")));
    }
  }
  /**
@@ -90,7 +91,9 @@
    this.creationTime = System.currentTimeMillis();
    if (debugEnabled())
      TRACER.debugInfo(this.toString());
    {
      TRACER.debugInfo(toString());
    }
  }
  /**
@@ -101,53 +104,23 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded message.
   */
  public ErrorMsg(byte[] in, short version)
  throws DataFormatException
  ErrorMsg(byte[] in, short version) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_ERROR)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_ERROR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(serverIdString);
      pos += length +1;
      // MsgID
      length = getNextLength(in, pos);
      String msgIdString = new String(in, pos, length, "UTF-8");
      msgID = Integer.valueOf(msgIdString);
      pos += length +1;
      // Details
      length = getNextLength(in, pos);
      details = Message.raw(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    senderID = scanner.nextIntUTF8();
    destination = scanner.nextIntUTF8();
    msgID = scanner.nextIntUTF8();
    details = Message.raw(scanner.nextString());
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // Creation Time
        length = getNextLength(in, pos);
        String creationTimeString = new String(in, pos, length, "UTF-8");
        creationTime = Long.valueOf(creationTimeString);
        pos += length +1;
      }
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      creationTime = scanner.nextLongUTF8();
    }
  }
@@ -175,60 +148,21 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteErrMsgId = String.valueOf(msgID).getBytes("UTF-8");
      byte[] byteDetails = details.toString().getBytes("UTF-8");
      byte[] byteCreationTime = null;
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteErrMsgId.length + 1
                     + byteDetails.length + 1;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_ERROR);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    builder.appendUTF8(msgID);
    builder.append(details.toString());
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        byteCreationTime = creationTime.toString().getBytes("UTF-8");
        length += byteCreationTime.length + 1;
      builder.appendUTF8(creationTime);
      }
      byte[] resultByteArray = new byte[length];
      // put the type of the operation
      resultByteArray[0] = MSG_TYPE_ERROR;
      int pos = 1;
      // sender
      pos = addByteArray(byteSender, resultByteArray, pos);
      // destination
      pos = addByteArray(byteDestination, resultByteArray, pos);
      // MsgId
      pos = addByteArray(byteErrMsgId, resultByteArray, pos);
      // details
      pos = addByteArray(byteDetails, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // creation time
        pos = addByteArray(byteCreationTime, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    return builder.toByteArray();
  }
  /**
@@ -236,6 +170,7 @@
   *
   * @return the string representation of this message.
   */
  @Override
  public String toString()
  {
    return "ErrorMessage=["+
@@ -254,7 +189,7 @@
   *
   * @return the creation time of this message.
   */
  public Long getCreationTime()
  public long getCreationTime()
  {
    return creationTime;
  }
opends/src/server/org/opends/server/replication/protocol/HeartbeatMsg.java
@@ -22,9 +22,8 @@
 *
 *
 *      Copyright 2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.util.zip.DataFormatException;
@@ -52,20 +51,18 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public HeartbeatMsg(byte[] in) throws DataFormatException
  HeartbeatMsg(byte[] in) throws DataFormatException
  {
    /* The heartbeat message is encoded in the form :
     * <msg-type>
     */
    /* first byte is the type */
    if (in.length != 1 || in[0] != MSG_TYPE_HEARTBEAT)
    {
      throw new DataFormatException("Input is not a valid Heartbeat Message.");
  }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -73,13 +70,7 @@
     * The heartbeat message contains:
     * <msg-type>
     */
    int length = 1;
    byte[] resultByteArray = new byte[length];
    /* put the message type */
    resultByteArray[0] = MSG_TYPE_HEARTBEAT;
    return resultByteArray;
    return new byte[] { MSG_TYPE_HEARTBEAT };
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/InitializeRcvAckMsg.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
@@ -43,7 +41,6 @@
{
  private final int numAck;
  /**
   * Create a new message..
   *
@@ -65,84 +62,37 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the message.
   */
  public InitializeRcvAckMsg(byte[] in) throws DataFormatException
  InitializeRcvAckMsg(byte[] in) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_INITIALIZE_RCV_ACK)
    {
      // msg type
      if (in[0] != MSG_TYPE_INITIALIZE_RCV_ACK)
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(serverIdString);
      pos += length +1;
      // value fo the ack
      length = getNextLength(in, pos);
      String numAckStr = new String(in, pos, length, "UTF-8");
      pos += length +1;
      numAck = Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
          + getClass().getCanonicalName());
  }
  /**
   * {@inheritDoc}
   */
    senderID = scanner.nextIntUTF8();
    destination = scanner.nextIntUTF8();
    numAck = scanner.nextIntUTF8();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try {
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
      int length = 1 + byteSender.length + 1
                     + byteDestination.length + 1
                     + byteNumAck.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_INITIALIZE_RCV_ACK;
      int pos = 1;
      // sender
      pos = addByteArray(byteSender, resultByteArray, pos);
      // destination
      pos = addByteArray(byteDestination, resultByteArray, pos);
      // ack value
      pos = addByteArray(byteNumAck, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_INITIALIZE_RCV_ACK);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    builder.appendUTF8(numAck);
    return builder.toByteArray();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return this.getClass().getSimpleName()  + "=["+
    return getClass().getSimpleName() + "=[" +
      " sender=" + this.senderID +
      " destination=" + this.destination +
      " msgID=" + this.numAck + "]";
opends/src/server/org/opends/server/replication/protocol/InitializeRequestMsg.java
@@ -22,15 +22,13 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is part of the replication protocol.
@@ -40,7 +38,7 @@
 */
public class InitializeRequestMsg extends RoutableMsg
{
  private DN baseDN;
  private final DN baseDN;
  private int initWindow = 0;
  /**
@@ -66,51 +64,22 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeRequestMsg(byte[] in, short version)
  throws DataFormatException
  InitializeRequestMsg(byte[] in, short version) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_INITIALIZE_REQUEST)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_INITIALIZE_REQUEST)
        throw new DataFormatException(
            "input is not a valid InitializeRequestMessage");
      int pos = 1;
      // baseDN
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // sender
      length = getNextLength(in, pos);
      String sourceServerIdString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(sourceServerIdString);
      pos += length +1;
      // destination
      length = getNextLength(in, pos);
      String destinationServerIdString = new String(in, pos, length, "UTF-8");
      destination = Integer.valueOf(destinationServerIdString);
      pos += length +1;
    }
    baseDN = scanner.nextDN();
    senderID = scanner.nextIntUTF8();
    destination = scanner.nextIntUTF8();
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        length = getNextLength(in, pos);
        String initWindowString = new String(in, pos, length, "UTF-8");
        initWindow = Integer.valueOf(initWindowString);
        pos += length +1;
      }
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
      initWindow = scanner.nextIntUTF8();
    }
  }
@@ -128,54 +97,20 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  {
    try {
      byte[] baseDNBytes = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      byte[] initWindowBytes = null;
      int length = 1 + baseDNBytes.length + 1 + senderBytes.length + 1
        + destinationBytes.length + 1;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_INITIALIZE_REQUEST);
    builder.append(baseDN);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        initWindowBytes = String.valueOf(initWindow).getBytes("UTF-8");
        length += initWindowBytes.length + 1;
      builder.appendUTF8(initWindow);
      }
      byte[] resultByteArray = new byte[length];
      // type of the operation
      resultByteArray[0] = MSG_TYPE_INITIALIZE_REQUEST;
      int pos = 1;
      // baseDN
      pos = addByteArray(baseDNBytes, resultByteArray, pos);
      // sender
      pos = addByteArray(senderBytes, resultByteArray, pos);
      // destination
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        pos = addByteArray(initWindowBytes, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/InitializeTargetMsg.java
@@ -22,15 +22,13 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is part of the replication protocol.
@@ -39,17 +37,17 @@
 */
public class InitializeTargetMsg extends RoutableMsg
{
  private DN baseDN;
  private final DN baseDN;
  /** Specifies the number of entries expected to be exported. */
  private long entryCount;
  private final long entryCount;
  /**
   * Specifies the serverID of the server that requested this export to happen.
   * It allows a server that previously sent an InitializeRequestMessage to know
   * that the current message is related to its own request.
   */
  private int requestorID;
  private final int requestorID;
  private int initWindow;
@@ -80,63 +78,24 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded InitializeMessage.
   */
  public InitializeTargetMsg(byte[] in, short version)
  throws DataFormatException
  InitializeTargetMsg(byte[] in, short version) throws DataFormatException
  {
    super();
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_INITIALIZE_TARGET)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_INITIALIZE_TARGET)
        throw new DataFormatException(
            "input is not a valid InitializeDestinationMessage");
      int pos = 1;
      // destination
      int length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length +1;
      // baseDN
      length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // sender
      length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      senderID = Integer.valueOf(senderString);
      pos += length +1;
      // requestor
      length = getNextLength(in, pos);
      String requestorString = new String(in, pos, length, "UTF-8");
      requestorID = Integer.valueOf(requestorString);
      pos += length +1;
      // entryCount
      length = getNextLength(in, pos);
      String entryCountString = new String(in, pos, length, "UTF-8");
      entryCount = Long.valueOf(entryCountString);
      pos += length +1;
    }
    destination = scanner.nextIntUTF8();
    baseDN = scanner.nextDN();
    senderID = scanner.nextIntUTF8();
    requestorID = scanner.nextIntUTF8();
    entryCount = scanner.nextLongUTF8();
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        // init window
        length = getNextLength(in, pos);
        String initWindowString = new String(in, pos, length, "UTF-8");
        initWindow = Integer.valueOf(initWindowString);
        pos += length +1;
      }
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
      initWindow = scanner.nextIntUTF8();
    }
  }
@@ -185,66 +144,22 @@
  // Msg encoding
  // ============
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version)
  throws UnsupportedEncodingException
  {
    try
    {
      byte[] byteDestination = String.valueOf(destination).getBytes("UTF-8");
      byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] byteSender = String.valueOf(senderID).getBytes("UTF-8");
      byte[] byteRequestor = String.valueOf(requestorID).getBytes("UTF-8");
      byte[] byteEntryCount = String.valueOf(entryCount).getBytes("UTF-8");
      byte[] byteInitWindow = null;
      int length = 1 + byteDestination.length + 1
                     + byteDn.length + 1
                     + byteSender.length + 1
                     + byteRequestor.length + 1
                     + byteEntryCount.length + 1;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_INITIALIZE_TARGET);
    builder.appendUTF8(destination);
    builder.append(baseDN);
    builder.appendUTF8(senderID);
    builder.appendUTF8(requestorID);
    builder.appendUTF8(entryCount);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        byteInitWindow = String.valueOf(initWindow).getBytes("UTF-8");
        length += byteInitWindow.length + 1;
      builder.appendUTF8(initWindow);
      }
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_INITIALIZE_TARGET;
      int pos = 1;
      /* put the destination */
      pos = addByteArray(byteDestination, resultByteArray, pos);
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the sender */
      pos = addByteArray(byteSender, resultByteArray, pos);
      /* put the requestorID */
      pos = addByteArray(byteRequestor, resultByteArray, pos);
      /* put the entryCount */
      pos = addByteArray(byteEntryCount, resultByteArray, pos);
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      {
        /* put the initWindow */
        pos = addByteArray(byteInitWindow, resultByteArray, pos);
      }
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/LDAPUpdateMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.zip.DataFormatException;
@@ -83,7 +82,7 @@
   * @param dn The DN of the entry on which the change
   *           that caused the creation of this object happened
   */
  public LDAPUpdateMsg(OperationContext ctx, DN dn)
  LDAPUpdateMsg(OperationContext ctx, DN dn)
  {
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.csn = ctx.getCSN();
@@ -101,7 +100,7 @@
   * @param dn        The DN of the entry on which the change
   *                  that caused the creation of this object happened
   */
  public LDAPUpdateMsg(CSN csn, String entryUUID, DN dn)
  LDAPUpdateMsg(CSN csn, String entryUUID, DN dn)
  {
    this.protocolVersion = ProtocolVersion.getCurrentVersion();
    this.csn = csn;
@@ -117,27 +116,19 @@
   */
  public static LDAPUpdateMsg generateMsg(PostOperationOperation op)
  {
    LDAPUpdateMsg msg = null;
    switch (op.getOperationType())
    {
    case MODIFY :
      msg = new ModifyMsg((PostOperationModifyOperation) op);
      break;
      return new ModifyMsg((PostOperationModifyOperation) op);
    case ADD:
      msg = new AddMsg((PostOperationAddOperation) op);
      break;
      return new AddMsg((PostOperationAddOperation) op);
    case DELETE :
      msg = new DeleteMsg((PostOperationDeleteOperation) op);
      break;
      return new DeleteMsg((PostOperationDeleteOperation) op);
    case MODIFY_DN :
      msg = new ModifyDNMsg( (PostOperationModifyDNOperation) op);
      break;
      return new ModifyDNMsg( (PostOperationModifyDNOperation) op);
    default:
      return null;
    }
    return msg;
  }
  /**
@@ -210,138 +201,62 @@
   * of a synchronized portion of code.
   *
   * This method is not synchronized and therefore not MT safe.
   *
   * @throws UnsupportedEncodingException when encoding fails.
   */
  public void encode() throws UnsupportedEncodingException
  public void encode()
  {
    bytes = getBytes(ProtocolVersion.getCurrentVersion());
  }
  /**
   * Encode the common header for all the UpdateMsg. This uses the current
   * protocol version.
   *
   * @param type the type of UpdateMsg to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMsg.
   * @param version The ProtocolVersion to use when encoding.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMsg as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   */
  /** {@inheritDoc} */
  @Override
  public byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  public ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
  {
    byte[] byteDn = dn.toString().getBytes("UTF-8");
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    byte[] byteEntryuuid = getEntryUUID().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><dn><entryuuid><assured>
     * <assured mode> <safe data level>
     * the length of result byte array is therefore :
     *   1 + 1 + CSN length + 1 + dn length + 1 + uuid length + 1 + 1
     *   + 1 + 1 + additional_length
     */
    int length = 8 + csnByte.length + byteDn.length
                 + byteEntryuuid.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    // put the protocol version
    encodedMsg[1] = (byte) version;
    int pos = 2;
    // Put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // Put the DN and a terminating 0
    pos = addByteArray(byteDn, encodedMsg, pos);
    // Put the entry uuid and a terminating 0
    pos = addByteArray(byteEntryuuid, encodedMsg, pos);
    // Put the assured flag
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // Put the assured mode
    encodedMsg[pos++] = assuredMode.getValue();
    // Put the safe data level
    encodedMsg[pos++] = safeDataLevel;
    return encodedMsg;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(msgType);
    builder.append((byte) protocolVersion);
    builder.appendUTF8(csn);
    builder.append(dn);
    builder.append(entryUUID);
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    return builder;
  }
  /**
   * Encode the common header for all the UpdateMessage. This uses the version
   * 1 of the replication protocol (used for compatibility purpose).
   *
   * @param type the type of UpdateMessage to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType the type of UpdateMessage to encode.
   * @return a byte array builder containing the common header
   */
  public byte[] encodeHeader_V1(byte type, int additionalLength)
    throws UnsupportedEncodingException
  ByteArrayBuilder encodeHeader_V1(byte msgType)
  {
    byte[] byteDn = dn.toString().getBytes("UTF-8");
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    byte[] byteEntryuuid = getEntryUUID().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><CSN><dn><assured><entryuuid><change>
     * the length of result byte array is therefore :
     *   1 + CSN length + 1 + dn length + 1  + 1 +
     *   uuid length + 1 + additional_length
     */
    int length = 5 + csnByte.length + byteDn.length
                 + byteEntryuuid.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    int pos = 1;
    // put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // put the assured information
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // put the DN and a terminating 0
    pos = addByteArray(byteDn, encodedMsg, pos);
    // put the entry uuid and a terminating 0
    pos = addByteArray(byteEntryuuid, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(msgType);
    builder.appendUTF8(csn);
    builder.append(assuredFlag);
    builder.append(dn);
    builder.append(entryUUID);
    return builder;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      return getBytes_V1();
    }
    else if (reqProtocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      return getBytes_V23();
    }
@@ -351,7 +266,7 @@
      if (bytes == null)
      {
        // this is the current version of the protocol
        bytes = getBytes_V45(reqProtocolVersion);
        bytes = getBytes_V45(protocolVersion);
      }
      return bytes;
    }
@@ -362,45 +277,35 @@
   * 1 of the replication protocol (used for compatibility purpose).
   *
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public abstract byte[] getBytes_V1() throws UnsupportedEncodingException;
  protected abstract byte[] getBytes_V1();
  /**
   * Get the byte array representation of this Message. This uses the version
   * 2 of the replication protocol (used for compatibility purpose).
   *
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public abstract byte[] getBytes_V23() throws UnsupportedEncodingException;
  protected abstract byte[] getBytes_V23();
  /**
   * Get the byte array representation of this Message. This uses the provided
   * version number which must be version 4 or newer.
   * @param reqProtocolVersion TODO
   *
   * @param protocolVersion the actual protocol version to encode into
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public abstract byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException;
  protected abstract byte[] getBytes_V45(short protocolVersion);
  /**
   * Encode a list of attributes.
   */
   static private byte[] encodeAttributes(Collection<Attribute> attributes)
   private static byte[] encodeAttributes(Collection<Attribute> attributes)
   {
     if (attributes==null)
     {
       return new byte[0];
     }
     try
     {
       ByteStringBuilder byteBuilder = new ByteStringBuilder();
@@ -424,151 +329,62 @@
  /**
   * Decode the Header part of this Update Message, and check its type.
   *
   * @param types The allowed types of this Update Message.
   * @param encodedMsg the encoded form of the UpdateMsg.
   * @return the position at which the remaining part of the message starts.
   * @param scanner the encoded form of the UpdateMsg.
   * @param allowedTypes The allowed types of this Update Message.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
   public int decodeHeader(byte[] types, byte[] encodedMsg)
  void decodeHeader(ByteArrayScanner scanner, byte... allowedTypes)
                          throws DataFormatException
   {
     // first byte is the type
     boolean foundMatchingType = false;
     for (byte type : types)
    final byte msgType = scanner.nextByte();
    if (!isTypeAllowed(msgType, allowedTypes))
     {
       if (type == encodedMsg[0])
       {
         foundMatchingType = true;
         break;
       }
     }
     if (!foundMatchingType)
       throw new DataFormatException("byte[] is not a valid update msg: "
           + encodedMsg[0]);
          + msgType);
    }
    if (msgType == MSG_TYPE_ADD_V1
        || msgType == MSG_TYPE_DELETE_V1
        || msgType == MSG_TYPE_MODIFYDN_V1
        || msgType == MSG_TYPE_MODIFY_V1)
    {
     /*
      * For older protocol version PDUs, decode the matching version header
      * instead.
       * For older protocol versions, decode the matching version header instead
      */
     if ((encodedMsg[0] == MSG_TYPE_ADD_V1) ||
         (encodedMsg[0] == MSG_TYPE_DELETE_V1) ||
         (encodedMsg[0] == MSG_TYPE_MODIFYDN_V1) ||
         (encodedMsg[0] == MSG_TYPE_MODIFY_V1))
     {
       return decodeHeader_V1(encodedMsg);
     }
     // read the protocol version
     protocolVersion = encodedMsg[1];
     try
     {
       // Read the CSN
       int pos = 2;
       int length = getNextLength(encodedMsg, pos);
       String csnStr = new String(encodedMsg, pos, length, "UTF-8");
       pos += length + 1;
       csn = new CSN(csnStr);
       // Read the dn
       length = getNextLength(encodedMsg, pos);
       dn = DN.decode(new String(encodedMsg, pos, length, "UTF-8"));
       pos += length + 1;
       // Read the entryuuid
       length = getNextLength(encodedMsg, pos);
       entryUUID = new String(encodedMsg, pos, length, "UTF-8");
       pos += length + 1;
       // Read the assured information
       assuredFlag = encodedMsg[pos++] == 1;
       // Read the assured mode
       assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
       // Read the safe data level
       safeDataLevel = encodedMsg[pos++];
       return pos;
     }
     catch (UnsupportedEncodingException e)
     {
       throw new DataFormatException("UTF-8 is not supported by this jvm.");
     }
     catch (IllegalArgumentException e)
     {
       throw new DataFormatException(e.getLocalizedMessage());
     }
     catch (DirectoryException e)
     {
       throw new DataFormatException(e.getLocalizedMessage());
     }
  }
  /**
   * Decode the Header part of this Update Message, and check its type. This
   * uses the version 1 of the replication protocol (used for compatibility
   * purpose).
   *
   * @param encodedMsg the encoded form of the UpdateMessage.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader_V1(byte[] encodedMsg)
                          throws DataFormatException
  {
    if ((encodedMsg[0] != MSG_TYPE_ADD_V1) &&
      (encodedMsg[0] != MSG_TYPE_DELETE_V1) &&
      (encodedMsg[0] != MSG_TYPE_MODIFYDN_V1) &&
      (encodedMsg[0] != MSG_TYPE_MODIFY_V1))
      throw new DataFormatException("byte[] is not a valid update msg: expected"
        + " a V1 PDU, received: " + encodedMsg[0]);
    // Force version to V1 (other new parameters take their default values
    // (assured stuff...))
    protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
    try
      csn = scanner.nextCSNUTF8();
      assuredFlag = scanner.nextBoolean();
      dn = scanner.nextDN();
      entryUUID = scanner.nextString();
    }
    else
    {
      // read the CSN
      int pos = 1;
      int length = getNextLength(encodedMsg, pos);
      String csnStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      csn = new CSN(csnStr);
      // read the assured information
      assuredFlag = encodedMsg[pos++] == 1;
      // read the dn
      length = getNextLength(encodedMsg, pos);
      dn = DN.decode(new String(encodedMsg, pos, length, "UTF-8"));
      pos += length + 1;
      // read the entryuuid
      length = getNextLength(encodedMsg, pos);
      entryUUID = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      return pos;
      protocolVersion = scanner.nextByte();
      csn = scanner.nextCSNUTF8();
      dn = scanner.nextDN();
      entryUUID = scanner.nextString();
      assuredFlag = scanner.nextBoolean();
      assuredMode = AssuredMode.valueOf(scanner.nextByte());
      safeDataLevel = scanner.nextByte();
    }
    catch (UnsupportedEncodingException e)
  }
  private boolean isTypeAllowed(final byte msgType, byte... allowedTypes)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    for (byte allowedType : allowedTypes)
    {
      throw new DataFormatException(e.getLocalizedMessage());
      if (msgType == allowedType)
      {
        return true;
    }
  }
    return false;
  }
  /**
   * Return the number of bytes used by this message.
   *
   * @return The number of bytes used by this message.
   */
  /** {@inheritDoc} */
  @Override
  public abstract int size();
@@ -613,7 +429,7 @@
   * @throws LDAPException when it occurs.
   * @throws ASN1Exception when it occurs.
   */
  public ArrayList<RawAttribute> decodeRawAttributes(byte[] in)
  ArrayList<RawAttribute> decodeRawAttributes(byte[] in)
  throws LDAPException, ASN1Exception
  {
    ArrayList<RawAttribute> rattr = new ArrayList<RawAttribute>();
@@ -642,7 +458,7 @@
   * @throws LDAPException when it occurs.
   * @throws ASN1Exception when it occurs.
   */
  public ArrayList<Attribute> decodeAttributes(byte[] in)
  ArrayList<Attribute> decodeAttributes(byte[] in)
  throws LDAPException, ASN1Exception
  {
    ArrayList<Attribute> lattr = new ArrayList<Attribute>();
opends/src/server/org/opends/server/replication/protocol/ModifyDNMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -67,10 +66,8 @@
    newSuperiorEntryUUID = ctx.getNewSuperiorEntryUUID();
    deleteOldRdn = operation.deleteOldRDN();
    if (operation.getRawNewSuperior() != null)
      newSuperior = operation.getRawNewSuperior().toString();
    else
      newSuperior = null;
    final ByteString rawNewSuperior = operation.getRawNewSuperior();
    newSuperior = rawNewSuperior != null ? rawNewSuperior.toString() : null;
    newRDN = operation.getRawNewRDN().toString();
  }
@@ -129,23 +126,19 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException The input byte[] is not a valid ModifyDNMsg.
   * @throws UnsupportedEncodingException If UTF8 is not supported.
   */
  public ModifyDNMsg(byte[] in) throws DataFormatException,
                                       UnsupportedEncodingException
  ModifyDNMsg(byte[] in) throws DataFormatException
  {
    // Decode header
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_MODIFYDN;
    allowedPduTypes[1] = MSG_TYPE_MODIFYDN_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_MODIFYDN, MSG_TYPE_MODIFYDN_V1);
    // protocol version has been read as part of the header
    if (protocolVersion <= 3)
      decodeBody_V123(in, pos);
    {
      decodeBody_V123(scanner, in[0]);
    }
    else
    {
      decodeBody_V4(in, pos);
      decodeBody_V4(scanner);
    }
    if (protocolVersion==ProtocolVersion.getCurrentVersion())
@@ -184,349 +177,81 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    int bodyLength = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      bodyLength += byteNewSuperior.length + 1;
    }
    else
      bodyLength += 1;
    if (newSuperiorEntryUUID != null)
    {
      byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
      bodyLength += byteNewSuperiorId.length + 1;
    }
    else
      bodyLength += 1;
    byte[] encodedMsg = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1, bodyLength);
    int pos = encodedMsg.length - bodyLength;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, encodedMsg, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorEntryUUID != null)
    {
      pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      encodedMsg[pos++] = 1;
    else
      encodedMsg[pos++] = 0;
    return encodedMsg;
    final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_MODIFYDN_V1);
    builder.append(newRDN);
    builder.append(newSuperior);
    builder.append(newSuperiorEntryUUID);
    builder.append(deleteOldRdn);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    // Encoding V2 / V3
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    int length = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      length += byteNewSuperior.length + 1;
    }
    else
      length += 1;
    if (newSuperiorEntryUUID != null)
    {
      byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
      length += byteNewSuperiorId.length + 1;
    }
    else
      length += 1;
    length += encodedMods.length + 1;
    /* encode the header in a byte[] large enough to also contain mods.. */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFYDN, length,
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    int pos = encodedMsg.length - length;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, encodedMsg, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorEntryUUID != null)
    {
      pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      encodedMsg[pos++] = 1;
    else
      encodedMsg[pos++] = 0;
    /* add the mods */
    if (encodedMods.length > 0)
    {
      pos = encodedMsg.length - (encodedMods.length + 1);
      addByteArray(encodedMods, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFYDN,ProtocolVersion.REPLICATION_PROTOCOL_V3);
    builder.append(newRDN);
    builder.append(newSuperior);
    builder.append(newSuperiorEntryUUID);
    builder.append(deleteOldRdn);
    builder.appendZeroTerminated(encodedMods);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    byte[] byteNewSuperior = null;
    byte[] byteNewSuperiorId = null;
    // calculate the length necessary to encode the parameters
    byte[] byteNewRdn = newRDN.getBytes("UTF-8");
    int bodyLength = byteNewRdn.length + 1 + 1;
    if (newSuperior != null)
    {
      byteNewSuperior = newSuperior.getBytes("UTF-8");
      bodyLength += byteNewSuperior.length + 1;
    }
    else
      bodyLength += 1;
    if (newSuperiorEntryUUID != null)
    {
      byteNewSuperiorId = newSuperiorEntryUUID.getBytes("UTF-8");
      bodyLength += byteNewSuperiorId.length + 1;
    }
    else
      bodyLength += 1;
    byte[] byteModsLen =
      String.valueOf(encodedMods.length).getBytes("UTF-8");
    bodyLength += byteModsLen.length + 1;
    bodyLength += encodedMods.length + 1;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    /* encode the header in a byte[] large enough to also contain mods.. */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFYDN, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    /* put the new RDN and a terminating 0 */
    pos = addByteArray(byteNewRdn, encodedMsg, pos);
    /* put the newsuperior and a terminating 0 */
    if (newSuperior != null)
    {
      pos = addByteArray(byteNewSuperior, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the newsuperiorId and a terminating 0 */
    if (newSuperiorEntryUUID != null)
    {
      pos = addByteArray(byteNewSuperiorId, encodedMsg, pos);
    }
    else
      encodedMsg[pos++] = 0;
    /* put the deleteoldrdn flag */
    if (deleteOldRdn)
      encodedMsg[pos++] = 1;
    else
      encodedMsg[pos++] = 0;
    pos = addByteArray(byteModsLen, encodedMsg, pos);
    pos = addByteArray(encodedMods, encodedMsg, pos);
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFYDN, protocolVersion);
    builder.append(newRDN);
    builder.append(newSuperior);
    builder.append(newSuperiorEntryUUID);
    builder.append(deleteOldRdn);
    builder.appendUTF8(encodedMods.length);
    builder.appendZeroTerminated(encodedMods);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.appendZeroTerminated(encodedEclIncludes);
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decodeBody_V123(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V123(ByteArrayScanner scanner, byte msgType)
      throws DataFormatException
  {
    /* read the newRDN
     * first calculate the length then construct the string
     */
    int length = getNextLength(in, pos);
    newRDN = new String(in, pos, length, "UTF-8");
    pos += length + 1;
    /* read the newSuperior
     * first calculate the length then construct the string
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperior = new String(in, pos, length, "UTF-8");
    else
      newSuperior = null;
    pos += length + 1;
    /* read the new parent Id
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperiorEntryUUID = new String(in, pos, length, "UTF-8");
    else
      newSuperiorEntryUUID = null;
    pos += length + 1;
    /* get the deleteoldrdn flag */
    deleteOldRdn = in[pos] != 0;
    pos++;
    newRDN = scanner.nextString();
    newSuperior = scanner.nextString();
    newSuperiorEntryUUID = scanner.nextString();
    deleteOldRdn = scanner.nextBoolean();
    // For easiness (no additional method), simply compare PDU type to
    // know if we have to read the mods of V2
    if (in[0] == MSG_TYPE_MODIFYDN)
    if (msgType == MSG_TYPE_MODIFYDN)
    {
      /* Read the mods : all the remaining bytes but the terminating 0 */
      length = in.length - pos - 1;
      if (length > 0) // Otherwise, there is only the trailing 0 byte which we
        // do not need to read
      {
        encodedMods = new byte[length];
        try
        {
          System.arraycopy(in, pos, encodedMods, 0, length);
        } catch (IndexOutOfBoundsException e)
        {
          throw new DataFormatException(e.getMessage());
        } catch (ArrayStoreException e)
        {
          throw new DataFormatException(e.getMessage());
        } catch (NullPointerException e)
        {
          throw new DataFormatException(e.getMessage());
        }
      }
      encodedMods = scanner.remainingBytesZeroTerminated();
    }
  }
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    /* read the newRDN
     * first calculate the length then construct the string
     */
    int length = getNextLength(in, pos);
    newRDN = new String(in, pos, length, "UTF-8");
    pos += length + 1;
    newRDN = scanner.nextString();
    newSuperior = scanner.nextString();
    newSuperiorEntryUUID = scanner.nextString();
    deleteOldRdn = scanner.nextBoolean();
    /* read the newSuperior
     * first calculate the length then construct the string
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperior = new String(in, pos, length, "UTF-8");
    else
      newSuperior = null;
    pos += length + 1;
    final int modsLen = scanner.nextIntUTF8();
    encodedMods = scanner.nextByteArray(modsLen);
    scanner.skipZeroSeparator();
    /* read the new parent Id
     */
    length = getNextLength(in, pos);
    if (length != 0)
      newSuperiorEntryUUID = new String(in, pos, length, "UTF-8");
    else
      newSuperiorEntryUUID = null;
    pos += length + 1;
    /* get the deleteoldrdn flag */
    deleteOldRdn = in[pos] != 0;
    pos++;
    // Read mods len
    length = getNextLength(in, pos);
    int modsLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode attributes
    this.encodedMods = new byte[modsLen];
    try
    {
      System.arraycopy(in, pos, encodedMods, 0, modsLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    pos += modsLen + 1;
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
  }
  /** {@inheritDoc} */
opends/src/server/org/opends/server/replication/protocol/ModifyMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.zip.DataFormatException;
@@ -50,7 +49,7 @@
   *
   * @param op The operation to use for building the message
   */
  public ModifyMsg(PostOperationModifyOperation op)
  ModifyMsg(PostOperationModifyOperation op)
  {
    super((OperationContext) op.getAttachment(OperationContext.SYNCHROCONTEXT),
          op.getEntryDN());
@@ -77,41 +76,35 @@
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
   */
  public ModifyMsg(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  ModifyMsg(byte[] in) throws DataFormatException
  {
    // Decode header
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_MODIFY;
    allowedPduTypes[1] = MSG_TYPE_MODIFY_V1;
    int pos = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_MODIFY, MSG_TYPE_MODIFY_V1);
    // protocol version has been read as part of the header
    if (protocolVersion <= 3)
      decodeBody_V123(in, pos);
    {
      decodeBody_V123(scanner);
    }
    else
      decodeBody_V4(in, pos);
    {
      decodeBody_V4(scanner);
    }
    if (protocolVersion==ProtocolVersion.getCurrentVersion())
    {
      bytes = in;
    }
  }
  /**
   * Creates a new Modify message from a V1 byte[].
   *
   * @param in The byte[] from which the operation must be read.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   * @throws UnsupportedEncodingException If UTF8 is not supported by the JVM.
   *
   * @return The created ModifyMsg.
   * @throws DataFormatException If the input byte[] is not a valid ModifyMsg
   */
  public static ModifyMsg createV1(byte[] in) throws DataFormatException,
                                     UnsupportedEncodingException
  static ModifyMsg createV1(byte[] in) throws DataFormatException
  {
    ModifyMsg msg = new ModifyMsg(in);
@@ -127,7 +120,9 @@
      DN newDN) throws LDAPException, ASN1Exception, DataFormatException
  {
    if (newDN == null)
    {
      newDN = getDN();
    }
    List<RawModification> ldapmods = decodeRawMods(encodedMods);
@@ -178,134 +173,53 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  public byte[] getBytes_V1()
  {
    /* encode the header in a byte[] large enough to also contain the mods */
    byte[] encodedMsg = encodeHeader_V1(MSG_TYPE_MODIFY_V1, encodedMods.length +
      1);
    /* add the mods */
    int pos = encodedMsg.length - (encodedMods.length + 1);
    addByteArray(encodedMods, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder = encodeHeader_V1(MSG_TYPE_MODIFY_V1);
    builder.append(encodedMods);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V23() throws UnsupportedEncodingException
  public byte[] getBytes_V23()
  {
    // Encoding V2 / V3
    /* encode the header in a byte[] large enough to also contain mods */
    byte[] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, encodedMods.length + 1,
        ProtocolVersion.REPLICATION_PROTOCOL_V3);
    /* add the mods */
    int pos = encodedMsg.length - (encodedMods.length + 1);
    addByteArray(encodedMods, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFY, ProtocolVersion.REPLICATION_PROTOCOL_V3);
    builder.append(encodedMods);
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes_V45(short reqProtocolVersion)
      throws UnsupportedEncodingException
  public byte[] getBytes_V45(short protocolVersion)
  {
    int bodyLength = 0;
    byte[] byteModsLen =
      String.valueOf(encodedMods.length).getBytes("UTF-8");
    bodyLength += byteModsLen.length + 1;
    bodyLength += encodedMods.length + 1;
    byte[] byteEntryAttrLen =
      String.valueOf(encodedEclIncludes.length).getBytes("UTF-8");
    bodyLength += byteEntryAttrLen.length + 1;
    bodyLength += encodedEclIncludes.length + 1;
    /* encode the header in a byte[] large enough to also contain the mods */
    byte [] encodedMsg = encodeHeader(MSG_TYPE_MODIFY, bodyLength,
        reqProtocolVersion);
    int pos = encodedMsg.length - bodyLength;
    pos = addByteArray(byteModsLen, encodedMsg, pos);
    pos = addByteArray(encodedMods, encodedMsg, pos);
    pos = addByteArray(byteEntryAttrLen, encodedMsg, pos);
    pos = addByteArray(encodedEclIncludes, encodedMsg, pos);
    return encodedMsg;
    final ByteArrayBuilder builder =
        encodeHeader(MSG_TYPE_MODIFY, protocolVersion);
    builder.appendUTF8(encodedMods.length);
    builder.append(encodedMods);
    builder.appendUTF8(encodedEclIncludes.length);
    builder.append(encodedEclIncludes);
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decodeBody_V123(byte[] in, int pos)
  private void decodeBody_V123(ByteArrayScanner scanner)
  throws DataFormatException
  {
    // Read and store the mods, in encoded form
    // all the remaining bytes but the terminating 0 */
    int length = in.length - pos - 1;
    encodedMods = new byte[length];
    try
    {
      System.arraycopy(in, pos, encodedMods, 0, length);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    encodedMods = scanner.remainingBytes();
  }
  private void decodeBody_V4(byte[] in, int pos)
  throws DataFormatException, UnsupportedEncodingException
  private void decodeBody_V4(ByteArrayScanner scanner)
      throws DataFormatException
  {
    // Read mods len
    int length = getNextLength(in, pos);
    int modsLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    final int modsLen = scanner.nextIntUTF8();
    this.encodedMods = scanner.nextByteArray(modsLen);
    // Read/Don't decode mods
    this.encodedMods = new byte[modsLen];
    try
    {
      System.arraycopy(in, pos, encodedMods, 0, modsLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    pos += modsLen + 1;
    // Read ecl attr len
    length = getNextLength(in, pos);
    int eclAttrLen = Integer.valueOf(new String(in, pos, length,"UTF-8"));
    pos += length + 1;
    // Read/Don't decode entry attributes
    encodedEclIncludes = new byte[eclAttrLen];
    try
    {
      System.arraycopy(in, pos, encodedEclIncludes, 0, eclAttrLen);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    final int eclAttrLen = scanner.nextIntUTF8();
    encodedEclIncludes = scanner.nextByteArray(eclAttrLen);
  }
}
opends/src/server/org/opends/server/replication/protocol/MonitorMsg.java
@@ -27,21 +27,16 @@
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.zip.DataFormatException;
import org.opends.server.protocols.asn1.ASN1;
import org.opends.server.protocols.asn1.ASN1Reader;
import org.opends.server.protocols.asn1.ASN1Writer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
/**
 * This message is part of the replication protocol.
@@ -69,7 +64,7 @@
   * first missing change for each LDAP server connected to a Replication
   * Server.
   */
  static class ServerData
  private static class ServerData
  {
    private ServerState state;
    private long approxFirstMissingDate;
@@ -79,7 +74,7 @@
   * Data structure to manage the state of this replication server
   * and the state information for the servers connected to it.
   */
  static class SubTopoMonitorData
  private static class SubTopoMonitorData
  {
    /** This replication server DbState. */
    private ServerState replServerDbState;
@@ -91,7 +86,7 @@
        new HashMap<Integer, ServerData>();
  }
  private SubTopoMonitorData data = new SubTopoMonitorData();
  private final SubTopoMonitorData data = new SubTopoMonitorData();
  /**
   * Creates a new MonitorMsg.
@@ -120,19 +115,23 @@
   * @param state The server state.
   * @param approxFirstMissingDate  The approximation of the date
   * of the older missing change. null when none.
   * @param isLDAP Specifies whether the server is a LS or a RS
   * @param isLDAPServer Specifies whether the server is a DS or a RS
   */
  public void setServerState(int serverId, ServerState state,
      long approxFirstMissingDate, boolean isLDAP)
      long approxFirstMissingDate, boolean isLDAPServer)
  {
    ServerData sd = new ServerData();
    final ServerData sd = new ServerData();
    sd.state = state;
    sd.approxFirstMissingDate = approxFirstMissingDate;
    if (isLDAP)
    if (isLDAPServer)
    {
      data.ldapStates.put(serverId, sd);
    }
    else
    {
      data.rsStates.put(serverId, sd);
  }
  }
  /**
   * Get the server state for the LDAP server with the provided serverId.
@@ -154,7 +153,6 @@
    return data.rsStates.get(serverId).state;
  }
  /**
   * Get the approximation of the date of the older missing change for the
   * LDAP Server with the provided server Id.
@@ -185,69 +183,32 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMessage.
   */
  public MonitorMsg(byte[] in, short version) throws DataFormatException
  MonitorMsg(byte[] in, short version) throws DataFormatException
  {
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_REPL_SERVER_MONITOR)
    {
      throw new DataFormatException("input is not a valid "
          + getClass().getCanonicalName());
    }
    if (version == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      try
      {
        /* first byte is the type */
        if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR)
          throw new DataFormatException("input is not a valid " +
              this.getClass().getCanonicalName());
        int pos = 1;
        // sender
        int length = getNextLength(in, pos);
        String senderIDString = new String(in, pos, length, "UTF-8");
        this.senderID = Integer.valueOf(senderIDString);
        pos += length +1;
        // destination
        length = getNextLength(in, pos);
        String destinationString = new String(in, pos, length, "UTF-8");
        this.destination = Integer.valueOf(destinationString);
        pos += length +1;
        reader.position(pos);
      this.senderID = scanner.nextIntUTF8();
      this.destination = scanner.nextIntUTF8();
      }
      catch (UnsupportedEncodingException e)
    else if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
      {
        throw new DataFormatException("UTF-8 is not supported by this jvm.");
      }
      this.senderID = scanner.nextShort();
      this.destination = scanner.nextShort();
    }
    else
    {
      if (reader.get() != MSG_TYPE_REPL_SERVER_MONITOR)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      /*
       * V4 and above uses integers for its serverIds while V2 and V3
       * use shorts.
       */
      if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
      {
        // sender
        this.senderID = reader.getShort();
        // destination
        this.destination = reader.getShort();
      }
      else
      {
        // sender
        this.senderID = reader.getInt();
        // destination
        this.destination = reader.getInt();
      }
      this.senderID = scanner.nextInt();
      this.destination = scanner.nextInt();
    }
    ASN1Reader asn1Reader = ASN1.getReader(reader);
    ASN1Reader asn1Reader = scanner.getASN1Reader();
    try
    {
      asn1Reader.readStartSequence();
@@ -297,13 +258,7 @@
        else
        {
          // the next states are the server states
          ServerData sd = new ServerData();
          sd.state = newState;
          sd.approxFirstMissingDate = outime;
          if (isLDAPServer)
            data.ldapStates.put(serverId, sd);
          else
            data.rsStates.put(serverId, sd);
          setServerState(serverId, newState, outime, isLDAPServer);
        }
      }
      asn1Reader.readEndSequence();
@@ -312,39 +267,19 @@
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      ByteStringBuilder byteBuilder = new ByteStringBuilder();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        /* put the type of the operation */
        byteBuilder.append(MSG_TYPE_REPL_SERVER_MONITOR);
        /*
         * V4 and above uses integers for its serverIds while V2 and V3
         * use shorts.
         */
        if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
        {
          byteBuilder.append(senderID);
          byteBuilder.append(destination);
        }
        else
        {
          byteBuilder.append((short)senderID);
          byteBuilder.append((short)destination);
        }
      }
      final ByteArrayBuilder builder = new ByteArrayBuilder();
      builder.append(MSG_TYPE_REPL_SERVER_MONITOR);
      append(builder, senderID, protocolVersion);
      append(builder, destination, protocolVersion);
      /* Put the serverStates ... */
      ASN1Writer writer = ASN1.getWriter(byteBuilder);
      ASN1Writer writer = builder.getASN1Writer();
      writer.writeStartSequence();
      {
        /* first put the Replication Server state */
@@ -354,39 +289,18 @@
        }
        writer.writeEndSequence();
        // then the LDAP server data
        // then the DS + RS server data
        writeServerStates(protocolVersion, writer, false /* DS */);
        // then the RS server datas
        writeServerStates(protocolVersion, writer, true /* RS */);
      }
      writer.writeEndSequence();
      if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
      if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
      {
        return byteBuilder.toByteArray();
        // legacy coding mistake
        builder.append((byte) 0);
      }
      else
      {
        byte[] temp = byteBuilder.toByteArray();
        byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
        byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
        int length = 1 +  1 + senderBytes.length +
        1 + destinationBytes.length + temp.length +1;
        byte[] resultByteArray = new byte[length];
        /* put the type of the operation */
        resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR;
        int pos = 1;
        pos = addByteArray(senderBytes, resultByteArray, pos);
        pos = addByteArray(destinationBytes, resultByteArray, pos);
        pos = addByteArray(temp, resultByteArray, pos);
        return resultByteArray;
      }
      return builder.toByteArray();
    }
    catch (Exception e)
    {
@@ -394,6 +308,23 @@
    }
  }
  private void append(final ByteArrayBuilder builder, int data,
      short protocolVersion)
  {
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      builder.appendUTF8(data);
    }
    else if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      builder.append((short) data);
    }
    else // protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V4
    {
      builder.append(data);
    }
  }
  private void writeServerStates(short protocolVersion, ASN1Writer writer,
      boolean writeRSStates) throws IOException
  {
@@ -454,8 +385,6 @@
    return data.rsStates.keySet().iterator();
  }
  /**
   * Get the destination.
   *
@@ -466,8 +395,6 @@
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
@@ -478,15 +405,11 @@
    return senderID;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    StringBuilder stateS = new StringBuilder("\nRState:[");
    final StringBuilder stateS = new StringBuilder("\nRState:[");
    stateS.append(data.replServerDbState);
    stateS.append("]");
@@ -502,10 +425,10 @@
    stateS.append("\nRSStates:[");
    for (Entry<Integer, ServerData> entry : data.rsStates.entrySet())
    {
      ServerData sd = entry.getValue();
      final ServerData sd = entry.getValue();
      stateS.append("\n[RSState(").append(entry.getKey()).append(")=")
            .append(sd.state).append("]").append(" afmd=")
            .append(sd.approxFirstMissingDate + "]");
            .append(sd.approxFirstMissingDate).append("]");
    }
    return getClass().getCanonicalName() +
    "[ sender=" + this.senderID +
opends/src/server/org/opends/server/replication/protocol/MonitorRequestMsg.java
@@ -26,7 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -46,8 +45,6 @@
   */
  private final int senderID;
  /**
   * Creates a message.
   *
@@ -70,69 +67,29 @@
   * @throws DataFormatException
   *           If the in does not contain a properly, encoded message.
   */
  public MonitorRequestMsg(byte[] in) throws DataFormatException
  MonitorRequestMsg(byte[] in) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
    {
      // First byte is the type
      if (in[0] != MSG_TYPE_REPL_SERVER_MONITOR_REQUEST)
        throw new DataFormatException("input is not a valid "
            + this.getClass().getCanonicalName());
      int pos = 1;
      // sender
      int length = getNextLength(in, pos);
      String senderString = new String(in, pos, length, "UTF-8");
      this.senderID = Integer.valueOf(senderString);
      pos += length + 1;
      // destination
      length = getNextLength(in, pos);
      String destinationString = new String(in, pos, length, "UTF-8");
      this.destination = Integer.valueOf(destinationString);
      pos += length + 1;
          + getClass().getCanonicalName());
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    this.senderID = scanner.nextIntUTF8();
    this.destination = scanner.nextIntUTF8();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      byte[] senderBytes = String.valueOf(senderID).getBytes("UTF-8");
      byte[] destinationBytes = String.valueOf(destination).getBytes("UTF-8");
      int length = 1 + senderBytes.length + 1 + destinationBytes.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_REPL_SERVER_MONITOR_REQUEST;
      int pos = 1;
      /* put the sender */
      pos = addByteArray(senderBytes, resultByteArray, pos);
      /* put the destination */
      pos = addByteArray(destinationBytes, resultByteArray, pos);
      return resultByteArray;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_REPL_SERVER_MONITOR_REQUEST);
    builder.appendUTF8(senderID);
    builder.appendUTF8(destination);
    return builder.toByteArray();
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
  /**
   * Get the destination.
@@ -144,8 +101,6 @@
    return destination;
  }
  /**
   * Get the server ID of the server that sent this message.
   *
@@ -156,13 +111,12 @@
    return senderID;
  }
  /**
   * Returns a string representation of the message.
   *
   * @return the string representation of this message.
   */
  @Override
  public String toString()
  {
    return "[" + getClass().getCanonicalName() + " sender=" + senderID
opends/src/server/org/opends/server/replication/protocol/ReplServerStartDSMsg.java
@@ -22,16 +22,14 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * Message sent by a replication server to a directory server in reply to the
@@ -39,17 +37,17 @@
 */
public class ReplServerStartDSMsg extends StartMsg
{
  private int serverId;
  private String serverURL;
  private DN baseDN;
  private int windowSize;
  private ServerState serverState;
  private final int serverId;
  private final String serverURL;
  private final DN baseDN;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * Threshold value used by the RS to determine if a DS must be put in
@@ -61,12 +59,12 @@
  /**
   * The weight affected to the replication server.
   */
  private int weight = -1;
  private final int weight;
  /**
   * Number of currently connected DS to the replication server.
   */
  private int connectedDSNumber = -1;
  private final int connectedDSNumber;
  /**
   * Create a ReplServerStartDSMsg.
@@ -115,100 +113,25 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartDSMsg.
   */
  public ReplServerStartDSMsg(byte[] in) throws DataFormatException
  ReplServerStartDSMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START_DS;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_REPL_SERVER_START_DS);
    try
    {
      /* The ReplServerStartDSMsg payload is stored in the form :
       * <baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <degradedStatusThreshold><weight><connectedDSNumber>
       * <serverState>
       */
      /* first bytes are the header */
      int pos = headerLength;
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /**
       * read the degraded status threshold
       */
      length = getNextLength(in, pos);
      degradedStatusThreshold =
        Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      /*
       * read the weight
       */
      length = getNextLength(in, pos);
      String weightString = new String(in, pos, length, "UTF-8");
      weight = Integer.valueOf(weightString);
      pos += length +1;
      /*
       * read the connected DS number
       */
      length = getNextLength(in, pos);
      String connectedDSNumberString = new String(in, pos, length, "UTF-8");
      connectedDSNumber = Integer.valueOf(connectedDSNumberString);
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
    baseDN = scanner.nextDN();
    serverId = scanner.nextIntUTF8();
    serverURL = scanner.nextString();
    windowSize = scanner.nextIntUTF8();
    sslEncryption = Boolean.valueOf(scanner.nextString());//FIXME
    degradedStatusThreshold =scanner.nextIntUTF8();
    weight = scanner.nextIntUTF8();
    connectedDSNumber = scanner.nextIntUTF8();
    serverState = scanner.nextServerState();
  }
  /**
@@ -248,72 +171,28 @@
    return this.serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
     throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    /* The ReplServerStartDSMsg is stored in the form :
     * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><weight><connectedDSNumber>
     * <serverState>
     */
    byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
    byte[] byteWeight =
      String.valueOf(weight).getBytes("UTF-8");
    byte[] byteConnectedDSNumber =
      String.valueOf(connectedDSNumber).getBytes("UTF-8");
    int length = byteDn.length + 1 + byteServerId.length + 1 +
      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
      byteSSLEncryption.length + 1 + byteDegradedStatusThreshold.length + 1 +
      byteWeight.length + 1 + byteConnectedDSNumber.length + 1 +
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START_DS,
        length, sessionProtocolVersion);
    int pos = headerLength;
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the weight */
    pos = addByteArray(byteWeight, resultByteArray, pos);
    /* put the connected DS number */
    pos = addByteArray(byteConnectedDSNumber, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    encodeHeader(MSG_TYPE_REPL_SERVER_START_DS, builder, protocolVersion);
    builder.append(baseDN);
    builder.appendUTF8(serverId);
    builder.append(serverURL);
    builder.appendUTF8(windowSize);
    builder.append(Boolean.toString(sslEncryption));
    builder.appendUTF8(degradedStatusThreshold);
    builder.appendUTF8(weight);
    builder.appendUTF8(connectedDSNumber);
    // Caution: ServerState MUST be the last field.
    builder.append(serverState);
    return builder.toByteArray();
  }
  /**
@@ -356,9 +235,7 @@
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/ReplServerStartMsg.java
@@ -22,16 +22,14 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * Message sent by a replication server to another replication server
@@ -39,17 +37,17 @@
 */
public class ReplServerStartMsg extends StartMsg
{
  private Integer serverId;
  private String serverURL;
  private DN baseDN;
  private int windowSize;
  private ServerState serverState;
  private final int serverId;
  private final String serverURL;
  private final DN baseDN;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * NOTE: Starting from protocol V4, we introduce a dedicated PDU for answering
@@ -106,166 +104,28 @@
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartMsg.
   */
  public ReplServerStartMsg(byte[] in) throws DataFormatException
  ReplServerStartMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[2];
    allowedPduTypes[0] = MSG_TYPE_REPL_SERVER_START;
    allowedPduTypes[1] = MSG_TYPE_REPL_SERVER_START_V1;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner,
        MSG_TYPE_REPL_SERVER_START, MSG_TYPE_REPL_SERVER_START_V1);
    // Protocol version has been read as part of the header:
    // decode the body according to the protocol version read in the header
    switch(protocolVersion)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        decodeBody_V1(in, headerLength);
        return;
    }
    try
    {
      /* The ReplServerStartMsg payload is stored in the form :
       * <baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <degradedStatusThreshold><serverState>
       */
    baseDN = scanner.nextDN();
    serverId = scanner.nextIntUTF8();
    serverURL = scanner.nextString();
    windowSize = scanner.nextIntUTF8();
    sslEncryption = Boolean.valueOf(scanner.nextString());
      /* first bytes are the header */
      int pos = headerLength;
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /**
       * read the degraded status threshold
       */
      length = getNextLength(in, pos);
      degradedStatusThreshold =
        Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    if (protocolVersion > ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
      degradedStatusThreshold = scanner.nextIntUTF8();
  }
  /**
   * Decodes the body of a just received ReplServerStartMsg. The body is in the
   * passed array, and starts at the provided location. This is for a PDU
   * encoded in V1 protocol version.
   * @param in A byte array containing the body for the ReplServerStartMsg
   * @param pos The position in the array where the decoding should start
   * @throws DataFormatException If the in does not contain a properly
   *                             encoded ReplServerStartMsg.
   */
  public void decodeBody_V1(byte[] in, int pos) throws DataFormatException
  {
    try
    {
      /* The ReplServerStartMsg payload is stored in the form :
       * <baseDN><serverId><serverURL><windowSize><sslEncryption>
       * <serverState>
       */
      /* read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the window size
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of serverid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
    serverState = scanner.nextServerState();
  }
  /**
@@ -305,69 +165,43 @@
    return this.serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
     throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    // If an older version requested, encode in the requested way
    switch(sessionProtocolVersion)
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      case ProtocolVersion.REPLICATION_PROTOCOL_V1:
        return getBytes_V1();
      /*
       * The ReplServerStartMessage is stored in the form :
       * <operation type><basedn><serverid><serverURL><windowsize><serverState>
       */
      encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1, builder);
      builder.append(baseDN);
      builder.appendUTF8(serverId);
      builder.append(serverURL);
      builder.appendUTF8(windowSize);
      builder.append(Boolean.toString(sslEncryption));
      // Caution: ServerState MUST be the last field.
      builder.append(serverState);
    }
    else
    {
    /* The ReplServerStartMsg is stored in the form :
     * <operation type><baseDN><serverId><serverURL><windowSize><sslEncryption>
     * <degradedStatusThreshold><serverState>
     */
    byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
    byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
    byte[] byteServerUrl = serverURL.getBytes("UTF-8");
    byte[] byteServerState = serverState.getBytes();
    byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
    byte[] byteSSLEncryption =
      String.valueOf(sslEncryption).getBytes("UTF-8");
    byte[] byteDegradedStatusThreshold =
      String.valueOf(degradedStatusThreshold).getBytes("UTF-8");
    int length = byteDn.length + 1 + byteServerId.length + 1 +
      byteServerUrl.length + 1 + byteWindowSize.length + 1 +
      byteSSLEncryption.length + 1 +
      byteDegradedStatusThreshold.length + 1 +
      byteServerState.length + 1;
    /* encode the header in a byte[] large enough */
    byte resultByteArray[] = encodeHeader(MSG_TYPE_REPL_SERVER_START, length,
        sessionProtocolVersion);
    int pos = headerLength;
    /* put the baseDN and a terminating 0 */
    pos = addByteArray(byteDn, resultByteArray, pos);
    /* put the ServerId */
    pos = addByteArray(byteServerId, resultByteArray, pos);
    /* put the ServerURL */
    pos = addByteArray(byteServerUrl, resultByteArray, pos);
    /* put the window size */
    pos = addByteArray(byteWindowSize, resultByteArray, pos);
    /* put the SSL Encryption setting */
    pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
    /* put the degraded status threshold */
    pos = addByteArray(byteDegradedStatusThreshold, resultByteArray, pos);
    /* put the ServerState */
    pos = addByteArray(byteServerState, resultByteArray, pos);
    return resultByteArray;
      encodeHeader(MSG_TYPE_REPL_SERVER_START, builder, protocolVersion);
      builder.append(baseDN);
      builder.appendUTF8(serverId);
      builder.append(serverURL);
      builder.appendUTF8(windowSize);
      builder.append(Boolean.toString(sslEncryption));
      builder.appendUTF8(degradedStatusThreshold);
      // Caution: ServerState MUST be the last field.
      builder.append(serverState);
    }
    return builder.toByteArray();
  }
  /**
@@ -410,9 +244,7 @@
    this.degradedStatusThreshold = degradedStatusThreshold;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -428,64 +260,4 @@
      "\ndegradedStatusThreshold: " + degradedStatusThreshold +
      "\nwindowSize: " + windowSize;
  }
  /**
   * Get the byte array representation of this Message. This uses the version
   * 1 of the replication protocol (used for compatibility purpose).
   *
   * @return The byte array representation of this Message.
   *
   * @throws UnsupportedEncodingException  When the encoding of the message
   *         failed because the UTF-8 encoding is not supported.
   */
  public byte[] getBytes_V1() throws UnsupportedEncodingException
  {
    /*
     * The ReplServerStartMessage is stored in the form :
     * <operation type><basedn><serverid><serverURL><windowsize><serverState>
     */
    try {
      byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      byte[] byteWindowSize = String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 + byteWindowSize.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough */
      byte resultByteArray[] = encodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1,
        length);
      int pos = headerLength;
      /* put the baseDN and a terminating 0 */
      pos = addByteArray(byteDn, resultByteArray, pos);
      /* put the ServerId */
      pos = addByteArray(byteServerId, resultByteArray, pos);
      /* put the ServerURL */
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      /* put the window size */
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      /* put the SSL Encryption setting */
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      /* put the ServerState */
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
}
opends/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
@@ -106,15 +105,8 @@
   *          The protocol version to use for serialization. The version should
   *          normally be older than the current one.
   * @return The encoded PDU.
   * @throws UnsupportedEncodingException
   *           When the encoding of the message failed because the UTF-8
   *           encoding is not supported or the requested protocol version to
   *           use is not supported by this PDU.
   */
  public abstract byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException;
  public abstract byte[] getBytes(short protocolVersion);
  /**
   * Generates a ReplicationMsg from its encoded form. This un-serialization is
@@ -128,15 +120,12 @@
   * @return The generated SynchronizationMessage.
   * @throws DataFormatException
   *           If the encoded form was not a valid msg.
   * @throws UnsupportedEncodingException
   *           If UTF8 is not supported.
   * @throws NotSupportedOldVersionPDUException
   *           If the PDU is part of an old protocol version and we do not
   *           support it.
   */
  public static ReplicationMsg generateMsg(byte[] buffer, short protocolVersion)
      throws DataFormatException, UnsupportedEncodingException,
      NotSupportedOldVersionPDUException
      throws DataFormatException, NotSupportedOldVersionPDUException
  {
    switch (buffer[0])
    {
@@ -214,51 +203,4 @@
      throw new DataFormatException("received message with unknown type");
    }
  }
  /**
   * Concatenate the tail byte array into the resultByteArray.
   * The resultByteArray must be large enough before calling this method.
   *
   * @param tail the byte array to concatenate.
   * @param resultByteArray The byte array to concatenate to.
   * @param pos the position where to concatenate.
   * @return the next position to use in the resultByteArray.
   */
  protected static int addByteArray(byte[] tail, byte[] resultByteArray,
    int pos)
  {
    for (int i=0; i<tail.length; i++,pos++)
    {
      resultByteArray[pos] = tail[i];
    }
    resultByteArray[pos++] = 0;
    return pos;
  }
  /**
   * Get the length of the next String encoded in the in byte array.
   *
   * @param in
   *          the byte array where to calculate the string.
   * @param pos
   *          the position where to start from in the byte array.
   * @return the length of the next string.
   * @throws DataFormatException
   *           If the byte array does not end with null.
   */
  protected static int getNextLength(byte[] in, int pos)
      throws DataFormatException
  {
    int offset = pos;
    int length = 0;
    while (in[offset++] != 0)
    {
      if (offset >= in.length)
        throw new DataFormatException("byte[] is not a valid msg");
      length++;
    }
    return length;
  }
}
opends/src/server/org/opends/server/replication/protocol/ResetGenerationIdMsg.java
@@ -22,23 +22,19 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by an LDAP server to communicate to the topology
 * that the generation must be reset for the domain.
 */
public class ResetGenerationIdMsg extends ReplicationMsg
{
  private long generationId;
  private final long generationId;
  /**
   * Creates a new message.
@@ -57,52 +53,25 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMessage.
   */
  public ResetGenerationIdMsg(byte[] in) throws DataFormatException
  ResetGenerationIdMsg(byte[] in) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    if (scanner.nextByte() != MSG_TYPE_RESET_GENERATION_ID)
    {
      if (in[0] != MSG_TYPE_RESET_GENERATION_ID)
        throw new
        DataFormatException("input is not a valid GenerationId Message");
      int pos = 1;
      /* read the generationId */
      int length = getNextLength(in, pos);
      generationId = Long.valueOf(new String(in, pos, length,
      "UTF-8"));
      pos += length +1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      throw new DataFormatException(
          "input is not a valid GenerationId Message");
    }
    generationId = scanner.nextLongUTF8();
    }
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    try
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_RESET_GENERATION_ID);
      // Put the generationId
      oStream.write(String.valueOf(generationId).getBytes("UTF-8"));
      oStream.write(0);
      return oStream.toByteArray();
    }
    catch (IOException e)
    {
      // never happens
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_RESET_GENERATION_ID);
    builder.appendUTF8(generationId);
    return builder.toByteArray();
  }
  /**
@@ -115,9 +84,7 @@
    return this.generationId;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/ServerStartECLMsg.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
@@ -38,26 +37,25 @@
 */
public class ServerStartECLMsg extends StartMsg
{
  private String serverURL;
  private int maxReceiveQueue;
  private int maxSendQueue;
  private int maxReceiveDelay;
  private int maxSendDelay;
  private int windowSize;
  private ServerState serverState = null;
  private final String serverURL;
  private final int maxReceiveQueue;
  private final int maxSendQueue;
  private final int maxReceiveDelay;
  private final int maxSendDelay;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  private final long heartbeatInterval;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * Creates a new ServerStartMsg. This message is to be sent by an LDAP
@@ -108,86 +106,21 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMsg.
   */
  public ServerStartECLMsg(byte[] in) throws DataFormatException
  ServerStartECLMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_START_ECL;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_START_ECL);
    try
    {
      /* first bytes are the header */
      int pos = headerLength;
      /*
       * read the ServerURL
       */
      int length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the maxReceiveDelay
       */
      length = getNextLength(in, pos);
      maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxReceiveQueue
       */
      length = getNextLength(in, pos);
      maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendDelay
       */
      length = getNextLength(in, pos);
      maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendQueue
       */
      length = getNextLength(in, pos);
      maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the windowSize
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the heartbeatInterval
       */
      length = getNextLength(in, pos);
      heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of sererid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    serverURL = scanner.nextString();
    maxReceiveDelay = scanner.nextIntUTF8();
    maxReceiveQueue = scanner.nextIntUTF8();
    maxSendDelay = scanner.nextIntUTF8();
    maxSendQueue = scanner.nextIntUTF8();
    windowSize = scanner.nextIntUTF8();
    heartbeatInterval = scanner.nextIntUTF8();
    // FIXME awful encoding
    sslEncryption = Boolean.valueOf(scanner.nextString());
    serverState = scanner.nextServerState();
  }
  /**
@@ -244,69 +177,24 @@
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
  {
    try {
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteMaxRecvDelay =
                     String.valueOf(maxReceiveDelay).getBytes("UTF-8");
      byte[] byteMaxRecvQueue =
                     String.valueOf(maxReceiveQueue).getBytes("UTF-8");
      byte[] byteMaxSendDelay =
                     String.valueOf(maxSendDelay).getBytes("UTF-8");
      byte[] byteMaxSendQueue =
                     String.valueOf(maxSendQueue).getBytes("UTF-8");
      byte[] byteWindowSize =
                     String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteHeartbeatInterval =
                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
      int length = byteServerUrl.length + 1 +
                   byteMaxRecvDelay.length + 1 +
                   byteMaxRecvQueue.length + 1 +
                   byteMaxSendDelay.length + 1 +
                   byteMaxSendQueue.length + 1 +
                   byteWindowSize.length + 1 +
                   byteHeartbeatInterval.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_START_ECL, length,
          sessionProtocolVersion);
      int pos = headerLength;
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos);
      pos = addByteArray(byteMaxSendDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    encodeHeader(MSG_TYPE_START_ECL, builder, sessionProtocolVersion);
    builder.append(serverURL);
    builder.appendUTF8(maxReceiveDelay);
    builder.appendUTF8(maxReceiveQueue);
    builder.appendUTF8(maxSendDelay);
    builder.appendUTF8(maxSendQueue);
    builder.appendUTF8(windowSize);
    builder.appendUTF8(heartbeatInterval);
    // FIXME awful encoding
    builder.append(Boolean.toString(sslEncryption));
    // Caution: ServerState MUST be the last field.
    builder.append(serverState);
    return builder.toByteArray();
  }
  /**
@@ -343,13 +231,11 @@
    return sslEncryption;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return this.getClass().getCanonicalName() + " content: " +
    return getClass().getCanonicalName() + " content: " +
      "\nprotocolVersion: " + protocolVersion +
      "\ngenerationId: " + generationId +
      "\ngroupId: " + groupId +
opends/src/server/org/opends/server/replication/protocol/ServerStartMsg.java
@@ -22,16 +22,14 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
/**
 * This message is used by LDAP server when they first connect.
@@ -40,27 +38,28 @@
 */
public class ServerStartMsg extends StartMsg
{
  private int serverId; // Id of the LDAP server that sent this message
  private String serverURL;
  private DN baseDN;
  private int maxReceiveQueue;
  private int maxSendQueue;
  private int maxReceiveDelay;
  private int maxSendDelay;
  private int windowSize;
  private ServerState serverState = null;
  /** Id of the LDAP server that sent this message */
  private final int serverId;
  private final String serverURL;
  private final DN baseDN;
  private final int maxReceiveQueue;
  private final int maxSendQueue;
  private final int maxReceiveDelay;
  private final int maxSendDelay;
  private final int windowSize;
  private final ServerState serverState;
  /**
   * The time in milliseconds between heartbeats from the replication
   * server.  Zero means heartbeats are off.
   */
  private long heartbeatInterval = 0;
  private final long heartbeatInterval;
  /**
   * Whether to continue using SSL to encrypt messages after the start
   * messages have been exchanged.
   */
  private boolean sslEncryption;
  private final boolean sslEncryption;
  /**
   * Creates a new ServerStartMsg. This message is to be sent by an LDAP
@@ -108,107 +107,22 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the ServerStartMsg.
   */
  public ServerStartMsg(byte[] in) throws DataFormatException
  ServerStartMsg(byte[] in) throws DataFormatException
  {
    byte[] allowedPduTypes = new byte[1];
    allowedPduTypes[0] = MSG_TYPE_SERVER_START;
    headerLength = decodeHeader(allowedPduTypes, in);
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    decodeHeader(scanner, MSG_TYPE_SERVER_START);
    try
    {
      /* first bytes are the header */
      int pos = headerLength;
      /*
       * read the dn
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      baseDN = DN.decode(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the ServerId
       */
      length = getNextLength(in, pos);
      String serverIdString = new String(in, pos, length, "UTF-8");
      serverId = Integer.valueOf(serverIdString);
      pos += length +1;
      /*
       * read the ServerURL
       */
      length = getNextLength(in, pos);
      serverURL = new String(in, pos, length, "UTF-8");
      pos += length +1;
      /*
       * read the maxReceiveDelay
       */
      length = getNextLength(in, pos);
      maxReceiveDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxReceiveQueue
       */
      length = getNextLength(in, pos);
      maxReceiveQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendDelay
       */
      length = getNextLength(in, pos);
      maxSendDelay = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the maxSendQueue
       */
      length = getNextLength(in, pos);
      maxSendQueue = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the windowSize
       */
      length = getNextLength(in, pos);
      windowSize = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the heartbeatInterval
       */
      length = getNextLength(in, pos);
      heartbeatInterval = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      /*
       * read the sslEncryption setting
       */
      length = getNextLength(in, pos);
      sslEncryption = Boolean.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      // Read the ServerState
      // Caution: ServerState MUST be the last field. Because ServerState can
      // contain null character (string termination of sererid string ..) it
      // cannot be decoded using getNextLength() like the other fields. The
      // only way is to rely on the end of the input buffer : and that forces
      // the ServerState to be the last. This should be changed and we want to
      // have more than one ServerState field.
      serverState = new ServerState(in, pos, in.length - 1);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
    catch (DirectoryException e)
    {
      throw new DataFormatException(e.getLocalizedMessage());
    }
    baseDN = scanner.nextDN();
    serverId = scanner.nextIntUTF8();
    serverURL = scanner.nextString();
    maxReceiveDelay = scanner.nextIntUTF8();
    maxReceiveQueue = scanner.nextIntUTF8();
    maxSendDelay = scanner.nextIntUTF8();
    maxSendQueue = scanner.nextIntUTF8();
    windowSize = scanner.nextIntUTF8();
    heartbeatInterval = scanner.nextIntUTF8();
    sslEncryption = Boolean.valueOf(scanner.nextString());
    serverState = scanner.nextServerState();
  }
  /**
@@ -284,76 +198,26 @@
    return serverState;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short sessionProtocolVersion)
  public byte[] getBytes(short protocolVersion)
  {
    try {
      byte[] byteDn = baseDN.toNormalizedString().getBytes("UTF-8");
      byte[] byteServerId = String.valueOf(serverId).getBytes("UTF-8");
      byte[] byteServerUrl = serverURL.getBytes("UTF-8");
      byte[] byteMaxRecvDelay =
                     String.valueOf(maxReceiveDelay).getBytes("UTF-8");
      byte[] byteMaxRecvQueue =
                     String.valueOf(maxReceiveQueue).getBytes("UTF-8");
      byte[] byteMaxSendDelay =
                     String.valueOf(maxSendDelay).getBytes("UTF-8");
      byte[] byteMaxSendQueue =
                     String.valueOf(maxSendQueue).getBytes("UTF-8");
      byte[] byteWindowSize =
                     String.valueOf(windowSize).getBytes("UTF-8");
      byte[] byteHeartbeatInterval =
                     String.valueOf(heartbeatInterval).getBytes("UTF-8");
      byte[] byteSSLEncryption =
                     String.valueOf(sslEncryption).getBytes("UTF-8");
      byte[] byteServerState = serverState.getBytes();
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    encodeHeader(MSG_TYPE_SERVER_START, builder, protocolVersion);
      int length = byteDn.length + 1 + byteServerId.length + 1 +
                   byteServerUrl.length + 1 +
                   byteMaxRecvDelay.length + 1 +
                   byteMaxRecvQueue.length + 1 +
                   byteMaxSendDelay.length + 1 +
                   byteMaxSendQueue.length + 1 +
                   byteWindowSize.length + 1 +
                   byteHeartbeatInterval.length + 1 +
                   byteSSLEncryption.length + 1 +
                   byteServerState.length + 1;
      /* encode the header in a byte[] large enough to also contain the mods */
      byte resultByteArray[] = encodeHeader(MSG_TYPE_SERVER_START, length,
          sessionProtocolVersion);
      int pos = headerLength;
      pos = addByteArray(byteDn, resultByteArray, pos);
      pos = addByteArray(byteServerId, resultByteArray, pos);
      pos = addByteArray(byteServerUrl, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxRecvQueue, resultByteArray, pos);
      pos = addByteArray(byteMaxSendDelay, resultByteArray, pos);
      pos = addByteArray(byteMaxSendQueue, resultByteArray, pos);
      pos = addByteArray(byteWindowSize, resultByteArray, pos);
      pos = addByteArray(byteHeartbeatInterval, resultByteArray, pos);
      pos = addByteArray(byteSSLEncryption, resultByteArray, pos);
      pos = addByteArray(byteServerState, resultByteArray, pos);
      return resultByteArray;
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
    builder.append(baseDN);
    builder.appendUTF8(serverId);
    builder.append(serverURL);
    builder.appendUTF8(maxReceiveDelay);
    builder.appendUTF8(maxReceiveQueue);
    builder.appendUTF8(maxSendDelay);
    builder.appendUTF8(maxSendQueue);
    builder.appendUTF8(windowSize);
    builder.appendUTF8(heartbeatInterval);
    builder.append(Boolean.toString(sslEncryption));
    // Caution: ServerState MUST be the last field.
    builder.append(serverState);
    return builder.toByteArray();
  }
  /**
@@ -390,9 +254,7 @@
    return sslEncryption;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/StartECLSessionMsg.java
@@ -22,12 +22,10 @@
 *
 *
 *      Copyright 2009-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -150,7 +148,7 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public StartECLSessionMsg(byte[] in) throws DataFormatException
  StartECLSessionMsg(byte[] in) throws DataFormatException
  {
    /*
     * The message is stored in the form:
@@ -158,68 +156,25 @@
     * <list of referrals urls>
     * (each referral url terminates with 0)
     */
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_START_ECL_SESSION)
    {
      // first bytes are the header
      int pos = 0;
      // first byte is the type
      if (in.length < 1 || in[pos++] != MSG_TYPE_START_ECL_SESSION)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
      throw new DataFormatException("Input is not a valid "
          + getClass().getCanonicalName());
      }
      // start mode
      int length = getNextLength(in, pos);
      int requestType = Integer.parseInt(new String(in, pos, length, "UTF-8"));
      eclRequestType = ECLRequestType.values()[requestType];
      pos += length +1;
      length = getNextLength(in, pos);
      firstChangeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      length = getNextLength(in, pos);
      lastChangeNumber = Integer.valueOf(new String(in, pos, length, "UTF-8"));
      pos += length +1;
      length = getNextLength(in, pos);
      csn = new CSN(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      // persistentSearch mode
      length = getNextLength(in, pos);
      int persistent = Integer.parseInt(new String(in, pos, length, "UTF-8"));
      isPersistent = Persistent.values()[persistent];
      pos += length + 1;
      // generalized state
      length = getNextLength(in, pos);
      crossDomainServerState = new String(in, pos, length, "UTF-8");
      pos += length + 1;
      length = getNextLength(in, pos);
      operationId = new String(in, pos, length, "UTF-8");
      pos += length + 1;
      // excluded DN
      length = getNextLength(in, pos);
      String excludedDNsString = new String(in, pos, length, "UTF-8");
    eclRequestType = ECLRequestType.values()[scanner.nextIntUTF8()];
    firstChangeNumber = scanner.nextIntUTF8();
    lastChangeNumber = scanner.nextIntUTF8();
    csn = scanner.nextCSNUTF8();
    isPersistent = Persistent.values()[scanner.nextIntUTF8()];
    crossDomainServerState = scanner.nextString();
    operationId = scanner.nextString();
    final String excludedDNsString = scanner.nextString();
      if (excludedDNsString.length()>0)
      {
        String[] excludedDNsStr = excludedDNsString.split(";");
        Collections.addAll(this.excludedBaseDNs, excludedDNsStr);
      }
      pos += length + 1;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
      Collections.addAll(excludedBaseDNs, excludedDNsString.split(";"));
    }
  }
@@ -238,71 +193,26 @@
    excludedBaseDNs = new HashSet<String>();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    String excludedBaseDNsString =
        StaticUtils.collectionToString(excludedBaseDNs, ";");
    try
    {
      byte[] byteMode = toBytes(eclRequestType.ordinal());
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_START_ECL_SESSION);
    builder.appendUTF8(eclRequestType.ordinal());
      // FIXME JNR Changing the lines below to use long would require a protocol
      // version change. Leave it like this for now until the need arises.
      byte[] byteChangeNumber = toBytes((int) firstChangeNumber);
      byte[] byteStopChangeNumber = toBytes((int) lastChangeNumber);
      byte[] byteCSN = csn.toString().getBytes("UTF-8");
      byte[] bytePsearch = toBytes(isPersistent.ordinal());
      byte[] byteGeneralizedState = toBytes(crossDomainServerState);
      byte[] byteOperationId = toBytes(operationId);
      byte[] byteExcludedDNs = toBytes(excludedBaseDNsString);
      int length =
        byteMode.length + 1 +
        byteChangeNumber.length + 1 +
        byteStopChangeNumber.length + 1 +
        byteCSN.length + 1 +
        bytePsearch.length + 1 +
        byteGeneralizedState.length + 1 +
        byteOperationId.length + 1 +
        byteExcludedDNs.length + 1 +
        1;
      byte[] resultByteArray = new byte[length];
      int pos = 0;
      resultByteArray[pos++] = MSG_TYPE_START_ECL_SESSION;
      pos = addByteArray(byteMode, resultByteArray, pos);
      pos = addByteArray(byteChangeNumber, resultByteArray, pos);
      pos = addByteArray(byteStopChangeNumber, resultByteArray, pos);
      pos = addByteArray(byteCSN, resultByteArray, pos);
      pos = addByteArray(bytePsearch, resultByteArray, pos);
      pos = addByteArray(byteGeneralizedState, resultByteArray, pos);
      pos = addByteArray(byteOperationId, resultByteArray, pos);
      pos = addByteArray(byteExcludedDNs, resultByteArray, pos);
      return resultByteArray;
    } catch (IOException e)
    {
      // never happens
      return null;
    }
    builder.appendUTF8((int) firstChangeNumber);
    builder.appendUTF8((int) lastChangeNumber);
    builder.appendUTF8(csn);
    builder.appendUTF8(isPersistent.ordinal());
    builder.append(crossDomainServerState);
    builder.append(operationId);
    builder.append(StaticUtils.collectionToString(excludedBaseDNs, ";"));
    return builder.toByteArray();
  }
  private byte[] toBytes(int i) throws UnsupportedEncodingException
  {
    return toBytes(String.valueOf(i));
  }
  private byte[] toBytes(String s) throws UnsupportedEncodingException
  {
    return String.valueOf(s).getBytes("UTF-8");
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/protocol/StartMsg.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This abstract message class is the superclass for start messages used
 * by LDAP servers and Replication servers to initiate their communications.
@@ -43,12 +41,7 @@
  /** Generation id of data set we want to work with. */
  protected long  generationId;
  /** Group id of the replicated domain. */
  protected byte groupId = (byte)-1;
  /**
   * The length of the header of this message.
   */
  protected int headerLength;
  protected byte groupId = -1;
  /**
   * Create a new StartMsg.
@@ -66,7 +59,7 @@
   * @param generationId    The generationId for this server.
   *
   */
  public StartMsg(short protocolVersion, long generationId)
  StartMsg(short protocolVersion, long generationId)
  {
    this.protocolVersion = protocolVersion;
    this.generationId = generationId;
@@ -75,171 +68,68 @@
  /**
   * Encode the header for the start message.
   *
   * @param type The type of the message to create.
   * @param additionalLength Additional length needed to encode the remaining
   * @param msgType The type of the message to create.
   * @param builder Additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @param sessionProtocolVersion  The version to use when encoding the header.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param protocolVersion  The version to use when encoding the header.
   */
  public byte[] encodeHeader(
      byte type, int additionalLength,
      short sessionProtocolVersion)
  throws UnsupportedEncodingException
  void encodeHeader(byte msgType, ByteArrayBuilder builder, short protocolVersion)
  {
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
    /* The message header is stored in the form :
     * <message type><protocol version><generation id><group id>
     */
    int length = 1 + 1 + byteGenerationID.length + 1 + 1 +
                     additionalLength;
    byte[] encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)sessionProtocolVersion;
    /* put the generationId */
    int pos = 2;
    pos = addByteArray(byteGenerationID, encodedMsg, pos);
    /* put the group id */
    encodedMsg[pos] = groupId;
    pos++;
    headerLength = pos;
    return encodedMsg;
    builder.append(msgType);
    builder.append((byte) protocolVersion);
    builder.appendUTF8(generationId);
    builder.append(groupId);
  }
  /**
   * Encode the header for the start message. This uses the version 1 of the
   * replication protocol (used for compatibility purpose).
   *
   * @param type The type of the message to create.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMessage.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMessage as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType The type of the message to create.
   * @param builder The builder where to append the remaining part of the
   *                UpdateMessage.
   */
  public byte[] encodeHeader_V1(byte type, int additionalLength)
  throws UnsupportedEncodingException
  void encodeHeader_V1(byte msgType, ByteArrayBuilder builder)
  {
    byte[] byteGenerationID =
      String.valueOf(generationId).getBytes("UTF-8");
    /* The message header is stored in the form :
     * <message type><protocol version><generation id>
     */
    int length = 1 + 1 + 1 +
                     byteGenerationID.length + 1 +
                     additionalLength;
    byte[] encodedMsg = new byte[length];
    /* put the type of the operation */
    encodedMsg[0] = type;
    /* put the protocol version */
    encodedMsg[1] = (byte)ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL;
    encodedMsg[2] = (byte)0;
    /* put the generationId */
    int pos = 3;
    headerLength = addByteArray(byteGenerationID, encodedMsg, pos);
    return encodedMsg;
    builder.append(msgType);
    builder.append((byte) ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
    builder.append((byte) 0);
    builder.appendUTF8(generationId);
  }
  /**
   * Decode the Header part of this message, and check its type.
   *
   * @param types The allowed types of this message.
   * @param encodedMsg the encoded form of the message.
   * @return the position at which the remaining part of the message starts.
   * @param scanner where to read the message from.
   * @param allowedTypes The allowed types of this message.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader(byte[] types, byte [] encodedMsg)
  void decodeHeader(final ByteArrayScanner scanner, byte... allowedTypes)
  throws DataFormatException
  {
    /* first byte is the type */
    boolean foundMatchingType = false;
    for (byte type : types) {
      if (type == encodedMsg[0]) {
        foundMatchingType = true;
        break;
    final byte msgType = scanner.nextByte();
    if (!isTypeAllowed(allowedTypes, msgType))
    {
      throw new DataFormatException("byte[] is not a valid start msg: "
          + msgType);
      }
    }
    if (!foundMatchingType)
      throw new DataFormatException("byte[] is not a valid start msg: " +
        encodedMsg[0]);
    final byte version = scanner.nextByte();
    // Filter for supported old versions PDUs
    if (encodedMsg[0] == MSG_TYPE_REPL_SERVER_START_V1)
      return decodeHeader_V1(MSG_TYPE_REPL_SERVER_START_V1, encodedMsg);
    try
    if (msgType == MSG_TYPE_REPL_SERVER_START_V1)
    {
      /* then read the version */
      short readVersion = (short)encodedMsg[1];
      if (readVersion < ProtocolVersion.REPLICATION_PROTOCOL_V2)
        throw new DataFormatException("Not a valid message: type is " +
          encodedMsg[0] + " but protocol version byte is " + readVersion +
          " instead of " + ProtocolVersion.getCurrentVersion());
      protocolVersion = readVersion;
      /* read the generationId */
      int pos = 2;
      int length = getNextLength(encodedMsg, pos);
      generationId = Long.valueOf(new String(encodedMsg, pos, length,
          "UTF-8"));
      pos += length +1;
      /* read the group id */
      groupId = encodedMsg[pos];
      pos++;
      return pos;
    } catch (UnsupportedEncodingException e)
      if (version != ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * Decode the Header part of this message, and check its type. This uses the
   * version 1 of the replication protocol (used for compatibility purpose).
   *
   * @param type The type of this message.
   * @param encodedMsg the encoded form of the message.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   */
  public int decodeHeader_V1(byte type, byte [] encodedMsg)
  throws DataFormatException
  {
    if (encodedMsg[0] != type)
      throw new DataFormatException("byte[] is not a valid start msg: expected "
        + " a V1 PDU, received: " + encodedMsg[0]);
    if (encodedMsg[1] != ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL)
    {
      throw new DataFormatException("Not a valid message: type is " +
        type + " but protocol version byte is " + encodedMsg[1] + " instead of "
        throw new DataFormatException("Not a valid message: type is " + msgType
            + " but protocol version byte is " + version + " instead of "
        + ProtocolVersion.REPLICATION_PROTOCOL_V1_REAL);
    }
@@ -248,23 +138,35 @@
    // into REPLICATION_PROTOCOL_V1 so that we only see V1 everywhere.
    protocolVersion = ProtocolVersion.REPLICATION_PROTOCOL_V1;
    try
    {
      // In V1, version was 1 (49) in string, so with a null
      // terminating string. Let's position the cursor at the next byte
      int pos = 3;
      /* read the generationId */
      int length = getNextLength(encodedMsg, pos);
      generationId = Long.valueOf(new String(encodedMsg, pos, length,
          "UTF-8"));
      pos += length +1;
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
      scanner.skipZeroSeparator();
      generationId = scanner.nextLongUTF8();
    }
    else
    {
      if (version < ProtocolVersion.REPLICATION_PROTOCOL_V2)
      {
        throw new DataFormatException("Not a valid message: type is " + msgType
            + " but protocol version byte is " + version + " instead of "
            + ProtocolVersion.getCurrentVersion());
      }
      protocolVersion = version;
      generationId = scanner.nextLongUTF8();
      groupId = scanner.nextByte();
    }
  }
  private boolean isTypeAllowed(byte[] allowedTypes, final byte msgType)
  {
    for (byte allowedType : allowedTypes)
    {
      if (msgType == allowedType)
      {
        return true;
      }
    }
    return false;
  }
  /**
opends/src/server/org/opends/server/replication/protocol/StartSessionMsg.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -80,7 +77,7 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public StartSessionMsg(byte[] in, short version) throws DataFormatException
  StartSessionMsg(byte[] in, short version) throws DataFormatException
  {
    if (version <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
@@ -110,35 +107,21 @@
    this.safeDataLevel = safeDataLevel;
  }
  /**
   * Creates a new message with the given required parameters.
   * Assured mode is false.
   * @param status Status we are starting with
   * @param referralsURLs Referrals URLs to be used by peer DSs
   */
  public StartSessionMsg(ServerStatus status, Collection<String> referralsURLs)
  {
    this.referralsURLs.addAll(referralsURLs);
    this.status = status;
    this.assuredFlag = false;
  }
  // ============
  // Msg encoding
  // ============
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    if (reqProtocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    if (protocolVersion <= ProtocolVersion.REPLICATION_PROTOCOL_V3)
    {
      return getBytes_V23();
    }
    else
    {
      return getBytes_V45(reqProtocolVersion);
      return getBytes_V45(protocolVersion);
    }
  }
@@ -157,7 +140,9 @@
      writer.writeStartSequence();
      for (String url : referralsURLs)
      {
        writer.writeOctetString(url);
      }
      writer.writeEndSequence();
      writer.writeStartSequence();
@@ -193,57 +178,37 @@
     * <list of referrals urls>
     * (each referral url terminates with 0)
     */
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_START_SESSION);
    builder.append(status.getValue());
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    try
    {
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
      /* Put the message type */
      oStream.write(MSG_TYPE_START_SESSION);
      // Put the status
      oStream.write(status.getValue());
      // Put the assured flag
      oStream.write(assuredFlag ? (byte) 1 : (byte) 0);
      // Put assured mode
      oStream.write(assuredMode.getValue());
      // Put safe data level
      oStream.write(safeDataLevel);
      // Put the referrals URLs
      if (referralsURLs.size() >= 1)
      {
        for (String url : referralsURLs)
        {
          byte[] byteArrayURL = url.getBytes("UTF-8");
          oStream.write(byteArrayURL);
          oStream.write(0);
        builder.append(url);
        }
      }
      return oStream.toByteArray();
    } catch (IOException e)
    {
      // never happens
      return null;
    }
    return builder.toByteArray();
  }
  // ============
  // Msg decoding
  // ============
  private void decode_V45(byte[] in, short version)
  throws DataFormatException
  private void decode_V45(byte[] in, short version) throws DataFormatException
  {
    ByteSequenceReader reader = ByteString.wrap(in).asReader();
    try
    {
      if (reader.get() != MSG_TYPE_START_SESSION)
        throw new DataFormatException("input is not a valid " +
            this.getClass().getCanonicalName());
      {
        throw new DataFormatException("input is not a valid "
            + getClass().getCanonicalName());
      }
      /*
      status = ServerStatus.valueOf(asn1Reader.readOctetString().byteAt(0));
@@ -279,8 +244,7 @@
        asn1Reader.readStartSequence();
        while (asn1Reader.hasNextElement())
        {
          String s = asn1Reader.readOctetStringAsString();
          this.eclIncludesForDeletes.add(s);
          this.eclIncludesForDeletes.add(asn1Reader.readOctetStringAsString());
        }
        asn1Reader.readEndSequence();
      }
@@ -296,8 +260,7 @@
    }
  }
  private void decode_V23(byte[] in)
  throws DataFormatException
  private void decode_V23(byte[] in) throws DataFormatException
  {
    /*
     * The message is stored in the form:
@@ -305,46 +268,22 @@
     * <list of referrals urls>
     * (each referral url terminates with 0)
     */
    try
    {
      /* first byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_START_SESSION)
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_START_SESSION)
      {
        throw new DataFormatException(
          "Input is not a valid " + this.getClass().getCanonicalName());
          "Input is not a valid " + getClass().getCanonicalName());
      }
      /* Read the status */
      status = ServerStatus.valueOf(in[1]);
    status = ServerStatus.valueOf(scanner.nextByte());
    assuredFlag = scanner.nextBoolean();
    assuredMode = AssuredMode.valueOf(scanner.nextByte());
    safeDataLevel = scanner.nextByte();
      /* Read the assured flag */
      assuredFlag = in[2] == 1;
      /* Read the assured mode */
      assuredMode = AssuredMode.valueOf(in[3]);
      /* Read the safe data level */
      safeDataLevel = in[4];
      /* Read the referrals URLs */
      int pos = 5;
      while (pos < in.length)
    while (!scanner.isEmpty())
      {
        /*
         * Read the next URL
         * first calculate the length then construct the string
         */
        int length = getNextLength(in, pos);
        referralsURLs.add(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
      }
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
      referralsURLs.add(scanner.nextString());
    }
  }
opends/src/server/org/opends/server/replication/protocol/TopologyMsg.java
@@ -26,9 +26,6 @@
 */
package org.opends.server.replication.protocol;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.*;
import java.util.zip.DataFormatException;
@@ -37,6 +34,8 @@
import org.opends.server.replication.common.RSInfo;
import org.opends.server.replication.common.ServerStatus;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
 * This class defines a message that is sent:
 * - By a RS to the other RSs in the topology, containing:
@@ -68,86 +67,67 @@
   * @throws java.util.zip.DataFormatException If the byte array does not
   * contain a valid encoded form of the message.
   */
  public TopologyMsg(byte[] in, short version) throws DataFormatException
  TopologyMsg(byte[] in, short version) throws DataFormatException
  {
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_TOPOLOGY)
    {
      /* First byte is the type */
      if (in.length < 1 || in[0] != MSG_TYPE_TOPOLOGY)
      {
        throw new DataFormatException(
          "Input is not a valid " + getClass().getCanonicalName());
      throw new DataFormatException("Input is not a valid "
          + getClass().getCanonicalName());
      }
      int pos = 1;
      /* Read number of following DS info entries */
      byte nDsInfo = in[pos++];
      /* Read the DS info entries */
      Map<Integer, DSInfo> replicaInfos =
    // Read the DS info entries, first read number of them
    int nDsInfo = scanner.nextByte();
    final Map<Integer, DSInfo> replicaInfos =
          new HashMap<Integer, DSInfo>(Math.max(0, nDsInfo));
      while (nDsInfo > 0 && pos < in.length)
    while (nDsInfo > 0 && !scanner.isEmpty())
      {
        /* Read DS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int dsId = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read DS URL */
        String dsUrl;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
        {
          length = getNextLength(in, pos);
          dsUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
        }
        else
        {
          dsUrl = "";
      final DSInfo dsInfo = nextDSInfo(scanner, version);
      replicaInfos.put(dsInfo.getDsId(), dsInfo);
      nDsInfo--;
        }
        /* Read RS id */
        length = getNextLength(in, pos);
        serverIdString = new String(in, pos, length, "UTF-8");
        int rsId = Integer.valueOf(serverIdString);
        pos += length + 1;
    // Read the RS info entries
    int nRsInfo = scanner.nextByte();
    final List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
    while (nRsInfo > 0 && !scanner.isEmpty())
    {
      rsInfos.add(nextRSInfo(scanner, version));
      nRsInfo--;
    }
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
    this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
    this.rsInfos = Collections.unmodifiableList(rsInfos);
  }
        /* Read DS status */
        ServerStatus status = ServerStatus.valueOf(in[pos++]);
  private DSInfo nextDSInfo(ByteArrayScanner scanner, short version)
      throws DataFormatException
  {
    final int dsId = scanner.nextIntUTF8();
    final String dsUrl =
        version < REPLICATION_PROTOCOL_V6 ? "" : scanner.nextString();
    final int rsId = scanner.nextIntUTF8();
    final long generationId = scanner.nextLongUTF8();
    final ServerStatus status = ServerStatus.valueOf(scanner.nextByte());
    final boolean assuredFlag = scanner.nextBoolean();
    final AssuredMode assuredMode = AssuredMode.valueOf(scanner.nextByte());
    final byte safeDataLevel = scanner.nextByte();
    final byte groupId = scanner.nextByte();
        /* Read DS assured flag */
        boolean assuredFlag = in[pos++] == 1;
    final List<String> refUrls = new ArrayList<String>();
    scanner.nextStrings(refUrls);
        /* Read DS assured mode */
        AssuredMode assuredMode = AssuredMode.valueOf(in[pos++]);
        /* Read DS safe data level */
        byte safeDataLevel = in[pos++];
        /* Read DS group id */
        byte groupId = in[pos++];
        /* Read number of referrals URLs */
        List<String> refUrls = new ArrayList<String>();
        pos = readStrings(in, pos, refUrls);
        Set<String> attrs = new HashSet<String>();
        Set<String> delattrs = new HashSet<String>();
    final Set<String> attrs = new HashSet<String>();
    final Set<String> delattrs = new HashSet<String>();
        short protocolVersion = -1;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    if (version >= REPLICATION_PROTOCOL_V4)
        {
          pos = readStrings(in, pos, attrs);
      scanner.nextStrings(attrs);
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
      if (version >= REPLICATION_PROTOCOL_V5)
          {
            pos = readStrings(in, pos, delattrs);
        scanner.nextStrings(delattrs);
          }
          else
          {
@@ -155,82 +135,30 @@
            delattrs.addAll(attrs);
          }
          /* Read Protocol version */
          protocolVersion = in[pos++];
      protocolVersion = scanner.nextByte();
        }
        /* Now create DSInfo and store it */
        replicaInfos.put(dsId, new DSInfo(dsId, dsUrl, rsId, generationId,
            status, assuredFlag, assuredMode, safeDataLevel, groupId, refUrls,
            attrs, delattrs, protocolVersion));
        nDsInfo--;
    return new DSInfo(dsId, dsUrl, rsId, generationId, status, assuredFlag,
        assuredMode, safeDataLevel, groupId, refUrls, attrs, delattrs,
        protocolVersion);
      }
      /* Read number of following RS info entries */
      byte nRsInfo = in[pos++];
      /* Read the RS info entries */
      List<RSInfo> rsInfos = new ArrayList<RSInfo>(Math.max(0, nRsInfo));
      while (nRsInfo > 0 && pos < in.length)
  private RSInfo nextRSInfo(ByteArrayScanner scanner, short version)
      throws DataFormatException
      {
        /* Read RS id */
        int length = getNextLength(in, pos);
        String serverIdString = new String(in, pos, length, "UTF-8");
        int id = Integer.valueOf(serverIdString);
        pos += length + 1;
        /* Read the generation id */
        length = getNextLength(in, pos);
        long generationId = Long.valueOf(new String(in, pos, length, "UTF-8"));
        pos += length + 1;
        /* Read RS group id */
        byte groupId = in[pos++];
    final int rsId = scanner.nextIntUTF8();
    final long generationId = scanner.nextLongUTF8();
    final byte groupId = scanner.nextByte();
        int weight = 1;
        String serverUrl = null;
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
    if (version >= REPLICATION_PROTOCOL_V4)
        {
          length = getNextLength(in, pos);
          serverUrl = new String(in, pos, length, "UTF-8");
          pos += length + 1;
          /* Read RS weight */
          length = getNextLength(in, pos);
          weight = Integer.valueOf(new String(in, pos, length, "UTF-8"));
          pos += length + 1;
      serverUrl = scanner.nextString();
      weight = scanner.nextIntUTF8();
        }
        /* Now create RSInfo and store it */
        rsInfos.add(new RSInfo(id, serverUrl, generationId, groupId, weight));
        nRsInfo--;
      }
      this.replicaInfos = Collections.unmodifiableMap(replicaInfos);
      this.rsInfos = Collections.unmodifiableList(rsInfos);
    }
    catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  private int readStrings(byte[] in, int pos, Collection<String> outputCol)
      throws DataFormatException, UnsupportedEncodingException
  {
    byte nAttrs = in[pos++];
    byte nRead = 0;
    // Read all elements until expected number read
    while (nRead != nAttrs && pos < in.length)
    {
      int length = getNextLength(in, pos);
      outputCol.add(new String(in, pos, length, "UTF-8"));
      pos += length + 1;
      nRead++;
    }
    return pos;
    return new RSInfo(rsId, serverUrl, generationId, groupId, weight);
  }
  /**
@@ -272,122 +200,62 @@
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short version) throws UnsupportedEncodingException
  {
    try
  public byte[] getBytes(short version)
    {
      /**
       * Message has the following form:
       * <pdu type><number of following DSInfo entries>[<DSInfo>]*
       * <number of following RSInfo entries>[<RSInfo>]*
       */
      ByteArrayOutputStream oStream = new ByteArrayOutputStream();
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_TOPOLOGY);
      /* Put the message type */
      oStream.write(MSG_TYPE_TOPOLOGY);
      // Put number of following DS info entries
      oStream.write((byte) replicaInfos.size());
      // Put DS info
    // Put DS infos
    builder.append((byte) replicaInfos.size());
      for (DSInfo dsInfo : replicaInfos.values())
      {
        // Put DS id
        byte[] byteServerId =
          String.valueOf(dsInfo.getDsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V6)
      builder.appendUTF8(dsInfo.getDsId());
      if (version >= REPLICATION_PROTOCOL_V6)
        {
          // Put DS URL
          oStream.write(dsInfo.getDsUrl().getBytes("UTF-8"));
          oStream.write(0);
        builder.append(dsInfo.getDsUrl());
        }
        // Put RS id
        byteServerId = String.valueOf(dsInfo.getRsId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
        oStream.write(String.valueOf(dsInfo.getGenerationId()).
            getBytes("UTF-8"));
        oStream.write(0);
        // Put DS status
        oStream.write(dsInfo.getStatus().getValue());
        // Put DS assured flag
        oStream.write(dsInfo.isAssured() ? (byte) 1 : (byte) 0);
        // Put DS assured mode
        oStream.write(dsInfo.getAssuredMode().getValue());
        // Put DS safe data level
        oStream.write(dsInfo.getSafeDataLevel());
        // Put DS group id
        oStream.write(dsInfo.getGroupId());
      builder.appendUTF8(dsInfo.getRsId());
      builder.appendUTF8(dsInfo.getGenerationId());
      builder.append(dsInfo.getStatus().getValue());
      builder.append(dsInfo.isAssured());
      builder.append(dsInfo.getAssuredMode().getValue());
      builder.append(dsInfo.getSafeDataLevel());
      builder.append(dsInfo.getGroupId());
        writeStrings(oStream, dsInfo.getRefUrls());
      builder.appendStrings(dsInfo.getRefUrls());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      if (version >= REPLICATION_PROTOCOL_V4)
        {
          // Put ECL includes
          writeStrings(oStream, dsInfo.getEclIncludes());
          if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V5)
        builder.appendStrings(dsInfo.getEclIncludes());
        if (version >= REPLICATION_PROTOCOL_V5)
          {
            writeStrings(oStream, dsInfo.getEclIncludesForDeletes());
          builder.appendStrings(dsInfo.getEclIncludesForDeletes());
          }
          oStream.write(dsInfo.getProtocolVersion());
        builder.append((byte) dsInfo.getProtocolVersion());
        }
      }
      // Put number of following RS info entries
      oStream.write((byte) rsInfos.size());
      // Put RS info
    // Put RS infos
    builder.append((byte) rsInfos.size());
      for (RSInfo rsInfo : rsInfos)
      {
        // Put RS id
        byte[] byteServerId =
          String.valueOf(rsInfo.getId()).getBytes("UTF-8");
        oStream.write(byteServerId);
        oStream.write(0);
        // Put the generation id
        oStream.write(String.valueOf(rsInfo.getGenerationId()).
          getBytes("UTF-8"));
        oStream.write(0);
        // Put RS group id
        oStream.write(rsInfo.getGroupId());
      builder.appendUTF8(rsInfo.getId());
      builder.appendUTF8(rsInfo.getGenerationId());
      builder.append(rsInfo.getGroupId());
        if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V4)
      if (version >= REPLICATION_PROTOCOL_V4)
        {
          // Put server URL
          oStream.write(rsInfo.getServerUrl().getBytes("UTF-8"));
          oStream.write(0);
          // Put RS weight
          oStream.write(String.valueOf(rsInfo.getWeight()).getBytes("UTF-8"));
          oStream.write(0);
        builder.append(rsInfo.getServerUrl());
        builder.appendUTF8(rsInfo.getWeight());
        }
      }
      return oStream.toByteArray();
    }
    catch (IOException e)
    {
      // never happens
      throw new RuntimeException(e);
    }
  }
  private void writeStrings(ByteArrayOutputStream oStream,
      Collection<String> col) throws IOException, UnsupportedEncodingException
  {
    // Put collection length as a byte
    oStream.write(col.size());
    for (String elem : col)
    {
      // Write the element and a 0 terminating byte
      oStream.write(elem.getBytes("UTF-8"));
      oStream.write(0);
    }
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
@@ -414,7 +282,7 @@
      + "CONNECTED RS SERVERS:"
      + "\n--------------------\n"
      + rsStr
      + (rsStr.equals("") ? "----------------------------\n" : "");
      + ("".equals(rsStr) ? "----------------------------\n" : "");
  }
  /**
opends/src/server/org/opends/server/replication/protocol/UpdateMsg.java
@@ -22,16 +22,17 @@
 *
 *
 *      Copyright 2006-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
/**
 * Abstract class that must be extended to define a message
 * used for sending Updates between servers.
@@ -39,42 +40,31 @@
public class UpdateMsg extends ReplicationMsg
                                    implements Comparable<UpdateMsg>
{
  /**
   * Protocol version.
   */
  /** Protocol version. */
  protected short protocolVersion;
  /**
   * The CSN of this update.
   */
  /** The CSN of this update. */
  protected CSN csn;
  /**
   * True when the update must use assured replication.
   */
  /** True when the update must use assured replication. */
  protected boolean assuredFlag = false;
  /**
   * When assuredFlag is true, defines the requested assured mode.
   */
  /** When assuredFlag is true, defines the requested assured mode. */
  protected AssuredMode assuredMode = AssuredMode.SAFE_DATA_MODE;
  /**
   * When assured mode is safe data, gives the requested level.
   */
  /** When assured mode is safe data, gives the requested level. */
  protected byte safeDataLevel = (byte)1;
  /**
   * The payload that must be encoded in this message.
   */
  private byte[] payload;
  /** The payload that must be encoded in this message. */
  private final byte[] payload;
  /**
   * Creates a new empty UpdateMsg.
   */
  protected UpdateMsg()
  {}
  {
    payload = null;
  }
  /**
   * Creates a new UpdateMsg with the given information.
@@ -85,25 +75,10 @@
   */
  UpdateMsg(byte[] bytes) throws DataFormatException
  {
    // Decode header
    int pos = decodeHeader(MSG_TYPE_GENERIC_UPDATE, bytes);
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    decodeHeader(MSG_TYPE_GENERIC_UPDATE, scanner);
    // Read the payload : all the remaining bytes but the terminating 0
    int length = bytes.length - pos;
    payload = new byte[length];
    try
    {
      System.arraycopy(bytes, pos, payload, 0, length);
    } catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (ArrayStoreException e)
    {
      throw new DataFormatException(e.getMessage());
    } catch (NullPointerException e)
    {
      throw new DataFormatException(e.getMessage());
    }
    payload = scanner.remainingBytes();
  }
  /**
@@ -152,9 +127,7 @@
    assuredFlag = assured;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object obj)
  {
@@ -162,18 +135,14 @@
        csn.equals(((UpdateMsg) obj).csn);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    return csn.hashCode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int compareTo(UpdateMsg msg)
  {
@@ -243,103 +212,50 @@
   * Encode the common header for all the UpdateMsg. This uses the current
   * protocol version.
   *
   * @param type the type of UpdateMsg to encode.
   * @param additionalLength additional length needed to encode the remaining
   *                         part of the UpdateMsg.
   * @param version The ProtocolVersion to use when encoding.
   * @return a byte array containing the common header and enough space to
   *         encode the remaining bytes of the UpdateMsg as was specified
   *         by the additionalLength.
   *         (byte array length = common header length + additionalLength)
   * @throws UnsupportedEncodingException if UTF-8 is not supported.
   * @param msgType The type of UpdateMsg to encode.
   * @param protocolVersion The ProtocolVersion to use when encoding.
   * @return a byte array builder containing the common header
   */
  protected byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  protected ByteArrayBuilder encodeHeader(byte msgType, short protocolVersion)
  {
    byte[] csnByte = getCSN().toString().getBytes("UTF-8");
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><assured>
     * <assured mode> <safe data level>
     * the length of result byte array is therefore :
     *   1 + 1 + CSN length + 1 + 1
     *   + 1 + 1 + additional_length
     */
    int length = 6 + csnByte.length + additionalLength;
    byte[] encodedMsg = new byte[length];
    // put the type of the operation
    encodedMsg[0] = type;
    // put the protocol version
    encodedMsg[1] = (byte)ProtocolVersion.getCurrentVersion();
    int pos = 2;
    // Put the CSN
    pos = addByteArray(csnByte, encodedMsg, pos);
    // Put the assured flag
    encodedMsg[pos++] = (assuredFlag ? (byte) 1 : 0);
    // Put the assured mode
    encodedMsg[pos++] = assuredMode.getValue();
    // Put the safe data level
    encodedMsg[pos++] = safeDataLevel;
    return encodedMsg;
    final ByteArrayBuilder builder =
        new ByteArrayBuilder(bytes(6) + csnsUTF8(1));
    builder.append(msgType);
    builder.append((byte) ProtocolVersion.getCurrentVersion());
    builder.appendUTF8(getCSN());
    builder.append(assuredFlag);
    builder.append(assuredMode.getValue());
    builder.append(safeDataLevel);
    return builder;
  }
  /**
   * Decode the Header part of this Update Message, and check its type.
   *
   * @param type The allowed type of this Update Message.
   * @param encodedMsg the encoded form of the UpdateMsg.
   * @return the position at which the remaining part of the message starts.
   * @throws DataFormatException if the encodedMsg does not contain a valid
   *         common header.
   * @param allowedType The allowed type of this Update Message.
   * @param scanner The encoded form of the UpdateMsg.
   * @throws DataFormatException
   *           if the scanner does not contain a valid common header.
   */
  protected int decodeHeader(byte type, byte[] encodedMsg)
  protected void decodeHeader(byte allowedType, ByteArrayScanner scanner)
                          throws DataFormatException
  {
    /* The message header is stored in the form :
     * <operation type><protocol version><CSN><assured>
     * <assured mode> <safe data level>
     */
    if (!(type == encodedMsg[0]))
    final byte msgType = scanner.nextByte();
    if (allowedType != msgType)
    {
      throw new DataFormatException("byte[] is not a valid update msg: "
        + encodedMsg[0]);
    // read the protocol version
    protocolVersion = encodedMsg[1];
    try
    {
      // Read the CSN
      int pos = 2;
      int length = getNextLength(encodedMsg, pos);
      String csnStr = new String(encodedMsg, pos, length, "UTF-8");
      pos += length + 1;
      csn = new CSN(csnStr);
      // Read the assured information
      assuredFlag = encodedMsg[pos++] == 1;
      // Read the assured mode
      assuredMode = AssuredMode.valueOf(encodedMsg[pos++]);
      // Read the safe data level
      safeDataLevel = encodedMsg[pos++];
      return pos;
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    } catch (IllegalArgumentException e)
    {
      throw new DataFormatException(e.getMessage());
          + msgType);
    }
    protocolVersion = scanner.nextByte();
    csn = scanner.nextCSNUTF8();
    assuredFlag = scanner.nextBoolean();
    assuredMode = AssuredMode.valueOf(scanner.nextByte());
    safeDataLevel = scanner.nextByte();
  }
  /**
@@ -347,10 +263,8 @@
   * protocol version.
   *
   * @return The encoded representation of this update message.
   * @throws UnsupportedEncodingException
   *           If the message could not be encoded.
   */
  public byte[] getBytes() throws UnsupportedEncodingException
  public byte[] getBytes()
  {
    return getBytes(ProtocolVersion.getCurrentVersion());
  }
@@ -364,20 +278,11 @@
   */
  @Override
  public byte[] getBytes(short protocolVersion)
      throws UnsupportedEncodingException
  {
    // Encode the header in a byte[] large enough to also contain the payload
    byte[] resultByteArray = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        payload.length, ProtocolVersion.getCurrentVersion());
    int pos = resultByteArray.length - payload.length;
    // Add the payload
    for (int i = 0; i < payload.length; i++, pos++)
    {
      resultByteArray[pos] = payload[i];
    }
    return resultByteArray;
    final ByteArrayBuilder builder = encodeHeader(MSG_TYPE_GENERIC_UPDATE,
        ProtocolVersion.getCurrentVersion());
    builder.append(payload);
    return builder.toByteArray();
  }
  /**
opends/src/server/org/opends/server/replication/protocol/WindowMsg.java
@@ -22,14 +22,12 @@
 *
 *
 *      Copyright 2006-2008 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.zip.DataFormatException;
/**
 * This message is used by LDAP server or by Replication Servers to
 * update the send window of the remote entities.
@@ -43,7 +41,6 @@
{
  private final int numAck;
  /**
   * Create a new WindowMsg.
   *
@@ -63,63 +60,27 @@
   * @throws DataFormatException If the byte array does not contain a valid
   *                             encoded form of the WindowMsg.
   */
  public WindowMsg(byte[] in) throws DataFormatException
  WindowMsg(byte[] in) throws DataFormatException
  {
    /* The WindowMsg is encoded in the form :
     * <numAck>
     */
    try
    final ByteArrayScanner scanner = new ByteArrayScanner(in);
    final byte msgType = scanner.nextByte();
    if (msgType != MSG_TYPE_WINDOW)
    {
      /* first byte is the type */
      if (in[0] != MSG_TYPE_WINDOW)
        throw new DataFormatException("input is not a valid Window Message");
      int pos = 1;
      /*
       * read the number of acks contained in this message.
       * first calculate the length then construct the string
       */
      int length = getNextLength(in, pos);
      String numAckStr = new String(in, pos, length, "UTF-8");
      pos += length +1;
      numAck = Integer.parseInt(numAckStr);
    } catch (UnsupportedEncodingException e)
    {
      throw new DataFormatException("UTF-8 is not supported by this jvm.");
    }
  }
  /**
   * {@inheritDoc}
   */
    numAck = scanner.nextIntUTF8();
  }
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    /*
     * WindowMsg contains.
     * <numAck>
     */
    try {
      byte[] byteNumAck = String.valueOf(numAck).getBytes("UTF-8");
      int length = 1 + byteNumAck.length + 1;
      byte[] resultByteArray = new byte[length];
      /* put the type of the operation */
      resultByteArray[0] = MSG_TYPE_WINDOW;
      int pos = 1;
      pos = addByteArray(byteNumAck, resultByteArray, pos);
      return resultByteArray;
    final ByteArrayBuilder builder = new ByteArrayBuilder();
    builder.append(MSG_TYPE_WINDOW);
    builder.appendUTF8(numAck);
    return builder.toByteArray();
    }
    catch (UnsupportedEncodingException e)
    {
      return null;
    }
  }
  /**
   * Get the number of message acknowledged by the Window Message.
@@ -131,9 +92,7 @@
    return numAck;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
opends/src/server/org/opends/server/replication/server/NotAssuredUpdateMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2008-2009 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS.
 *      Portions Copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.server;
@@ -31,9 +31,7 @@
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.protocol.*;
/**
 * This is a facility class that is in fact an hack to optimize replication
@@ -48,28 +46,32 @@
 * when calling the isAssured() method.
 *
 */
public class NotAssuredUpdateMsg extends UpdateMsg
class NotAssuredUpdateMsg extends UpdateMsg
{
  // The real update message this message represents
  private UpdateMsg realUpdateMsg = null;
  /** The real update message this message represents */
  private final UpdateMsg realUpdateMsg;
  // V1 serialized form of the real message with assured flag set to false.
  // Ready to be sent.
  private byte[] realUpdateMsgNotAssuredBytesV1 = null;
  /**
   * V1 serialized form of the real message with assured flag set to false.
   * Ready to be sent.
   */
  private final byte[] realUpdateMsgNotAssuredBytesV1;
  // VLatest serialized form of the real message with assured flag set to false.
  // Ready to be sent.
  private byte[] realUpdateMsgNotAssuredBytesVLatest = null;
  /**
   * VLatest serialized form of the real message with assured flag set to false.
   * Ready to be sent.
   */
  private final byte[] realUpdateMsgNotAssuredBytesVLatest;
  /**
   * Creates a new empty UpdateMsg.
   * This class is only used by replication server code so constructor is not
   * public by security.
   *
   * @param updateMsg The real underlying update message this object represents.
   * @throws UnsupportedEncodingException  When the pre-encoding of the message
   *         failed because the UTF-8 encoding is not supported or the
   *         requested protocol version to use is not supported by this PDU.
   *
   */
  NotAssuredUpdateMsg(UpdateMsg updateMsg) throws UnsupportedEncodingException
  {
@@ -85,18 +87,7 @@
       * Get the encoding form of the real message then overwrite the assured
       * flag to always be false.
       */
      byte[] origBytes = realUpdateMsg.getBytes(
        ProtocolVersion.REPLICATION_PROTOCOL_V1);
      // Clone the byte array to be able to modify it without problems
      // (ModifyMsg messages for instance do not return a cloned version of
      // their byte array)
      byte[] bytes = new byte[origBytes.length];
      System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
      int maxLen = bytes.length;
      int pos;
      int nZeroFound = 0; // Number of 0 value found
      boolean found = false;
      byte[] bytes = getRealUpdateMsgBytes(ProtocolVersion.REPLICATION_PROTOCOL_V1);
      /* Look for assured flag position:
       * The message header is stored in the form :
@@ -107,23 +98,7 @@
       * See LDAPUpdateMsg.encodeHeader_V1() for more information
       */
      // Find end of CSN then end of dn
      for (pos = 1; pos < maxLen; pos++)
      {
        if (bytes[pos] == (byte) 0)
        {
          nZeroFound++;
          if (nZeroFound == 2) // 2 end of string to find
          {
            found = true;
            break;
          }
        }
      }
      if (!found)
        throw new UnsupportedEncodingException("Could not find end of CSN.");
      pos++;
      if (pos >= maxLen)
        throw new UnsupportedEncodingException("Reached end of packet.");
      int pos = findNthZeroByte(bytes, 1, 2);
      // Force assured flag to false
      bytes[pos] = 0;
@@ -135,16 +110,7 @@
       * Get the encoding form of the real message then overwrite the assured
       * flag to always be false.
       */
      origBytes = realUpdateMsg.getBytes(ProtocolVersion.getCurrentVersion());
      // Clone the byte array to be able to modify it without problems
      // (ModifyMsg messages for instance do not return a cloned version of
      // their byte array)
      bytes = new byte[origBytes.length];
      System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
      maxLen = bytes.length;
      nZeroFound = 0; // Number of 0 value found
      found = false;
      bytes = getRealUpdateMsgBytes(ProtocolVersion.getCurrentVersion());
      /* Look for assured flag position:
       * The message header is stored in the form :
@@ -156,48 +122,22 @@
       * See LDAPUpdateMsg.encodeHeader() for more information
       */
      // Find end of CSN then end of dn then end of uuid
      for (pos = 2; pos < maxLen; pos++)
      {
        if (bytes[pos] == (byte) 0)
        {
          nZeroFound++;
          if (nZeroFound == 3) // 3 end of string to find
          {
            found = true;
            break;
          }
        }
      }
      if (!found)
        throw new UnsupportedEncodingException("Could not find end of CSN.");
      pos++;
      if (pos >= maxLen)
        throw new UnsupportedEncodingException("Reached end of packet.");
      pos = findNthZeroByte(bytes, 2, 3);
      // Force assured flag to false
      bytes[pos] = 0;
      // Store computed VLATEST serialized form
      realUpdateMsgNotAssuredBytesVLatest = bytes;
    } else
    }
    else
    {
      realUpdateMsgNotAssuredBytesV1 = null;
      /**
       * Prepare VLATEST serialized form of the message:
       * Get the encoding form of the real message then overwrite the assured
       * flag to always be false.
       */
      byte[] origBytes = realUpdateMsg.getBytes(
          ProtocolVersion.getCurrentVersion());
      // Clone the byte array to be able to modify it without problems
      // (ModifyMsg messages for instance do not return a cloned version of
      // their byte array)
      byte[] bytes = new byte[origBytes.length];
      System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
      int maxLen = bytes.length;
      int pos;
      int nZeroFound = 0; // Number of 0 value found
      boolean found = false;
      byte[] bytes = getRealUpdateMsgBytes(ProtocolVersion.getCurrentVersion());
      // This is a generic update message
      /* Look for assured flag position:
@@ -210,23 +150,7 @@
       * See UpdateMsg.encodeHeader() for more  information
       */
      // Find end of CSN
      for (pos = 2; pos < maxLen; pos++)
      {
        if (bytes[pos] == (byte) 0)
        {
          nZeroFound++;
          if (nZeroFound == 1) // 1 end of string to find
          {
            found = true;
            break;
          }
        }
      }
      if (!found)
        throw new UnsupportedEncodingException("Could not find end of CSN.");
      pos++;
      if (pos >= maxLen)
        throw new UnsupportedEncodingException("Reached end of packet.");
      int pos = findNthZeroByte(bytes, 2, 1);
      // Force assured flag to false
      bytes[pos] = 0;
@@ -236,17 +160,52 @@
  }
  /**
   * {@inheritDoc}
   * Clones the byte array to be able to modify it without problems
   * (ModifyMsg messages for instance do not return a cloned version of
   * their byte array).
   */
  private byte[] getRealUpdateMsgBytes(final short protocolVersion)
  {
    byte[] origBytes = realUpdateMsg.getBytes(protocolVersion);
    byte[] bytes = new byte[origBytes.length];
    System.arraycopy(origBytes, 0, bytes, 0, origBytes.length);
    return bytes;
  }
  private int findNthZeroByte(byte[] bytes, int startPos, int nbToFind)
      throws UnsupportedEncodingException
  {
    final int maxLen = bytes.length;
    int nbZeroFound = 0; // Number of 0 values found
    for (int pos = startPos; pos < maxLen; pos++)
    {
      if (bytes[pos] == (byte) 0)
      {
        nbZeroFound++;
        if (nbZeroFound == nbToFind)
        {
          // nb of end of strings reached
          pos++;
          if (pos >= maxLen)
          {
            throw new UnsupportedEncodingException("Reached end of packet.");
          }
          return pos;
        }
      }
    }
    throw new UnsupportedEncodingException(
        "Could not find " + nbToFind + " zero bytes in byte array.");
  }
  /** {@inheritDoc} */
  @Override
  public CSN getCSN()
  {
    return realUpdateMsg.getCSN();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean isAssured()
  {
@@ -254,9 +213,7 @@
    return false;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void setAssured(boolean assured)
  {
@@ -264,78 +221,59 @@
    // and we do not want to change the original real update message settings
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public boolean equals(Object obj)
  {
    // Compare with the underlying real update message
    if (obj != null)
    {
      if (obj.getClass() != realUpdateMsg.getClass())
        return false;
      return realUpdateMsg.getCSN().
        equals(((UpdateMsg)obj).getCSN());
    }
    else
    if (obj == null)
    {
      return false;
    }
    return obj.getClass() == realUpdateMsg.getClass()
        && realUpdateMsg.getCSN().equals(((UpdateMsg) obj).getCSN());
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int hashCode()
  {
    return realUpdateMsg.hashCode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int compareTo(UpdateMsg msg)
  {
    return realUpdateMsg.compareTo(msg);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short reqProtocolVersion)
    throws UnsupportedEncodingException
  public byte[] getBytes(short protocolVersion)
  {
    if (reqProtocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    if (protocolVersion == ProtocolVersion.REPLICATION_PROTOCOL_V1)
    {
      return realUpdateMsgNotAssuredBytesV1;
    else
    }
      return realUpdateMsgNotAssuredBytesVLatest;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public AssuredMode getAssuredMode()
  {
    return realUpdateMsg.getAssuredMode();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte getSafeDataLevel()
  {
    return realUpdateMsg.getSafeDataLevel();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void setAssuredMode(AssuredMode assuredMode)
  {
@@ -343,9 +281,7 @@
    // and we do not want to change the original real update message settings
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public void setSafeDataLevel(byte safeDataLevel)
  {
@@ -353,49 +289,38 @@
    // and we do not want to change the original real update message settings
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public short getVersion()
  {
    return realUpdateMsg.getVersion();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int size()
  {
    return realUpdateMsg.size();
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  protected byte[] encodeHeader(byte type, int additionalLength, short version)
    throws UnsupportedEncodingException
  protected ByteArrayBuilder encodeHeader(byte allowedType,
      short protocolVersion)
  {
    // Not called as only used by constructors using bytes
    return null;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public int decodeHeader(byte type, byte[] encodedMsg)
  protected void decodeHeader(byte msgType, ByteArrayScanner scanner)
                          throws DataFormatException
  {
    // Not called as only used by getBytes methods
    return -1;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getPayload()
  {
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -25,7 +25,6 @@
 */
package org.opends.server.replication.server.changelog.file;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
@@ -35,7 +34,6 @@
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.UpdateMsg;
@@ -48,11 +46,9 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.types.InitializationException;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
/**
 * Represents a replication server database for one server in the topology.
@@ -370,23 +366,13 @@
  /** Parser of records persisted in the ReplicaDB log. */
  private static class ReplicaDBParser implements RecordParser<CSN, UpdateMsg>
  {
    private static final DebugTracer TRACER = getTracer();
    @Override
    public ByteString encodeRecord(final Record<CSN, UpdateMsg> record)
    {
      final UpdateMsg message = record.getValue();
      try
      {
        return ByteString.wrap(message.getBytes());
      }
      catch (UnsupportedEncodingException e)
      {
        // should never happen
        TRACER.debugCaught(DebugLogLevel.ERROR, e);
        return ByteString.empty();
      }
    }
    @Override
    public Record<CSN, UpdateMsg> decodeRecord(final ByteString data) throws DecodingException
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -34,6 +34,8 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
import org.opends.server.replication.protocol.ReplicationMsg;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
@@ -56,14 +58,14 @@
 * <p>
 * This is the only class that should have code using the BDB interfaces.
 */
public class ReplicationDB
class ReplicationDB
{
  private Database db;
  private ReplicationDbEnv dbEnv;
  private ReplicationServer replicationServer;
  private int serverId;
  private DN baseDN;
  private final ReplicationDbEnv dbEnv;
  private final ReplicationServer replicationServer;
  private final int serverId;
  private final DN baseDN;
  /**
   * The lock used to provide exclusive access to the thread that close the db
@@ -120,7 +122,7 @@
   * @param dbEnv The Db environment to use to create the db.
   * @throws ChangelogException If a database problem happened
   */
  public ReplicationDB(int serverId, DN baseDN,
  ReplicationDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbEnv)
      throws ChangelogException
  {
@@ -188,7 +190,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public void addEntry(UpdateMsg change) throws ChangelogException
  void addEntry(UpdateMsg change) throws ChangelogException
  {
    dbCloseLock.readLock().lock();
    try
@@ -200,7 +202,9 @@
      }
      final DatabaseEntry key = createReplicationKey(change.getCSN());
      final DatabaseEntry data = new ReplicationData(change);
      // Always keep messages in the replication DB with the current protocol
      // version
      final DatabaseEntry data = new DatabaseEntry(change.getBytes());
      insertCounterRecordIfNeeded(change.getCSN());
      db.put(null, key, data);
@@ -256,7 +260,7 @@
  /**
   * Shutdown the database.
   */
  public void shutdown()
  void shutdown()
  {
    dbCloseLock.writeLock().lock();
    try
@@ -286,8 +290,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public ReplServerDBCursor openReadCursor(CSN startCSN)
      throws ChangelogException
  ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
  {
    return new ReplServerDBCursor(startCSN);
  }
@@ -301,7 +304,7 @@
   *
   * @return The ReplServerDBCursor.
   */
  public ReplServerDBCursor openDeleteCursor() throws ChangelogException
  ReplServerDBCursor openDeleteCursor() throws ChangelogException
  {
    return new ReplServerDBCursor();
  }
@@ -325,7 +328,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readOldestCSN() throws ChangelogException
  CSN readOldestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -381,7 +384,7 @@
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN readNewestCSN() throws ChangelogException
  CSN readNewestCSN() throws ChangelogException
  {
    dbCloseLock.readLock().lock();
@@ -432,93 +435,7 @@
    }
  }
  /**
   * Try to find in the DB, the CSN right before the one passed as a parameter.
   *
   * @param csn
   *          The CSN from which we start searching.
   * @return the CSN right before the one passed as a parameter. Can return null
   *         if there is none.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public CSN getPreviousCSN(CSN csn) throws ChangelogException
  {
    if (csn == null)
    {
      return null;
    }
    dbCloseLock.readLock().lock();
    Cursor cursor = null;
    try
    {
      // If the DB has been closed then return immediately.
      if (isDBClosed())
      {
        return null;
      }
      DatabaseEntry key = createReplicationKey(csn);
      DatabaseEntry data = new DatabaseEntry();
      cursor = db.openCursor(null, null);
      if (cursor.getSearchKeyRange(key, data, LockMode.DEFAULT) == SUCCESS)
      {
        // We can move close to the CSN.
        // Let's move to the previous change.
        if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
        // else, there was no change previous to our CSN.
      }
      else
      {
        // We could not move the cursor past to the CSN
        // Check if the last change is older than CSN
        if (cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS)
        {
          return getRegularRecord(cursor, key, data);
        }
      }
    }
    catch (DatabaseException e)
    {
      throw new ChangelogException(e);
    }
    finally
    {
      closeAndReleaseReadLock(cursor);
    }
    return null;
  }
  private CSN getRegularRecord(Cursor cursor, DatabaseEntry key,
      DatabaseEntry data) throws DatabaseException
  {
    final CSN csn = toCSN(key.getData());
    if (!isACounterRecord(csn))
    {
      return csn;
    }
    // There cannot be 2 counter record next to each other,
    // it is safe to return previous record which must exist
    if (cursor.getPrev(key, data, LockMode.DEFAULT) == SUCCESS)
    {
      return toCSN(key.getData());
    }
    // database only contain a counter record, which should not be possible
    // let's just say no CSN
    return null;
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
@@ -529,7 +446,7 @@
   * This Class implements a cursor that can be used to browse a
   * replicationServer database.
   */
  public class ReplServerDBCursor implements Closeable
  class ReplServerDBCursor implements Closeable
  {
    /**
     * The transaction that will protect the actions done with the cursor.
@@ -713,7 +630,7 @@
     * (per the Cursor documentation).
     * This should not be used in any other case.
     */
    public void abort()
    void abort()
    {
      synchronized (this)
      {
@@ -735,7 +652,7 @@
     * @throws ChangelogException
     *           In case of underlying database problem.
     */
    public CSN nextCSN() throws ChangelogException
    CSN nextCSN() throws ChangelogException
    {
      if (isClosed)
      {
@@ -761,7 +678,7 @@
     *
     * @return the next UpdateMsg.
     */
    public UpdateMsg next()
    UpdateMsg next()
    {
      if (isClosed)
      {
@@ -791,7 +708,8 @@
          {
            continue;
          }
          currentChange = ReplicationData.generateChange(data.getData());
          currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
              data.getData(), ProtocolVersion.getCurrentVersion());
        }
        catch (Exception e)
        {
@@ -806,7 +724,7 @@
           */
          Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
              .get(replicationServer.getServerId(),
                  (csn == null ? "" : csn.toString()),
                  (csn != null ? csn.toString() : ""),
                  e.getMessage());
          logError(message);
        }
@@ -819,7 +737,7 @@
     *
     * @throws ChangelogException In case of database problem.
     */
    public void delete() throws ChangelogException
    void delete() throws ChangelogException
    {
      if (isClosed)
      {
@@ -842,7 +760,7 @@
   *
   * @throws ChangelogException In case of database problem.
   */
  public void clear() throws ChangelogException
  void clear() throws ChangelogException
  {
    // The coming users will be blocked until the clear is done
    dbCloseLock.writeLock().lock();
@@ -912,7 +830,7 @@
   * Encode the provided counter value in a database entry.
   * @return The database entry with the counter value encoded inside.
   */
  static private DatabaseEntry encodeCounterValue(int value)
  private static DatabaseEntry encodeCounterValue(int value)
  {
    DatabaseEntry entry = new DatabaseEntry();
    entry.setData(getBytes(String.valueOf(value)));
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationData.java
File was deleted
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/common/ServerStateTest.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2013 ForgeRock AS
 *      Portions Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.common;
@@ -93,11 +93,6 @@
    String stringRep = serverState.toString();
    assertTrue(stringRep.contains(csn2.toString()));
    assertTrue(stringRep.contains(csn3.toString()));
    // Check getBytes
    byte[] b = serverState.getBytes();
    ServerState generatedServerState = new ServerState(b,0,b.length -1) ;
    assertEquals(b, generatedServerState.getBytes()) ;
  }
  /**
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
@@ -27,13 +27,22 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.zip.DataFormatException;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.types.ByteStringBuilder;
import org.testng.Assert;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
/**
 * Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes.
 */
@@ -41,44 +50,194 @@
public class ByteArrayTest extends DirectoryServerTestCase
{
  private static final class IntegerRange implements Iterator<Object[]>
  {
    private int next;
    private final int endInclusive;
    public IntegerRange(int startInclusive, int endInclusive)
    {
      this.next = startInclusive;
      this.endInclusive = endInclusive;
    }
    @Override
    public boolean hasNext()
    {
      return next <= this.endInclusive;
    }
    @Override
    public Object[] next()
    {
      return new Object[] { next++ };
    }
    @Override
    public void remove() { /* unused */ }
  }
  @BeforeClass
  public void setup() throws Exception
  {
    TestCaseUtils.startFakeServer();
  }
  @AfterClass
  public void teardown() throws Exception
  {
    TestCaseUtils.shutdownFakeServer();
  }
  private final byte[] byteArray = new byte[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, };
  @Test
  public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception
  {
    final boolean bo = true;
    final boolean boFalse = false;
    final boolean boTrue = true;
    final byte by = 80;
    final short sh = 42;
    final int i = sh + 1;
    final long l = i + 1;
    final String st = "Yay!";
    final String nullStr = null;
    final String str = "Yay!";
    final Collection<String> col = Arrays.asList("foo", "bar", "baz");
    final CSN csn = new CSN(42424242, 13, 42);
    final DN dn = DN.decode("dc=example,dc=com");
    final ServerState ss = new ServerState();
    ss.update(csn);
    byte[] bytes = new ByteArrayBuilder()
        .append(bo)
        .append(boTrue)
        .append(boFalse)
        .append(by)
        .append(sh)
        .append(i)
        .append(l)
        .append(st)
        .append(nullStr)
        .append(str)
        .appendStrings(col)
        .appendUTF8(i)
        .appendUTF8(l)
        .append(csn)
        .appendUTF8(csn)
        .append(dn)
        .appendZeroTerminated(byteArray)
        .append(byteArray)
        .append(ss)
        .toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    Assert.assertEquals(scanner.nextBoolean(), bo);
    Assert.assertEquals(scanner.nextByte(), by);
    Assert.assertEquals(scanner.nextShort(), sh);
    Assert.assertEquals(scanner.nextInt(), i);
    Assert.assertEquals(scanner.nextLong(), l);
    Assert.assertEquals(scanner.nextString(), st);
    Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
    Assert.assertEquals(scanner.nextIntUTF8(), i);
    Assert.assertEquals(scanner.nextLongUTF8(), l);
    Assert.assertEquals(scanner.nextCSN(), csn);
    Assert.assertEquals(scanner.nextCSNUTF8(), csn);
    Assert.assertTrue(scanner.isEmpty());
    assertFalse(scanner.isEmpty());
    assertEquals(scanner.nextBoolean(), boTrue);
    assertEquals(scanner.nextBoolean(), boFalse);
    assertEquals(scanner.nextByte(), by);
    assertEquals(scanner.nextShort(), sh);
    assertEquals(scanner.nextInt(), i);
    assertEquals(scanner.nextLong(), l);
    assertEquals(scanner.nextString(), nullStr);
    assertEquals(scanner.nextString(), str);
    assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
    assertEquals(scanner.nextIntUTF8(), i);
    assertEquals(scanner.nextLongUTF8(), l);
    assertEquals(scanner.nextCSN(), csn);
    assertEquals(scanner.nextCSNUTF8(), csn);
    assertEquals(scanner.nextDN(), dn);
    assertEquals(scanner.nextByteArray(byteArray.length), byteArray);
    scanner.skipZeroSeparator();
    assertEquals(scanner.nextByteArray(byteArray.length), byteArray);
    assertEquals(scanner.nextServerState().toString(), ss.toString());
    assertTrue(scanner.isEmpty());
  }
  @Test
  public void testByteArrayScanner_remainingBytes() throws Exception
  {
    final byte[] bytes = new ByteArrayBuilder().append(byteArray).toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    assertEquals(scanner.remainingBytes(), byteArray);
    assertTrue(scanner.isEmpty());
  }
  @Test
  public void testByteArrayScanner_remainingBytesZeroTerminated() throws Exception
  {
    final byte[] bytes =
        new ByteArrayBuilder().appendZeroTerminated(byteArray).toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    assertEquals(scanner.remainingBytesZeroTerminated(), byteArray);
    assertTrue(scanner.isEmpty());
  }
  @DataProvider
  public Iterator<Object[]> testCasesForNextMethodsWithEmptyByteArray()
  {
    return new IntegerRange(0, 7);
  }
  @Test(dataProvider = "testCasesForNextMethodsWithEmptyByteArray",
      expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_nextMethods_throwsExceptionWhenNoData(int testNumber) throws Exception
  {
    delegate(testNumber);
  }
  /**
   * TestNG does not like test methods with a return type other than void,
   * so used a delegate to simplify the code down below.
   */
  private Object delegate(int testNumber) throws DataFormatException
  {
    final ByteArrayScanner scanner = new ByteArrayScanner(new byte[0]);
    switch (testNumber)
    {
    case 0:
      return scanner.nextByte();
    case 1:
      return scanner.nextBoolean();
    case 2:
      return scanner.nextShort();
    case 3:
      return scanner.nextInt();
    case 4:
      return scanner.nextIntUTF8();
    case 5:
      return scanner.nextLong();
    case 6:
      return scanner.nextLongUTF8();
    case 7:
      return scanner.nextCSN();
    default:
      return null;
    }
  }
  @Test(expectedExceptions = IndexOutOfBoundsException.class)
  public void testByteArrayScanner_skipZeroSeparator_throwsExceptionWhenNoData() throws Exception
  {
    new ByteArrayScanner(new byte[0]).skipZeroSeparator();
  }
  @Test(expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_skipZeroSeparator_throwsExceptionWhenNoZeroSeparator() throws Exception
  {
    new ByteArrayScanner(new byte[] { 1 }).skipZeroSeparator();
  }
  @Test(expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_nextCSNUTF8_throwsExceptionWhenInvalidCSN() throws Exception
  {
    new ByteArrayScanner(new byte[] { 1, 0 }).nextCSNUTF8();
  }
  @Test(expectedExceptions = DataFormatException.class)
  public void testByteArrayScanner_nextDN_throwsExceptionWhenInvalidDN() throws Exception
  {
    final byte[] bytes = new ByteArrayBuilder().append("this is not a valid DN").toByteArray();
    new ByteArrayScanner(bytes).nextDN();
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ReplicationServerTest.java
@@ -38,6 +38,7 @@
import org.opends.server.core.ModifyDNOperationBasis;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.AssuredMode;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.ServerState;
@@ -894,9 +895,9 @@
      }
      // Send StartSessionMsg
      StartSessionMsg startSessionMsg =
        new StartSessionMsg(ServerStatus.NORMAL_STATUS,
        new ArrayList<String>());
      StartSessionMsg startSessionMsg = new StartSessionMsg(
          ServerStatus.NORMAL_STATUS, new ArrayList<String>(),
          false, AssuredMode.SAFE_DATA_MODE, (byte) 1);
      session.publish(startSessionMsg);
      // Read the TopologyMsg that should come back.