From 5168ba85e8f918b7acd57acc5f61300b08dd9fd9 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 10 Jul 2014 15:36:31 +0000
Subject: [PATCH] OPENDJ-1453 (CR-3870) Replica offline messages should be synced with updates
---
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java | 126 ++++++++++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 7
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java | 2
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java | 12
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java | 7
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java | 113 +++++++++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java | 1
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java | 129 ++++++++++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java | 30 ++
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java | 11 +
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java | 8
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java | 47 ++++
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 23 ++
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 22 -
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java | 9
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java | 5
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java | 22 +
17 files changed, 540 insertions(+), 34 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
index 4b15777..1a871ce 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2006,6 +2006,13 @@
addOperation.setAttachment(SYNCHROCONTEXT, ctx);
}
+ /** {@inheritDoc} */
+ @Override
+ public void publishReplicaOfflineMsg()
+ {
+ pendingChanges.putReplicaOfflineMsg();
+ }
+
/**
* Check if an operation must be synchronized.
* Also update the list of pending changes and the server RUV
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
index e694ab4..9e7309f 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -29,6 +29,7 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.types.operation.PluginOperation;
/**
@@ -39,7 +40,7 @@
{
private final CSN csn;
private boolean committed;
- private LDAPUpdateMsg msg;
+ private UpdateMsg msg;
private final PluginOperation op;
private ServerState dependencyState;
@@ -49,7 +50,7 @@
* @param op the operation to use
* @param msg the message to use (can be null for local operations)
*/
- PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg)
+ PendingChange(CSN csn, PluginOperation op, UpdateMsg msg)
{
this.csn = csn;
this.committed = false;
@@ -89,12 +90,27 @@
* @return the message if operation was a replication operation
* null if the operation was a local operation
*/
- public LDAPUpdateMsg getMsg()
+ public UpdateMsg getMsg()
{
return msg;
}
/**
+ * Get the LDAPUpdateMsg associated to this PendingChange.
+ *
+ * @return the LDAPUpdateMsg if operation was a replication operation, null
+ * otherwise
+ */
+ public LDAPUpdateMsg getLDAPUpdateMsg()
+ {
+ if (msg instanceof LDAPUpdateMsg)
+ {
+ return (LDAPUpdateMsg) msg;
+ }
+ return null;
+ }
+
+ /**
* Set the message associated to the PendingChange.
* @param msg the message
*/
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
index 42ee696..834cf0c 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -33,6 +33,8 @@
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.protocol.LDAPUpdateMsg;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.service.ReplicationDomain;
import org.opends.server.types.operation.PluginOperation;
@@ -136,6 +138,20 @@
}
/**
+ * Add a replica offline message to the pending list.
+ */
+ public synchronized void putReplicaOfflineMsg()
+ {
+ final CSN offlineCSN = csnGenerator.newCSN();
+ final PendingChange pendingChange =
+ new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN));
+ pendingChange.setCommitted(true);
+
+ pendingChanges.put(offlineCSN, pendingChange);
+ pushCommittedChanges();
+ }
+
+ /**
* Push all committed local changes to the replicationServer service.
*/
synchronized void pushCommittedChanges()
@@ -152,20 +168,26 @@
while (firstChange != null && firstChange.isCommitted())
{
final PluginOperation op = firstChange.getOp();
- if (op != null && !op.isSynchronizationOperation())
+ final UpdateMsg msg = firstChange.getMsg();
+ if (msg instanceof LDAPUpdateMsg
+ && op != null
+ && !op.isSynchronizationOperation())
{
- final LDAPUpdateMsg updateMsg = firstChange.getMsg();
if (!recoveringOldChanges)
{
- domain.publish(updateMsg);
+ domain.publish(msg);
}
else
{
// do not push updates until the RS catches up.
// @see #setRecovering(boolean)
- domain.getServerState().update(updateMsg.getCSN());
+ domain.getServerState().update(msg.getCSN());
}
}
+ else if (msg instanceof ReplicaOfflineMsg)
+ {
+ domain.publish(msg);
+ }
// false warning: firstEntry will not be null if firstChange is not null
pendingChanges.remove(firstEntry.getKey());
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
index 5126be4..beb6d19 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -157,7 +157,7 @@
if (change.dependenciesIsCovered(state))
{
dependentChanges.remove(change);
- return change.getMsg();
+ return change.getLDAPUpdateMsg();
}
}
return null;
@@ -208,7 +208,7 @@
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
if (pendingMsg != null)
{
if (pendingMsg instanceof DeleteMsg)
@@ -300,7 +300,7 @@
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
if (pendingMsg instanceof AddMsg)
{
// Check if the operation to be run is an addOperation on a same DN.
@@ -350,13 +350,13 @@
return false;
}
- final DN targetDN = change.getMsg().getDN();
+ final DN targetDN = change.getLDAPUpdateMsg().getDN();
for (PendingChange pendingChange : pendingChanges.values())
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
if (pendingMsg != null)
{
if (pendingMsg instanceof DeleteMsg)
@@ -442,7 +442,7 @@
{
if (pendingChange.getCSN().isOlderThan(csn))
{
- final LDAPUpdateMsg pendingMsg = pendingChange.getMsg();
+ final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg();
if (pendingMsg != null)
{
if (pendingMsg instanceof DeleteMsg)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
index 1e6e2ba..9f75a01 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -99,7 +99,7 @@
/**
* The constant for the 8th version of the replication protocol.
* <ul>
- * <li>StopMsg now has a timestamp to communicate the replica stop time.</li>
+ * <li>New ReplicaOfflineMsg.</li>
* </ul>
*/
public static final short REPLICATION_PROTOCOL_V8 = 8;
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
new file mode 100644
index 0000000..a9ac7c7
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
@@ -0,0 +1,113 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.protocol;
+
+import java.util.zip.DataFormatException;
+
+import org.opends.server.replication.common.CSN;
+
+import static org.opends.server.replication.protocol.ByteArrayBuilder.*;
+
+/**
+ * Class that define messages sent by a replica (DS) to the replication server
+ * (RS) to let the RS know the date at which a replica went offline.
+ */
+public class ReplicaOfflineMsg extends UpdateMsg
+{
+
+ /**
+ * Constructor of a replica offline message providing the offline timestamp in
+ * a CSN.
+ *
+ * @param offlineCSN
+ * the provided offline CSN
+ */
+ public ReplicaOfflineMsg(final CSN offlineCSN)
+ {
+ super(offlineCSN, new byte[0]);
+ }
+
+ /**
+ * Creates a message by deserializing it from the provided byte array.
+ *
+ * @param in
+ * The provided byte array.
+ * @throws DataFormatException
+ * When an error occurs during decoding .
+ */
+ public ReplicaOfflineMsg(byte[] in) throws DataFormatException
+ {
+ try
+ {
+ final ByteArrayScanner scanner = new ByteArrayScanner(in);
+ final byte msgType = scanner.nextByte();
+ if (msgType != MSG_TYPE_REPLICA_OFFLINE)
+ {
+ throw new DataFormatException("input is not a valid "
+ + getClass().getSimpleName() + " message: " + msgType);
+ }
+ protocolVersion = scanner.nextShort();
+ csn = scanner.nextCSN();
+
+ if (!scanner.isEmpty())
+ {
+ throw new DataFormatException(
+ "Did not expect to find more bytes to read for "
+ + getClass().getSimpleName());
+ }
+ }
+ catch (RuntimeException e)
+ {
+ // Index out of bounds, bad format, etc.
+ throw new DataFormatException("byte[] is not a valid "
+ + getClass().getSimpleName());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public byte[] getBytes(short protocolVersion)
+ {
+ final ByteArrayBuilder builder = new ByteArrayBuilder(size());
+ builder.appendByte(MSG_TYPE_REPLICA_OFFLINE);
+ builder.appendShort(protocolVersion);
+ builder.appendCSN(csn);
+ return builder.toByteArray();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int size()
+ {
+ return bytes(1) + shorts(1) + csns(1);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName() + " offlineCSN=" + csn;
+ }
+}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
index d828ce3..959bcb6 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -85,6 +85,9 @@
// EntryMsg, InitializeRequestMsg, InitializeTargetMsg, ErrorMsg
// TopologyMsg
+ /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */
+ static final byte MSG_TYPE_REPLICA_OFFLINE = 37;
+
// Adding a new type of message here probably requires to
// change accordingly generateMsg method below
@@ -199,6 +202,8 @@
return new StopMsg(buffer);
case MSG_TYPE_INITIALIZE_RCV_ACK:
return new InitializeRcvAckMsg(buffer);
+ case MSG_TYPE_REPLICA_OFFLINE:
+ return new ReplicaOfflineMsg(buffer);
default:
throw new DataFormatException("received message with unknown type");
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 81c06fc..e756e03 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -228,7 +228,13 @@
{
final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
- if (newMsg == null)
+ if (newMsg instanceof ReplicaOfflineMsg)
+ {
+ // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog
+ // proceed as if it was never returned
+ continue;
+ }
+ else if (newMsg == null)
{ // in non blocking mode, null means no more messages
return null;
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 3d6cbbe..1ae6953 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -492,6 +492,13 @@
{
try
{
+ if (updateMsg instanceof ReplicaOfflineMsg)
+ {
+ final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg;
+ this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN());
+ return true;
+ }
+
if (this.domainDB.publishUpdateMsg(baseDN, updateMsg))
{
/*
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 0d98801..19317bb 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -697,18 +697,39 @@
throws ChangelogException
{
final Set<Integer> serverIds = getDomainMap(baseDN).keySet();
+ final ChangelogState state = dbEnv.readChangelogState();
final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size());
for (int serverId : serverIds)
{
// get the last already sent CSN from that server to get a cursor
final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null;
- cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null);
+ final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN);
+ final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState);
+ cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null);
}
// recycle exhausted cursors,
// because client code will not manage the cursors itself
return new CompositeDBCursor<Void>(cursors, true);
}
+ private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId,
+ ServerState startAfterServerState)
+ {
+ final List<CSN> domain = state.getOfflineReplicas().get(baseDN);
+ if (domain != null)
+ {
+ for (CSN offlineCSN : domain)
+ {
+ if (serverId == offlineCSN.getServerId()
+ && !startAfterServerState.cover(offlineCSN))
+ {
+ return offlineCSN;
+ }
+ }
+ }
+ return null;
+ }
+
/** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
new file mode 100644
index 0000000..fb2364e
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
@@ -0,0 +1,126 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+
+/**
+ * Implementation of a DBCursor that decorates an existing DBCursor
+ * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
+ * and the offline CSN is newer than the last returned update CSN.
+ */
+public class ReplicaOfflineCursor implements DBCursor<UpdateMsg>
+{
+ /** @NonNull */
+ private final DBCursor<UpdateMsg> cursor;
+ private ReplicaOfflineMsg replicaOfflineMsg;
+ /**
+ * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
+ */
+ private boolean returnReplicaOfflineMsg;
+
+ /**
+ * Creates a ReplicaOfflineCursor object with a cursor to decorate
+ * and an offlineCSN to return as part of a ReplicaOfflineMsg.
+ *
+ * @param cursor
+ * the non-null underlying cursor that needs to be exhausted before
+ * we return a ReplicaOfflineMsg
+ * @param offlineCSN
+ * The offline CSN from which to builder the
+ * {@link ReplicaOfflineMsg} to return
+ */
+ public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN)
+ {
+ this.replicaOfflineMsg =
+ offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
+ this.cursor = cursor;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ if (returnReplicaOfflineMsg)
+ {
+ // already consumed, never return it again...
+ replicaOfflineMsg = null;
+ returnReplicaOfflineMsg = false;
+ // ...and verify if new changes have been added to the DB
+ // (cursors are automatically restarted)
+ }
+ final UpdateMsg lastUpdate = cursor.getRecord();
+ final boolean hasNext = cursor.next();
+ if (hasNext)
+ {
+ return true;
+ }
+ if (replicaOfflineMsg == null)
+ { // no ReplicaOfflineMsg to return
+ return false;
+ }
+
+ // replicaDB just happened to be exhausted now
+ if (lastUpdate != null
+ && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
+ {
+ // offlineCSN is outdated, never return it
+ replicaOfflineMsg = null;
+ return false;
+ }
+ returnReplicaOfflineMsg = true;
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName()
+ + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg
+ + " offlineCSN="
+ + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null)
+ + " cursor=" + cursor.toString().split("", 2)[1];
+ }
+
+}
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
index c6b4149..2b88d91 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2777,6 +2777,7 @@
synchronized (startStopLock)
{
+ domain.publishReplicaOfflineMsg();
shutdown = true;
setConnectedRS(ConnectedRS.stopped());
stopRSHeartBeatMonitoring();
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
index ce9d714..985b34e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3417,6 +3417,15 @@
}
/**
+ * Publishes a replica offline message if all pending changes for current
+ * replica have been sent out.
+ */
+ public void publishReplicaOfflineMsg()
+ {
+ // Here to be overridden
+ }
+
+ /**
* Publish information to the Replication Service (not assured mode).
*
* @param msg The byte array containing the information that should
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
index d3f3167..f0fd1e4 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -844,6 +844,17 @@
assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
}
+ @Test
+ public void replicaOfflineMsgTest() throws Exception
+ {
+ final CSN csn = new CSN(System.currentTimeMillis(), 0, 42);
+ final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn);
+
+ final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8);
+ ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes);
+ assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN());
+ }
+
/**
* Test that WindowMsg encoding and decoding works
* by checking that : msg == new WindowMsg(msg.getBytes()).
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index 1a933e8..a2a2644 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -29,7 +29,6 @@
import java.util.Map;
import org.opends.server.DirectoryServerTestCase;
-import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
@@ -56,10 +55,10 @@
@BeforeClass
public void setupMsgs()
{
- msg1 = newUpdateMsg(1);
- msg2 = newUpdateMsg(2);
- msg3 = newUpdateMsg(3);
- msg4 = newUpdateMsg(4);
+ msg1 = new FakeUpdateMsg(1);
+ msg2 = new FakeUpdateMsg(2);
+ msg3 = new FakeUpdateMsg(3);
+ msg4 = new FakeUpdateMsg(4);
}
@Test
@@ -134,19 +133,6 @@
of(msg4, baseDN1));
}
- private UpdateMsg newUpdateMsg(final int t)
- {
- return new UpdateMsg(new CSN(t, t, t), new byte[t])
- {
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return "UpdateMsg(" + t + ")";
- }
- };
- }
-
private CompositeDBCursor<String> newCompositeDBCursor(
Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception
{
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
new file mode 100644
index 0000000..9fa6d8d
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
@@ -0,0 +1,47 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.UpdateMsg;
+
+@SuppressWarnings("javadoc")
+class FakeUpdateMsg extends UpdateMsg
+{
+ private final int t;
+
+ FakeUpdateMsg(int t)
+ {
+ super(new CSN(t, t, t), new byte[1]);
+ this.t = t;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ return "UpdateMsg(" + t + ")";
+ }
+}
\ No newline at end of file
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
new file mode 100644
index 0000000..1737f27
--- /dev/null
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
@@ -0,0 +1,129 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import org.opends.server.replication.ReplicationTestCase;
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import static org.assertj.core.api.Assertions.*;
+
+/**
+ * Test the ReplicaOfflineCursor class.
+ */
+@SuppressWarnings("javadoc")
+public class ReplicaOfflineCursorTest extends ReplicationTestCase
+{
+
+ private int timestamp;
+ private DBCursor<UpdateMsg> delegateCursor;
+
+ @BeforeTest
+ public void init()
+ {
+ timestamp = 1;
+ }
+
+ @Test
+ public void cursorReturnsFalse() throws Exception
+ {
+ delegateCursor = new SequentialDBCursor();
+
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsTrue() throws Exception
+ {
+ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+ delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isSameAs(updateMsg);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsReplicaOfflineMsg() throws Exception
+ {
+ delegateCursor = new SequentialDBCursor();
+
+ final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ final UpdateMsg record = cursor.getRecord();
+ assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+ assertThat(record.getCSN()).isEqualTo(offlineCSN);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception
+ {
+ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+ delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+ final CSN offlineCSN = new CSN(timestamp++, 1, 1);
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isSameAs(updateMsg);
+ assertThat(cursor.next()).isTrue();
+ final UpdateMsg record = cursor.getRecord();
+ assertThat(record).isInstanceOf(ReplicaOfflineMsg.class);
+ assertThat(record.getCSN()).isEqualTo(offlineCSN);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+ @Test
+ public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() throws Exception
+ {
+ final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1);
+
+ final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
+ delegateCursor = new SequentialDBCursor(null, updateMsg);
+
+ final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
+ assertThat(cursor.getRecord()).isNull();
+ assertThat(cursor.next()).isTrue();
+ assertThat(cursor.getRecord()).isSameAs(updateMsg);
+ assertThat(cursor.next()).isFalse();
+ assertThat(cursor.getRecord()).isNull();
+ }
+
+}
--
Gitblit v1.10.0