From de36fa06856d8d04652401bb24e49c3259aef154 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 30 Apr 2014 10:26:42 +0000
Subject: [PATCH] OPENDJ-1259 (CR-3443) Make the Medium Consistency Point support replicas temporarily leaving the topology

---
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                   |   11 
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java                                |  142 ++++
 opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java                               |    8 
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                             |   51 +
 opends/src/server/org/opends/server/replication/protocol/StopMsg.java                                                    |   13 
 opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java                                           |  348 +++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java                      |   84 +++
 opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java                                           |  281 +++++++++++
 opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java                                            |   53 +
 opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java                                     |  108 ++-
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java    |  156 +++--
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java |   15 
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java             |   69 +
 opends/src/server/org/opends/server/replication/server/ServerHandler.java                                                |   15 
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                                      |   22 
 opends/src/server/org/opends/server/replication/server/ChangelogState.java                                               |   37 +
 opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java                            |    4 
 opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java                                  |   39 +
 18 files changed, 1,255 insertions(+), 201 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java b/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
new file mode 100644
index 0000000..9431c35
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ByteArrayBuilder.java
@@ -0,0 +1,348 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.io.UnsupportedEncodingException;
+import java.util.Collection;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.types.ByteStringBuilder;
+
+/**
+ * Byte array builder class encodes data into byte arrays to send messages over
+ * the replication protocol. Built on top of {@link ByteStringBuilder}, it
+ * isolates the latter against legacy type conversions from the replication
+ * protocol. It exposes a fluent API.
+ *
+ * @see ByteArrayScanner ByteArrayScanner class that decodes messages built with
+ *      current class.
+ */
+public class ByteArrayBuilder
+{
+
+  /** This is the null byte, also known as zero byte. */
+  public static final byte NULL_BYTE = 0;
+  private final ByteStringBuilder builder;
+
+  /**
+   * Constructs a ByteArrayBuilder.
+   */
+  public ByteArrayBuilder()
+  {
+    builder = new ByteStringBuilder();
+  }
+
+  /**
+   * Constructs a ByteArrayBuilder.
+   *
+   * @param capacity
+   *          the capacity of the underlying ByteStringBuilder
+   */
+  public ByteArrayBuilder(int capacity)
+  {
+    builder = new ByteStringBuilder(capacity);
+  }
+
+  /**
+   * Append a boolean to this ByteArrayBuilder.
+   *
+   * @param b
+   *          the boolean to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(boolean b)
+  {
+    append((byte) (b ? 1 : 0));
+    return this;
+  }
+
+  /**
+   * Append a byte to this ByteArrayBuilder.
+   *
+   * @param b
+   *          the byte to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(byte b)
+  {
+    builder.append(b);
+    return this;
+  }
+
+  /**
+   * Append a short to this ByteArrayBuilder.
+   *
+   * @param s
+   *          the short to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(short s)
+  {
+    builder.append(s);
+    return this;
+  }
+
+  /**
+   * Append an int to this ByteArrayBuilder.
+   *
+   * @param i
+   *          the long to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(int i)
+  {
+    builder.append(i);
+    return this;
+  }
+
+  /**
+   * Append a long to this ByteArrayBuilder.
+   *
+   * @param l
+   *          the long to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(long l)
+  {
+    builder.append(l);
+    return this;
+  }
+
+  /**
+   * Append an int to this ByteArrayBuilder by converting it to a String then
+   * encoding that string to a UTF-8 byte array.
+   *
+   * @param i
+   *          the int to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder appendUTF8(int i)
+  {
+    return append(Integer.toString(i));
+  }
+
+  /**
+   * Append a long to this ByteArrayBuilder by converting it to a String then
+   * encoding that string to a UTF-8 byte array.
+   *
+   * @param l
+   *          the long to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder appendUTF8(long l)
+  {
+    return append(Long.toString(l));
+  }
+
+  /**
+   * Append a Collection of Strings to this ByteArrayBuilder.
+   *
+   * @param col
+   *          the Collection of Strings to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder appendStrings(Collection<String> col)
+  {
+    append(col.size());
+    for (String s : col)
+    {
+      append(s);
+    }
+    return this;
+  }
+
+  /**
+   * Append a String to this ByteArrayBuilder.
+   *
+   * @param s
+   *          the String to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(String s)
+  {
+    try
+    {
+      append(s.getBytes("UTF-8"));
+    }
+    catch (UnsupportedEncodingException e)
+    {
+      throw new RuntimeException("Should never happen", e);
+    }
+    return this;
+  }
+
+  /**
+   * Append a CSN to this ByteArrayBuilder.
+   *
+   * @param csn
+   *          the CSN to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder append(CSN csn)
+  {
+    csn.toByteString(builder);
+    return this;
+  }
+
+  /**
+   * Append a CSN to this ByteArrayBuilder by converting it to a String then
+   * encoding that string to a UTF-8 byte array.
+   *
+   * @param csn
+   *          the CSN to append.
+   * @return this ByteArrayBuilder
+   */
+  public ByteArrayBuilder appendUTF8(CSN csn)
+  {
+    append(csn.toString());
+    return this;
+  }
+
+  private ByteArrayBuilder append(byte[] sBytes)
+  {
+    for (byte b : sBytes)
+    {
+      append(b);
+    }
+    append((byte) 0); // zero separator
+    return this;
+  }
+
+  /**
+   * Converts the content of this ByteStringBuilder to a byte array.
+   *
+   * @return the content of this ByteStringBuilder converted to a byte array.
+   */
+  public byte[] toByteArray()
+  {
+    return builder.toByteArray();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return builder.toString();
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * boolean fields when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of boolean fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended boolean fields.
+   */
+  public static int booleans(int nbFields)
+  {
+    return nbFields;
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * byte fields when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of byte fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended byte fields.
+   */
+  public static int bytes(int nbFields)
+  {
+    return nbFields;
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * short fields when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of short fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended short fields.
+   */
+  public static int shorts(int nbFields)
+  {
+    return 2 * nbFields;
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * int fields when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of int fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended int fields.
+   */
+  public static int ints(int nbFields)
+  {
+    return 4 * nbFields;
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * long fields when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of long fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended long fields.
+   */
+  public static int longs(int nbFields)
+  {
+    return 8 * nbFields;
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * CSN fields when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of CSN fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended CSN fields.
+   */
+  public static int csns(int nbFields)
+  {
+    return CSN.BYTE_ENCODING_LENGTH * nbFields;
+  }
+
+  /**
+   * Helper method that returns the number of bytes that would be used by the
+   * CSN fields encoded as a UTF8 string when appended to a ByteArrayBuilder.
+   *
+   * @param nbFields
+   *          the number of CSN fields that will be appended to a
+   *          ByteArrayBuilder
+   * @return the number of bytes occupied by the appended legacy-encoded CSN
+   *         fields.
+   */
+  public static int csnsUTF8(int nbFields)
+  {
+    return CSN.STRING_ENCODING_LENGTH * nbFields + 1 /* null byte */;
+  }
+}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java b/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
new file mode 100644
index 0000000..374bf0b
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/protocol/ByteArrayScanner.java
@@ -0,0 +1,281 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.Collection;
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.types.ByteSequenceReader;
+import org.opends.server.types.ByteString;
+
+/**
+ * Byte array scanner class helps decode data from byte arrays received via
+ * messages over the replication protocol. Built on top of
+ * {@link ByteSequenceReader}, it isolates the latter against legacy type
+ * conversions from the replication protocol.
+ *
+ * @see ByteArrayBuilder ByteArrayBuilder class that encodes messages read with
+ *      current class.
+ */
+public class ByteArrayScanner
+{
+
+  private final ByteSequenceReader bytes;
+
+  /**
+   * Builds a ByteArrayScanner object that will read from the supplied byte
+   * array.
+   *
+   * @param bytes
+   *          the byte array input that will be read from
+   */
+  public ByteArrayScanner(byte[] bytes)
+  {
+    this.bytes = ByteString.wrap(bytes).asReader();
+  }
+
+  /**
+   * Reads the next boolean.
+   *
+   * @return the next boolean
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public boolean nextBoolean() throws DataFormatException
+  {
+    return nextByte() != 0;
+  }
+
+  /**
+   * Reads the next byte.
+   *
+   * @return the next byte
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public byte nextByte() throws DataFormatException
+  {
+    try
+    {
+      return bytes.get();
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  /**
+   * Reads the next short.
+   *
+   * @return the next short
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public short nextShort() throws DataFormatException
+  {
+    try
+    {
+      return bytes.getShort();
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  /**
+   * Reads the next int.
+   *
+   * @return the next int
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public int nextInt() throws DataFormatException
+  {
+    try
+    {
+      return bytes.getInt();
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  /**
+   * Reads the next long.
+   *
+   * @return the next long
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public long nextLong() throws DataFormatException
+  {
+    try
+    {
+      return bytes.getLong();
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  /**
+   * Reads the next int that was encoded as a UTF8 string.
+   *
+   * @return the next int that was encoded as a UTF8 string.
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public int nextIntUTF8() throws DataFormatException
+  {
+    return Integer.valueOf(nextString());
+  }
+
+  /**
+   * Reads the next long that was encoded as a UTF8 string.
+   *
+   * @return the next long that was encoded as a UTF8 string.
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public long nextLongUTF8() throws DataFormatException
+  {
+    return Long.valueOf(nextString());
+  }
+
+  /**
+   * Reads the next UTF8-encoded string.
+   *
+   * @return the next UTF8-encoded string.
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public String nextString() throws DataFormatException
+  {
+    try
+    {
+      final String s = bytes.getString(findZeroSeparator());
+      bytes.skip(1); // skip the zero separator
+      return s;
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  private int findZeroSeparator() throws DataFormatException
+  {
+    int offset = 0;
+    final int remaining = bytes.remaining();
+    while (bytes.peek(offset) != 0 && offset < remaining)
+    {
+      offset++;
+    }
+    if (offset == remaining)
+    {
+      throw new DataFormatException("No more data to read from");
+    }
+    return offset;
+  }
+
+  /**
+   * Reads the next UTF8-encoded strings in the provided collection.
+   *
+   * @param output
+   *          the collection where to add the next UTF8-encoded strings
+   * @param <TCol>
+   *          the collection's concrete type
+   * @return the provided collection where the next UTF8-encoded strings have
+   *         been added.
+   * @throws DataFormatException
+   *           if no more data can be read from the input
+   */
+  public <TCol extends Collection<String>> TCol nextStrings(TCol output)
+      throws DataFormatException
+  {
+    final int colSize = nextInt();
+    for (int i = 0; i < colSize; i++)
+    {
+      output.add(nextString());
+    }
+    return output;
+  }
+
+  /**
+   * Reads the next CSN.
+   *
+   * @return the next CSN.
+   * @throws DataFormatException
+   *           if CSN was incorrectly encoded or no more data can be read from
+   *           the input
+   */
+  public CSN nextCSN() throws DataFormatException
+  {
+    try
+    {
+      return CSN.valueOf(bytes.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  /**
+   * Reads the next CSN that was encoded as a UTF8 string.
+   *
+   * @return the next CSN that was encoded as a UTF8 string.
+   * @throws DataFormatException
+   *           if legacy CSN was incorrectly encoded or no more data can be read
+   *           from the input
+   */
+  public CSN nextCSNUTF8() throws DataFormatException
+  {
+    try
+    {
+      return CSN.valueOf(nextString());
+    }
+    catch (IndexOutOfBoundsException e)
+    {
+      throw new DataFormatException(e.getMessage());
+    }
+  }
+
+  /**
+   * Returns whether the scanner has more bytes to consume.
+   *
+   * @return true if the scanner has more bytes to consume, false otherwise.
+   */
+  public boolean isEmpty()
+  {
+    return bytes.remaining() == 0;
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
index 51abace..853262c 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ChangeTimeHeartbeatMsg.java
@@ -29,9 +29,9 @@
 import java.util.zip.DataFormatException;
 
 import org.opends.server.replication.common.CSN;
-import org.opends.server.types.ByteSequenceReader;
-import org.opends.server.types.ByteString;
-import org.opends.server.types.ByteStringBuilder;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+import static org.opends.server.replication.protocol.ProtocolVersion.*;
 
 /**
  * Class that define messages sent by a replication domain (DS) to the
@@ -39,21 +39,48 @@
  */
 public class ChangeTimeHeartbeatMsg extends ReplicationMsg
 {
+  private static final byte NORMAL_HEARTBEAT = 0;
+  private static final byte REPLICA_OFFLINE_HEARTBEAT = 1;
+
   /**
    * The CSN containing the change time.
    */
   private final CSN csn;
+  /**
+   * The CSN containing the change time.
+   */
+  private final byte eventType;
+
+  private ChangeTimeHeartbeatMsg(CSN csn, byte eventType)
+  {
+    this.csn = csn;
+    this.eventType = eventType;
+  }
 
   /**
-   * Constructor of a Change Time Heartbeat message providing the change time
-   * value in a CSN.
+   * Factory method that builds a change time heartbeat message providing the
+   * change time value in a CSN.
    *
    * @param csn
    *          The provided CSN.
+   * @return a new ChangeTimeHeartbeatMsg
    */
-  public ChangeTimeHeartbeatMsg(CSN csn)
+  public static ChangeTimeHeartbeatMsg heartbeatMsg(CSN csn)
   {
-    this.csn = csn;
+    return new ChangeTimeHeartbeatMsg(csn, NORMAL_HEARTBEAT);
+  }
+
+  /**
+   * Factory method that builds a change time heartbeat message for a replica
+   * going offline.
+   *
+   * @param offlineCSN
+   *          the serverId and timestamp of the replica going offline
+   * @return a new ChangeTimeHeartbeatMsg
+   */
+  public static ChangeTimeHeartbeatMsg replicaOfflineMsg(CSN offlineCSN)
+  {
+    return new ChangeTimeHeartbeatMsg(offlineCSN, REPLICA_OFFLINE_HEARTBEAT);
   }
 
   /**
@@ -67,6 +94,17 @@
   }
 
   /**
+   * Returns whether this is a replica offline message.
+   *
+   * @return true if this is a replica offline message, false if this is a
+   *         regular heartbeat message.
+   */
+  public boolean isReplicaOfflineMsg()
+  {
+    return eventType == REPLICA_OFFLINE_HEARTBEAT;
+  }
+
+  /**
    * Creates a message from a provided byte array.
    *
    * @param in
@@ -79,32 +117,31 @@
   public ChangeTimeHeartbeatMsg(byte[] in, short version)
       throws DataFormatException
   {
-    final ByteSequenceReader reader = ByteString.wrap(in).asReader();
     try
     {
-      if (reader.get() != MSG_TYPE_CT_HEARTBEAT)
+      final ByteArrayScanner scanner = new ByteArrayScanner(in);
+      final byte msgType = scanner.nextByte();
+      if (msgType != MSG_TYPE_CT_HEARTBEAT)
       {
-        // Throw better exception below.
-        throw new IllegalArgumentException();
+        throw new DataFormatException("input is not a valid "
+            + getClass().getSimpleName() + " message: " + msgType);
       }
 
-      if (version >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
-      {
-        csn = CSN.valueOf(reader.getByteSequence(CSN.BYTE_ENCODING_LENGTH));
-      }
-      else
-      {
-        csn = CSN.valueOf(reader.getString(CSN.STRING_ENCODING_LENGTH));
-        reader.get(); // Read trailing 0 byte.
-      }
+      csn = version >= REPLICATION_PROTOCOL_V7
+          ? scanner.nextCSN()
+          : scanner.nextCSNUTF8();
+      eventType = version >= REPLICATION_PROTOCOL_V8
+          ? scanner.nextByte()
+          : NORMAL_HEARTBEAT;
 
-      if (reader.remaining() > 0)
+      if (!scanner.isEmpty())
       {
-        // Throw better exception below.
-        throw new IllegalArgumentException();
+        throw new DataFormatException(
+            "Did not expect to find more bytes to read for "
+            + getClass().getSimpleName() + " message.");
       }
     }
-    catch (Exception e)
+    catch (RuntimeException e)
     {
       // Index out of bounds, bad format, etc.
       throw new DataFormatException("byte[] is not a valid CT_HEARTBEAT msg");
@@ -115,24 +152,22 @@
   @Override
   public byte[] getBytes(short protocolVersion)
   {
-    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V7)
+    if (protocolVersion < ProtocolVersion.REPLICATION_PROTOCOL_V7)
     {
-      final ByteStringBuilder builder = new ByteStringBuilder(
-          CSN.BYTE_ENCODING_LENGTH + 1 /* type + csn */);
+      ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csnsUTF8(1));
       builder.append(MSG_TYPE_CT_HEARTBEAT);
-      csn.toByteString(builder);
+      builder.appendUTF8(csn);
       return builder.toByteArray();
     }
-    else
+
+    final ByteArrayBuilder builder = new ByteArrayBuilder(bytes(1) + csns(1));
+    builder.append(MSG_TYPE_CT_HEARTBEAT);
+    builder.append(csn);
+    if (protocolVersion >= ProtocolVersion.REPLICATION_PROTOCOL_V8)
     {
-      final ByteStringBuilder builder = new ByteStringBuilder(
-          CSN.STRING_ENCODING_LENGTH + 2 /* type + csn str + nul */);
-      builder.append(MSG_TYPE_CT_HEARTBEAT);
-      builder.append(csn.toString());
-      builder.append((byte) 0); // For compatibility with earlier protocol
-                                // versions.
-      return builder.toByteArray();
+      builder.append(eventType);
     }
+    return builder.toByteArray();
   }
 
   /** {@inheritDoc} */
@@ -141,4 +176,5 @@
   {
     return getClass().getSimpleName() + ", csn=" + csn.toStringUI();
   }
+
 }
diff --git a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 9eb2fc4..1e6e2ba 100644
--- a/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opends/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -22,11 +22,10 @@
  *
  *
  *      Copyright 2006-2010 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.protocol;
 
-
 /**
  * The version utility class for the replication protocol.
  */
@@ -43,52 +42,72 @@
   public static final short REPLICATION_PROTOCOL_V1_REAL = 49;
   /**
    * The constant for the second version of the replication protocol.
-   * Add fields in the header for assured replication.
+   * <ul>
+   * <li>Add fields in the header for assured replication.</li>
+   * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V2 = 2;
 
   /**
    * The constant for the 3rd version of the replication protocol.
-   * Add messages for remote ECL : not used as of today.
+   * <ul>
+   * <li>Add messages for remote ECL : not used as of today.</li>
+   * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V3 = 3;
 
   /**
    * The constant for the 4th version of the replication protocol.
-   * - Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
-   *   ECL entry attributes.
-   * - Modified algorithm for choosing a RS to connect to: introduction of a
-   *   ReplicationServerDSMsg message.
-   *   -> also added of the server URL in RSInfo of TopologyMsg
-   * - Introduction of a StopMsg for proper connections ending.
-   * - Initialization failover/flow control
+   * <ul>
+   * <li>Add to the body of the ADD/MOD/MODDN/DEL msgs, a list of attribute for
+   * ECL entry attributes.</li>
+   * <li>Modified algorithm for choosing a RS to connect to: introduction of a
+   * ReplicationServerDSMsg message.</li>
+   * <li>also added of the server URL in RSInfo of TopologyMsg</li>
+   * <li>Introduction of a StopMsg for proper connections ending.</li>
+   * <li>Initialization failover/flow control</li>
+   * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V4 = 4;
 
   /**
    * The constant for the 5th version of the replication protocol.
-   * - Add support for wild-cards in change log included attributes
-   * - Add support for specifying additional included attributes for deletes
-   * - See OPENDJ-194.
+   * <ul>
+   * <li>Add support for wild-cards in change log included attributes</li>
+   * <li>Add support for specifying additional included attributes for deletes</li>
+   * <li>See OPENDJ-194.</li>
+   * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V5 = 5;
 
   /**
    * The constant for the 6th version of the replication protocol.
-   * - include DS local URL in the DSInfo of TopologyMsg.
+   * <ul>
+   * <li>include DS local URL in the DSInfo of TopologyMsg.</li>
+   * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V6 = 6;
 
   /**
    * The constant for the 7th version of the replication protocol.
-   * - compact encoding for length, CSNs, and server IDs.
+   * <ul>
+   * <li>compact encoding for length, CSNs, and server IDs.</li>
+   * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V7 = 7;
 
   /**
+   * The constant for the 8th version of the replication protocol.
+   * <ul>
+   * <li>StopMsg now has a timestamp to communicate the replica stop time.</li>
+   * </ul>
+   */
+  public static final short REPLICATION_PROTOCOL_V8 = 8;
+
+  /**
    * The replication protocol version used by the instance of RS/DS in this VM.
    */
-  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V7;
+  private static final short CURRENT_VERSION = REPLICATION_PROTOCOL_V8;
 
   /**
    * Gets the current version of the replication protocol.
diff --git a/opends/src/server/org/opends/server/replication/protocol/StopMsg.java b/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
index 30228b3..c03dbf0 100644
--- a/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
+++ b/opends/src/server/org/opends/server/replication/protocol/StopMsg.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions copyright 2013 ForgeRock AS.
+ *      Portions copyright 2013-2014 ForgeRock AS.
  */
 package org.opends.server.replication.protocol;
 
@@ -56,9 +56,7 @@
         in[0]);
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
   public byte[] getBytes(short protocolVersion)
   {
@@ -67,4 +65,11 @@
         MSG_TYPE_STOP
       };
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName();
+  }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ChangelogState.java b/opends/src/server/org/opends/server/replication/server/ChangelogState.java
index a454a35..a0cb04b 100644
--- a/opends/src/server/org/opends/server/replication/server/ChangelogState.java
+++ b/opends/src/server/org/opends/server/replication/server/ChangelogState.java
@@ -21,7 +21,7 @@
  * CDDL HEADER END
  *
  *
- *      Copyright 2013 ForgeRock AS
+ *      Copyright 2013-2014 ForgeRock AS
  */
 package org.opends.server.replication.server;
 
@@ -30,6 +30,7 @@
 import java.util.List;
 import java.util.Map;
 
+import org.opends.server.replication.common.CSN;
 import org.opends.server.types.DN;
 
 /**
@@ -49,6 +50,8 @@
   private final Map<DN, Long> domainToGenerationId = new HashMap<DN, Long>();
   private final Map<DN, List<Integer>> domainToServerIds =
       new HashMap<DN, List<Integer>>();
+  private final Map<DN, List<CSN>> offlineReplicas =
+      new HashMap<DN, List<CSN>>();
 
   /**
    * Sets the generationId for the supplied replication domain.
@@ -83,6 +86,25 @@
   }
 
   /**
+   * Adds the following replica information to the offline list.
+   *
+   * @param baseDN
+   *          the baseDN of the offline replica
+   * @param offlineCSN
+   *          the CSN (serverId + timestamp) of the offline replica
+   */
+  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+  {
+    List<CSN> offlineCSNs = offlineReplicas.get(baseDN);
+    if (offlineCSNs == null)
+    {
+      offlineCSNs = new LinkedList<CSN>();
+      offlineReplicas.put(baseDN, offlineCSNs);
+    }
+    offlineCSNs.add(offlineCSN);
+  }
+
+  /**
    * Returns the Map of domainBaseDN => generationId.
    *
    * @return a Map of domainBaseDN => generationId
@@ -102,11 +124,22 @@
     return domainToServerIds;
   }
 
+  /**
+   * Returns the Map of domainBaseDN => List&lt;offlineCSN&gt;.
+   *
+   * @return a Map of domainBaseDN => List&lt;offlineCSN&gt;.
+   */
+  public Map<DN, List<CSN>> getOfflineReplicas()
+  {
+    return offlineReplicas;
+  }
+
   /** {@inheritDoc} */
   @Override
   public String toString()
   {
     return "domainToGenerationId=" + domainToGenerationId
-        + ", domainToServerIds=" + domainToServerIds;
+        + ", domainToServerIds=" + domainToServerIds
+        + ", offlineReplicas=" + offlineReplicas;
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index e2fddc1..8462ce6 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2530,11 +2530,29 @@
    * value received, and forwarding the message to the other RSes.
    * @param senderHandler The handler for the server that sent the heartbeat.
    * @param msg The message to process.
+   * @throws DirectoryException
+   *           if a problem occurs
    */
   void processChangeTimeHeartbeatMsg(ServerHandler senderHandler,
-      ChangeTimeHeartbeatMsg msg)
+      ChangeTimeHeartbeatMsg msg) throws DirectoryException
   {
-    domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+    try
+    {
+      if (msg.isReplicaOfflineMsg())
+      {
+        domainDB.replicaOffline(baseDN, msg.getCSN());
+      }
+      else
+      {
+        domainDB.replicaHeartbeat(baseDN, msg.getCSN());
+      }
+    }
+    catch (ChangelogException e)
+    {
+      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e
+          .getMessageObject(), e);
+    }
+
     if (senderHandler.isDataServer())
     {
       /*
diff --git a/opends/src/server/org/opends/server/replication/server/ServerHandler.java b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
index e35ddbc..291dcdb 100644
--- a/opends/src/server/org/opends/server/replication/server/ServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ServerHandler.java
@@ -839,14 +839,19 @@
   /**
    * Processes a change time heartbeat msg.
    *
-   * @param msg The message to be processed.
+   * @param msg
+   *          The message to be processed.
+   * @throws DirectoryException
+   *           When an exception is raised.
    */
-  void process(ChangeTimeHeartbeatMsg msg)
+  void process(ChangeTimeHeartbeatMsg msg) throws DirectoryException
   {
     if (debugEnabled())
+    {
       TRACER.debugInfo("In "
           + replicationServerDomain.getLocalRSMonitorInstanceName() + " "
           + this + " processes received msg:\n" + msg);
+    }
     replicationServerDomain.processChangeTimeHeartbeatMsg(this, msg);
   }
 
@@ -865,9 +870,9 @@
       // lets update the LDAP server with out current window size and hope
       // that everything will work better in the future.
       // TODO also log an error message.
-      WindowMsg msg = new WindowMsg(rcvWindow);
-      session.publish(msg);
-    } else
+      session.publish(new WindowMsg(rcvWindow));
+    }
+    else
     {
       // Both the LDAP server and the replication server believes that the
       // window is closed. Lets check the flowcontrol in case we
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 2f2583e..d5bdf8f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -217,6 +217,8 @@
    *          the replication domain baseDN
    * @param offlineCSN
    *          The CSN (serverId and timestamp) for the replica's going offline
+   * @throws ChangelogException
+   *           If a database problem happened
    */
-  void replicaOffline(DN baseDN, CSN offlineCSN);
+  void replicaOffline(DN baseDN, CSN offlineCSN) throws ChangelogException;
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 3b44c84..5ee48e5 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -103,7 +103,8 @@
 
   /**
    * Holds the last time each replica was seen alive, whether via updates or
-   * heartbeats received. Data is held for each serverId cross domain.
+   * heartbeat notifications, or offline notifications. Data is held for each
+   * serverId cross domain.
    * <p>
    * Updates are persistent and stored in the replicaDBs, heartbeats are
    * transient and are easily constructed on normal operations.
@@ -221,6 +222,7 @@
    * @return true if the provided baseDN is enabled for the external changelog,
    *         false if the provided baseDN is disabled for the external changelog
    *         or unknown to multimaster replication.
+   * @see MultimasterReplication#isECLEnabledDomain(DN)
    */
   protected boolean isECLEnabledDomain(DN baseDN)
   {
@@ -351,6 +353,20 @@
       nextChangeForInsertDBCursor.next();
     }
 
+    for (Entry<DN, List<CSN>> entry : changelogState.getOfflineReplicas()
+        .entrySet())
+    {
+      final DN baseDN = entry.getKey();
+      final List<CSN> offlineCSNs = entry.getValue();
+      for (CSN offlineCSN : offlineCSNs)
+      {
+        if (isECLEnabledDomain(baseDN))
+        {
+          replicasOffline.update(baseDN, offlineCSN);
+        }
+      }
+    }
+
     // this will not be used any more. Discard for garbage collection.
     this.changelogState = null;
   }
@@ -554,20 +570,33 @@
   }
 
   private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
-      final DN mcBaseDN)
+      final DN mcBaseDN) throws ChangelogException
   {
     // update, so it becomes the previous cookie for the next change
     mediumConsistencyRUV.update(mcBaseDN, mcCSN);
     mediumConsistency = Pair.of(mcBaseDN, mcCSN);
-    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcCSN.getServerId());
-    if (offlineCSN != null
-        && offlineCSN.isOlderThan(mcCSN)
-        // If no new updates has been seen for this replica
-        && lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN))
+    final int mcServerId = mcCSN.getServerId();
+    final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
+    final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
+    if (offlineCSN != null)
     {
-      removeCursor(mcBaseDN, mcCSN);
-      replicasOffline.removeCSN(mcBaseDN, offlineCSN);
-      mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+      if (lastAliveCSN != null && offlineCSN.isOlderThan(lastAliveCSN))
+      {
+        // replica is back online, we can forget the last time it was offline
+        replicasOffline.removeCSN(mcBaseDN, offlineCSN);
+      }
+      else if (offlineCSN.isOlderThan(mcCSN))
+      {
+        /*
+         * replica is not back online and Medium consistency point has gone past
+         * its last offline time: remove everything known about it: cursor,
+         * offlineCSN from lastAliveCSN and remove all knowledge of this replica
+         * from the medium consistency RUV.
+         */
+        removeCursor(mcBaseDN, mcCSN);
+        lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
+        mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
+      }
     }
   }
 
@@ -587,6 +616,7 @@
   }
 
   private void removeCursor(final DN baseDN, final CSN csn)
+      throws ChangelogException
   {
     for (Entry<DN, Map<Integer, DBCursor<UpdateMsg>>> entry1
         : allCursors.entrySet())
@@ -601,6 +631,7 @@
           {
             iter.remove();
             StaticUtils.close(entry2.getValue());
+            resetNextChangeForInsertDBCursor();
             return;
           }
         }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index dd337e5..e23f4cb 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -826,13 +826,14 @@
   /** {@inheritDoc} */
   @Override
   public void replicaOffline(DN baseDN, CSN offlineCSN)
+      throws ChangelogException
   {
+    dbEnv.addOfflineReplica(baseDN, offlineCSN);
     final ChangeNumberIndexer indexer = cnIndexer.get();
     if (indexer != null)
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
-    // TODO save this state in the changelogStateDB?
   }
 
   /**
@@ -942,5 +943,13 @@
       // wait a bit before purging more
       return DEFAULT_SLEEP;
     }
+
+    /** {@inheritDoc} */
+    @Override
+    public void initiateShutdown()
+    {
+      super.initiateShutdown();
+      this.interrupt(); // wake up the purger thread for faster shutdown
+    }
   }
 }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 577fc39..27b0175 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -98,7 +98,11 @@
       synchronized (this)
       {
         closeCursor();
-        // previously exhausted cursor must be able to reinitialize themselves
+        // Previously exhausted cursor must be able to reinitialize themselves.
+        // There is a risk of readLock never being unlocked
+        // if following code is called while the cursor is closed.
+        // It is better to let the deadlock happen to help quickly identifying
+        // and fixing such issue with unit tests.
         cursor = db.openReadCursor(lastNonNullCurrentCSN);
         currentChange = cursor.next();
         if (currentChange != null)
@@ -131,7 +135,7 @@
   }
 
   /**
-   * Called by the Gc when the object is garbage collected Release the internal
+   * Called by the Gc when the object is garbage collected. Release the internal
    * cursor in case the cursor was badly used and {@link #close()} was never
    * called.
    */
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
index e4d7283..c259985 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnv.java
@@ -38,9 +38,12 @@
 import org.opends.messages.Message;
 import org.opends.messages.MessageBuilder;
 import org.opends.server.loggers.debug.DebugTracer;
+import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.ReplicationServer;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.types.ByteString;
+import org.opends.server.types.ByteStringBuilder;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
 
@@ -67,6 +70,7 @@
   private ReplicationServer replicationServer;
   private final AtomicBoolean isShuttingDown = new AtomicBoolean(false);
   private static final String GENERATION_ID_TAG = "GENID";
+  private static final String OFFLINE_TAG = "OFFLINE";
   private static final String FIELD_SEPARATOR = " ";
   /** The tracer object for the debug logger. */
   private static final DebugTracer TRACER = getTracer();
@@ -268,6 +272,19 @@
           }
           result.setDomainGenerationId(baseDN, generationId);
         }
+        else if (prefix.equals(OFFLINE_TAG))
+        {
+          final String[] str = stringKey.split(FIELD_SEPARATOR, 3);
+          final int serverId = toInt(str[1]);
+          final DN baseDN = DN.decode(str[2]);
+          long timestamp = ByteString.wrap(entry.getValue()).asReader().getLong();
+          if (debugEnabled())
+          {
+            debug("has read replica offline: baseDN=" + baseDN + " serverId="
+                + serverId);
+          }
+          result.addOfflineReplica(baseDN, new CSN(timestamp, 0, serverId));
+        }
         else
         {
           final String[] str = stringData.split(FIELD_SEPARATOR, 2);
@@ -275,7 +292,8 @@
           final DN baseDN = DN.decode(str[1]);
           if (debugEnabled())
           {
-            debug("has read replica: baseDN=" + baseDN + " serverId=" + serverId);
+            debug("has read replica: baseDN=" + baseDN + " serverId="
+                + serverId);
           }
           result.addServerIdToDomain(serverId, baseDN);
         }
@@ -416,7 +434,7 @@
       // Opens the DB for the changes received from this server on this domain.
       final Database replicaDB = openDatabase(replicaEntry.getKey());
 
-      putInChangelogStateDBIfNotExist(replicaEntry);
+      putInChangelogStateDBIfNotExist(toByteArray(replicaEntry));
       putInChangelogStateDBIfNotExist(toGenIdEntry(baseDN, generationId));
       return replicaDB;
     }
@@ -436,7 +454,7 @@
    *          the replica's serverId
    * @return a database entry for the replica
    */
-  Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
+  static Entry<String, String> toReplicaEntry(DN baseDN, int serverId)
   {
     final String key = serverId + FIELD_SEPARATOR + baseDN.toNormalizedString();
     return new SimpleImmutableEntry<String, String>(key, key);
@@ -452,30 +470,65 @@
    *          the domain's generationId
    * @return a database entry for the generationId
    */
-  Entry<String, String> toGenIdEntry(DN baseDN, long generationId)
+  static Entry<byte[], byte[]> toGenIdEntry(DN baseDN, long generationId)
   {
     final String normDn = baseDN.toNormalizedString();
     final String key = GENERATION_ID_TAG + FIELD_SEPARATOR + normDn;
     final String data = GENERATION_ID_TAG + FIELD_SEPARATOR + generationId
         + FIELD_SEPARATOR + normDn;
-    return new SimpleImmutableEntry<String, String>(key, data);
+    return new SimpleImmutableEntry<byte[], byte[]>(toBytes(key),toBytes(data));
   }
 
-  private void putInChangelogStateDBIfNotExist(Entry<String, String> entry)
-      throws RuntimeException
+  /**
+   * Converts an Entry&lt;String, String&gt; to an Entry&lt;byte[], byte[]&gt;.
+   *
+   * @param entry
+   *          the entry to convert
+   * @return the converted entry
+   */
+  static Entry<byte[], byte[]> toByteArray(Entry<String, String> entry)
   {
-    DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+    return new SimpleImmutableEntry<byte[], byte[]>(
+        toBytes(entry.getKey()),
+        toBytes(entry.getValue()));
+  }
+
+  /**
+   * Return an entry to store in the changelog state database representing the
+   * time a replica went offline.
+   *
+   * @param baseDN
+   *          the replica's baseDN
+   * @param offlineCSN
+   *          the replica's serverId and offline timestamp
+   * @return a database entry representing the time a replica went offline
+   */
+  static Entry<byte[], byte[]> toReplicaOfflineEntry(DN baseDN, CSN offlineCSN)
+  {
+    final byte[] key =
+        toBytes(OFFLINE_TAG + FIELD_SEPARATOR + offlineCSN.getServerId()
+            + FIELD_SEPARATOR + baseDN.toNormalizedString());
+    final ByteStringBuilder data = new ByteStringBuilder(8); // store a long
+    data.append(offlineCSN.getTime());
+    return new SimpleImmutableEntry<byte[], byte[]>(key, data.toByteArray());
+  }
+
+  private void putInChangelogStateDBIfNotExist(Entry<byte[], byte[]> entry)
+      throws ChangelogException, RuntimeException
+  {
+    DatabaseEntry key = new DatabaseEntry(entry.getKey());
     DatabaseEntry data = new DatabaseEntry();
     if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == NOTFOUND)
     {
       Transaction txn = dbEnvironment.beginTransaction(null, null);
       try
       {
-        data.setData(toBytes(entry.getValue()));
+        data.setData(entry.getValue());
         if (debugEnabled())
         {
           debug("putting record in the changelogstate Db key=["
-              + entry.getKey() + "] value=[" + entry.getValue() + "]");
+              + toString(entry.getKey()) + "] value=["
+              + toString(entry.getValue()) + "]");
         }
         changelogStateDb.put(txn, key, data);
         txn.commit(Durability.COMMIT_WRITE_NO_SYNC);
@@ -585,11 +638,11 @@
    */
   public void clearServerId(DN baseDN, int serverId) throws ChangelogException
   {
-    deleteFromChangelogStateDB(toReplicaEntry(baseDN, serverId),
+    deleteFromChangelogStateDB(toByteArray(toReplicaEntry(baseDN, serverId)),
         "clearServerId(baseDN=" + baseDN + " , serverId=" + serverId + ")");
   }
 
-  private void deleteFromChangelogStateDB(Entry<String, ?> entry,
+  private void deleteFromChangelogStateDB(Entry<byte[], ?> entry,
       String methodInvocation) throws ChangelogException
   {
     if (debugEnabled())
@@ -599,7 +652,7 @@
 
     try
     {
-      final DatabaseEntry key = new DatabaseEntry(toBytes(entry.getKey()));
+      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
       final DatabaseEntry data = new DatabaseEntry();
       if (changelogStateDb.get(null, key, data, LockMode.DEFAULT) == SUCCESS)
       {
@@ -620,18 +673,65 @@
           throw dbe;
         }
       }
-      else
+      else if (debugEnabled())
       {
-        if (debugEnabled())
-        {
-          debug(methodInvocation + " failed: key=[ " + entry.getKey()
-              + "] not found");
-        }
+        debug(methodInvocation + " failed: key not found");
       }
     }
-    catch (RuntimeException dbe)
+    catch (RuntimeException e)
     {
-      throw new ChangelogException(dbe);
+      if (debugEnabled())
+      {
+        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+      }
+      throw new ChangelogException(e);
+    }
+  }
+
+  /**
+   * Add the information about an offline replica to the changelog state DB.
+   *
+   * @param baseDN
+   *          the domain of the offline replica
+   * @param offlineCSN
+   *          the offline replica serverId and offline timestamp
+   * @throws ChangelogException
+   *           if a database problem occurred
+   */
+  public void addOfflineReplica(DN baseDN, CSN offlineCSN)
+      throws ChangelogException
+  {
+    // just overwrite any older entry as it is assumed a newly received offline
+    // CSN is newer than the previous one
+    putInChangelogStateDB(toReplicaOfflineEntry(baseDN, offlineCSN),
+        "replicaOffline(baseDN=" + baseDN + ", offlineCSN=" + offlineCSN + ")");
+  }
+
+  private void putInChangelogStateDB(Entry<byte[], byte[]> entry,
+      String methodInvocation) throws ChangelogException
+  {
+    if (debugEnabled())
+    {
+      debug(methodInvocation + " starting");
+    }
+
+    try
+    {
+      final DatabaseEntry key = new DatabaseEntry(entry.getKey());
+      final DatabaseEntry data = new DatabaseEntry(entry.getValue());
+      changelogStateDb.put(null, key, data);
+      if (debugEnabled())
+      {
+        debug(methodInvocation + " succeeded");
+      }
+    }
+    catch (RuntimeException e)
+    {
+      if (debugEnabled())
+      {
+        debug(methodInvocation + " error: " + stackTraceToSingleLineString(e));
+      }
+      throw new ChangelogException(e);
     }
   }
 
diff --git a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
index faeab2b..6b090d9 100644
--- a/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
+++ b/opends/src/server/org/opends/server/replication/service/CTHeartbeatPublisherThread.java
@@ -22,7 +22,7 @@
  *
  *
  *      Copyright 2009 Sun Microsystems, Inc.
- *      Portions Copyright 2011-2013 ForgeRock AS
+ *      Portions Copyright 2011-2014 ForgeRock AS
  */
 package org.opends.server.replication.service;
 
@@ -35,7 +35,6 @@
 import org.opends.server.replication.protocol.Session;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
-import org.opends.server.util.TimeThread;
 
 import static org.opends.server.loggers.debug.DebugLogger.*;
 
@@ -76,15 +75,15 @@
    * @param session The session on which heartbeats are to be sent.
    * @param heartbeatInterval The interval between heartbeats sent
    *                          (in milliseconds).
-   * @param serverId2 The serverId of the sender domain.
+   * @param serverId The serverId of the sender domain.
    */
   public CTHeartbeatPublisherThread(String threadName, Session session,
-                  long heartbeatInterval, int serverId2)
+                  long heartbeatInterval, int serverId)
   {
     super(threadName);
     this.session = session;
     this.heartbeatInterval = heartbeatInterval;
-    this.serverId = serverId2;
+    this.serverId = serverId;
   }
 
   /**
@@ -93,6 +92,7 @@
   @Override
   public void run()
   {
+    long lastHeartbeatTime = 0;
     try
     {
       if (debugEnabled())
@@ -103,13 +103,12 @@
 
       while (!shutdown)
       {
-        long now = System.currentTimeMillis();
-        final CSN csn = new CSN(TimeThread.getTime(), 0, serverId);
-        ChangeTimeHeartbeatMsg ctHeartbeatMsg = new ChangeTimeHeartbeatMsg(csn);
-
+        final long now = System.currentTimeMillis();
         if (now > session.getLastPublishTime() + heartbeatInterval)
         {
-          session.publish(ctHeartbeatMsg);
+          final CSN csn = new CSN(now, 0, serverId);
+          session.publish(ChangeTimeHeartbeatMsg.heartbeatMsg(csn));
+          lastHeartbeatTime = csn.getTime();
         }
 
         long sleepTime = session.getLastPublishTime() + heartbeatInterval - now;
@@ -138,6 +137,26 @@
           }
         }
       }
+
+      if (shutdown)
+      {
+        /*
+         * Shortcoming: this thread is restarted each time the DS reconnects,
+         * e.g. during load balancing. This is not that much of a problem
+         * because the ChangeNumberIndexer tolerates receiving replica offline
+         * heartbeats and then receiving messages back again.
+         */
+        /*
+         * However, during shutdown we need to be sure that all pending client
+         * operations have either completed or have been aborted before shutting
+         * down replication. Otherwise, the medium consistency will move forward
+         * without knowing about these changes.
+         */
+        final long now = System.currentTimeMillis();
+        final int seqNum = lastHeartbeatTime == now ? 1 : 0;
+        final CSN offlineCSN = new CSN(now, seqNum, serverId);
+        session.publish(ChangeTimeHeartbeatMsg.replicaOfflineMsg(offlineCSN));
+      }
     }
     catch (IOException e)
     {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
new file mode 100644
index 0000000..21133fc
--- /dev/null
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/ByteArrayTest.java
@@ -0,0 +1,84 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.opends.server.DirectoryServerTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.types.ByteStringBuilder;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+/**
+ * Test for {@link ByteStringBuilder} and {@link ByteArrayScanner} classes.
+ */
+@SuppressWarnings("javadoc")
+public class ByteArrayTest extends DirectoryServerTestCase
+{
+
+  @Test
+  public void testBuilderAppendMethodsAndScannerNextMethods() throws Exception
+  {
+    final boolean bo = true;
+    final byte by = 80;
+    final short sh = 42;
+    final int i = sh + 1;
+    final long l = i + 1;
+    final String st = "Yay!";
+    final Collection<String> col = Arrays.asList("foo", "bar", "baz");
+    final CSN csn = new CSN(42424242, 13, 42);
+
+    byte[] bytes = new ByteArrayBuilder()
+        .append(bo)
+        .append(by)
+        .append(sh)
+        .append(i)
+        .append(l)
+        .append(st)
+        .appendStrings(col)
+        .appendUTF8(i)
+        .appendUTF8(l)
+        .append(csn)
+        .appendUTF8(csn)
+        .toByteArray();
+
+    final ByteArrayScanner scanner = new ByteArrayScanner(bytes);
+    Assert.assertEquals(scanner.nextBoolean(), bo);
+    Assert.assertEquals(scanner.nextByte(), by);
+    Assert.assertEquals(scanner.nextShort(), sh);
+    Assert.assertEquals(scanner.nextInt(), i);
+    Assert.assertEquals(scanner.nextLong(), l);
+    Assert.assertEquals(scanner.nextString(), st);
+    Assert.assertEquals(scanner.nextStrings(new ArrayList<String>()), col);
+    Assert.assertEquals(scanner.nextIntUTF8(), i);
+    Assert.assertEquals(scanner.nextLongUTF8(), l);
+    Assert.assertEquals(scanner.nextCSN(), csn);
+    Assert.assertEquals(scanner.nextCSNUTF8(), csn);
+    Assert.assertTrue(scanner.isEmpty());
+  }
+}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index 32b843c..470db97 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -51,6 +51,7 @@
 import static org.opends.server.TestCaseUtils.*;
 import static org.opends.server.replication.protocol.OperationContext.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
+import static org.opends.server.util.StaticUtils.*;
 import static org.testng.Assert.*;
 
 /**
@@ -107,10 +108,8 @@
     List<Modification> mods4 = new ArrayList<Modification>();
     for (int i = 0; i < 10; i++)
     {
-      Attribute attr = Attributes.create("description", "string"
-          + String.valueOf(i));
-      Modification mod = new Modification(ModificationType.ADD, attr);
-      mods4.add(mod);
+      Attribute attr = Attributes.create("description", "string" + i);
+      mods4.add(new Modification(ModificationType.ADD, attr));
     }
 
     Attribute attr5 = Attributes.create("namingcontexts", TEST_ROOT_DN_STRING);
@@ -156,14 +155,14 @@
     msg.setAssuredMode(assuredMode);
     msg.setSafeDataLevel(safeDataLevel);
 
-    // Set ECL entry inlcuded attributes
+    // Set ECL entry included attributes
     if (entryAttrList != null)
     {
       msg.setEclIncludes(entryAttrList);
     }
 
     ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
-        msg.getBytes(), ProtocolVersion.getCurrentVersion());
+        msg.getBytes(), getCurrentVersion());
 
     // Test that generated attributes match original attributes.
     assertEquals(generatedMsg.isAssured(), isAssured);
@@ -218,7 +217,7 @@
 
     // Check equals
     ModifyMsg generatedMsg = (ModifyMsg) ReplicationMsg.generateMsg(
-        msg.getBytes(), ProtocolVersion.REPLICATION_PROTOCOL_V1);
+        msg.getBytes(), REPLICATION_PROTOCOL_V1);
     assertFalse(msg.equals(null));
     assertFalse(msg.equals(new Object()));
 
@@ -302,7 +301,7 @@
     }
     msg.setInitiatorsName("johnny h");
     DeleteMsg generatedMsg = (DeleteMsg) ReplicationMsg.generateMsg(
-        msg.getBytes(), ProtocolVersion.getCurrentVersion());
+        msg.getBytes(), getCurrentVersion());
 
     assertEquals(msg.toString(), generatedMsg.toString());
     assertEquals(msg.getInitiatorsName(), generatedMsg.getInitiatorsName());
@@ -397,8 +396,8 @@
       msg.setEclIncludes(entryAttrList);
     }
 
-    ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg
-        .generateMsg(msg.getBytes(), ProtocolVersion.getCurrentVersion());
+    ModifyDNMsg generatedMsg = (ModifyDNMsg) ReplicationMsg.generateMsg(
+        msg.getBytes(), getCurrentVersion());
 
     // Test that generated attributes match original attributes.
     assertEquals(generatedMsg.isAssured(), isAssured);
@@ -476,8 +475,8 @@
       msg.setEclIncludes(entryAttrList);
     }
 
-    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(msg
-        .getBytes(), ProtocolVersion.getCurrentVersion());
+    AddMsg generatedMsg = (AddMsg) ReplicationMsg.generateMsg(
+        msg.getBytes(), getCurrentVersion());
     assertEquals(generatedMsg.getBytes(), msg.getBytes());
     assertEquals(generatedMsg.toString(), msg.toString());
 
@@ -828,6 +827,33 @@
     new StopMsg(msg.getBytes(getCurrentVersion()));
   }
 
+  @Test
+  public void changeTimeHeartbeatMsgTest() throws Exception
+  {
+    final short v1 = REPLICATION_PROTOCOL_V1;
+    final short v7 = REPLICATION_PROTOCOL_V7;
+    final short v8 = REPLICATION_PROTOCOL_V8;
+
+    final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
+    final ChangeTimeHeartbeatMsg heartbeatMsg = ChangeTimeHeartbeatMsg.heartbeatMsg(csn);
+    assertCTHearbeatMsg(heartbeatMsg, v1, false);
+    assertCTHearbeatMsg(heartbeatMsg, v7, false);
+    assertCTHearbeatMsg(heartbeatMsg, v8, false);
+
+    final ChangeTimeHeartbeatMsg offlineMsg = ChangeTimeHeartbeatMsg.replicaOfflineMsg(csn);
+    assertCTHearbeatMsg(offlineMsg, v1, false);
+    assertCTHearbeatMsg(offlineMsg, v7, false);
+    assertCTHearbeatMsg(offlineMsg, v8, true);
+  }
+
+  private void assertCTHearbeatMsg(ChangeTimeHeartbeatMsg heartbeatMsg,
+      short version, boolean expected) throws DataFormatException
+  {
+    final byte[] bytes = heartbeatMsg.getBytes(version);
+    ChangeTimeHeartbeatMsg decodedMsg = new ChangeTimeHeartbeatMsg(bytes, version);
+    assertEquals(decodedMsg.isReplicaOfflineMsg(), expected);
+  }
+
   /**
    * Test that WindowMsg encoding and decoding works
    * by checking that : msg == new WindowMsg(msg.getBytes()).
@@ -913,8 +939,7 @@
       throws Exception
   {
     TopologyMsg msg = new TopologyMsg(dsList, rsList);
-    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()),
-        ProtocolVersion.getCurrentVersion());
+    TopologyMsg newMsg = new TopologyMsg(msg.getBytes(getCurrentVersion()), getCurrentVersion());
     assertEquals(msg.getReplicaInfos(), newMsg.getReplicaInfos());
     assertEquals(msg.getRsInfos(), newMsg.getRsInfos());
   }
@@ -1091,16 +1116,14 @@
     msg.setServerState(sid3, s3, now+3, false);
 
     byte[] b = msg.getBytes(getCurrentVersion());
-    MonitorMsg newMsg = new MonitorMsg(b, ProtocolVersion.getCurrentVersion());
+    MonitorMsg newMsg = new MonitorMsg(b, getCurrentVersion());
 
     assertEquals(rsState, msg.getReplServerDbState());
     assertEquals(newMsg.getReplServerDbState().toString(),
         msg.getReplServerDbState().toString());
 
-    Iterator<Integer> it = newMsg.ldapIterator();
-    while (it.hasNext())
+    for (int sid : toIterable(newMsg.ldapIterator()))
     {
-      int sid = it.next();
       ServerState s = newMsg.getLDAPServerState(sid);
       if (sid == sid1)
       {
@@ -1118,10 +1141,8 @@
       }
     }
 
-    Iterator<Integer> it2 = newMsg.rsIterator();
-    while (it2.hasNext())
+    for (int sid : toIterable(newMsg.rsIterator()))
     {
-      int sid = it2.next();
       ServerState s = newMsg.getRSServerState(sid);
       if (sid == sid3)
       {
@@ -1380,7 +1401,7 @@
       encodemsg += (t4 - t31);
 
       // getBytes
-      byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
+      byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
       t5 = System.nanoTime();
       getbytes += (t5 - t4);
 
@@ -1455,7 +1476,7 @@
       encodemsg += (t4 - t31);
 
       // getBytes
-      byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
+      byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
       t5 = System.nanoTime();
       getbytes += (t5 - t4);
 
@@ -1529,7 +1550,7 @@
       encodemsg += (t4 - t31);
 
       // getBytes
-      byte[] bytes = generatedMsg.getBytes(ProtocolVersion.getCurrentVersion());
+      byte[] bytes = generatedMsg.getBytes(getCurrentVersion());
       t5 = System.nanoTime();
       getbytes += (t5 - t4);
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 62fe230..9b472ed 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -387,9 +387,18 @@
 
     final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
     publishUpdateMsg(msg4);
-    // MCP moved forward after receiving update from serverId1
+    // MCP moves forward after receiving update from serverId1
     // (last replica in the domain)
     assertExternalChangelogContent(msg1, msg2, msg4);
+
+    // serverId2 comes online again
+    final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId2, 5);
+    publishUpdateMsg(msg5);
+    // MCP does not move until it knows what happens to serverId1
+    assertExternalChangelogContent(msg1, msg2, msg4);
+    sendHeartbeat(BASE_DN1, serverId1, 6);
+    // MCP moves forward
+    assertExternalChangelogContent(msg1, msg2, msg4, msg5);
   }
 
   private void addReplica(DN baseDN, int serverId) throws Exception
@@ -417,11 +426,13 @@
     waitForWaitingState(cnIndexer);
   }
 
-  private void stopCNIndexer()
+  private void stopCNIndexer() throws Exception
   {
     if (cnIndexer != null)
     {
       cnIndexer.initiateShutdown();
+      cnIndexer.interrupt();
+      cnIndexer.join();
     }
   }
 
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
index 72fc472..6aac87b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicationDbEnvTest.java
@@ -31,6 +31,7 @@
 
 import org.opends.server.DirectoryServerTestCase;
 import org.opends.server.TestCaseUtils;
+import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.server.ChangelogState;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.DN;
@@ -43,6 +44,7 @@
 import com.sleepycat.je.Environment;
 
 import static java.util.Arrays.*;
+import static java.util.Collections.*;
 
 import static org.assertj.core.api.Assertions.*;
 import static org.opends.server.replication.server.changelog.je.ReplicationDbEnv.*;
@@ -51,78 +53,104 @@
 public class ReplicationDbEnvTest extends DirectoryServerTestCase
 {
 
-	/**
-	 * Bypass heavyweight setup.
-	 */
-	private final class TestableReplicationDbEnv extends ReplicationDbEnv
-	{
-		private TestableReplicationDbEnv() throws ChangelogException
-		{
-			super(null, null);
-		}
+  /**
+   * Bypass heavyweight setup.
+   */
+  private final class TestableReplicationDbEnv extends ReplicationDbEnv
+  {
+    private TestableReplicationDbEnv() throws ChangelogException
+    {
+      super(null, null);
+    }
 
-		@Override
-		protected Environment openJEEnvironment(String path)
-		{
-			return null;
-		}
+    @Override
+    protected Environment openJEEnvironment(String path)
+    {
+      return null;
+    }
 
-		@Override
-		protected Database openDatabase(String databaseName)
-				throws ChangelogException, RuntimeException
-		{
-			return null;
-		}
-	}
+    @Override
+    protected Database openDatabase(String databaseName)
+        throws ChangelogException, RuntimeException
+    {
+      return null;
+    }
+  }
 
-	@BeforeClass
-	public void setup() throws Exception
-	{
-		TestCaseUtils.startFakeServer();
-	}
+  @BeforeClass
+  public void setup() throws Exception
+  {
+    TestCaseUtils.startFakeServer();
+  }
 
-	@AfterClass
-	public void teardown()
-	{
-		TestCaseUtils.shutdownFakeServer();
-	}
+  @AfterClass
+  public void teardown()
+  {
+    TestCaseUtils.shutdownFakeServer();
+  }
 
-	@DataProvider
-	public Object[][] changelogStateDataProvider() throws Exception
-	{
-		return new Object[][] {
-			{ DN.decode("dc=example,dc=com"), 524157415, asList(42, 346) },
-			// test with a space in the baseDN (space is the field separator in the DB)
-			{ DN.decode("cn=admin data"), 524157415, asList(42, 346) },
-	  };
-	}
+  @DataProvider
+  public Object[][] changelogStateDataProvider() throws Exception
+  {
+    final int genId = 524157415;
+    final int id1 = 42;
+    final int id2 = 346;
+    final int t1 = 1956245524;
+    return new Object[][] {
+      { DN.decode("dc=example,dc=com"), genId, EMPTY_LIST, EMPTY_LIST },
+      { DN.decode("dc=example,dc=com"), genId, asList(id1, id2),
+        asList(new CSN(id2, 0, t1)) },
+      // test with a space in the baseDN (space is the field separator in the
+      // DB)
+      { DN.decode("cn=admin data"), genId, asList(id1, id2), EMPTY_LIST }, };
+  }
 
-	@Test(dataProvider = "changelogStateDataProvider")
-	public void encodeDecodeChangelogState(DN baseDN, long generationId,
-			List<Integer> serverIds) throws Exception
-	{
-		final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
+  @Test(dataProvider = "changelogStateDataProvider")
+  public void encodeDecodeChangelogState(DN baseDN, long generationId,
+      List<Integer> replicas, List<CSN> offlineReplicas) throws Exception
+  {
+    final ReplicationDbEnv changelogStateDB = new TestableReplicationDbEnv();
 
-		// encode data
-		final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
-		put(wholeState, changelogStateDB.toGenIdEntry(baseDN, generationId));
-		for (Integer serverId : serverIds)
-		{
-			put(wholeState, changelogStateDB.toReplicaEntry(baseDN, serverId));
-		}
+    // encode data
+    final Map<byte[], byte[]> wholeState = new LinkedHashMap<byte[], byte[]>();
+    put(wholeState, toGenIdEntry(baseDN, generationId));
+    for (Integer serverId : replicas)
+    {
+      put(wholeState, toByteArray(toReplicaEntry(baseDN, serverId)));
+    }
+    for (CSN offlineCSN : offlineReplicas)
+    {
+      put(wholeState, toReplicaOfflineEntry(baseDN, offlineCSN));
+    }
 
-		// decode data
-		final ChangelogState state =
-				changelogStateDB.decodeChangelogState(wholeState);
-		assertThat(state.getDomainToGenerationId()).containsExactly(
-				entry(baseDN, generationId));
-		assertThat(state.getDomainToServerIds()).containsExactly(
-				entry(baseDN, serverIds));
-	}
+    // decode data
+    final ChangelogState state =
+        changelogStateDB.decodeChangelogState(wholeState);
+    assertThat(state.getDomainToGenerationId()).containsExactly(
+        entry(baseDN, generationId));
+    if (!replicas.isEmpty())
+    {
+      assertThat(state.getDomainToServerIds()).containsExactly(
+          entry(baseDN, replicas));
+    }
+    else
+    {
+      assertThat(state.getDomainToServerIds()).isEmpty();
+    }
+    if (!offlineReplicas.isEmpty())
+    {
+      assertThat(state.getOfflineReplicas()).containsExactly(
+          entry(baseDN, offlineReplicas));
+    }
+    else
+    {
+      assertThat(state.getOfflineReplicas()).isEmpty();
+    }
+  }
 
-	private void put(Map<byte[], byte[]> map, Entry<String, String> entry)
-	{
-		map.put(toBytes(entry.getKey()), toBytes(entry.getValue()));
-	}
+  private void put(Map<byte[], byte[]> map, Entry<byte[], byte[]> entry)
+  {
+    map.put(entry.getKey(), entry.getValue());
+  }
 
 }

--
Gitblit v1.10.0