From dc20d17584703e0736ef3982bc0dc18b4d11bb37 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 13:41:47 +0000
Subject: [PATCH] OPENDJ-1441 (CR-4244) Persistent searches on external changelog do not return changes for new replicas and new domains

---
 /dev/null                                                                                                          |  126 ---------------
 opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                             |   54 +++++-
 opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java                       |    7 
 opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java                             |  171 +++++++++++++++++++++
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java |   19 +
 opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java                         |   48 +++++
 6 files changed, 274 insertions(+), 151 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 7025753..af4cbb8 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -49,7 +50,7 @@
 import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
 import org.opends.server.replication.server.changelog.je.DomainDBCursor;
 import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicaOfflineCursor;
+import org.opends.server.replication.server.changelog.je.ReplicaCursor;
 import org.opends.server.types.DN;
 import org.opends.server.types.DebugLogLevel;
 import org.opends.server.util.StaticUtils;
@@ -94,6 +95,8 @@
       new HashMap<DN, List<DomainDBCursor>>();
   private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
       new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
   private ReplicationEnvironment replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -714,16 +717,28 @@
   /** {@inheritDoc} */
   @Override
   public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
-      PositionStrategy positionStrategy) throws ChangelogException
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
     final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
       final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
-      // TODO JNR if (offlineCSN != null) ??
-      // What about replicas that suddenly become offline?
-      return new ReplicaOfflineCursor(cursor, offlineCSN);
+      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+      synchronized (replicaCursors)
+      {
+        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+        if (cursors == null)
+        {
+          cursors = new ArrayList<ReplicaCursor>();
+          replicaCursors.put(replicaID, cursors);
+        }
+        cursors.add(replicaCursor);
+      }
+
+      return replicaCursor;
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
@@ -748,6 +763,15 @@
         }
       }
     }
+    else if (cursor instanceof ReplicaCursor)
+    {
+      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+      final List<ReplicaCursor> cursors = replicaCursors.get(replicaCursor.getReplicaID());
+      if (cursors != null)
+      {
+        cursors.remove(cursor);
+      }
+    }
   }
 
   /** {@inheritDoc} */
@@ -788,6 +812,7 @@
     {
       replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
+    updateCursorsWithOfflineCSN(baseDN, serverId, null);
   }
 
   /** {@inheritDoc} */
@@ -800,6 +825,19 @@
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
+    updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
+  }
+
+  private void updateCursorsWithOfflineCSN(final DN baseDN, final int serverId, final CSN offlineCSN)
+  {
+    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
+    if (cursors != null)
+    {
+      for (ReplicaCursor cursor : cursors)
+      {
+        cursor.setOfflineCSN(offlineCSN);
+      }
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index d70e7a1..a15c25d 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -558,11 +558,10 @@
         /*
          * replica is not back online, Medium consistency point has gone past
          * its last offline time, and there are no more changes after the
-         * offline CSN in the cursor: remove everything known about it:
-         * cursor, offlineCSN from lastAliveCSN and remove all knowledge of
-         * this replica from the medium consistency RUV.
+         * offline CSN in the cursor: remove everything known about it
+         * (offlineCSN from lastAliveCSN and remove all knowledge of this replica
+         * from the medium consistency RUV).
          */
-        // TODO JNR how to close cursor for offline replica?
         lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
         mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
       }
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fdabd6f..90091c9 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -29,6 +29,7 @@
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -90,6 +91,8 @@
       new HashMap<DN, List<DomainDBCursor>>();
   private final CopyOnWriteArrayList<MultiDomainDBCursor> registeredMultiDomainCursors =
       new CopyOnWriteArrayList<MultiDomainDBCursor>();
+  private final ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>> replicaCursors =
+      new ConcurrentSkipListMap<Pair<DN, Integer>, List<ReplicaCursor>>(Pair.COMPARATOR);
   private ReplicationDbEnv replicationEnv;
   private final ReplicationServerCfg config;
   private final File dbDirectory;
@@ -728,7 +731,7 @@
     return cursor;
   }
 
-  private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
+  private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
   {
     synchronized (registeredDomainCursors)
     {
@@ -759,21 +762,31 @@
 
   /** {@inheritDoc} */
   @Override
-  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
-      PositionStrategy positionStrategy) throws ChangelogException
-
+  public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
+      final PositionStrategy positionStrategy) throws ChangelogException
   {
     Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
         + " is not supported for the JE implementation fo changelog");
     final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
     if (replicaDB != null)
     {
-      final DBCursor<UpdateMsg> cursor =
-          replicaDB.generateCursorFrom(startCSN);
+      final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
       final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
-      // TODO JNR if (offlineCSN != null) ??
-      // What about replicas that suddenly become offline?
-      return new ReplicaOfflineCursor(cursor, offlineCSN);
+      final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
+      final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
+
+      synchronized (replicaCursors)
+      {
+        List<ReplicaCursor> cursors = replicaCursors.get(replicaID);
+        if (cursors == null)
+        {
+          cursors = new ArrayList<ReplicaCursor>();
+          replicaCursors.put(replicaID, cursors);
+        }
+        cursors.add(replicaCursor);
+      }
+
+      return replicaCursor;
     }
     return EMPTY_CURSOR_REPLICA_DB;
   }
@@ -798,6 +811,15 @@
         }
       }
     }
+    else if (cursor instanceof ReplicaCursor)
+    {
+      final ReplicaCursor replicaCursor = (ReplicaCursor) cursor;
+      final List<ReplicaCursor> cursors =  replicaCursors.get(replicaCursor.getReplicaID());
+      if (cursors != null)
+      {
+        cursors.remove(cursor);
+      }
+    }
   }
 
   /** {@inheritDoc} */
@@ -838,6 +860,7 @@
     {
       replicationEnv.notifyReplicaOnline(baseDN, serverId);
     }
+    updateCursorsWithOfflineCSN(baseDN, null);
   }
 
   /** {@inheritDoc} */
@@ -850,6 +873,19 @@
     {
       indexer.replicaOffline(baseDN, offlineCSN);
     }
+    updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+  }
+
+  private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+  {
+    final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+    if (cursors != null && !cursors.isEmpty())
+    {
+      for (ReplicaCursor cursor : cursors)
+      {
+        cursor.setOfflineCSN(offlineCSN);
+      }
+    }
   }
 
   /**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
new file mode 100644
index 0000000..1ede615
--- /dev/null
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursor.java
@@ -0,0 +1,171 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License").  You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ *      Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ *      Copyright 2014 ForgeRock AS
+ */
+package org.opends.server.replication.server.changelog.je;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.protocol.ReplicaOfflineMsg;
+import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.types.DN;
+
+import com.forgerock.opendj.util.Pair;
+
+/**
+ * {@link DBCursor} over a replica returning {@link UpdateMsg}s.
+ * <p>
+ * It decorates an existing {@link DBCursor} on a replicaDB and can possibly
+ * return replica offline messages when the decorated DBCursor is exhausted and
+ * the offline CSN is newer than the last returned update CSN.
+ */
+public class ReplicaCursor implements DBCursor<UpdateMsg>
+{
+  /** @NonNull */
+  private final DBCursor<UpdateMsg> cursor;
+  private final AtomicReference<ReplicaOfflineMsg> replicaOfflineMsg =
+      new AtomicReference<ReplicaOfflineMsg>();
+  private UpdateMsg currentRecord;
+
+  private final Pair<DN, Integer> replicaID;
+  private final ReplicationDomainDB domainDB;
+
+  /**
+   * Creates a ReplicaCursor object with a cursor to decorate
+   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
+   *
+   * @param cursor
+   *          the non-null underlying cursor that needs to be exhausted before
+   *          we return a ReplicaOfflineMsg
+   * @param offlineCSN
+   *          the offline CSN from which to builder the
+   *          {@link ReplicaOfflineMsg} to return
+   * @param replicaID
+   *          the baseDN => serverId pair to uniquely identify the replica
+   * @param domainDB
+   *          the DB for the provided replication domain
+   */
+  public ReplicaCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN,
+      Pair<DN, Integer> replicaID, ReplicationDomainDB domainDB)
+  {
+    this.cursor = cursor;
+    this.replicaID = replicaID;
+    this.domainDB = domainDB;
+    setOfflineCSN(offlineCSN);
+  }
+
+  /**
+   * Sets the offline CSN to be returned by this cursor.
+   *
+   * @param offlineCSN
+   *          The offline CSN to be returned by this cursor.
+   *          If null, it will unset any previous offlineCSN and never return a ReplicaOfflineMsg
+   */
+  public void setOfflineCSN(CSN offlineCSN)
+  {
+    this.replicaOfflineMsg.set(
+        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public UpdateMsg getRecord()
+  {
+    return currentRecord;
+  }
+
+  /**
+   * Returns the replica identifier that this cursor is associated to.
+   *
+   * @return the replica identifier that this cursor is associated to
+   */
+  public Pair<DN, Integer> getReplicaID()
+  {
+    return replicaID;
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public boolean next() throws ChangelogException
+  {
+    final ReplicaOfflineMsg offlineMsg1 = replicaOfflineMsg.get();
+    if (isReplicaOfflineMsgOutdated(offlineMsg1, currentRecord))
+    {
+      replicaOfflineMsg.compareAndSet(offlineMsg1, null);
+    }
+
+    // now verify if new changes have been added to the DB
+    // (cursors are automatically restarted)
+    final UpdateMsg lastUpdate = cursor.getRecord();
+    final boolean hasNext = cursor.next();
+    if (hasNext)
+    {
+      currentRecord = cursor.getRecord();
+      return true;
+    }
+
+    // replicaDB just happened to be exhausted now
+    final ReplicaOfflineMsg offlineMsg2 = replicaOfflineMsg.get();
+    if (isReplicaOfflineMsgOutdated(offlineMsg2, lastUpdate))
+    {
+      replicaOfflineMsg.compareAndSet(offlineMsg2, null);
+      currentRecord = null;
+      return false;
+    }
+    currentRecord = offlineMsg2;
+    return currentRecord != null;
+  }
+
+  /** It could also mean that the replica offline message has already been consumed. */
+  private boolean isReplicaOfflineMsgOutdated(
+      final ReplicaOfflineMsg offlineMsg, final UpdateMsg updateMsg)
+  {
+    return offlineMsg != null
+        && updateMsg != null
+        && offlineMsg.getCSN().isOlderThanOrEqualTo(updateMsg.getCSN());
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public void close()
+  {
+    cursor.close();
+    domainDB.unregisterCursor(this);
+  }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString()
+  {
+    final ReplicaOfflineMsg msg = replicaOfflineMsg.get();
+    return getClass().getSimpleName()
+        + " currentRecord=" + currentRecord
+        + " offlineCSN=" + (msg != null ? msg.getCSN().toStringUI() : null)
+        + " cursor=" + cursor.toString().split("", 2)[1];
+  }
+
+}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
deleted file mode 100644
index fb2364e..0000000
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursor.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License").  You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- *      Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- *      Copyright 2014 ForgeRock AS
- */
-package org.opends.server.replication.server.changelog.je;
-
-import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.protocol.ReplicaOfflineMsg;
-import org.opends.server.replication.protocol.UpdateMsg;
-import org.opends.server.replication.server.changelog.api.ChangelogException;
-import org.opends.server.replication.server.changelog.api.DBCursor;
-
-/**
- * Implementation of a DBCursor that decorates an existing DBCursor
- * and returns a ReplicaOfflineMsg when the decorated DBCursor is exhausted
- * and the offline CSN is newer than the last returned update CSN.
- */
-public class ReplicaOfflineCursor implements DBCursor<UpdateMsg>
-{
-  /** @NonNull */
-  private final DBCursor<UpdateMsg> cursor;
-  private ReplicaOfflineMsg replicaOfflineMsg;
-  /**
-   * Whether calls to {@link #getRecord()} must return the {@link ReplicaOfflineMsg}
-   */
-  private boolean returnReplicaOfflineMsg;
-
-  /**
-   * Creates a ReplicaOfflineCursor object with a cursor to decorate
-   * and an offlineCSN to return as part of a ReplicaOfflineMsg.
-   *
-   * @param cursor
-   *          the non-null underlying cursor that needs to be exhausted before
-   *          we return a ReplicaOfflineMsg
-   * @param offlineCSN
-   *          The offline CSN from which to builder the
-   *          {@link ReplicaOfflineMsg} to return
-   */
-  public ReplicaOfflineCursor(DBCursor<UpdateMsg> cursor, CSN offlineCSN)
-  {
-    this.replicaOfflineMsg =
-        offlineCSN != null ? new ReplicaOfflineMsg(offlineCSN) : null;
-    this.cursor = cursor;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public UpdateMsg getRecord()
-  {
-    return returnReplicaOfflineMsg ? replicaOfflineMsg : cursor.getRecord();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public boolean next() throws ChangelogException
-  {
-    if (returnReplicaOfflineMsg)
-    {
-      // already consumed, never return it again...
-      replicaOfflineMsg = null;
-      returnReplicaOfflineMsg = false;
-      // ...and verify if new changes have been added to the DB
-      // (cursors are automatically restarted)
-    }
-    final UpdateMsg lastUpdate = cursor.getRecord();
-    final boolean hasNext = cursor.next();
-    if (hasNext)
-    {
-      return true;
-    }
-    if (replicaOfflineMsg == null)
-    { // no ReplicaOfflineMsg to return
-      return false;
-    }
-
-    // replicaDB just happened to be exhausted now
-    if (lastUpdate != null
-        && replicaOfflineMsg.getCSN().isOlderThanOrEqualTo(lastUpdate.getCSN()))
-    {
-      // offlineCSN is outdated, never return it
-      replicaOfflineMsg = null;
-      return false;
-    }
-    returnReplicaOfflineMsg = true;
-    return true;
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public void close()
-  {
-    cursor.close();
-  }
-
-  /** {@inheritDoc} */
-  @Override
-  public String toString()
-  {
-    return getClass().getSimpleName()
-        + " returnReplicaOfflineMsg=" + returnReplicaOfflineMsg
-        + " offlineCSN="
-        + (replicaOfflineMsg != null ? replicaOfflineMsg.getCSN().toStringUI() : null)
-        + " cursor=" + cursor.toString().split("", 2)[1];
-  }
-
-}
\ No newline at end of file
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
similarity index 85%
rename from opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
rename to opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
index 538b6de..a276dba 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaOfflineCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ReplicaCursorTest.java
@@ -35,10 +35,10 @@
 import static org.assertj.core.api.Assertions.*;
 
 /**
- * Test the ReplicaOfflineCursor class.
+ * Test the {@link ReplicaCursor} class.
  */
 @SuppressWarnings("javadoc")
-public class ReplicaOfflineCursorTest extends ReplicationTestCase
+public class ReplicaCursorTest extends ReplicationTestCase
 {
 
   private int timestamp;
@@ -55,7 +55,7 @@
   {
     delegateCursor = new SequentialDBCursor();
 
-    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
     assertThat(cursor.getRecord()).isNull();
     assertThat(cursor.next()).isFalse();
     assertThat(cursor.getRecord()).isNull();
@@ -67,7 +67,7 @@
     final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
     delegateCursor = new SequentialDBCursor(updateMsg);
 
-    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, null);
+    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, null);
     assertThat(cursor.getRecord()).isNull();
     assertThat(cursor.next()).isTrue();
     assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -81,7 +81,7 @@
     delegateCursor = new SequentialDBCursor();
 
     final CSN offlineCSN = new CSN(timestamp++, 1, 1);
-    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
     assertThat(cursor.getRecord()).isNull();
     assertThat(cursor.next()).isTrue();
     final UpdateMsg record = cursor.getRecord();
@@ -98,7 +98,7 @@
     delegateCursor = new SequentialDBCursor(updateMsg);
 
     final CSN offlineCSN = new CSN(timestamp++, 1, 1);
-    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, offlineCSN);
+    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, offlineCSN);
     assertThat(cursor.getRecord()).isNull();
     assertThat(cursor.next()).isTrue();
     assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -118,7 +118,7 @@
     final UpdateMsg updateMsg = new FakeUpdateMsg(timestamp++);
     delegateCursor = new SequentialDBCursor(updateMsg);
 
-    final ReplicaOfflineCursor cursor = new ReplicaOfflineCursor(delegateCursor, outdatedOfflineCSN);
+    final ReplicaCursor cursor = newReplicaCursor(delegateCursor, outdatedOfflineCSN);
     assertThat(cursor.getRecord()).isNull();
     assertThat(cursor.next()).isTrue();
     assertThat(cursor.getRecord()).isSameAs(updateMsg);
@@ -126,4 +126,9 @@
     assertThat(cursor.getRecord()).isNull();
   }
 
+  private ReplicaCursor newReplicaCursor(DBCursor<UpdateMsg> delegateCursor, CSN offlineCSN)
+  {
+    return new ReplicaCursor(delegateCursor, offlineCSN, null, null);
+  }
+
 }

--
Gitblit v1.10.0