opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/LDAPReplicationDomain.java
@@ -2006,6 +2006,13 @@ addOperation.setAttachment(SYNCHROCONTEXT, ctx); } /** {@inheritDoc} */ @Override public void publishReplicaOfflineMsg() { pendingChanges.putReplicaOfflineMsg(); } /** * Check if an operation must be synchronized. * Also update the list of pending changes and the server RUV opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChange.java
@@ -29,6 +29,7 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.LDAPUpdateMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.types.operation.PluginOperation; /** @@ -39,7 +40,7 @@ { private final CSN csn; private boolean committed; private LDAPUpdateMsg msg; private UpdateMsg msg; private final PluginOperation op; private ServerState dependencyState; @@ -49,7 +50,7 @@ * @param op the operation to use * @param msg the message to use (can be null for local operations) */ PendingChange(CSN csn, PluginOperation op, LDAPUpdateMsg msg) PendingChange(CSN csn, PluginOperation op, UpdateMsg msg) { this.csn = csn; this.committed = false; @@ -89,12 +90,27 @@ * @return the message if operation was a replication operation * null if the operation was a local operation */ public LDAPUpdateMsg getMsg() public UpdateMsg getMsg() { return msg; } /** * Get the LDAPUpdateMsg associated to this PendingChange. * * @return the LDAPUpdateMsg if operation was a replication operation, null * otherwise */ public LDAPUpdateMsg getLDAPUpdateMsg() { if (msg instanceof LDAPUpdateMsg) { return (LDAPUpdateMsg) msg; } return null; } /** * Set the message associated to the PendingChange. * @param msg the message */ opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/PendingChanges.java
@@ -33,6 +33,8 @@ import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.CSNGenerator; import org.opends.server.replication.protocol.LDAPUpdateMsg; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.service.ReplicationDomain; import org.opends.server.types.operation.PluginOperation; @@ -136,6 +138,20 @@ } /** * Add a replica offline message to the pending list. */ public synchronized void putReplicaOfflineMsg() { final CSN offlineCSN = csnGenerator.newCSN(); final PendingChange pendingChange = new PendingChange(offlineCSN, null, new ReplicaOfflineMsg(offlineCSN)); pendingChange.setCommitted(true); pendingChanges.put(offlineCSN, pendingChange); pushCommittedChanges(); } /** * Push all committed local changes to the replicationServer service. */ synchronized void pushCommittedChanges() @@ -152,20 +168,26 @@ while (firstChange != null && firstChange.isCommitted()) { final PluginOperation op = firstChange.getOp(); if (op != null && !op.isSynchronizationOperation()) final UpdateMsg msg = firstChange.getMsg(); if (msg instanceof LDAPUpdateMsg && op != null && !op.isSynchronizationOperation()) { final LDAPUpdateMsg updateMsg = firstChange.getMsg(); if (!recoveringOldChanges) { domain.publish(updateMsg); domain.publish(msg); } else { // do not push updates until the RS catches up. // @see #setRecovering(boolean) domain.getServerState().update(updateMsg.getCSN()); domain.getServerState().update(msg.getCSN()); } } else if (msg instanceof ReplicaOfflineMsg) { domain.publish(msg); } // false warning: firstEntry will not be null if firstChange is not null pendingChanges.remove(firstEntry.getKey()); opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/plugin/RemotePendingChanges.java
@@ -157,7 +157,7 @@ if (change.dependenciesIsCovered(state)) { dependentChanges.remove(change); return change.getMsg(); return change.getLDAPUpdateMsg(); } } return null; @@ -208,7 +208,7 @@ { if (pendingChange.getCSN().isOlderThan(csn)) { final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) @@ -300,7 +300,7 @@ { if (pendingChange.getCSN().isOlderThan(csn)) { final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); if (pendingMsg instanceof AddMsg) { // Check if the operation to be run is an addOperation on a same DN. @@ -350,13 +350,13 @@ return false; } final DN targetDN = change.getMsg().getDN(); final DN targetDN = change.getLDAPUpdateMsg().getDN(); for (PendingChange pendingChange : pendingChanges.values()) { if (pendingChange.getCSN().isOlderThan(csn)) { final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) @@ -442,7 +442,7 @@ { if (pendingChange.getCSN().isOlderThan(csn)) { final LDAPUpdateMsg pendingMsg = pendingChange.getMsg(); final LDAPUpdateMsg pendingMsg = pendingChange.getLDAPUpdateMsg(); if (pendingMsg != null) { if (pendingMsg instanceof DeleteMsg) opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ProtocolVersion.java
@@ -99,7 +99,7 @@ /** * The constant for the 8th version of the replication protocol. * <ul> * <li>StopMsg now has a timestamp to communicate the replica stop time.</li> * <li>New ReplicaOfflineMsg.</li> * </ul> */ public static final short REPLICATION_PROTOCOL_V8 = 8; opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicaOfflineMsg.java
New file @@ -0,0 +1,113 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.protocol; import java.util.zip.DataFormatException; import org.opends.server.replication.common.CSN; import static org.opends.server.replication.protocol.ByteArrayBuilder.*; /** * Class that define messages sent by a replica (DS) to the replication server * (RS) to let the RS know the date at which a replica went offline. */ public class ReplicaOfflineMsg extends UpdateMsg { /** * Constructor of a replica offline message providing the offline timestamp in * a CSN. * * @param offlineCSN * the provided offline CSN */ public ReplicaOfflineMsg(final CSN offlineCSN) { super(offlineCSN, new byte[0]); } /** * Creates a message by deserializing it from the provided byte array. * * @param in * The provided byte array. * @throws DataFormatException * When an error occurs during decoding . */ public ReplicaOfflineMsg(byte[] in) throws DataFormatException { try { final ByteArrayScanner scanner = new ByteArrayScanner(in); final byte msgType = scanner.nextByte(); if (msgType != MSG_TYPE_REPLICA_OFFLINE) { throw new DataFormatException("input is not a valid " + getClass().getSimpleName() + " message: " + msgType); } protocolVersion = scanner.nextShort(); csn = scanner.nextCSN(); if (!scanner.isEmpty()) { throw new DataFormatException( "Did not expect to find more bytes to read for " + getClass().getSimpleName()); } } catch (RuntimeException e) { // Index out of bounds, bad format, etc. throw new DataFormatException("byte[] is not a valid " + getClass().getSimpleName()); } } /** {@inheritDoc} */ @Override public byte[] getBytes(short protocolVersion) { final ByteArrayBuilder builder = new ByteArrayBuilder(size()); builder.appendByte(MSG_TYPE_REPLICA_OFFLINE); builder.appendShort(protocolVersion); builder.appendCSN(csn); return builder.toByteArray(); } /** {@inheritDoc} */ @Override public int size() { return bytes(1) + shorts(1) + csns(1); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " offlineCSN=" + csn; } } opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/protocol/ReplicationMsg.java
@@ -85,6 +85,9 @@ // EntryMsg, InitializeRequestMsg, InitializeTargetMsg, ErrorMsg // TopologyMsg /** @since {@link ProtocolVersion#REPLICATION_PROTOCOL_V8} */ static final byte MSG_TYPE_REPLICA_OFFLINE = 37; // Adding a new type of message here probably requires to // change accordingly generateMsg method below @@ -199,6 +202,8 @@ return new StopMsg(buffer); case MSG_TYPE_INITIALIZE_RCV_ACK: return new InitializeRcvAckMsg(buffer); case MSG_TYPE_REPLICA_OFFLINE: return new ReplicaOfflineMsg(buffer); default: throw new DataFormatException("received message with unknown type"); } opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -228,7 +228,13 @@ { final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */); if (newMsg == null) if (newMsg instanceof ReplicaOfflineMsg) { // and ReplicaOfflineMsg cannot be returned to a search on cn=changelog // proceed as if it was never returned continue; } else if (newMsg == null) { // in non blocking mode, null means no more messages return null; } opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -492,6 +492,13 @@ { try { if (updateMsg instanceof ReplicaOfflineMsg) { final ReplicaOfflineMsg offlineMsg = (ReplicaOfflineMsg) updateMsg; this.domainDB.replicaOffline(baseDN, offlineMsg.getCSN()); return true; } if (this.domainDB.publishUpdateMsg(baseDN, updateMsg)) { /* opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -697,18 +697,39 @@ throws ChangelogException { final Set<Integer> serverIds = getDomainMap(baseDN).keySet(); final ChangelogState state = dbEnv.readChangelogState(); final Map<DBCursor<UpdateMsg>, Void> cursors = new HashMap<DBCursor<UpdateMsg>, Void>(serverIds.size()); for (int serverId : serverIds) { // get the last already sent CSN from that server to get a cursor final CSN lastCSN = startAfterServerState != null ? startAfterServerState.getCSN(serverId) : null; cursors.put(getCursorFrom(baseDN, serverId, lastCSN), null); final DBCursor<UpdateMsg> replicaDBCursor = getCursorFrom(baseDN, serverId, lastCSN); final CSN offlineCSN = getOfflineCSN(state, baseDN, serverId, startAfterServerState); cursors.put(new ReplicaOfflineCursor(replicaDBCursor, offlineCSN), null); } // recycle exhausted cursors, // because client code will not manage the cursors itself return new CompositeDBCursor<Void>(cursors, true); } private CSN getOfflineCSN(final ChangelogState state, DN baseDN, int serverId, ServerState startAfterServerState) { final List<CSN> domain = state.getOfflineReplicas().get(baseDN); if (domain != null) { for (CSN offlineCSN : domain) { if (serverId == offlineCSN.getServerId() && !startAfterServerState.cover(offlineCSN)) { return offlineCSN; } } } return null; } /** {@inheritDoc} */ @Override public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN) opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
New file @@ -0,0 +1,126 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; /** * Implementation of a DBCursor that decorates an existing DBCursor * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted * and the offline CSN is newer than the last returned update CSN. */ public class ReplicaOfflineCursor implements DBCursor<UpdateMsg> { /** @NonNull */ private final DBCursor<UpdateMsg> cursor; private ReplicaOfflineMsg replicaOfflineMsg; /** * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg} */ private boolean returnReplicaOfflineMsg; /** * Creates a ReplicaOfflineCursor object with a cursor to decorate * and an offlineCSN to return as part of a ReplicaOfflineMsg. * * @param cursor * the non-null underlying cursor that needs to be exhausted before * we return a ReplicaOfflineMsg * @param offlineCSN * The offline CSN from which to builder the * {@link ReplicaOfflineMsg} to return */ public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN) { this.replicaOfflineMsg = offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null; this.cursor = cursor; } /** {@inheritDoc} */ @Override public UpdateMsg getRecord() { return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord(); } /** {@inheritDoc} */ @Override public boolean next() throws ChangelogException { if (returnReplicaOfflineMsg) { // already consumed, never return it again... replicaOfflineMsg = null; returnReplicaOfflineMsg = false; // ...and verify if new changes have been added to the DB // (cursors are automatically restarted) } final UpdateMsg lastUpdate = cursor.getRecord(); final boolean hasNext = cursor.next(); if (hasNext) { return true; } if (replicaOfflineMsg == null) { // no ReplicaOfflineMsg to return return false; } // replicaDB just happened to be exhausted now if (lastUpdate != null && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN())) { // offlineCSN is outdated, never return it replicaOfflineMsg = null; return false; } returnReplicaOfflineMsg = true; return true; } /** {@inheritDoc} */ @Override public void close() { cursor.close(); } /** {@inheritDoc} */ @Override public String toString() { return getClass().getSimpleName() + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg + " offlineCSN=" + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null) + " cursor=" + cursor.toString().split("", 2)[1]; } } opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationBroker.java
@@ -2777,6 +2777,7 @@ synchronized (startStopLock) { domain.publishReplicaOfflineMsg(); shutdown = true; setConnectedRS(ConnectedRS.stopped()); stopRSHeartBeatMonitoring(); opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/service/ReplicationDomain.java
@@ -3417,6 +3417,15 @@ } /** * Publishes a replica offline message if all pending changes for current * replica have been sent out. */ public void publishReplicaOfflineMsg() { // Here to be overridden } /** * Publish information to the Replication Service (not assured mode). * * @param msg The byte array containing the information that should opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/protocol/SynchronizationMsgTest.java
@@ -844,6 +844,17 @@ assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN()); } @Test public void replicaOfflineMsgTest() throws Exception { final CSN csn = new CSN(System.currentTimeMillis(), 0, 42); final ReplicaOfflineMsg expectedMsg = new ReplicaOfflineMsg(csn); final byte[] bytes = expectedMsg.getBytes(REPLICATION_PROTOCOL_V8); ReplicaOfflineMsg decodedMsg = new ReplicaOfflineMsg(bytes); assertEquals(decodedMsg.getCSN(), expectedMsg.getCSN()); } /** * Test that WindowMsg encoding and decoding works * by checking that : msg == new WindowMsg(msg.getBytes()). opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -29,7 +29,6 @@ import java.util.Map; import org.opends.server.DirectoryServerTestCase; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; @@ -56,10 +55,10 @@ @BeforeClass public void setupMsgs() { msg1 = newUpdateMsg(1); msg2 = newUpdateMsg(2); msg3 = newUpdateMsg(3); msg4 = newUpdateMsg(4); msg1 = new FakeUpdateMsg(1); msg2 = new FakeUpdateMsg(2); msg3 = new FakeUpdateMsg(3); msg4 = new FakeUpdateMsg(4); } @Test @@ -134,19 +133,6 @@ of(msg4, baseDN1)); } private UpdateMsg newUpdateMsg(final int t) { return new UpdateMsg(new CSN(t, t, t), new byte[t]) { /** {@inheritDoc} */ @Override public String toString() { return "UpdateMsg(" + t + ")"; } }; } private CompositeDBCursor<String> newCompositeDBCursor( Pair<? extends DBCursor<UpdateMsg>, String>... pairs) throws Exception { opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/FakeUpdateMsg.java
New file @@ -0,0 +1,47 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; @SuppressWarnings("javadoc") class FakeUpdateMsg extends UpdateMsg { private final int t; FakeUpdateMsg(int t) { super(new CSN(t, t, t), new byte[1]); this.t = t; } /** {@inheritDoc} */ @Override public String toString() { return "UpdateMsg(" + t + ")"; } } opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
New file @@ -0,0 +1,129 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.ReplicaOfflineMsg; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.changelog.api.DBCursor; import org.testng.annotations.BeforeTest; import org.testng.annotations.Test; import static org.assertj.core.api.Assertions.*; /** * Test the ReplicaOfflineCursor class. */ @SuppressWarnings("javadoc") public class ReplicaOfflineCursorTest extends ReplicationTestCase { private int timestamp; private DBCursor<UpdateMsg> delegateCursor; @BeforeTest public void init() { timestamp = 1; } @Test public void cursorReturnsFalse() throws Exception { delegateCursor = new SequentialDBCursor(); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isFalse(); assertThat(cursor.getRecord()).isNull(); } @Test public void cursorReturnsTrue() throws Exception { final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); delegateCursor = new SequentialDBCursor(null, updateMsg); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); assertThat(cursor.getRecord()).isSameAs(updateMsg); assertThat(cursor.next()).isFalse(); assertThat(cursor.getRecord()).isNull(); } @Test public void cursorReturnsReplicaOfflineMsg() throws Exception { delegateCursor = new SequentialDBCursor(); final CSN offlineCSN = new CSN(timestamp++, 1, 1); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); final UpdateMsg record = cursor.getRecord(); assertThat(record).isInstanceOf(ReplicaOfflineMsg.class); assertThat(record.getCSN()).isEqualTo(offlineCSN); assertThat(cursor.next()).isFalse(); assertThat(cursor.getRecord()).isNull(); } @Test public void cursorReturnsUpdateMsgThenReplicaOfflineMsg() throws Exception { final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); delegateCursor = new SequentialDBCursor(null, updateMsg); final CSN offlineCSN = new CSN(timestamp++, 1, 1); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); assertThat(cursor.getRecord()).isSameAs(updateMsg); assertThat(cursor.next()).isTrue(); final UpdateMsg record = cursor.getRecord(); assertThat(record).isInstanceOf(ReplicaOfflineMsg.class); assertThat(record.getCSN()).isEqualTo(offlineCSN); assertThat(cursor.next()).isFalse(); assertThat(cursor.getRecord()).isNull(); } @Test public void cursorReturnsUpdateMsgThenNeverReturnsOutdatedReplicaOfflineMsg() throws Exception { final CSN outdatedOfflineCSN = new CSN(timestamp++, 1, 1); final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++); delegateCursor = new SequentialDBCursor(null, updateMsg); final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN); assertThat(cursor.getRecord()).isNull(); assertThat(cursor.next()).isTrue(); assertThat(cursor.getRecord()).isSameAs(updateMsg); assertThat(cursor.next()).isFalse(); assertThat(cursor.getRecord()).isNull(); } }