mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.26.2014 de36fa06856d8d04652401bb24e49c3259aef154
opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
New file
@@ -0,0 +1,348 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.io.UnsupportedEncodingException;
import java.util.Collection;
import org.opends.server.replication.common.CSN;
import org.opends.server.types.ByteStringBuilder;
/**
 * Byte array builder class encodes data into byte arrays to send messages over
 * the replication protocol. Built on top of {@link ByteStringBuilder}, it
 * isolates the latter against legacy type conversions from the replication
 * protocol. It exposes a fluent API.
 *
 * @see ByteArrayScanner ByteArrayScanner class that decodes messages built with
 *      current class.
 */
public class ByteArrayBuilder
{
  /** This is the null byte, also known as zero byte. */
  public static final byte NULL_BYTE = 0;
  private final ByteStringBuilder builder;
  /**
   * Constructs a ByteArrayBuilder.
   */
  public ByteArrayBuilder()
  {
    builder = new ByteStringBuilder();
  }
  /**
   * Constructs a ByteArrayBuilder.
   *
   * @param capacity
   *          the capacity of the underlying ByteStringBuilder
   */
  public ByteArrayBuilder(int capacity)
  {
    builder = new ByteStringBuilder(capacity);
  }
  /**
   * Append a boolean to this ByteArrayBuilder.
   *
   * @param b
   *          the boolean to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(boolean b)
  {
    append((byte) (b ? 1 : 0));
    return this;
  }
  /**
   * Append a byte to this ByteArrayBuilder.
   *
   * @param b
   *          the byte to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(byte b)
  {
    builder.append(b);
    return this;
  }
  /**
   * Append a short to this ByteArrayBuilder.
   *
   * @param s
   *          the short to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(short s)
  {
    builder.append(s);
    return this;
  }
  /**
   * Append an int to this ByteArrayBuilder.
   *
   * @param i
   *          the long to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(int i)
  {
    builder.append(i);
    return this;
  }
  /**
   * Append a long to this ByteArrayBuilder.
   *
   * @param l
   *          the long to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(long l)
  {
    builder.append(l);
    return this;
  }
  /**
   * Append an int to this ByteArrayBuilder by converting it to a String then
   * encoding that string to a UTF-8 byte array.
   *
   * @param i
   *          the int to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder appendUTF8(int i)
  {
    return append(Integer.toString(i));
  }
  /**
   * Append a long to this ByteArrayBuilder by converting it to a String then
   * encoding that string to a UTF-8 byte array.
   *
   * @param l
   *          the long to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder appendUTF8(long l)
  {
    return append(Long.toString(l));
  }
  /**
   * Append a Collection of Strings to this ByteArrayBuilder.
   *
   * @param col
   *          the Collection of Strings to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder appendStrings(Collection<String> col)
  {
    append(col.size());
    for (String s : col)
    {
      append(s);
    }
    return this;
  }
  /**
   * Append a String to this ByteArrayBuilder.
   *
   * @param s
   *          the String to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(String s)
  {
    try
    {
      append(s.getBytes("UTF-8"));
    }
    catch (UnsupportedEncodingException e)
    {
      throw new RuntimeException("Should never happen", e);
    }
    return this;
  }
  /**
   * Append a CSN to this ByteArrayBuilder.
   *
   * @param csn
   *          the CSN to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder append(CSN csn)
  {
    csn.toByteString(builder);
    return this;
  }
  /**
   * Append a CSN to this ByteArrayBuilder by converting it to a String then
   * encoding that string to a UTF-8 byte array.
   *
   * @param csn
   *          the CSN to append.
   * @return this ByteArrayBuilder
   */
  public ByteArrayBuilder appendUTF8(CSN csn)
  {
    append(csn.toString());
    return this;
  }
  private ByteArrayBuilder append(byte[] sBytes)
  {
    for (byte b : sBytes)
    {
      append(b);
    }
    append((byte) 0); // zero separator
    return this;
  }
  /**
   * Converts the content of this ByteStringBuilder to a byte array.
   *
   * @return the content of this ByteStringBuilder converted to a byte array.
   */
  public byte[] toByteArray()
  {
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return builder.toString();
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * boolean fields when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of boolean fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended boolean fields.
   */
  public static int booleans(int nbFields)
  {
    return nbFields;
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * byte fields when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of byte fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended byte fields.
   */
  public static int bytes(int nbFields)
  {
    return nbFields;
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * short fields when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of short fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended short fields.
   */
  public static int shorts(int nbFields)
  {
    return 2 * nbFields;
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * int fields when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of int fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended int fields.
   */
  public static int ints(int nbFields)
  {
    return 4 * nbFields;
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * long fields when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of long fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended long fields.
   */
  public static int longs(int nbFields)
  {
    return 8 * nbFields;
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * CSN fields when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of CSN fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended CSN fields.
   */
  public static int csns(int nbFields)
  {
    return CSN.BYTE_ENCODING_LENGTH * nbFields;
  }
  /**
   * Helper method that returns the number of bytes that would be used by the
   * CSN fields encoded as a UTF8 string when appended to a ByteArrayBuilder.
   *
   * @param nbFields
   *          the number of CSN fields that will be appended to a
   *          ByteArrayBuilder
   * @return the number of bytes occupied by the appended legacy-encoded CSN
   *         fields.
   */
  public static int csnsUTF8(int nbFields)
  {
    return CSN.STRING_ENCODING_LENGTH * nbFields + 1 /* null byte */;
  }
}
opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
New file
@@ -0,0 +1,281 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.util.Collection;
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.CSN;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
/**
 * Byte array scanner class helps decode data from byte arrays received via
 * messages over the replication protocol. Built on top of
 * {@link ByteSequenceReader}, it isolates the latter against legacy type
 * conversions from the replication protocol.
 *
 * @see ByteArrayBuilder ByteArrayBuilder class that encodes messages read with
 *      current class.
 */
public class ByteArrayScanner
{
  private final ByteSequenceReader bytes;
  /**
   * Builds a ByteArrayScanner object that will read from the supplied byte
   * array.
   *
   * @param bytes
   *          the byte array input that will be read from
   */
  public ByteArrayScanner(byte[] bytes)
  {
    this.bytes = ByteString.wrap(bytes).asReader();
  }
  /**
   * Reads the next boolean.
   *
   * @return the next boolean
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public boolean nextBoolean() throws DataFormatException
  {
    return nextByte() != 0;
  }
  /**
   * Reads the next byte.
   *
   * @return the next byte
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public byte nextByte() throws DataFormatException
  {
    try
    {
      return bytes.get();
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Reads the next short.
   *
   * @return the next short
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public short nextShort() throws DataFormatException
  {
    try
    {
      return bytes.getShort();
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Reads the next int.
   *
   * @return the next int
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public int nextInt() throws DataFormatException
  {
    try
    {
      return bytes.getInt();
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Reads the next long.
   *
   * @return the next long
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public long nextLong() throws DataFormatException
  {
    try
    {
      return bytes.getLong();
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Reads the next int that was encoded as a UTF8 string.
   *
   * @return the next int that was encoded as a UTF8 string.
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public int nextIntUTF8() throws DataFormatException
  {
    return Integer.valueOf(nextString());
  }
  /**
   * Reads the next long that was encoded as a UTF8 string.
   *
   * @return the next long that was encoded as a UTF8 string.
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public long nextLongUTF8() throws DataFormatException
  {
    return Long.valueOf(nextString());
  }
  /**
   * Reads the next UTF8-encoded string.
   *
   * @return the next UTF8-encoded string.
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public String nextString() throws DataFormatException
  {
    try
    {
      final String s = bytes.getString(findZeroSeparator());
      bytes.skip(1); // skip the zero separator
      return s;
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  private int findZeroSeparator() throws DataFormatException
  {
    int offset = 0;
    final int remaining = bytes.remaining();
    while (bytes.peek(offset) != 0 && offset < remaining)
    {
      offset++;
    }
    if (offset == remaining)
    {
      throw new DataFormatException("No more data to read from");
    }
    return offset;
  }
  /**
   * Reads the next UTF8-encoded strings in the provided collection.
   *
   * @param output
   *          the collection where to add the next UTF8-encoded strings
   * @param <TCol>
   *          the collection's concrete type
   * @return the provided collection where the next UTF8-encoded strings have
   *         been added.
   * @throws DataFormatException
   *           if no more data can be read from the input
   */
  public <TCol extends Collection<String>> TCol nextStrings(TCol output)
      throws DataFormatException
  {
    final int colSize = nextInt();
    for (int i = 0; i < colSize; i++)
    {
      output.add(nextString());
    }
    return output;
  }
  /**
   * Reads the next CSN.
   *
   * @return the next CSN.
   * @throws DataFormatException
   *           if CSN was incorrectly encoded or no more data can be read from
   *           the input
   */
  public CSN nextCSN() throws DataFormatException
  {
    try
    {
      return CSN.valueOf(bytes.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Reads the next CSN that was encoded as a UTF8 string.
   *
   * @return the next CSN that was encoded as a UTF8 string.
   * @throws DataFormatException
   *           if legacy CSN was incorrectly encoded or no more data can be read
   *           from the input
   */
  public CSN nextCSNUTF8() throws DataFormatException
  {
    try
    {
      return CSN.valueOf(nextString());
    }
    catch (IndexOutOfBoundsException e)
    {
      throw new DataFormatException(e.getMessage());
    }
  }
  /**
   * Returns whether the scanner has more bytes to consume.
   *
   * @return true if the scanner has more bytes to consume, false otherwise.
   */
  public boolean isEmpty()
  {
    return bytes.remaining() == 0;
  }
}
opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -29,9 +29,9 @@
import java.util.zip.DataFormatException;
import org.opends.server.replication.common.CSN;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
/**
 * Class that define messages sent by a replication domain (DS) to the
@@ -39,21 +39,48 @@
 */
public class ChangeTimeHeartbeatMsg extends ReplicationMsg
{
  private static final byte NORMAL_HEARTBEAT = 0;
  private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
  /**
   * The CSN containing the change time.
   */
  private final CSN csn;
  /**
   * The CSN containing the change time.
   */
  private final byte eventType;
  private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
  {
    this.csn = csn;
    this.eventType = eventType;
  }
  /**
   * Constructor of a Change Time Heartbeat message providing the change time
   * value in a CSN.
   * Factory method that builds a change time heartbeat message providing the
   * change time value in a CSN.
   *
   * @param csn
   *          The provided CSN.
   * @return a new ChangeTimeHeartbeatMsg
   */
  public ChangeTimeHeartbeatMsg(CSN csn)
  public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
  {
    this.csn = csn;
    return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
  }
  /**
   * Factory method that builds a change time heartbeat message for a replica
   * going offline.
   *
   * @param offlineCSN
   *          the serverId and timestamp of the replica going offline
   * @return a new ChangeTimeHeartbeatMsg
   */
  public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
  {
    return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
  }
  /**
@@ -67,6 +94,17 @@
  }
  /**
   * Returns whether this is a replica offline message.
   *
   * @return true if this is a replica offline message, false if this is a
   *         regular heartbeat message.
   */
  public boolean isReplicaOfflineMsg()
  {
    return eventType == REPLICA_OFFLINE_HEARTBEAT;
  }
  /**
   * Creates a message from a provided byte array.
   *
   * @param in
@@ -79,32 +117,31 @@
  public ChangeTimeHeartbeatMsg(byte[] in, short version)
      throws DataFormatException
  {
    final ByteSequenceReader reader = ByteString.wrap(in).asReader();
    try
    {
      if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
      final ByteArrayScanner scanner = new ByteArrayScanner(in);
      final byte msgType = scanner.nextByte();
      if (msgType != MSG_TYPE_CT_HEARTBEAT)
      {
        // Throw better exception below.
        throw new IllegalArgumentException();
        throw new DataFormatException("input is not a valid "
            + getClass().getSimpleName() + " message: " + msgType);
      }
      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
      {
        csn = CSN.valueOf(reader.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
      }
      else
      {
        csn = CSN.valueOf(reader.getString(CSN.STRING_ENCODING_LENGTH));
        reader.get(); // Read trailing 0 byte.
      }
      csn = version >= REPLICATION_PROTOCOL_V7
          ? scanner.nextCSN()
          : scanner.nextCSNUTF8();
      eventType = version >= REPLICATION_PROTOCOL_V8
          ? scanner.nextByte()
          : NORMAL_HEARTBEAT;
      if (reader.remaining() > 0)
      if (!scanner.isEmpty())
      {
        // Throw better exception below.
        throw new IllegalArgumentException();
        throw new DataFormatException(
            "Did not expect to find more bytes to read for "
            + getClass().getSimpleName() + " message.");
      }
    }
    catch (Exception e)
    catch (RuntimeException e)
    {
      // Index out of bounds, bad format, etc.
      throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
@@ -115,24 +152,22 @@
  @Override
  public byte[] getBytes(short protocolVersion)
  {
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V7)
    {
      final ByteStringBuilder builder = new ByteStringBuilder(
          CSN.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
      ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csnsUTF8(1));
      builder.append(MSG_TYPE_CT_HEARTBEAT);
      csn.toByteString(builder);
      builder.appendUTF8(csn);
      return builder.toByteArray();
    }
    else
    final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
    builder.append(MSG_TYPE_CT_HEARTBEAT);
    builder.append(csn);
    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
    {
      final ByteStringBuilder builder = new ByteStringBuilder(
          CSN.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
      builder.append(MSG_TYPE_CT_HEARTBEAT);
      builder.append(csn.toString());
      builder.append((byte) 0); // For compatibility with earlier protocol
                                // versions.
      return builder.toByteArray();
      builder.append(eventType);
    }
    return builder.toByteArray();
  }
  /** {@inheritDoc} */
@@ -141,4 +176,5 @@
  {
    return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
  }
}
opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -22,11 +22,10 @@
 *
 *
 *      Copyright 2006-2010 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
/**
 * The version utility class for the replication protocol.
 */
@@ -43,52 +42,72 @@
  public static final short REPLICATION_PROTOCOL_V1_REAL = 49;
  /**
   * The constant for the second version of the replication protocol.
   * Add fields in the header for assured replication.
   * <ul>
   * <li>Add fields in the header for assured replication.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V2 = 2;
  /**
   * The constant for the 3rd version of the replication protocol.
   * Add messages for remote ECL : not used as of today.
   * <ul>
   * <li>Add messages for remote ECL : not used as of today.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V3 = 3;
  /**
   * The constant for the 4th version of the replication protocol.
   * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
   *   ECL entry attributes.
   * - Modified algorithm for choosing a RS to connect to: introduction of a
   *   ReplicationServerDSMsg message.
   *   -> also added of the server URL in RSInfo of TopologyMsg
   * - Introduction of a StopMsg for proper connections ending.
   * - Initialization failover/flow control
   * <ul>
   * <li>Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
   * ECL entry attributes.</li>
   * <li>Modified algorithm for choosing a RS to connect to: introduction of a
   * ReplicationServerDSMsg message.</li>
   * <li>also added of the server URL in RSInfo of TopologyMsg</li>
   * <li>Introduction of a StopMsg for proper connections ending.</li>
   * <li>Initialization failover/flow control</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V4 = 4;
  /**
   * The constant for the 5th version of the replication protocol.
   * - Add support for wild-cards in change log included attributes
   * - Add support for specifying additional included attributes for deletes
   * - See OPENDJ-194.
   * <ul>
   * <li>Add support for wild-cards in change log included attributes</li>
   * <li>Add support for specifying additional included attributes for deletes</li>
   * <li>See OPENDJ-194.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V5 = 5;
  /**
   * The constant for the 6th version of the replication protocol.
   * - include DS local URL in the DSInfo of TopologyMsg.
   * <ul>
   * <li>include DS local URL in the DSInfo of TopologyMsg.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V6 = 6;
  /**
   * The constant for the 7th version of the replication protocol.
   * - compact encoding for length, CSNs, and server IDs.
   * <ul>
   * <li>compact encoding for length, CSNs, and server IDs.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V7 = 7;
  /**
   * The constant for the 8th version of the replication protocol.
   * <ul>
   * <li>StopMsg now has a timestamp to communicate the replica stop time.</li>
   * </ul>
   */
  public static final short REPLICATION_PROTOCOL_V8 = 8;
  /**
   * The replication protocol version used by the instance of RS/DS in this VM.
   */
  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7;
  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V8;
  /**
   * Gets the current version of the replication protocol.
opends/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions copyright 2013 ForgeRock AS.
 *      Portions copyright 2013-2014 ForgeRock AS.
 */
package org.opends.server.replication.protocol;
@@ -56,9 +56,7 @@
        in[0]);
  }
  /**
   * {@inheritDoc}
   */
  /** {@inheritDoc} */
  @Override
  public byte[] getBytes(short protocolVersion)
  {
@@ -67,4 +65,11 @@
        MSG_TYPE_STOP
      };
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return getClass().getSimpleName();
  }
}
opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -21,7 +21,7 @@
 * CDDL HEADER END
 *
 *
 *      Copyright 2013 ForgeRock AS
 *      Copyright 2013-2014 ForgeRock AS
 */
package org.opends.server.replication.server;
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import org.opends.server.replication.common.CSN;
import org.opends.server.types.DN;
/**
@@ -49,6 +50,8 @@
  private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
  private final Map<DN, List<Integer>> domainToServerIds =
      new HashMap<DN, List<Integer>>();
  private final Map<DN, List<CSN>> offlineReplicas =
      new HashMap<DN, List<CSN>>();
  /**
   * Sets the generationId for the supplied replication domain.
@@ -83,6 +86,25 @@
  }
  /**
   * Adds the following replica information to the offline list.
   *
   * @param baseDN
   *          the baseDN of the offline replica
   * @param offlineCSN
   *          the CSN (serverId + timestamp) of the offline replica
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
  {
    List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
    if (offlineCSNs == null)
    {
      offlineCSNs = new LinkedList<CSN>();
      offlineReplicas.put(baseDN, offlineCSNs);
    }
    offlineCSNs.add(offlineCSN);
  }
  /**
   * Returns the Map of domainBaseDN => generationId.
   *
   * @return a Map of domainBaseDN => generationId
@@ -102,11 +124,22 @@
    return domainToServerIds;
  }
  /**
   * Returns the Map of domainBaseDN => List&lt;offlineCSN&gt;.
   *
   * @return a Map of domainBaseDN => List&lt;offlineCSN&gt;.
   */
  public Map<DN, List<CSN>> getOfflineReplicas()
  {
    return offlineReplicas;
  }
  /** {@inheritDoc} */
  @Override
  public String toString()
  {
    return "domainToGenerationId=" + domainToGenerationId
        + ", domainToServerIds=" + domainToServerIds;
        + ", domainToServerIds=" + domainToServerIds
        + ", offlineReplicas=" + offlineReplicas;
  }
}
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2530,11 +2530,29 @@
   * value received, and forwarding the message to the other RSes.
   * @param senderHandler The handler for the server that sent the heartbeat.
   * @param msg The message to process.
   * @throws DirectoryException
   *           if a problem occurs
   */
  void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
      ChangeTimeHeartbeatMsg msg)
      ChangeTimeHeartbeatMsg msg) throws DirectoryException
  {
    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
    try
    {
      if (msg.isReplicaOfflineMsg())
      {
        domainDB.replicaOffline(baseDN, msg.getCSN());
      }
      else
      {
        domainDB.replicaHeartbeat(baseDN, msg.getCSN());
      }
    }
    catch (ChangelogException e)
    {
      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e
          .getMessageObject(), e);
    }
    if (senderHandler.isDataServer())
    {
      /*
opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -839,14 +839,19 @@
  /**
   * Processes a change time heartbeat msg.
   *
   * @param msg The message to be processed.
   * @param msg
   *          The message to be processed.
   * @throws DirectoryException
   *           When an exception is raised.
   */
  void process(ChangeTimeHeartbeatMsg msg)
  void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException
  {
    if (debugEnabled())
    {
      TRACER.debugInfo("In "
          + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
          + this + " processes received msg:\n" + msg);
    }
    replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
  }
@@ -865,9 +870,9 @@
      // lets update the LDAP server with out current window size and hope
      // that everything will work better in the future.
      // TODO also log an error message.
      WindowMsg msg = new WindowMsg(rcvWindow);
      session.publish(msg);
    } else
      session.publish(new WindowMsg(rcvWindow));
    }
    else
    {
      // Both the LDAP server and the replication server believes that the
      // window is closed. Lets check the flowcontrol in case we
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -217,6 +217,8 @@
   *          the replication domain baseDN
   * @param offlineCSN
   *          The CSN (serverId and timestamp) for the replica's going offline
   * @throws ChangelogException
   *           If a database problem happened
   */
  void replicaOffline(DN baseDN, CSN offlineCSN);
  void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
}
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -103,7 +103,8 @@
  /**
   * Holds the last time each replica was seen alive, whether via updates or
   * heartbeats received. Data is held for each serverId cross domain.
   * heartbeat notifications, or offline notifications. Data is held for each
   * serverId cross domain.
   * <p>
   * Updates are persistent and stored in the replicaDBs, heartbeats are
   * transient and are easily constructed on normal operations.
@@ -221,6 +222,7 @@
   * @return true if the provided baseDN is enabled for the external changelog,
   *         false if the provided baseDN is disabled for the external changelog
   *         or unknown to multimaster replication.
   * @see MultimasterReplication#isECLEnabledDomain(DN)
   */
  protected boolean isECLEnabledDomain(DN baseDN)
  {
@@ -351,6 +353,20 @@
      nextChangeForInsertDBCursor.next();
    }
    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
        .entrySet())
    {
      final DN baseDN = entry.getKey();
      final List<CSN> offlineCSNs = entry.getValue();
      for (CSN offlineCSN : offlineCSNs)
      {
        if (isECLEnabledDomain(baseDN))
        {
          replicasOffline.update(baseDN, offlineCSN);
        }
      }
    }
    // this will not be used any more. Discard for garbage collection.
    this.changelogState = null;
  }
@@ -554,20 +570,33 @@
  }
  private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
      final DN mcBaseDN)
      final DN mcBaseDN) throws ChangelogException
  {
    // update, so it becomes the previous cookie for the next change
    mediumConsistencyRUV.update(mcBaseDN, mcCSN);
    mediumConsistency = Pair.of(mcBaseDN, mcCSN);
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
    if (offlineCSN != null
        && offlineCSN.isOlderThan(mcCSN)
        // If no new updates has been seen for this replica
        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
    final int mcServerId = mcCSN.getServerId();
    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
    if (offlineCSN != null)
    {
      removeCursor(mcBaseDN, mcCSN);
      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
      {
        // replica is back online, we can forget the last time it was offline
        replicasOffline.removeCSN(mcBaseDN, offlineCSN);
      }
      else if (offlineCSN.isOlderThan(mcCSN))
      {
        /*
         * replica is not back online and Medium consistency point has gone past
         * its last offline time: remove everything known about it: cursor,
         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
         * from the medium consistency RUV.
         */
        removeCursor(mcBaseDN, mcCSN);
        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
      }
    }
  }
@@ -587,6 +616,7 @@
  }
  private void removeCursor(final DN baseDN, final CSN csn)
      throws ChangelogException
  {
    for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
        : allCursors.entrySet())
@@ -601,6 +631,7 @@
          {
            iter.remove();
            StaticUtils.close(entry2.getValue());
            resetNextChangeForInsertDBCursor();
            return;
          }
        }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -826,13 +826,14 @@
  /** {@inheritDoc} */
  @Override
  public void replicaOffline(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    dbEnv.addOfflineReplica(baseDN, offlineCSN);
    final ChangeNumberIndexer indexer = cnIndexer.get();
    if (indexer != null)
    {
      indexer.replicaOffline(baseDN, offlineCSN);
    }
    // TODO save this state in the changelogStateDB?
  }
  /**
@@ -942,5 +943,13 @@
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
    /** {@inheritDoc} */
    @Override
    public void initiateShutdown()
    {
      super.initiateShutdown();
      this.interrupt(); // wake up the purger thread for faster shutdown
    }
  }
}
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -98,7 +98,11 @@
      synchronized (this)
      {
        closeCursor();
        // previously exhausted cursor must be able to reinitialize themselves
        // Previously exhausted cursor must be able to reinitialize themselves.
        // There is a risk of readLock never being unlocked
        // if following code is called while the cursor is closed.
        // It is better to let the deadlock happen to help quickly identifying
        // and fixing such issue with unit tests.
        cursor = db.openReadCursor(lastNonNullCurrentCSN);
        currentChange = cursor.next();
        if (currentChange != null)
@@ -131,7 +135,7 @@
  }
  /**
   * Called by the Gc when the object is garbage collected Release the internal
   * Called by the Gc when the object is garbage collected. Release the internal
   * cursor in case the cursor was badly used and {@link #close()} was never
   * called.
   */
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -38,9 +38,12 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -67,6 +70,7 @@
  private ReplicationServer replicationServer;
  private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
  private static final String GENERATION_ID_TAG = "GENID";
  private static final String OFFLINE_TAG = "OFFLINE";
  private static final String FIELD_SEPARATOR = " ";
  /** The tracer object for the debug logger. */
  private static final DebugTracer TRACER = getTracer();
@@ -268,6 +272,19 @@
          }
          result.setDomainGenerationId(baseDN, generationId);
        }
        else if (prefix.equals(OFFLINE_TAG))
        {
          final String[] str = stringKey.split(FIELD_SEPARATOR, 3);
          final int serverId = toInt(str[1]);
          final DN baseDN = DN.decode(str[2]);
          long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong();
          if (debugEnabled())
          {
            debug("has read replica offline: baseDN=" + baseDN + " serverId="
                + serverId);
          }
          result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
        }
        else
        {
          final String[] str = stringData.split(FIELD_SEPARATOR, 2);
@@ -275,7 +292,8 @@
          final DN baseDN = DN.decode(str[1]);
          if (debugEnabled())
          {
            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
            debug("has read replica: baseDN=" + baseDN + " serverId="
                + serverId);
          }
          result.addServerIdToDomain(serverId, baseDN);
        }
@@ -416,7 +434,7 @@
      // Opens the DB for the changes received from this server on this domain.
      final Database replicaDB = openDatabase(replicaEntry.getKey());
      putInChangelogStateDBIfNotExist(replicaEntry);
      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
      putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
      return replicaDB;
    }
@@ -436,7 +454,7 @@
   *          the replica's serverId
   * @return a database entry for the replica
   */
  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
  static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
  {
    final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
    return new SimpleImmutableEntry<String, String>(key, key);
@@ -452,30 +470,65 @@
   *          the domain's generationId
   * @return a database entry for the generationId
   */
  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
  static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
  {
    final String normDn = baseDN.toNormalizedString();
    final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
    final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
        + FIELD_SEPARATOR + normDn;
    return new SimpleImmutableEntry<String, String>(key, data);
    return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data));
  }
  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
      throws RuntimeException
  /**
   * Converts an Entry&lt;String, String&gt; to an Entry&lt;byte[], byte[]&gt;.
   *
   * @param entry
   *          the entry to convert
   * @return the converted entry
   */
  static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
  {
    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
    return new SimpleImmutableEntry<byte[], byte[]>(
        toBytes(entry.getKey()),
        toBytes(entry.getValue()));
  }
  /**
   * Return an entry to store in the changelog state database representing the
   * time a replica went offline.
   *
   * @param baseDN
   *          the replica's baseDN
   * @param offlineCSN
   *          the replica's serverId and offline timestamp
   * @return a database entry representing the time a replica went offline
   */
  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
  {
    final byte[] key =
        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
            + FIELD_SEPARATOR + baseDN.toNormalizedString());
    final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
    data.append(offlineCSN.getTime());
    return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
  }
  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
      throws ChangelogException, RuntimeException
  {
    DatabaseEntry key = new DatabaseEntry(entry.getKey());
    DatabaseEntry data = new DatabaseEntry();
    if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
    {
      Transaction txn = dbEnvironment.beginTransaction(null, null);
      try
      {
        data.setData(toBytes(entry.getValue()));
        data.setData(entry.getValue());
        if (debugEnabled())
        {
          debug("putting record in the changelogstate Db key=["
              + entry.getKey() + "] value=[" + entry.getValue() + "]");
              + toString(entry.getKey()) + "] value=["
              + toString(entry.getValue()) + "]");
        }
        changelogStateDb.put(txn, key, data);
        txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
@@ -585,11 +638,11 @@
   */
  public void clearServerId(DN baseDN, int serverId) throws ChangelogException
  {
    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
        "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
  }
  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
@@ -599,7 +652,7 @@
    try
    {
      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
      final DatabaseEntry data = new DatabaseEntry();
      if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
      {
@@ -620,18 +673,65 @@
          throw dbe;
        }
      }
      else
      else if (debugEnabled())
      {
        if (debugEnabled())
        {
          debug(methodInvocation + " failed: key=[ " + entry.getKey()
              + "] not found");
        }
        debug(methodInvocation + " failed: key not found");
      }
    }
    catch (RuntimeException dbe)
    catch (RuntimeException e)
    {
      throw new ChangelogException(dbe);
      if (debugEnabled())
      {
        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
      }
      throw new ChangelogException(e);
    }
  }
  /**
   * Add the information about an offline replica to the changelog state DB.
   *
   * @param baseDN
   *          the domain of the offline replica
   * @param offlineCSN
   *          the offline replica serverId and offline timestamp
   * @throws ChangelogException
   *           if a database problem occurred
   */
  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
      throws ChangelogException
  {
    // just overwrite any older entry as it is assumed a newly received offline
    // CSN is newer than the previous one
    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
  }
  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
      String methodInvocation) throws ChangelogException
  {
    if (debugEnabled())
    {
      debug(methodInvocation + " starting");
    }
    try
    {
      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
      final DatabaseEntry data = new DatabaseEntry(entry.getValue());
      changelogStateDb.put(null, key, data);
      if (debugEnabled())
      {
        debug(methodInvocation + " succeeded");
      }
    }
    catch (RuntimeException e)
    {
      if (debugEnabled())
      {
        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
      }
      throw new ChangelogException(e);
    }
  }
opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -22,7 +22,7 @@
 *
 *
 *      Copyright 2009 Sun Microsystems, Inc.
 *      Portions Copyright 2011-2013 ForgeRock AS
 *      Portions Copyright 2011-2014 ForgeRock AS
 */
package org.opends.server.replication.service;
@@ -35,7 +35,6 @@
import org.opends.server.replication.protocol.Session;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
import org.opends.server.util.TimeThread;
import static org.opends.server.loggers.debug.DebugLogger.*;
@@ -76,15 +75,15 @@
   * @param session The session on which heartbeats are to be sent.
   * @param heartbeatInterval The interval between heartbeats sent
   *                          (in milliseconds).
   * @param serverId2 The serverId of the sender domain.
   * @param serverId The serverId of the sender domain.
   */
  public CTHeartbeatPublisherThread(String threadName, Session session,
                  long heartbeatInterval, int serverId2)
                  long heartbeatInterval, int serverId)
  {
    super(threadName);
    this.session = session;
    this.heartbeatInterval = heartbeatInterval;
    this.serverId = serverId2;
    this.serverId = serverId;
  }
  /**
@@ -93,6 +92,7 @@
  @Override
  public void run()
  {
    long lastHeartbeatTime = 0;
    try
    {
      if (debugEnabled())
@@ -103,13 +103,12 @@
      while (!shutdown)
      {
        long now = System.currentTimeMillis();
        final CSN csn = new CSN(TimeThread.getTime(), 0, serverId);
        ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
        final long now = System.currentTimeMillis();
        if (now > session.getLastPublishTime() + heartbeatInterval)
        {
          session.publish(ctHeartbeatMsg);
          final CSN csn = new CSN(now, 0, serverId);
          session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
          lastHeartbeatTime = csn.getTime();
        }
        long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
@@ -138,6 +137,26 @@
          }
        }
      }
      if (shutdown)
      {
        /*
         * Shortcoming: this thread is restarted each time the DS reconnects,
         * e.g. during load balancing. This is not that much of a problem
         * because the ChangeNumberIndexer tolerates receiving replica offline
         * heartbeats and then receiving messages back again.
         */
        /*
         * However, during shutdown we need to be sure that all pending client
         * operations have either completed or have been aborted before shutting
         * down replication. Otherwise, the medium consistency will move forward
         * without knowing about these changes.
         */
        final long now = System.currentTimeMillis();
        final int seqNum = lastHeartbeatTime == now ? 1 : 0;
        final CSN offlineCSN = new CSN(now, seqNum, serverId);
        session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
      }
    }
    catch (IOException e)
    {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
New file
@@ -0,0 +1,84 @@
/*
 * CDDL HEADER START
 *
 * The contents of this file are subject to the terms of the
 * Common Development and Distribution License, Version 1.0 only
 * (the "License").  You may not use this file except in compliance
 * with the License.
 *
 * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
 * or http://forgerock.org/license/CDDLv1.0.html.
 * See the License for the specific language governing permissions
 * and limitations under the License.
 *
 * When distributing Covered Code, include this CDDL HEADER in each
 * file and include the License file at legal-notices/CDDLv1_0.txt.
 * If applicable, add the following below this CDDL HEADER, with the
 * fields enclosed by brackets "[]" replaced with your own identifying
 * information:
 *      Portions Copyright [yyyy] [name of copyright owner]
 *
 * CDDL HEADER END
 *
 *      Copyright 2014 ForgeRock AS
 */
package org.opends.server.replication.protocol;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.types.ByteStringBuilder;
import org.testng.Assert;
import org.testng.annotations.Test;
/**
 * Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes.
 */
@SuppressWarnings("javadoc")
public class ByteArrayTest extends DirectoryServerTestCase
{
  @Test
  public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception
  {
    final boolean bo = true;
    final byte by = 80;
    final short sh = 42;
    final int i = sh + 1;
    final long l = i + 1;
    final String st = "Yay!";
    final Collection<String> col = Arrays.asList("foo", "bar", "baz");
    final CSN csn = new CSN(42424242, 13, 42);
    byte[] bytes = new ByteArrayBuilder()
        .append(bo)
        .append(by)
        .append(sh)
        .append(i)
        .append(l)
        .append(st)
        .appendStrings(col)
        .appendUTF8(i)
        .appendUTF8(l)
        .append(csn)
        .appendUTF8(csn)
        .toByteArray();
    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
    Assert.assertEquals(scanner.nextBoolean(), bo);
    Assert.assertEquals(scanner.nextByte(), by);
    Assert.assertEquals(scanner.nextShort(), sh);
    Assert.assertEquals(scanner.nextInt(), i);
    Assert.assertEquals(scanner.nextLong(), l);
    Assert.assertEquals(scanner.nextString(), st);
    Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
    Assert.assertEquals(scanner.nextIntUTF8(), i);
    Assert.assertEquals(scanner.nextLongUTF8(), l);
    Assert.assertEquals(scanner.nextCSN(), csn);
    Assert.assertEquals(scanner.nextCSNUTF8(), csn);
    Assert.assertTrue(scanner.isEmpty());
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -51,6 +51,7 @@
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.replication.protocol.OperationContext.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
@@ -107,10 +108,8 @@
    List<Modification> mods4 = new ArrayList<Modification>();
    for (int i = 0; i < 10; i++)
    {
      Attribute attr = Attributes.create("description", "string"
          + String.valueOf(i));
      Modification mod = new Modification(ModificationType.ADD, attr);
      mods4.add(mod);
      Attribute attr = Attributes.create("description", "string" + i);
      mods4.add(new Modification(ModificationType.ADD, attr));
    }
    Attribute attr5 = Attributes.create("namingcontexts", TEST_ROOT_DN_STRING);
@@ -156,14 +155,14 @@
    msg.setAssuredMode(assuredMode);
    msg.setSafeDataLevel(safeDataLevel);
    // Set ECL entry inlcuded attributes
    // Set ECL entry included attributes
    if (entryAttrList != null)
    {
      msg.setEclIncludes(entryAttrList);
    }
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.getCurrentVersion());
        msg.getBytes(), getCurrentVersion());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.isAssured(), isAssured);
@@ -218,7 +217,7 @@
    // Check equals
    ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
        msg.getBytes(), REPLICATION_PROTOCOL_V1);
    assertFalse(msg.equals(null));
    assertFalse(msg.equals(new Object()));
@@ -302,7 +301,7 @@
    }
    msg.setInitiatorsName("johnny h");
    DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), ProtocolVersion.getCurrentVersion());
        msg.getBytes(), getCurrentVersion());
    assertEquals(msg.toString(), generatedMsg.toString());
    assertEquals(msg.getInitiatorsName(), generatedMsg.getInitiatorsName());
@@ -397,8 +396,8 @@
      msg.setEclIncludes(entryAttrList);
    }
    ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
        .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
    ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), getCurrentVersion());
    // Test that generated attributes match original attributes.
    assertEquals(generatedMsg.isAssured(), isAssured);
@@ -476,8 +475,8 @@
      msg.setEclIncludes(entryAttrList);
    }
    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
        .getBytes(), ProtocolVersion.getCurrentVersion());
    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(
        msg.getBytes(), getCurrentVersion());
    assertEquals(generatedMsg.getBytes(), msg.getBytes());
    assertEquals(generatedMsg.toString(), msg.toString());
@@ -828,6 +827,33 @@
    new StopMsg(msg.getBytes(getCurrentVersion()));
  }
  @Test
  public void changeTimeHeartbeatMsgTest() throws Exception
  {
    final short v1 = REPLICATION_PROTOCOL_V1;
    final short v7 = REPLICATION_PROTOCOL_V7;
    final short v8 = REPLICATION_PROTOCOL_V8;
    final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
    final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn);
    assertCTHearbeatMsg(heartbeatMsg, v1, false);
    assertCTHearbeatMsg(heartbeatMsg, v7, false);
    assertCTHearbeatMsg(heartbeatMsg, v8, false);
    final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn);
    assertCTHearbeatMsg(offlineMsg, v1, false);
    assertCTHearbeatMsg(offlineMsg, v7, false);
    assertCTHearbeatMsg(offlineMsg, v8, true);
  }
  private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg,
      short version, boolean expected) throws DataFormatException
  {
    final byte[] bytes = heartbeatMsg.getBytes(version);
    ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version);
    assertEquals(decodedMsg.isReplicaOfflineMsg(), expected);
  }
  /**
   * Test that WindowMsg encoding and decoding works
   * by checking that : msg == new WindowMsg(msg.getBytes()).
@@ -913,8 +939,7 @@
      throws Exception
  {
    TopologyMsg msg = new TopologyMsg(dsList, rsList);
    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
        ProtocolVersion.getCurrentVersion());
    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), getCurrentVersion());
    assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos());
    assertEquals(msg.getRsInfos(), newMsg.getRsInfos());
  }
@@ -1091,16 +1116,14 @@
    msg.setServerState(sid3, s3, now+3, false);
    byte[] b = msg.getBytes(getCurrentVersion());
    MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
    MonitorMsg newMsg = new MonitorMsg(b, getCurrentVersion());
    assertEquals(rsState, msg.getReplServerDbState());
    assertEquals(newMsg.getReplServerDbState().toString(),
        msg.getReplServerDbState().toString());
    Iterator<Integer> it = newMsg.ldapIterator();
    while (it.hasNext())
    for (int sid : toIterable(newMsg.ldapIterator()))
    {
      int sid = it.next();
      ServerState s = newMsg.getLDAPServerState(sid);
      if (sid == sid1)
      {
@@ -1118,10 +1141,8 @@
      }
    }
    Iterator<Integer> it2 = newMsg.rsIterator();
    while (it2.hasNext())
    for (int sid : toIterable(newMsg.rsIterator()))
    {
      int sid = it2.next();
      ServerState s = newMsg.getRSServerState(sid);
      if (sid == sid3)
      {
@@ -1380,7 +1401,7 @@
      encodemsg += (t4 - t31);
      // getBytes
      byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
      byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
      t5 = System.nanoTime();
      getbytes += (t5 - t4);
@@ -1455,7 +1476,7 @@
      encodemsg += (t4 - t31);
      // getBytes
      byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
      byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
      t5 = System.nanoTime();
      getbytes += (t5 - t4);
@@ -1529,7 +1550,7 @@
      encodemsg += (t4 - t31);
      // getBytes
      byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
      byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
      t5 = System.nanoTime();
      getbytes += (t5 - t4);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -387,9 +387,18 @@
    final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
    publishUpdateMsg(msg4);
    // MCP moved forward after receiving update from serverId1
    // MCP moves forward after receiving update from serverId1
    // (last replica in the domain)
    assertExternalChangelogContent(msg1, msg2, msg4);
    // serverId2 comes online again
    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
    publishUpdateMsg(msg5);
    // MCP does not move until it knows what happens to serverId1
    assertExternalChangelogContent(msg1, msg2, msg4);
    sendHeartbeat(BASE_DN1, serverId1, 6);
    // MCP moves forward
    assertExternalChangelogContent(msg1, msg2, msg4, msg5);
  }
  private void addReplica(DN baseDN, int serverId) throws Exception
@@ -417,11 +426,13 @@
    waitForWaitingState(cnIndexer);
  }
  private void stopCNIndexer()
  private void stopCNIndexer() throws Exception
  {
    if (cnIndexer != null)
    {
      cnIndexer.initiateShutdown();
      cnIndexer.interrupt();
      cnIndexer.join();
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -31,6 +31,7 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.types.DN;
@@ -43,6 +44,7 @@
import com.sleepycat.je.Environment;
import static java.util.Arrays.*;
import static java.util.Collections.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*;
@@ -51,78 +53,104 @@
public class ReplicationDbEnvTest extends DirectoryServerTestCase
{
   /**
    * Bypass heavyweight setup.
    */
   private final class TestableReplicationDbEnv extends ReplicationDbEnv
   {
      private TestableReplicationDbEnv() throws ChangelogException
      {
         super(null, null);
      }
  /**
   * Bypass heavyweight setup.
   */
  private final class TestableReplicationDbEnv extends ReplicationDbEnv
  {
    private TestableReplicationDbEnv() throws ChangelogException
    {
      super(null, null);
    }
      @Override
      protected Environment openJEEnvironment(String path)
      {
         return null;
      }
    @Override
    protected Environment openJEEnvironment(String path)
    {
      return null;
    }
      @Override
      protected Database openDatabase(String databaseName)
            throws ChangelogException, RuntimeException
      {
         return null;
      }
   }
    @Override
    protected Database openDatabase(String databaseName)
        throws ChangelogException, RuntimeException
    {
      return null;
    }
  }
   @BeforeClass
   public void setup() throws Exception
   {
      TestCaseUtils.startFakeServer();
   }
  @BeforeClass
  public void setup() throws Exception
  {
    TestCaseUtils.startFakeServer();
  }
   @AfterClass
   public void teardown()
   {
      TestCaseUtils.shutdownFakeServer();
   }
  @AfterClass
  public void teardown()
  {
    TestCaseUtils.shutdownFakeServer();
  }
   @DataProvider
   public Object[][] changelogStateDataProvider() throws Exception
   {
      return new Object[][] {
         { DN.decode("dc=example,dc=com"), 524157415, asList(42, 346) },
         // test with a space in the baseDN (space is the field separator in the DB)
         { DN.decode("cn=admin data"), 524157415, asList(42, 346) },
     };
   }
  @DataProvider
  public Object[][] changelogStateDataProvider() throws Exception
  {
    final int genId = 524157415;
    final int id1 = 42;
    final int id2 = 346;
    final int t1 = 1956245524;
    return new Object[][] {
      { DN.decode("dc=example,dc=com"), genId, EMPTY_LIST, EMPTY_LIST },
      { DN.decode("dc=example,dc=com"), genId, asList(id1, id2),
        asList(new CSN(id2, 0, t1)) },
      // test with a space in the baseDN (space is the field separator in the
      // DB)
      { DN.decode("cn=admin data"), genId, asList(id1, id2), EMPTY_LIST }, };
  }
   @Test(dataProvider = "changelogStateDataProvider")
   public void encodeDecodeChangelogState(DN baseDN, long generationId,
         List<Integer> serverIds) throws Exception
   {
      final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
  @Test(dataProvider = "changelogStateDataProvider")
  public void encodeDecodeChangelogState(DN baseDN, long generationId,
      List<Integer> replicas, List<CSN> offlineReplicas) throws Exception
  {
    final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
      // encode data
      final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
      put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId));
      for (Integer serverId : serverIds)
      {
         put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId));
      }
    // encode data
    final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
    put(wholeState, toGenIdEntry(baseDN, generationId));
    for (Integer serverId : replicas)
    {
      put(wholeState, toByteArray(toReplicaEntry(baseDN, serverId)));
    }
    for (CSN offlineCSN : offlineReplicas)
    {
      put(wholeState, toReplicaOfflineEntry(baseDN, offlineCSN));
    }
      // decode data
      final ChangelogState state =
            changelogStateDB.decodeChangelogState(wholeState);
      assertThat(state.getDomainToGenerationId()).containsExactly(
            entry(baseDN, generationId));
      assertThat(state.getDomainToServerIds()).containsExactly(
            entry(baseDN, serverIds));
   }
    // decode data
    final ChangelogState state =
        changelogStateDB.decodeChangelogState(wholeState);
    assertThat(state.getDomainToGenerationId()).containsExactly(
        entry(baseDN, generationId));
    if (!replicas.isEmpty())
    {
      assertThat(state.getDomainToServerIds()).containsExactly(
          entry(baseDN, replicas));
    }
    else
    {
      assertThat(state.getDomainToServerIds()).isEmpty();
    }
    if (!offlineReplicas.isEmpty())
    {
      assertThat(state.getOfflineReplicas()).containsExactly(
          entry(baseDN, offlineReplicas));
    }
    else
    {
      assertThat(state.getOfflineReplicas()).isEmpty();
    }
  }
   private void put(Map<byte[], byte[]> map, Entry<String, String> entry)
   {
      map.put(toBytes(entry.getKey()), toBytes(entry.getValue()));
   }
  private void put(Map<byte[], byte[]> map, Entry<byte[], byte[]> entry)
  {
    map.put(entry.getKey(), entry.getValue());
  }
}