From dc20d17584703e0736ef3982bc0dc18b4d11bb37 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 13:41:47 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4244) Persistent searches on external changelog do not return changes for new replicas and new domains
---
/dev/null | 126 ---------------
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 54 +++++-
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 7
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java | 171 +++++++++++++++++++++
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java | 19 +
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 48 +++++
6 files changed, 274 insertions(+), 151 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7025753..af4cbb8 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +50,7 @@
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
+import org.opends.server.replication.server.changelog.je.ReplicaCursor;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -94,6 +95,8 @@
new HashMap<DN, List<DomainDBCursor>>();
private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
private ReplicationEnvironment replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -714,16 +717,28 @@
/** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
- PositionStrategy positionStrategy) throws ChangelogException
+ final PositionStrategy positionStrategy) throws ChangelogException
{
final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
- // TODO JNR if (offlineCSN != null) ??
- // What about replicas that suddenly become offline?
- return new ReplicaOfflineCursor(cursor, offlineCSN);
+ final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+ final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+ synchronized (replicaCursors)
+ {
+ List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+ if (cursors == null)
+ {
+ cursors = new ArrayList<ReplicaCursor>();
+ replicaCursors.put(replicaID, cursors);
+ }
+ cursors.add(replicaCursor);
+ }
+
+ return replicaCursor;
}
return EMPTY_CURSOR_REPLICA_DB;
}
@@ -748,6 +763,15 @@
}
}
}
+ else if (cursor instanceof ReplicaCursor)
+ {
+ final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+ final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
}
/** {@inheritDoc} */
@@ -788,6 +812,7 @@
{
replicationEnv.notifyReplicaOnline(baseDN, serverId);
}
+ updateCursorsWithOfflineCSN(baseDN, serverId, null);
}
/** {@inheritDoc} */
@@ -800,6 +825,19 @@
{
indexer.replicaOffline(baseDN, offlineCSN);
}
+ updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
+ }
+
+ private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
+ {
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+ if (cursors != null)
+ {
+ for (ReplicaCursor cursor : cursors)
+ {
+ cursor.setOfflineCSN(offlineCSN);
+ }
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index d70e7a1..a15c25d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -558,11 +558,10 @@
/*
* replica is not back online, Medium consistency point has gone past
* its last offline time, and there are no more changes after the
- * offline CSN in the cursor: remove everything known about it:
- * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
- * this replica from the medium consistency RUV.
+ * offline CSN in the cursor: remove everything known about it
+ * (offlineCSN from lastAliveCSN and remove all knowledge of this replica
+ * from the medium consistency RUV).
*/
- // TODO JNR how to close cursor for offline replica?
lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fdabd6f..90091c9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -90,6 +91,8 @@
new HashMap<DN, List<DomainDBCursor>>();
private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
new CopyOnWriteArrayList<MultiDomainDBCursor>();
+ private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+ new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
private ReplicationDbEnv replicationEnv;
private final ReplicationServerCfg config;
private final File dbDirectory;
@@ -728,7 +731,7 @@
return cursor;
}
- private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
{
synchronized (registeredDomainCursors)
{
@@ -759,21 +762,31 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
- PositionStrategy positionStrategy) throws ChangelogException
-
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
+ " is not supported for the JE implementation fo changelog");
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- final DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startCSN);
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
- // TODO JNR if (offlineCSN != null) ??
- // What about replicas that suddenly become offline?
- return new ReplicaOfflineCursor(cursor, offlineCSN);
+ final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+ final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+ synchronized (replicaCursors)
+ {
+ List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+ if (cursors == null)
+ {
+ cursors = new ArrayList<ReplicaCursor>();
+ replicaCursors.put(replicaID, cursors);
+ }
+ cursors.add(replicaCursor);
+ }
+
+ return replicaCursor;
}
return EMPTY_CURSOR_REPLICA_DB;
}
@@ -798,6 +811,15 @@
}
}
}
+ else if (cursor instanceof ReplicaCursor)
+ {
+ final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+ final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+ if (cursors != null)
+ {
+ cursors.remove(cursor);
+ }
+ }
}
/** {@inheritDoc} */
@@ -838,6 +860,7 @@
{
replicationEnv.notifyReplicaOnline(baseDN, serverId);
}
+ updateCursorsWithOfflineCSN(baseDN, null);
}
/** {@inheritDoc} */
@@ -850,6 +873,19 @@
{
indexer.replicaOffline(baseDN, offlineCSN);
}
+ updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+ }
+
+ private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+ {
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+ if (cursors != null && !cursors.isEmpty())
+ {
+ for (ReplicaCursor cursor : cursors)
+ {
+ cursor.setOfflineCSN(offlineCSN);
+ }
+ }
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
new file mode 100644
index 0000000..1ede615
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
@@ -0,0 +1,171 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * {@link DBCursor} over a replica returning {@link UpdateMsg}s.
+ * <p>
+ * It decorates an existing {@link DBCursor} on a replicaDB and can possibly
+ * return replica offline messages when the decorated DBCursor is exhausted and
+ * the offline CSN is newer than the last returned update CSN.
+ */
+public class ReplicaCursor implements DBCursor<UpdateMsg>
+{
+ /** @NonNull */
+ private final DBCursor<UpdateMsg> cursor;
+ private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg =
+ new AtomicReference<ReplicaOfflineMsg>();
+ private UpdateMsg currentRecord;
+
+ private final Pair<DN, Integer> replicaID;
+ private final ReplicationDomainDB domainDB;
+
+ /**
+ * Creates a ReplicaCursor object with a cursor to decorate
+ * and an offlineCSN to return as part of a ReplicaOfflineMsg.
+ *
+ * @param cursor
+ * the non-null underlying cursor that needs to be exhausted before
+ * we return a ReplicaOfflineMsg
+ * @param offlineCSN
+ * the offline CSN from which to builder the
+ * {@link ReplicaOfflineMsg} to return
+ * @param replicaID
+ * the baseDN => serverId pair to uniquely identify the replica
+ * @param domainDB
+ * the DB for the provided replication domain
+ */
+ public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN,
+ Pair<DN, Integer> replicaID, ReplicationDomainDB domainDB)
+ {
+ this.cursor = cursor;
+ this.replicaID = replicaID;
+ this.domainDB = domainDB;
+ setOfflineCSN(offlineCSN);
+ }
+
+ /**
+ * Sets the offline CSN to be returned by this cursor.
+ *
+ * @param offlineCSN
+ * The offline CSN to be returned by this cursor.
+ * If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg
+ */
+ public void setOfflineCSN(CSN offlineCSN)
+ {
+ this.replicaOfflineMsg.set(
+ offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return currentRecord;
+ }
+
+ /**
+ * Returns the replica identifier that this cursor is associated to.
+ *
+ * @return the replica identifier that this cursor is associated to
+ */
+ public Pair<DN, Integer> getReplicaID()
+ {
+ return replicaID;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
+ {
+ final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get();
+ if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord))
+ {
+ replicaOfflineMsg.compareAndSet(offlineMsg1, null);
+ }
+
+ // now verify if new changes have been added to the DB
+ // (cursors are automatically restarted)
+ final UpdateMsg lastUpdate = cursor.getRecord();
+ final boolean hasNext = cursor.next();
+ if (hasNext)
+ {
+ currentRecord = cursor.getRecord();
+ return true;
+ }
+
+ // replicaDB just happened to be exhausted now
+ final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get();
+ if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate))
+ {
+ replicaOfflineMsg.compareAndSet(offlineMsg2, null);
+ currentRecord = null;
+ return false;
+ }
+ currentRecord = offlineMsg2;
+ return currentRecord != null;
+ }
+
+ /** It could also mean that the replica offline message has already been consumed. */
+ private boolean isReplicaOfflineMsgOutdated(
+ final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg)
+ {
+ return offlineMsg != null
+ && updateMsg != null
+ && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close()
+ {
+ cursor.close();
+ domainDB.unregisterCursor(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ final ReplicaOfflineMsg msg = replicaOfflineMsg.get();
+ return getClass().getSimpleName()
+ + " currentRecord=" + currentRecord
+ + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null)
+ + " cursor=" + cursor.toString().split("", 2)[1];
+ }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
deleted file mode 100644
index fb2364e..0000000
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- * Copyright 2014 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.protocol.ReplicaOfflineMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-
-/**
- * Implementation of a DBCursor that decorates an existing DBCursor
- * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
- * and the offline CSN is newer than the last returned update CSN.
- */
-public class ReplicaOfflineCursor implements DBCursor<UpdateMsg>
-{
- /** @NonNull */
- private final DBCursor<UpdateMsg> cursor;
- private ReplicaOfflineMsg replicaOfflineMsg;
- /**
- * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
- */
- private boolean returnReplicaOfflineMsg;
-
- /**
- * Creates a ReplicaOfflineCursor object with a cursor to decorate
- * and an offlineCSN to return as part of a ReplicaOfflineMsg.
- *
- * @param cursor
- * the non-null underlying cursor that needs to be exhausted before
- * we return a ReplicaOfflineMsg
- * @param offlineCSN
- * The offline CSN from which to builder the
- * {@link ReplicaOfflineMsg} to return
- */
- public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN)
- {
- this.replicaOfflineMsg =
- offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
- this.cursor = cursor;
- }
-
- /** {@inheritDoc} */
- @Override
- public UpdateMsg getRecord()
- {
- return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
- }
-
- /** {@inheritDoc} */
- @Override
- public boolean next() throws ChangelogException
- {
- if (returnReplicaOfflineMsg)
- {
- // already consumed, never return it again...
- replicaOfflineMsg = null;
- returnReplicaOfflineMsg = false;
- // ...and verify if new changes have been added to the DB
- // (cursors are automatically restarted)
- }
- final UpdateMsg lastUpdate = cursor.getRecord();
- final boolean hasNext = cursor.next();
- if (hasNext)
- {
- return true;
- }
- if (replicaOfflineMsg == null)
- { // no ReplicaOfflineMsg to return
- return false;
- }
-
- // replicaDB just happened to be exhausted now
- if (lastUpdate != null
- && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
- {
- // offlineCSN is outdated, never return it
- replicaOfflineMsg = null;
- return false;
- }
- returnReplicaOfflineMsg = true;
- return true;
- }
-
- /** {@inheritDoc} */
- @Override
- public void close()
- {
- cursor.close();
- }
-
- /** {@inheritDoc} */
- @Override
- public String toString()
- {
- return getClass().getSimpleName()
- + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg
- + " offlineCSN="
- + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null)
- + " cursor=" + cursor.toString().split("", 2)[1];
- }
-
-}
\ No newline at end of file
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
similarity index 85%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
index 538b6de..a276dba 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
@@ -35,10 +35,10 @@
import static org.assertj.core.api.Assertions.*;
/**
- * Test the ReplicaOfflineCursor class.
+ * Test the {@link ReplicaCursor} class.
*/
@SuppressWarnings("javadoc")
-public class ReplicaOfflineCursorTest extends ReplicationTestCase
+public class ReplicaCursorTest extends ReplicationTestCase
{
private int timestamp;
@@ -55,7 +55,7 @@
{
delegateCursor = new SequentialDBCursor();
- final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+ final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
assertThat(cursor.getRecord()).isNull();
assertThat(cursor.next()).isFalse();
assertThat(cursor.getRecord()).isNull();
@@ -67,7 +67,7 @@
final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
delegateCursor = new SequentialDBCursor(updateMsg);
- final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+ final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
assertThat(cursor.getRecord()).isNull();
assertThat(cursor.next()).isTrue();
assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -81,7 +81,7 @@
delegateCursor = new SequentialDBCursor();
final CSN offlineCSN = new CSN(timestamp++, 1, 1);
- final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+ final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
assertThat(cursor.getRecord()).isNull();
assertThat(cursor.next()).isTrue();
final UpdateMsg record = cursor.getRecord();
@@ -98,7 +98,7 @@
delegateCursor = new SequentialDBCursor(updateMsg);
final CSN offlineCSN = new CSN(timestamp++, 1, 1);
- final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+ final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
assertThat(cursor.getRecord()).isNull();
assertThat(cursor.next()).isTrue();
assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -118,7 +118,7 @@
final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
delegateCursor = new SequentialDBCursor(updateMsg);
- final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
+ final ReplicaCursor cursor = newReplicaCursor(delegateCursor, outdatedOfflineCSN);
assertThat(cursor.getRecord()).isNull();
assertThat(cursor.next()).isTrue();
assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -126,4 +126,9 @@
assertThat(cursor.getRecord()).isNull();
}
+ private ReplicaCursor newReplicaCursor(DBCursor<UpdateMsg> delegateCursor, CSN offlineCSN)
+ {
+ return new ReplicaCursor(delegateCursor, offlineCSN, null, null);
+ }
+
}
--
Gitblit v1.10.0