From 5168ba85e8f918b7acd57acc5f61300b08dd9fd9 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 10 Jul 2014 15:36:31 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3870) Replica offline messages should be synced with updates

---
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java                             |  126 ++++++++++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                                       |    7 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java                                             |    2 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java                                          |   12 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java                                         |    7 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java                                           |  113 +++++++++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java                                            |    1 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java |  129 ++++++++++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java                                                |   30 ++
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java              |   11 +
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java                                              |    8 
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java            |   47 ++++
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                                    |   23 ++
 opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java    |   22 -
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java                                            |    9 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java                                              |    5 
 opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java                                                 |   22 +
 17 files changed, 540 insertions(+), 34 deletions(-)

diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 4b15777..1a871ce 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2006,6 +2006,13 @@
     addOperation.setAttachment(SYNCHROCONTEXT, ctx);
   }
 
+  /** {@inheritDoc} */
+  @Override
+  public void publishReplicaOfflineMsg()
+  {
+    pendingChanges.putReplicaOfflineMsg();
+  }
+
   /**
    * Check if an operation must be synchronized.
    * Also update the list of pending changes and the server RUV
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
index e694ab4..9e7309f 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -29,6 +29,7 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.types.operation.PluginOperation;
 
 /**
@@ -39,7 +40,7 @@
 {
   private final CSN csn;
   private boolean committed;
-  private LDAPUpdateMsg msg;
+  private UpdateMsg msg;
   private final PluginOperation op;
   private ServerState dependencyState;
 
@@ -49,7 +50,7 @@
    * @param op the operation to use
    * @param msg the message to use (can be null for local operations)
    */
-  PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
+  PendingChange(CSN csn, PluginOperation op, UpdateMsg msg)
   {
     this.csn = csn;
     this.committed = false;
@@ -89,12 +90,27 @@
    * @return the message if operation was a replication operation
    * null if the operation was a local operation
    */
-  public LDAPUpdateMsg getMsg()
+  public UpdateMsg getMsg()
   {
     return msg;
   }
 
   /**
+   * Get the LDAPUpdateMsg associated to this PendingChange.
+   *
+   * @return the LDAPUpdateMsg if operation was a replication operation, null
+   *         otherwise
+   */
+  public LDAPUpdateMsg getLDAPUpdateMsg()
+  {
+    if (msg instanceof LDAPUpdateMsg)
+    {
+      return (LDAPUpdateMsg) msg;
+    }
+    return null;
+  }
+
+  /**
    * Set the message associated to the PendingChange.
    * @param msg the message
    */
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 42ee696..834cf0c 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -33,6 +33,8 @@
 import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.common.CSNGenerator;
 import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.service.ReplicationDomain;
 import org.opends.server.types.operation.PluginOperation;
 
@@ -136,6 +138,20 @@
   }
 
   /**
+   * Add a replica offline message to the pending list.
+   */
+  public synchronized void putReplicaOfflineMsg()
+  {
+    final CSN offlineCSN = csnGenerator.newCSN();
+    final PendingChange pendingChange =
+        new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN));
+    pendingChange.setCommitted(true);
+
+    pendingChanges.put(offlineCSN, pendingChange);
+    pushCommittedChanges();
+  }
+
+  /**
    * Push all committed local changes to the replicationServer service.
    */
   synchronized void pushCommittedChanges()
@@ -152,20 +168,26 @@
     while (firstChange != null && firstChange.isCommitted())
     {
       final PluginOperation op = firstChange.getOp();
-      if (op != null && !op.isSynchronizationOperation())
+      final UpdateMsg msg = firstChange.getMsg();
+      if (msg instanceof LDAPUpdateMsg
+          && op != null
+          && !op.isSynchronizationOperation())
       {
-        final LDAPUpdateMsg updateMsg = firstChange.getMsg();
         if (!recoveringOldChanges)
         {
-          domain.publish(updateMsg);
+          domain.publish(msg);
         }
         else
         {
           // do not push updates until the RS catches up.
           // @see #setRecovering(boolean)
-          domain.getServerState().update(updateMsg.getCSN());
+          domain.getServerState().update(msg.getCSN());
         }
       }
+      else if (msg instanceof ReplicaOfflineMsg)
+      {
+        domain.publish(msg);
+      }
 
       // false warning: firstEntry will not be null if firstChange is not null
       pendingChanges.remove(firstEntry.getKey());
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index 5126be4..beb6d19 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -157,7 +157,7 @@
       if (change.dependenciesIsCovered(state))
       {
         dependentChanges.remove(change);
-        return change.getMsg();
+        return change.getLDAPUpdateMsg();
       }
     }
     return null;
@@ -208,7 +208,7 @@
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
         if (pendingMsg != null)
         {
           if (pendingMsg instanceof DeleteMsg)
@@ -300,7 +300,7 @@
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
         if (pendingMsg instanceof AddMsg)
         {
           // Check if the operation to be run is an addOperation on a same DN.
@@ -350,13 +350,13 @@
       return false;
     }
 
-    final DN targetDN = change.getMsg().getDN();
+    final DN targetDN = change.getLDAPUpdateMsg().getDN();
 
     for (PendingChange pendingChange : pendingChanges.values())
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
         if (pendingMsg != null)
         {
           if (pendingMsg instanceof DeleteMsg)
@@ -442,7 +442,7 @@
     {
       if (pendingChange.getCSN().isOlderThan(csn))
       {
-        final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+        final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
         if (pendingMsg != null)
         {
           if (pendingMsg instanceof DeleteMsg)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 1e6e2ba..9f75a01 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -99,7 +99,7 @@
   /**
    * The constant for the 8th version of the replication protocol.
    * <ul>
-   * <li>StopMsg now has a timestamp to communicate the replica stop time.</li>
+   * <li>New ReplicaOfflineMsg.</li>
    * </ul>
    */
   public static final short REPLICATION_PROTOCOL_V8 = 8;
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
new file mode 100644
index 0000000..a9ac7c7
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -0,0 +1,113 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.CSN;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+
+/**
+ * Class that define messages sent by a replica (DS) to the replication server
+ * (RS) to let the RS know the date at which a replica went offline.
+ */
+public class ReplicaOfflineMsg extends UpdateMsg
+{
+
+  /**
+   * Constructor of a replica offline message providing the offline timestamp in
+   * a CSN.
+   *
+   * @param offlineCSN
+   *          the provided offline CSN
+   */
+  public ReplicaOfflineMsg(final CSN offlineCSN)
+  {
+    super(offlineCSN, new byte[0]);
+  }
+
+  /**
+   * Creates a message by deserializing it from the provided byte array.
+   *
+   * @param in
+   *          The provided byte array.
+   * @throws DataFormatException
+   *           When an error occurs during decoding .
+   */
+  public ReplicaOfflineMsg(byte[] in) throws DataFormatException
+  {
+    try
+    {
+      final ByteArrayScanner scanner = new ByteArrayScanner(in);
+      final byte msgType = scanner.nextByte();
+      if (msgType != MSG_TYPE_REPLICA_OFFLINE)
+      {
+        throw new DataFormatException("input is not a valid "
+            + getClass().getSimpleName() + " message: " + msgType);
+      }
+      protocolVersion = scanner.nextShort();
+      csn = scanner.nextCSN();
+
+      if (!scanner.isEmpty())
+      {
+        throw new DataFormatException(
+            "Did not expect to find more bytes to read for "
+                + getClass().getSimpleName());
+      }
+    }
+    catch (RuntimeException e)
+    {
+      // Index out of bounds, bad format, etc.
+      throw new DataFormatException("byte[] is not a valid "
+          + getClass().getSimpleName());
+    }
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public byte[] getBytes(short protocolVersion)
+  {
+    final ByteArrayBuilder builder = new ByteArrayBuilder(size());
+    builder.appendByte(MSG_TYPE_REPLICA_OFFLINE);
+    builder.appendShort(protocolVersion);
+    builder.appendCSN(csn);
+    return builder.toByteArray();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public int size()
+  {
+    return bytes(1) + shorts(1) + csns(1);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName() + " offlineCSN=" + csn;
+  }
+}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index d828ce3..959bcb6 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -85,6 +85,9 @@
   //   EntryMsg, InitializeRequestMsg, InitializeTargetMsg, ErrorMsg
   //   TopologyMsg
 
+  /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */
+  static final byte MSG_TYPE_REPLICA_OFFLINE = 37;
+
   // Adding a new type of message here probably requires to
   // change accordingly generateMsg method below
 
@@ -199,6 +202,8 @@
       return new StopMsg(buffer);
     case MSG_TYPE_INITIALIZE_RCV_ACK:
       return new InitializeRcvAckMsg(buffer);
+    case MSG_TYPE_REPLICA_OFFLINE:
+      return new ReplicaOfflineMsg(buffer);
     default:
       throw new DataFormatException("received message with unknown type");
     }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 81c06fc..e756e03 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -228,7 +228,13 @@
       {
         final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
 
-        if (newMsg == null)
+        if (newMsg instanceof ReplicaOfflineMsg)
+        {
+          // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog
+          // proceed as if it was never returned
+          continue;
+        }
+        else if (newMsg == null)
         { // in non blocking mode, null means no more messages
           return null;
         }
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3d6cbbe..1ae6953 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -492,6 +492,13 @@
   {
     try
     {
+      if (updateMsg instanceof ReplicaOfflineMsg)
+      {
+        final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
+        this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN());
+        return true;
+      }
+
       if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
       {
         /*
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 0d98801..19317bb 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -697,18 +697,39 @@
       throws ChangelogException
   {
     final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+    final ChangelogState state = dbEnv.readChangelogState();
     final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
     for (int serverId : serverIds)
     {
       // get the last already sent CSN from that server to get a cursor
       final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
-      cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
+      final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
+      final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
+      cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
     }
     // recycle exhausted cursors,
     // because client code will not manage the cursors itself
     return new CompositeDBCursor<Void>(cursors, true);
   }
 
+  private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
+      ServerState startAfterServerState)
+  {
+    final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
+    if (domain != null)
+    {
+      for (CSN offlineCSN : domain)
+      {
+        if (serverId == offlineCSN.getServerId()
+            && !startAfterServerState.cover(offlineCSN))
+        {
+          return offlineCSN;
+        }
+      }
+    }
+    return null;
+  }
+
   /** {@inheritDoc} */
   @Override
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
new file mode 100644
index 0000000..fb2364e
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
@@ -0,0 +1,126 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * Implementation of a DBCursor that decorates an existing DBCursor
+ * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
+ * and the offline CSN is newer than the last returned update CSN.
+ */
+public class ReplicaOfflineCursor implements DBCursor<UpdateMsg>
+{
+  /** @NonNull */
+  private final DBCursor<UpdateMsg> cursor;
+  private ReplicaOfflineMsg replicaOfflineMsg;
+  /**
+   * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
+   */
+  private boolean returnReplicaOfflineMsg;
+
+  /**
+   * Creates a ReplicaOfflineCursor object with a cursor to decorate
+   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
+   *
+   * @param cursor
+   *          the non-null underlying cursor that needs to be exhausted before
+   *          we return a ReplicaOfflineMsg
+   * @param offlineCSN
+   *          The offline CSN from which to builder the
+   *          {@link ReplicaOfflineMsg} to return
+   */
+  public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN)
+  {
+    this.replicaOfflineMsg =
+        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
+    this.cursor = cursor;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public UpdateMsg getRecord()
+  {
+    return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next() throws ChangelogException
+  {
+    if (returnReplicaOfflineMsg)
+    {
+      // already consumed, never return it again...
+      replicaOfflineMsg = null;
+      returnReplicaOfflineMsg = false;
+      // ...and verify if new changes have been added to the DB
+      // (cursors are automatically restarted)
+    }
+    final UpdateMsg lastUpdate = cursor.getRecord();
+    final boolean hasNext = cursor.next();
+    if (hasNext)
+    {
+      return true;
+    }
+    if (replicaOfflineMsg == null)
+    { // no ReplicaOfflineMsg to return
+      return false;
+    }
+
+    // replicaDB just happened to be exhausted now
+    if (lastUpdate != null
+        && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
+    {
+      // offlineCSN is outdated, never return it
+      replicaOfflineMsg = null;
+      return false;
+    }
+    returnReplicaOfflineMsg = true;
+    return true;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    cursor.close();
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return getClass().getSimpleName()
+        + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg
+        + " offlineCSN="
+        + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null)
+        + " cursor=" + cursor.toString().split("", 2)[1];
+  }
+
+}
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c6b4149..2b88d91 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2777,6 +2777,7 @@
 
     synchronized (startStopLock)
     {
+      domain.publishReplicaOfflineMsg();
       shutdown = true;
       setConnectedRS(ConnectedRS.stopped());
       stopRSHeartBeatMonitoring();
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ce9d714..985b34e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3417,6 +3417,15 @@
   }
 
   /**
+   * Publishes a replica offline message if all pending changes for current
+   * replica have been sent out.
+   */
+  public void publishReplicaOfflineMsg()
+  {
+    // Here to be overridden
+  }
+
+  /**
    * Publish information to the Replication Service (not assured mode).
    *
    * @param msg  The byte array containing the information that should
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index d3f3167..f0fd1e4 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -844,6 +844,17 @@
     assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
   }
 
+  @Test
+  public void replicaOfflineMsgTest() throws Exception
+  {
+    final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
+    final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn);
+
+    final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8);
+    ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes);
+    assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
+  }
+
   /**
    * Test that WindowMsg encoding and decoding works
    * by checking that : msg == new WindowMsg(msg.getBytes()).
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 1a933e8..a2a2644 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -29,7 +29,6 @@
 import java.util.Map;
 
 import org.opends.server.DirectoryServerTestCase;
-import org.opends.server.replication.common.CSN;
 import org.opends.server.replication.protocol.UpdateMsg;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -56,10 +55,10 @@
   @BeforeClass
   public void setupMsgs()
   {
-    msg1 = newUpdateMsg(1);
-    msg2 = newUpdateMsg(2);
-    msg3 = newUpdateMsg(3);
-    msg4 = newUpdateMsg(4);
+    msg1 = new FakeUpdateMsg(1);
+    msg2 = new FakeUpdateMsg(2);
+    msg3 = new FakeUpdateMsg(3);
+    msg4 = new FakeUpdateMsg(4);
   }
 
   @Test
@@ -134,19 +133,6 @@
         of(msg4, baseDN1));
   }
 
-  private UpdateMsg newUpdateMsg(final int t)
-  {
-    return new UpdateMsg(new CSN(t, t, t), new byte[t])
-    {
-      /** {@inheritDoc} */
-      @Override
-      public String toString()
-      {
-        return "UpdateMsg(" + t + ")";
-      }
-    };
-  }
-
   private CompositeDBCursor<String> newCompositeDBCursor(
       Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
   {
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
new file mode 100644
index 0000000..9fa6d8d
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
@@ -0,0 +1,47 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+
+@SuppressWarnings("javadoc")
+class FakeUpdateMsg extends UpdateMsg
+{
+  private final int t;
+
+  FakeUpdateMsg(int t)
+  {
+    super(new CSN(t, t, t), new byte[1]);
+    this.t = t;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    return "UpdateMsg(" + t + ")";
+  }
+}
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
new file mode 100644
index 0000000..1737f27
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -0,0 +1,129 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+
+/**
+ * Test the ReplicaOfflineCursor class.
+ */
+@SuppressWarnings("javadoc")
+public class ReplicaOfflineCursorTest extends ReplicationTestCase
+{
+
+  private int timestamp;
+  private DBCursor<UpdateMsg> delegateCursor;
+
+  @BeforeTest
+  public void init()
+  {
+    timestamp = 1;
+  }
+
+  @Test
+  public void cursorReturnsFalse() throws Exception
+  {
+    delegateCursor = new SequentialDBCursor();
+
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsTrue() throws Exception
+  {
+    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+    delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    assertThat(cursor.getRecord()).isSameAs(updateMsg);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsReplicaOfflineMsg() throws Exception
+  {
+    delegateCursor = new SequentialDBCursor();
+
+    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    final UpdateMsg record = cursor.getRecord();
+    assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+    assertThat(record.getCSN()).isEqualTo(offlineCSN);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
+  {
+    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+    delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+    final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    assertThat(cursor.getRecord()).isSameAs(updateMsg);
+    assertThat(cursor.next()).isTrue();
+    final UpdateMsg record = cursor.getRecord();
+    assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+    assertThat(record.getCSN()).isEqualTo(offlineCSN);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+  @Test
+  public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() throws Exception
+  {
+    final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
+
+    final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+    delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
+    assertThat(cursor.getRecord()).isNull();
+    assertThat(cursor.next()).isTrue();
+    assertThat(cursor.getRecord()).isSameAs(updateMsg);
+    assertThat(cursor.next()).isFalse();
+    assertThat(cursor.getRecord()).isNull();
+  }
+
+}

--
Gitblit v1.10.0