From f28eee1ba07554be73881e1df417494b3968ea85 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 22 Sep 2014 14:55:35 +0000
Subject: [PATCH] OPENDJ-1444 CR-4537 Remove previous cookie from storage of ChangeNumberIndexDB
---
opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 5
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java | 48 ++
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 44 --
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 185 ++++++++--
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 157 ++++----
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 17
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java | 4
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 73 ++-
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java | 19
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 143 ++++++-
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 11
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java | 33 -
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 23
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 25
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 3
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 159 ++------
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java | 31 +
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java | 7
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 15
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java | 36 +
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 23 +
21 files changed, 646 insertions(+), 415 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 749b0e1..bca5601 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -53,6 +53,7 @@
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
@@ -1348,12 +1349,12 @@
* @return a non null {@link DBCursor} going from oldest to newest CSN
* @throws ChangelogException
* If a database problem happened
- * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
+ * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy)
*/
public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
throws ChangelogException
{
- return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
+ return domainDB.getCursorFrom(baseDN, startAfterServerState, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
}
/**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java
index f9d532e..a4218cf 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexRecord.java
@@ -21,7 +21,7 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.api;
@@ -38,8 +38,6 @@
/** This is the key used to store this record. */
private final long changeNumber;
- /** This is used on startup to recover the medium consistency point. */
- private final String previousCookie;
/** The baseDN where the change happened. */
private final DN baseDN;
/** The CSN of the change. */
@@ -50,36 +48,30 @@
*
* @param changeNumber
* the change number
- * @param previousCookie
- * the previous cookie
* @param baseDN
* the baseDN
* @param csn
* the replication CSN field
*/
- public ChangeNumberIndexRecord(long changeNumber, String previousCookie,
- DN baseDN, CSN csn)
+ public ChangeNumberIndexRecord(long changeNumber, DN baseDN, CSN csn)
{
this.changeNumber = changeNumber;
- this.previousCookie = previousCookie;
this.baseDN = baseDN;
this.csn = csn;
}
/**
* Builds an instance of this class, with changeNumber equal to 0.
- *
- * @param previousCookie
- * the previous cookie
* @param baseDN
* the baseDN
* @param csn
* the replication CSN field
- * @see #ChangeNumberIndexRecord(long, String, DN, CSN)
+ *
+ * @see #ChangeNumberIndexRecord(long, DN, CSN)
*/
- public ChangeNumberIndexRecord(String previousCookie, DN baseDN, CSN csn)
+ public ChangeNumberIndexRecord(DN baseDN, CSN csn)
{
- this(0, previousCookie, baseDN, csn);
+ this(0, baseDN, csn);
}
/**
@@ -112,21 +104,10 @@
return changeNumber;
}
- /**
- * Get the previous cookie field.
- *
- * @return the previous cookie.
- */
- public String getPreviousCookie()
- {
- return previousCookie;
- }
-
/** {@inheritDoc} */
@Override
public String toString()
{
- return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN
- + " previousCookie=" + previousCookie;
+ return "changeNumber=" + changeNumber + " csn=" + csn + " baseDN=" + baseDN;
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
index 1fe0714..585a82f 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -52,6 +52,38 @@
* }
* </pre>
*
+ * A cursor can be initialised from a key, using a {@code KeyMatchingStrategy} and
+ * a {@code PositionStrategy}, to determine the exact starting position.
+ * <p>
+ * Let's call Kp the highest key lower than K and Kn the lowest key higher
+ * than K : Kp < K < Kn
+ * <ul>
+ * <li>When using EQUAL_TO_KEY on key K :
+ * <ul>
+ * <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log),
+ * otherwise it is empty</li>
+ * <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if K exists in log),
+ * otherwise it is empty</li>
+ * </ul>
+ * </li>
+ * <li>When using LESS_THAN_OR_EQUAL_TO_KEY on key K :
+ * <ul>
+ * <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log)
+ * or else Kp (if Kp exists in log), otherwise it is empty</li>
+ * <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if Kp or K exist in log),
+ * otherwise it is empty</li>
+ * </ul>
+ * </li>
+ * <li>When using GREATER_THAN_OR_EQUAL_TO_KEY on key K :
+ * <ul>
+ * <li>with ON_MATCHING_KEY, cursor is positioned on key K (if K exists in log)
+ * or else Kn (if Kn exists in log), otherwise it is empty</li>
+ * <li>with AFTER_MATCHING_KEY, cursor is positioned on key Kn (if K or Kn exist in log),
+ * otherwise it is empty</li>
+ * </ul>
+ * </li>
+ * </ul>
+ *
* @param <T>
* type of the record being returned
* \@NotThreadSafe
@@ -61,9 +93,11 @@
/**
* Represents a cursor key matching strategy, which allow to choose if only
- * the exact key must be found or if any key equals or higher should match.
+ * the exact key must be found or if any key equal or lower/higher should match.
*/
public enum KeyMatchingStrategy {
+ /** matches if the key or a lower key is found. */
+ LESS_THAN_OR_EQUAL_TO_KEY,
/** matches only if the exact key is found. */
EQUAL_TO_KEY,
/** matches if the key or a greater key is found. */
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 8970417..348a10b 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -31,6 +31,7 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
@@ -38,6 +39,15 @@
/**
* This interface allows to query or control the replication domain database(s)
* (composed of one or more ReplicaDBs) and query/update each ReplicaDB.
+ * <p>
+ * In particular, the {@code getCursorFom()} methods allow to obtain a cursor at any level:
+ * <ul>
+ * <li>Across all the domains, provided a {@link MultiDomainServerState}</li>
+ * <li>Across all replicaDBs of a domain, provided a {@link ServerState}</li>
+ * <li>On one replica DB for a domain and serverId, provided a CSN</li>
+ * </ul>
+ * The cursor starting point is specified by providing a key, a {@link KeyMatchingStrategy} and
+ * a {@link PositionStrategy}.
*/
public interface ReplicationDomainDB
{
@@ -95,8 +105,9 @@
void removeDomain(DN baseDN) throws ChangelogException;
/**
- * Generates a {@link DBCursor} across all the domains starting at or after the
- * provided {@link MultiDomainServerState} for each domain.
+ * Generates a {@link DBCursor} across all the domains starting before, at or
+ * after the provided {@link MultiDomainServerState} for each domain,
+ * depending on the provided matching and positioning strategies.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
@@ -106,21 +117,22 @@
* Starting point for each domain cursor. If any {@link ServerState}
* for a domain is null, then start from the oldest CSN for each
* replicaDBs
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * Cursor position strategy, which allow to indicates at which
- * exact position the cursor must start
+ * Cursor position strategy
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, ServerState, PositionStrategy)
+ * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy)
*/
- public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
- throws ChangelogException;
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy) throws ChangelogException;
/**
- * Generates a {@link DBCursor} across all the domains starting at or after
- * the provided {@link MultiDomainServerState} for each domain, excluding a
- * provided set of domain DNs.
+ * Generates a {@link DBCursor} across all the domains starting before, at or
+ * after the provided {@link MultiDomainServerState} for each domain,
+ * excluding a provided set of domain DNs.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
@@ -130,25 +142,25 @@
* Starting point for each domain cursor. If any {@link ServerState}
* for a domain is null, then start from the oldest CSN for each
* replicaDBs
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * Cursor position strategy, which allow to indicates at which exact
- * position the cursor must start
+ * Cursor position strategy
* @param excludedDomainDns
* Every domain appearing in this set is excluded from the cursor
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, ServerState, PositionStrategy)
+ * @see #getCursorFrom(DN, ServerState, KeyMatchingStrategy, PositionStrategy)
*/
- public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy,
- Set<DN> excludedDomainDns) throws ChangelogException;
-
- // serverId methods
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy, Set<DN> excludedDomainDns) throws ChangelogException;
/**
* Generates a {@link DBCursor} across all the replicaDBs for the specified
- * replication domain starting at or after the provided {@link ServerState} for each
- * replicaDBs.
+ * replication domain starting before, at or after the provided
+ * {@link ServerState} for each replicaDB, depending on the provided matching
+ * and positioning strategies.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
@@ -160,20 +172,22 @@
* Starting point for each ReplicaDB cursor. If any CSN for a
* replicaDB is null, then start from the oldest CSN for this
* replicaDB
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * Cursor position strategy, which allow to indicates at which
- * exact position the cursor must start
+ * Cursor position strategy
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
+ * @see #getCursorFrom(DN, int, CSN, KeyMatchingStrategy, PositionStrategy)
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
- throws ChangelogException;
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy) throws ChangelogException;
/**
* Generates a {@link DBCursor} for one replicaDB for the specified
- * replication domain and serverId starting at or after the provided {@link CSN}.
+ * replication domain and serverId starting beofre, at or after the provided
+ * {@link CSN}, depending on the provided matching and positioning strategies.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
@@ -186,15 +200,16 @@
* @param startCSN
* Starting point for the ReplicaDB cursor. If the CSN is null, then
* start from the oldest CSN for this replicaDB
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * Cursor position strategy, which allow to indicates at which
- * exact position the cursor must start
+ * Cursor position strategy
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
- throws ChangelogException;
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy) throws ChangelogException;
/**
* Unregisters the provided cursor from this replication domain.
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 26a0bfe..d735b38 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -37,6 +37,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
@@ -93,7 +94,8 @@
/** {@inheritDoc} */
@Override
public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
- PositionStrategy positionStrategy) throws ChangelogException
+ KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException
{
throw new RuntimeException("Not implemented");
}
@@ -101,8 +103,8 @@
/** {@inheritDoc} */
@Override
public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState,
- PositionStrategy positionStrategy, Set<DN> excludedDomainDns)
- throws ChangelogException
+ KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy,
+ Set<DN> excludedDomainDns) throws ChangelogException
{
throw new RuntimeException("Not implemented");
}
@@ -110,7 +112,8 @@
/** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState,
- PositionStrategy positionStrategy) throws ChangelogException
+ KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException
{
throw new RuntimeException("Not implemented");
}
@@ -118,8 +121,8 @@
/** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId,
- CSN startCSN, PositionStrategy positionStrategy)
- throws ChangelogException
+ CSN startCSN, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy) throws ChangelogException
{
throw new RuntimeException("Not implemented");
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index ac97e20..d29c707 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -45,14 +45,18 @@
import org.opends.server.types.DirectoryException;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
* Thread responsible for inserting replicated changes into the ChangeNumber
- * Index DB (CNIndexDB for short). Only changes older than the medium
- * consistency point are inserted in the CNIndexDB. As a consequence this class
- * is also responsible for maintaining the medium consistency point.
+ * Index DB (CNIndexDB for short).
+ * <p>
+ * Only changes older than the medium consistency point are inserted in the
+ * CNIndexDB. As a consequence this class is also responsible for maintaining
+ * the medium consistency point (indirectly through an
+ * {@code ECLMultiDomainDBCursor}).
*/
public class ChangeNumberIndexer extends DirectoryThread
{
@@ -75,27 +79,10 @@
/*
* The following MultiDomainServerState fields must be thread safe, because
* 1) initialization can happen while the replication server starts receiving
- * updates 2) many updates can happen concurrently.
+ * updates
+ * 2) many updates can happen concurrently.
*/
/**
- * Holds the cross domain medium consistency Replication Update Vector for the
- * current replication server, also known as the previous cookie.
- * <p>
- * Stores the value of the cookie before the change currently processed is
- * inserted in the DB. After insert, it is updated with the CSN of the change
- * currently processed (thus becoming the "current" cookie just before the
- * change is returned.
- * <p>
- * Note: This object is only updated by changes/updates.
- *
- * @see <a href=
- * "https://wikis.forgerock.org/confluence/display/OPENDJ/OpenDJ+Domain+Names"
- * >OpenDJ Domain Names - medium consistency RUV</a>
- */
- private final MultiDomainServerState mediumConsistencyRUV =
- new MultiDomainServerState();
-
- /**
* Holds the last time each replica was seen alive, whether via updates or
* heartbeat notifications, or offline notifications. Data is held for each
* serverId cross domain.
@@ -105,11 +92,10 @@
* <p>
* Note: This object is updated by both heartbeats and changes/updates.
*/
- private final MultiDomainServerState lastAliveCSNs =
- new MultiDomainServerState();
+ private final MultiDomainServerState lastAliveCSNs = new MultiDomainServerState();
+
/** Note: This object is updated by replica offline messages. */
- private final MultiDomainServerState replicasOffline =
- new MultiDomainServerState();
+ private final MultiDomainServerState replicasOffline = new MultiDomainServerState();
/**
* Cursor across all the replicaDBs for all the replication domains. It is
@@ -312,25 +298,50 @@
}
/**
- * Restores in memory data needed to build the CNIndexDB, including the medium
- * consistency point.
+ * Restores in memory data needed to build the CNIndexDB. In particular,
+ * initializes the changes cursor to the medium consistency point.
*/
private void initialize() throws ChangelogException, DirectoryException
{
- final ChangeNumberIndexRecord newestRecord =
- changelogDB.getChangeNumberIndexDB().getNewestRecord();
+ final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+
+ initializeLastAliveCSNs(domainDB);
+ initializeNextChangeCursor(domainDB);
+ initializeOfflineReplicas();
+
+ // this will not be used any more. Discard for garbage collection.
+ this.changelogState = null;
+ }
+
+ private void initializeNextChangeCursor(final ReplicationDomainDB domainDB) throws ChangelogException
+ {
+ final MultiDomainServerState cookieWithNewestCSN = getCookieInitializedWithNewestCSN();
+
+ MultiDomainDBCursor cursorInitializedToMediumConsistencyPoint =
+ domainDB.getCursorFrom(cookieWithNewestCSN, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
+
+ nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate, cursorInitializedToMediumConsistencyPoint);
+ nextChangeForInsertDBCursor.next();
+ }
+
+ /** Returns a cookie initialised with the newest CSN for each replica. */
+ private MultiDomainServerState getCookieInitializedWithNewestCSN() throws ChangelogException
+ {
+ final ChangeNumberIndexRecord newestRecord = changelogDB.getChangeNumberIndexDB().getNewestRecord();
+ final MultiDomainServerState cookieWithNewestCSN = new MultiDomainServerState();
if (newestRecord != null)
{
- // restore the mediumConsistencyRUV from DB
- mediumConsistencyRUV.update(
- new MultiDomainServerState(newestRecord.getPreviousCookie()));
- // Do not update with the newestRecord CSN
- // as it will be used for a sanity check later in the same method
+ final CSN newestCsn = newestRecord.getCSN();
+ for (DN baseDN : changelogState.getDomainToServerIds().keySet())
+ {
+ cookieWithNewestCSN.update(baseDN, newestCsn);
+ }
}
+ return cookieWithNewestCSN;
+ }
- // initialize the DB cursor and the last seen updates
- // to ensure the medium consistency CSN can move forward
- final ReplicationDomainDB domainDB = changelogDB.getReplicationDomainDB();
+ private void initializeLastAliveCSNs(final ReplicationDomainDB domainDB)
+ {
for (Entry<DN, Set<Integer>> entry : changelogState.getDomainToServerIds().entrySet())
{
final DN baseDN = entry.getKey();
@@ -349,34 +360,10 @@
lastAliveCSNs.update(baseDN, latestKnownState);
}
}
+ }
- nextChangeForInsertDBCursor = new ECLMultiDomainDBCursor(predicate,
- domainDB.getCursorFrom(mediumConsistencyRUV, AFTER_MATCHING_KEY));
- nextChangeForInsertDBCursor.next();
-
- if (newestRecord != null)
- {
- // restore the "previousCookie" state before shutdown
- UpdateMsg record = nextChangeForInsertDBCursor.getRecord();
- if (record instanceof ReplicaOfflineMsg)
- {
- // ignore: replica offline messages are never stored in the CNIndexDB
- nextChangeForInsertDBCursor.next();
- record = nextChangeForInsertDBCursor.getRecord();
- }
-
- // sanity check: ensure that when initializing the cursors at the previous
- // cookie, the next change we find is the newest record in the CNIndexDB
- if (!record.getCSN().equals(newestRecord.getCSN()))
- {
- throw new ChangelogException(ERR_CHANGE_NUMBER_INDEXER_INCONSISTENT_CSN_READ.get(
- newestRecord.getCSN().toStringUI(), record.getCSN().toStringUI()));
- }
- // Now we can update the mediumConsistencyRUV
- mediumConsistencyRUV.update(newestRecord.getBaseDN(), record.getCSN());
- nextChangeForInsertDBCursor.next();
- }
-
+ private void initializeOfflineReplicas()
+ {
final MultiDomainServerState offlineReplicas = changelogState.getOfflineReplicas();
for (DN baseDN : offlineReplicas)
{
@@ -391,9 +378,6 @@
}
}
}
-
- // this will not be used any more. Discard for garbage collection.
- this.changelogState = null;
}
private CSN oldestPossibleCSN(int serverId)
@@ -488,10 +472,10 @@
// OK, the oldest change is older than the medium consistency point
// let's publish it to the CNIndexDB.
- final String previousCookie = mediumConsistencyRUV.toString();
- final ChangeNumberIndexRecord record =
- new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
- changelogDB.getChangeNumberIndexDB().addRecord(record);
+ final long changeNumber = changelogDB.getChangeNumberIndexDB()
+ .addRecord(new ChangeNumberIndexRecord(baseDN, csn));
+ MultiDomainServerState cookie = nextChangeForInsertDBCursor.toCookie();
+ notifyEntryAddedToChangelog(baseDN, changeNumber, cookie, msg);
moveForwardMediumConsistencyPoint(csn, baseDN);
}
catch (InterruptedException ignored)
@@ -521,6 +505,28 @@
}
/**
+ * Notifies the {@link ChangelogBackend} that a new entry has been added.
+ *
+ * @param baseDN
+ * the baseDN of the newly added entry.
+ * @param changeNumber
+ * the change number of the newly added entry. It will be greater
+ * than zero for entries added to the change number index and less
+ * than or equal to zero for entries added to any replica DB
+ * @param cookie
+ * the cookie of the newly added entry. This is only meaningful for
+ * entries added to the change number index
+ * @param msg
+ * the update message of the newly added entry
+ * @throws ChangelogException
+ * If a problem occurs while notifying of the newly added entry.
+ */
+ protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
+ MultiDomainServerState cookie, UpdateMsg msg) throws ChangelogException
+ {
+ }
+
+ /**
* Nothing can be done about it.
* <p>
* Rely on the DirectoryThread uncaught exceptions handler for logging error +
@@ -534,12 +540,8 @@
getClass().getSimpleName(), stackTraceToSingleLineString(e));
}
- private void moveForwardMediumConsistencyPoint(final CSN mcCSN,
- final DN mcBaseDN) throws ChangelogException
+ private void moveForwardMediumConsistencyPoint(final CSN mcCSN, final DN mcBaseDN) throws ChangelogException
{
- // update, so it becomes the previous cookie for the next change
- mediumConsistencyRUV.update(mcBaseDN, mcCSN);
-
final int mcServerId = mcCSN.getServerId();
final CSN offlineCSN = replicasOffline.getCSN(mcBaseDN, mcServerId);
final CSN lastAliveCSN = lastAliveCSNs.getCSN(mcBaseDN, mcServerId);
@@ -560,7 +562,6 @@
* from the medium consistency RUV).
*/
lastAliveCSNs.removeCSN(mcBaseDN, offlineCSN);
- mediumConsistencyRUV.removeCSN(mcBaseDN, offlineCSN);
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
index 748619c..bcc1338 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursor.java
@@ -34,6 +34,8 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.util.StaticUtils;
+import com.forgerock.opendj.util.Pair;
+
/**
* {@link DBCursor} implementation that iterates across a Collection of
* {@link DBCursor}s, advancing from the oldest to the newest change cross all
@@ -214,6 +216,35 @@
return null;
}
+ /**
+ * Returns a snapshot of this cursor.
+ *
+ * @return a list of (Data, UpdateMsg) pairs representing the state of the
+ * cursor. In each pair, the data or the update message may be
+ * {@code null}, but at least one of them is non-null.
+ */
+ public List<Pair<Data, UpdateMsg>> getSnapshot()
+ {
+ final List<Pair<Data, UpdateMsg>> snapshot = new ArrayList<Pair<Data, UpdateMsg>>();
+ for (Entry<DBCursor<UpdateMsg>, Data> entry : cursors.entrySet())
+ {
+ final UpdateMsg updateMsg = entry.getKey().getRecord();
+ final Data data = entry.getValue();
+ if (updateMsg != null || data != null)
+ {
+ snapshot.add(Pair.of(data, updateMsg));
+ }
+ }
+ for (Data data : exhaustedCursors.values())
+ {
+ if (data != null)
+ {
+ snapshot.add(Pair.of(data, (UpdateMsg) null));
+ }
+ }
+ return snapshot;
+ }
+
/** {@inheritDoc} */
@Override
public void close()
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index 780b19b..28acc78 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -46,14 +46,14 @@
private final DN baseDN;
private final ReplicationDomainDB domainDB;
- private final ConcurrentSkipListMap<Integer, CSN> newReplicas =
- new ConcurrentSkipListMap<Integer, CSN>();
+ private final ConcurrentSkipListMap<Integer, CSN> newReplicas = new ConcurrentSkipListMap<Integer, CSN>();
/**
* Replaces null CSNs in ConcurrentSkipListMap that does not support null values.
*/
private static final CSN NULL_CSN = new CSN(0, 0, 0);
private final PositionStrategy positionStrategy;
+ private final KeyMatchingStrategy matchingStrategy;
/**
* Builds a DomainDBCursor instance.
@@ -62,14 +62,19 @@
* the replication domain baseDN of this cursor
* @param domainDB
* the DB for the provided replication domain
+ * @param matchingStrategy
+ * Cursor key matching strategy, which allow to indicates how key is
+ * matched
* @param positionStrategy
- * Cursor position strategy, which allow to indicates at which
- * exact position the cursor must start
+ * Cursor position strategy, which allow to indicates at which exact
+ * position the cursor must start
*/
- public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
+ public DomainDBCursor(final DN baseDN, final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy)
{
this.baseDN = baseDN;
this.domainDB = domainDB;
+ this.matchingStrategy = matchingStrategy;
this.positionStrategy = positionStrategy;
}
@@ -89,13 +94,13 @@
*
* @param serverId
* the serverId of the replica
- * @param startAfterCSN
- * the CSN after which to start iterating
+ * @param startCSN
+ * the CSN to use as a starting point
*/
- public void addReplicaDB(int serverId, CSN startAfterCSN)
+ public void addReplicaDB(int serverId, CSN startCSN)
{
// only keep the oldest CSN that will be the new cursor's starting point
- newReplicas.putIfAbsent(serverId, startAfterCSN != null ? startAfterCSN : NULL_CSN);
+ newReplicas.putIfAbsent(serverId, startCSN != null ? startCSN : NULL_CSN);
}
/** {@inheritDoc} */
@@ -109,7 +114,7 @@
final CSN csn = pair.getValue();
final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
final DBCursor<UpdateMsg> cursor =
- domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
+ domainDB.getCursorFrom(baseDN, serverId, startCSN, matchingStrategy, positionStrategy);
addCursor(cursor, null);
iter.remove();
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
index 0f027ba..6274d2f 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNDB.java
@@ -88,9 +88,7 @@
{
final long changeNumber = record.getChangeNumber();
DatabaseEntry key = new ReplicationDraftCNKey(changeNumber);
- DatabaseEntry data = new DraftCNData(changeNumber,
- record.getPreviousCookie(), record.getBaseDN().toNormalizedString(),
- record.getCSN());
+ DatabaseEntry data = new DraftCNData(changeNumber, record.getBaseDN().toNormalizedString(), record.getCSN());
// Use a transaction so that we can override durability.
Transaction txn = null;
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
index 634ec3c..0abdddc 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/DraftCNData.java
@@ -45,6 +45,8 @@
{
private static final String FIELD_SEPARATOR = "!";
+ private static final String EMPTY_STRING_PREVIOUS_COOKIE = "";
+
private static final long serialVersionUID = 1L;
private long changeNumber;
@@ -55,19 +57,17 @@
*
* @param changeNumber
* the change number
- * @param previousCookie
- * The previous cookie
* @param baseDN
* The baseDN (domain DN)
* @param csn
* The replication CSN
*/
- public DraftCNData(long changeNumber, String previousCookie, String baseDN,
- CSN csn)
+ public DraftCNData(long changeNumber, String baseDN, CSN csn)
{
this.changeNumber = changeNumber;
- String record =
- previousCookie + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn;
+ // Although the previous cookie is not used any more, we need
+ // to keep it in database for compatibility with previous versions
+ String record = EMPTY_STRING_PREVIOUS_COOKIE + FIELD_SEPARATOR + baseDN + FIELD_SEPARATOR + csn;
setData(getBytes(record));
}
@@ -102,11 +102,14 @@
{
try
{
+ // Although the previous cookie is not used any more, we need
+ // to keep it in database for compatibility with previous versions
String stringData = new String(data, "UTF-8");
String[] str = stringData.split(FIELD_SEPARATOR, 3);
+ // str[0] contains previous cookie and is ignored
final DN baseDN = DN.valueOf(str[1]);
final CSN csn = new CSN(str[2]);
- return new ChangeNumberIndexRecord(changeNumber, str[0], baseDN, csn);
+ return new ChangeNumberIndexRecord(changeNumber, baseDN, csn);
}
catch (UnsupportedEncodingException e)
{
@@ -130,7 +133,9 @@
public ChangeNumberIndexRecord getRecord() throws ChangelogException
{
if (record == null)
+ {
record = decodeData(changeNumber, getData());
+ }
return record;
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
index 97a1806..daf442a 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursor.java
@@ -24,11 +24,17 @@
*/
package org.opends.server.replication.server.changelog.je;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
+import com.forgerock.opendj.util.Pair;
+
/**
* Multi domain DB cursor that only returns updates for the domains which have
* been enabled for the external changelog.
@@ -110,4 +116,46 @@
{
return getClass().getSimpleName() + " cursor=[" + cursor + ']';
}
+
+ /**
+ * Returns a snapshot of this cursor.
+ *
+ * @return a list of (DN, UpdateMsg) pairs, containing all base DNs enabled
+ * for the external changelog. The update message may be {@code null}.
+ */
+ List<Pair<DN, UpdateMsg>> getSnapshot()
+ {
+ final List<Pair<DN, UpdateMsg>> snapshot = cursor.getSnapshot();
+ final List<Pair<DN, UpdateMsg>> eclSnapshot = new ArrayList<Pair<DN,UpdateMsg>>();
+ for (Pair<DN, UpdateMsg> pair : snapshot)
+ {
+ DN baseDN = pair.getFirst();
+ if (predicate.isECLEnabledDomain(baseDN))
+ {
+ eclSnapshot.add(pair);
+ }
+ }
+ return eclSnapshot;
+ }
+
+ /**
+ * Returns the cookie corresponding to the state of this cursor.
+ *
+ * @return a valid cookie taking into account only the base DNs enabled for
+ * the external changelog
+ */
+ public MultiDomainServerState toCookie()
+ {
+ List<Pair<DN, UpdateMsg>> snapshot = getSnapshot();
+ MultiDomainServerState cookie = new MultiDomainServerState();
+ for (Pair<DN, UpdateMsg> pair : snapshot)
+ {
+ // only put base DNs where a CSN is available in the cookie
+ if (pair.getSecond() != null)
+ {
+ cookie.update(pair.getFirst(), pair.getSecond().getCSN());
+ }
+ }
+ return cookie;
+ }
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 6bf1929..9b07a74 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -117,8 +117,7 @@
{
long changeNumber = nextChangeNumber();
final ChangeNumberIndexRecord newRecord =
- new ChangeNumberIndexRecord(changeNumber, record.getPreviousCookie(),
- record.getBaseDN(), record.getCSN());
+ new ChangeNumberIndexRecord(changeNumber, record.getBaseDN(), record.getCSN());
db.addRecord(newRecord);
newestChangeNumber = changeNumber;
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 2504200..3d770b3 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -699,18 +700,19 @@
/** {@inheritDoc} */
@Override
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
- final PositionStrategy positionStrategy) throws ChangelogException
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
{
final Set<DN> excludedDomainDns = Collections.emptySet();
- return getCursorFrom(startState, positionStrategy, excludedDomainDns);
+ return getCursorFrom(startState, matchingStrategy, positionStrategy, excludedDomainDns);
}
/** {@inheritDoc} */
@Override
public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
- final PositionStrategy positionStrategy, final Set<DN> excludedDomainDns) throws ChangelogException
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy,
+ final Set<DN> excludedDomainDns) throws ChangelogException
{
- final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, matchingStrategy, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
@@ -724,9 +726,9 @@
/** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
- final PositionStrategy positionStrategy) throws ChangelogException
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
{
- final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN, matchingStrategy, positionStrategy);
for (int serverId : getDomainMap(baseDN).keySet())
{
// get the last already sent CSN from that server to get a cursor
@@ -736,11 +738,12 @@
return cursor;
}
- private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy)
{
synchronized (registeredDomainCursors)
{
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, matchingStrategy, positionStrategy);
List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
if (cursors == null)
{
@@ -768,12 +771,12 @@
/** {@inheritDoc} */
@Override
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
- final PositionStrategy positionStrategy) throws ChangelogException
+ final KeyMatchingStrategy matchingStrategy, final PositionStrategy positionStrategy) throws ChangelogException
{
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, matchingStrategy, positionStrategy);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 13cdf02..867e76a 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
@@ -182,17 +183,19 @@
* @param startCSN
* The position where the cursor must start. If null, start from the
* oldest CSN
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * indicates at which exact position the cursor must start
+ * Cursor position strategy
* @return a new {@link DBCursor} that allows to browse the db managed by this
* ReplicaDB and starting at the position defined by a given CSN.
* @throws ChangelogException
* if a database problem happened
*/
- DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
- throws ChangelogException
+ DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
+ return new JEReplicaDBCursor(db, startCSN, matchingStrategy, positionStrategy, this);
}
/**
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index e3dc03e..c759d67 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,6 +32,7 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
@@ -42,8 +43,10 @@
class JEReplicaDBCursor implements DBCursor<UpdateMsg>
{
private final ReplicationDB db;
- private final PositionStrategy positionStrategy;
+ private PositionStrategy positionStrategy;
+ private KeyMatchingStrategy matchingStrategy;
private JEReplicaDB replicaDB;
+ private final CSN startCSN;
private CSN lastNonNullCurrentCSN;
private ReplServerDBCursor cursor;
private UpdateMsg currentChange;
@@ -57,19 +60,23 @@
* @param startCSN
* The CSN after which the cursor must start.If null, start from the
* oldest CSN
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * indicates at which exact position the cursor must start
+ * Cursor position strategy
* @param replicaDB
* The associated JEReplicaDB.
* @throws ChangelogException
* if a database problem happened.
*/
- public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy,
- JEReplicaDB replicaDB) throws ChangelogException
+ public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy, JEReplicaDB replicaDB) throws ChangelogException
{
this.db = db;
+ this.matchingStrategy = matchingStrategy;
this.positionStrategy = positionStrategy;
this.replicaDB = replicaDB;
+ this.startCSN = startCSN;
this.lastNonNullCurrentCSN = startCSN;
}
@@ -94,7 +101,13 @@
// if following code is called while the cursor is closed.
// It is better to let the deadlock happen to help quickly identifying
// and fixing such issue with unit tests.
- cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy);
+ if (lastNonNullCurrentCSN != startCSN)
+ {
+ // re-initialize to further CSN, take care to use appropriate strategies
+ matchingStrategy = GREATER_THAN_OR_EQUAL_TO_KEY;
+ positionStrategy = AFTER_MATCHING_KEY;
+ }
+ cursor = db.openReadCursor(lastNonNullCurrentCSN, matchingStrategy, positionStrategy);
}
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index 690e551..f15cf48 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -47,6 +47,8 @@
private final ConcurrentSkipListMap<DN, ServerState> newDomains =
new ConcurrentSkipListMap<DN, ServerState>();
+ private final KeyMatchingStrategy matchingStrategy;
+
private final PositionStrategy positionStrategy;
/**
@@ -54,13 +56,16 @@
*
* @param domainDB
* the replication domain management DB
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * Cursor position strategy, which allow to indicates at which
- * exact position the cursor must start
+ * Cursor position strategy
*/
- public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
+ public MultiDomainDBCursor(final ReplicationDomainDB domainDB, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy)
{
this.domainDB = domainDB;
+ this.matchingStrategy = matchingStrategy;
this.positionStrategy = positionStrategy;
}
@@ -75,8 +80,7 @@
*/
public void addDomain(DN baseDN, ServerState startAfterState)
{
- newDomains.put(baseDN,
- startAfterState != null ? startAfterState : new ServerState());
+ newDomains.put(baseDN, startAfterState != null ? startAfterState : new ServerState());
}
/** {@inheritDoc} */
@@ -89,7 +93,8 @@
final Entry<DN, ServerState> entry = iter.next();
final DN baseDN = entry.getKey();
final ServerState serverState = entry.getValue();
- final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
+ final DBCursor<UpdateMsg> domainDBCursor =
+ domainDB.getCursorFrom(baseDN, serverState, matchingStrategy, positionStrategy);
addCursor(domainDBCursor, baseDN);
iter.remove();
}
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index abbe49c..e5935c3 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -39,6 +39,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -49,6 +50,8 @@
import static com.sleepycat.je.OperationStatus.*;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -289,15 +292,18 @@
* @param startCSN
* The CSN from which the cursor must start.If null, start from the
* oldest CSN
+ * @param matchingStrategy
+ * Cursor key matching strategy
* @param positionStrategy
- * indicates at which exact position the cursor must start
+ * Cursor position strategy
* @return The ReplServerDBCursor.
* @throws ChangelogException
* If a database problem happened
*/
- ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy) throws ChangelogException
{
- return new ReplServerDBCursor(startCSN, positionStrategy);
+ return new ReplServerDBCursor(startCSN, matchingStrategy, positionStrategy);
}
/**
@@ -447,6 +453,31 @@
return serverId + " " + baseDN.toNormalizedString();
}
+ /** Hold a cursor and an indicator of wether the cursor should be considered as empty. */
+ private static class CursorWithEmptyIndicator
+ {
+ private Cursor cursor;
+ private boolean isEmpty;
+
+ private CursorWithEmptyIndicator(Cursor localCursor, boolean isEmpty)
+ {
+ this.cursor = localCursor;
+ this.isEmpty = isEmpty;
+ }
+
+ /** Creates cursor considered as empty. */
+ static CursorWithEmptyIndicator createEmpty(Cursor cursor)
+ {
+ return new CursorWithEmptyIndicator(cursor, true);
+ }
+
+ /** Creates cursor considered as non-empty. */
+ static CursorWithEmptyIndicator createNonEmpty(Cursor cursor)
+ {
+ return new CursorWithEmptyIndicator(cursor, false);
+ }
+ }
+
/**
* This Class implements a cursor that can be used to browse a
* replicationServer database.
@@ -460,7 +491,7 @@
* <p>
* Will be set non null for a write cursor
*/
- private final Cursor cursor;
+ private Cursor cursor;
private final DatabaseEntry key;
private final DatabaseEntry data;
/** \@Null for read cursors, \@NotNull for deleting cursors. */
@@ -475,12 +506,16 @@
*
* @param startCSN
* The CSN from which the cursor must start.
+ * @param matchingStrategy
+ * Cursor key matching strategy, which allow to indicates how key
+ * is matched
* @param positionStrategy
* indicates at which exact position the cursor must start
* @throws ChangelogException
* When the startCSN does not exist.
*/
- private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
+ private ReplServerDBCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException
{
key = createReplicationKey(startCSN);
data = new DatabaseEntry();
@@ -491,8 +526,7 @@
// unlock it when throwing an exception.
dbCloseLock.readLock().lock();
- boolean cursorHeld = false;
- Cursor localCursor = null;
+ CursorWithEmptyIndicator maybeEmptyCursor = null;
try
{
// If the DB has been closed then create empty cursor.
@@ -503,35 +537,15 @@
return;
}
- localCursor = db.openCursor(txn, null);
- if (startCSN != null
- && localCursor.getSearchKey(key, data, LockMode.DEFAULT) != SUCCESS)
+ maybeEmptyCursor = generateCursor(startCSN, matchingStrategy, positionStrategy);
+ if (maybeEmptyCursor.isEmpty)
{
- // We could not move the cursor to the expected startCSN
- if (localCursor.getSearchKeyRange(key, data, DEFAULT) != SUCCESS)
- {
- // We could not even move the cursor close to it
- // => return empty cursor
- isClosed = true;
- cursor = null;
- return;
- }
-
- if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
- {
- // We can move close to the startCSN.
- // Let's create a cursor from that point.
- key.setData(null);
- if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
- {
- localCursor.close();
- localCursor = db.openCursor(txn, null);
- }
- }
+ isClosed = true;
+ cursor = null;
+ return;
}
- cursor = localCursor;
- cursorHeld = cursor != null;
+ cursor = maybeEmptyCursor.cursor;
if (key.getData() != null)
{
computeCurrentRecord();
@@ -543,13 +557,72 @@
}
finally
{
- if (!cursorHeld)
+ if (maybeEmptyCursor != null && maybeEmptyCursor.isEmpty)
{
- closeAndReleaseReadLock(localCursor);
+ closeAndReleaseReadLock(maybeEmptyCursor.cursor);
}
}
}
+ /** Generate a possibly empty cursor with the provided start CSN and strategies. */
+ private CursorWithEmptyIndicator generateCursor(CSN startCSN, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy)
+ {
+ Cursor cursor = db.openCursor(txn, null);
+ boolean isCsnFound = startCSN == null || cursor.getSearchKey(key, data, LockMode.DEFAULT) == SUCCESS;
+ if (!isCsnFound)
+ {
+ if (matchingStrategy == EQUAL_TO_KEY)
+ {
+ return CursorWithEmptyIndicator.createEmpty(cursor);
+ }
+
+ boolean isGreaterCsnFound = cursor.getSearchKeyRange(key, data, DEFAULT) == SUCCESS;
+ if (isGreaterCsnFound)
+ {
+ if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && positionStrategy == AFTER_MATCHING_KEY)
+ {
+ // Move backward so that the first call to next() points to this greater csn
+ key.setData(null);
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ // Edge case: we're at the beginning of the database
+ cursor.close();
+ cursor = db.openCursor(txn, null);
+ }
+ }
+ else if (matchingStrategy == LESS_THAN_OR_EQUAL_TO_KEY)
+ {
+ // Move backward to point on the lower csn
+ key.setData(null);
+ if (cursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ // Edge case: we're at the beginning of the log, there is no lower csn
+ return CursorWithEmptyIndicator.createEmpty(cursor);
+ }
+ }
+ }
+ else
+ {
+ if (matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
+ {
+ // There is no greater csn
+ return CursorWithEmptyIndicator.createEmpty(cursor);
+ }
+ // LESS_THAN_OR_EQUAL_TO_KEY case : the lower csn is the highest csn available
+ key.setData(null);
+ boolean isLastKeyFound = cursor.getLast(key, data, LockMode.DEFAULT) == SUCCESS;
+ if (!isLastKeyFound)
+ {
+ // Edge case: empty database
+ cursor.close();
+ cursor = db.openCursor(txn, null);
+ }
+ }
+ }
+ return CursorWithEmptyIndicator.createNonEmpty(cursor);
+ }
+
private ReplServerDBCursor() throws ChangelogException
{
key = new DatabaseEntry();
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index fd8f893..e415e36 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -45,6 +45,7 @@
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.*;
@@ -54,6 +55,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
@@ -138,7 +140,6 @@
private Map<DN, ServerState> domainNewestCSNs;
private ECLEnabledDomainPredicate predicate;
private ChangeNumberIndexer cnIndexer;
- private MultiDomainServerState initialCookie;
@BeforeClass
public static void classSetup() throws Exception
@@ -160,17 +161,16 @@
{
MockitoAnnotations.initMocks(this);
- multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
+ multiDomainCursor = new MultiDomainDBCursor(domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
initialState = new ChangelogState();
- initialCookie = new MultiDomainServerState();
replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
domainDBCursors = new HashMap<DN, DomainDBCursor>();
domainNewestCSNs = new HashMap<DN, ServerState>();
when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
- when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
- .thenReturn(multiDomainCursor);
+ when(domainDB.getCursorFrom(any(MultiDomainServerState.class),
+ eq(LESS_THAN_OR_EQUAL_TO_KEY), eq(AFTER_MATCHING_KEY))).thenReturn(multiDomainCursor);
}
@AfterMethod
@@ -179,18 +179,18 @@
stopCNIndexer();
}
- private static final String EMPTY_DB_NO_DS = "emptyDBNoDS";
+ private static final String NO_DS = "noDS";
@Test
- public void emptyDBNoDS() throws Exception
+ public void noDS() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
startCNIndexer();
assertExternalChangelogContent();
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBOneDS() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void oneDS() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -202,23 +202,8 @@
assertExternalChangelogContent(msg1);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void nonEmptyDBOneDS() throws Exception
- {
- eclEnabledDomains = Arrays.asList(BASE_DN1);
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
- addReplica(BASE_DN1, serverId1);
- setCNIndexDBInitialRecords(msg1);
- startCNIndexer();
- assertExternalChangelogContent();
-
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId1, 2);
- publishUpdateMsg(msg2);
- assertExternalChangelogContent(msg2);
- }
-
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSs() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSs() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -236,8 +221,8 @@
assertExternalChangelogContent(msg1);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsDifferentDomains() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsDifferentDomains() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1, BASE_DN2);
addReplica(BASE_DN1, serverId1);
@@ -272,8 +257,8 @@
* CompositeDBCursor currentRecord == Upd2.<li>
* </ol>
*/
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsDoesNotLoseChanges() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsDoesNotLoseChanges() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -301,34 +286,8 @@
assertExternalChangelogContent(msg1, msg2, msg3);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void nonEmptyDBTwoDSs() throws Exception
- {
- eclEnabledDomains = Arrays.asList(BASE_DN1);
- final ReplicatedUpdateMsg msg1 = msg(BASE_DN1, serverId1, 1);
- final ReplicatedUpdateMsg msg2 = msg(BASE_DN1, serverId2, 2);
- addReplica(BASE_DN1, serverId1);
- addReplica(BASE_DN1, serverId2);
- setCNIndexDBInitialRecords(msg1, msg2);
- startCNIndexer();
- assertExternalChangelogContent();
-
- final ReplicatedUpdateMsg msg3 = msg(BASE_DN1, serverId2, 3);
- final ReplicatedUpdateMsg msg4 = msg(BASE_DN1, serverId1, 4);
- publishUpdateMsg(msg3, msg4);
- assertExternalChangelogContent(msg3);
-
- final ReplicatedUpdateMsg msg5 = msg(BASE_DN1, serverId1, 5);
- publishUpdateMsg(msg5);
- assertExternalChangelogContent(msg3);
-
- final ReplicatedUpdateMsg msg6 = msg(BASE_DN1, serverId2, 6);
- publishUpdateMsg(msg6);
- assertExternalChangelogContent(msg3, msg4, msg5);
- }
-
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneSendsNoUpdatesForSomeTime() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneSendsNoUpdatesForSomeTime() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -345,8 +304,8 @@
assertExternalChangelogContent(msg1Sid2, msg2Sid1);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBThreeDSsOneIsNotECLEnabledDomain() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void threeDSsOneIsNotECLEnabledDomain() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(ADMIN_DATA_DN, serverId1);
@@ -367,8 +326,8 @@
assertExternalChangelogContent(msg2);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBOneInitialDSAnotherDSJoining() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void oneInitialDSAnotherDSJoining() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -389,8 +348,8 @@
assertExternalChangelogContent(msg1, msg2);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBOneInitialDSAnotherDSJoining2() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void oneInitialDSAnotherDSJoining2() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -409,8 +368,8 @@
assertExternalChangelogContent(msg1, msg2);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneSendingHeartbeats() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneSendingHeartbeats() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -427,8 +386,8 @@
assertExternalChangelogContent(msg1, msg2);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneGoingOffline() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneGoingOffline() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -461,8 +420,8 @@
assertExternalChangelogContent(msg1, msg2, msg4, msg5);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneInitiallyOffline() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneInitiallyOffline() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -495,8 +454,8 @@
* <li>RS starts</li>
* </ol>
*/
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneInitiallyWithChangesThenOffline() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneInitiallyWithChangesThenOffline() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -540,8 +499,8 @@
* <li>RS starts</li>
* </ol>
*/
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneInitiallyPersistedOfflineThenChanges() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -564,8 +523,8 @@
assertExternalChangelogContent(msg2, msg3, msg4);
}
- @Test(dependsOnMethods = { EMPTY_DB_NO_DS })
- public void emptyDBTwoDSsOneKilled() throws Exception
+ @Test(dependsOnMethods = { NO_DS })
+ public void twoDSsOneKilled() throws Exception
{
eclEnabledDomains = Arrays.asList(BASE_DN1);
addReplica(BASE_DN1, serverId1);
@@ -598,16 +557,16 @@
DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
if (domainDBCursor == null)
{
- domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
+ domainDBCursor = new DomainDBCursor(baseDN, domainDB, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
domainDBCursors.put(baseDN, domainDBCursor);
multiDomainCursor.addDomain(baseDN, null);
- when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
- .thenReturn(domainDBCursor);
+ when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(LESS_THAN_OR_EQUAL_TO_KEY),
+ eq(AFTER_MATCHING_KEY))).thenReturn(domainDBCursor);
}
domainDBCursor.addReplicaDB(serverId, null);
- when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
- .thenReturn(replicaDBCursor);
+ when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(LESS_THAN_OR_EQUAL_TO_KEY),
+ eq(AFTER_MATCHING_KEY))).thenReturn(replicaDBCursor);
}
when(domainDB.getDomainNewestCSNs(baseDN)).thenReturn(
@@ -636,7 +595,16 @@
return eclEnabledDomains.contains(baseDN);
}
};
- cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate);
+ cnIndexer = new ChangeNumberIndexer(changelogDB, initialState, predicate)
+ {
+ /** {@inheritDoc} */
+ @Override
+ protected void notifyEntryAddedToChangelog(DN baseDN, long changeNumber,
+ MultiDomainServerState previousCookie, UpdateMsg msg) throws ChangelogException
+ {
+ // avoid problems with ChangelogBackend initialization
+ }
+ };
cnIndexer.start();
waitForWaitingState(cnIndexer);
}
@@ -661,28 +629,6 @@
return new ReplicatedUpdateMsg(baseDN, new CSN(0, 0, serverId), true);
}
- private void setCNIndexDBInitialRecords(ReplicatedUpdateMsg... msgs) throws Exception
- {
- // Initialize the previous cookie that will be used to compare the records
- // added to the CNIndexDB at the end of this test
- for (int i = 0; i < msgs.length; i++)
- {
- ReplicatedUpdateMsg msg = msgs[i];
- if (i + 1 == msgs.length)
- {
- final ReplicatedUpdateMsg newestMsg = msg;
- final DN baseDN = newestMsg.getBaseDN();
- final CSN csn = newestMsg.getCSN();
- when(cnIndexDB.getNewestRecord()).thenReturn(
- new ChangeNumberIndexRecord(initialCookie.toString(), baseDN, csn));
- final SequentialDBCursor cursor =
- replicaDBCursors.get(Pair.of(baseDN, csn.getServerId()));
- cursor.add(newestMsg);
- }
- initialCookie.update(msg.getBaseDN(), msg.getCSN());
- }
- }
-
private void publishUpdateMsg(ReplicatedUpdateMsg... msgs) throws Exception
{
for (ReplicatedUpdateMsg msg : msgs)
@@ -758,9 +704,6 @@
verify(cnIndexDB, atLeast(0)).addRecord(arg.capture());
final List<ChangeNumberIndexRecord> allValues = arg.getAllValues();
- // clone initial state to avoid modifying it
- final MultiDomainServerState previousCookie =
- new MultiDomainServerState(initialCookie.toString());
// check it was not called more than expected
String desc1 = "actual was:<" + allValues + ">, but expected was:<" + Arrays.toString(expectedMsgs) + ">";
assertThat(allValues).as(desc1).hasSize(expectedMsgs.length);
@@ -772,8 +715,6 @@
String desc2 = "actual was:<" + record + ">, but expected was:<" + expectedMsg + ">";
assertThat(record.getBaseDN()).as(desc2).isEqualTo(expectedMsg.getBaseDN());
assertThat(record.getCSN()).as(desc2).isEqualTo(expectedMsg.getCSN());
- assertThat(record.getPreviousCookie()).as(desc2).isEqualTo(previousCookie.toString());
- previousCookie.update(expectedMsg.getBaseDN(), expectedMsg.getCSN());
}
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java
index 73c3d12..d033f9a 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ECLMultiDomainDBCursorTest.java
@@ -40,9 +40,10 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Mockito.*;
-import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
@SuppressWarnings("javadoc")
public class ECLMultiDomainDBCursorTest extends DirectoryServerTestCase
@@ -67,7 +68,7 @@
{
TestCaseUtils.startFakeServer();
MockitoAnnotations.initMocks(this);
- multiDomainCursor = new MultiDomainDBCursor(domainDB, ON_MATCHING_KEY);
+ multiDomainCursor = new MultiDomainDBCursor(domainDB, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY);
eclCursor = new ECLMultiDomainDBCursor(predicate, multiDomainCursor);
}
@@ -164,7 +165,7 @@
private void addDomainCursorToCursor(DN baseDN, SequentialDBCursor cursor) throws ChangelogException
{
final ServerState state = new ServerState();
- when(domainDB.getCursorFrom(baseDN, state, ON_MATCHING_KEY)).thenReturn(cursor);
+ when(domainDB.getCursorFrom(baseDN, state, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY)).thenReturn(cursor);
multiDomainCursor.addDomain(baseDN, state);
}
}
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 3f03539..2c954ec 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -58,7 +58,6 @@
{
private final MultiDomainServerState previousCookie =
new MultiDomainServerState();
- private final List<String> cookies = new ArrayList<String>();
private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
@@ -74,13 +73,6 @@
setReplicationDBImplementation(previousDBImpl);
}
- @BeforeMethod
- public void clearCookie()
- {
- previousCookie.clear();
- cookies.clear();
- }
-
/**
* This test makes basic operations of a JEChangeNumberIndexDB:
* <ol>
@@ -124,11 +116,11 @@
try
{
assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
+ assertEqualTo(cursor.getRecord(), csns[0], baseDN1);
assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
+ assertEqualTo(cursor.getRecord(), csns[1], baseDN2);
assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
+ assertEqualTo(cursor.getRecord(), csns[2], baseDN3);
assertFalse(cursor.next());
}
finally
@@ -154,19 +146,13 @@
private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
{
- final String cookie = previousCookie.toString();
- cookies.add(cookie);
- final long changeNumber = cnIndexDB.addRecord(
- new ChangeNumberIndexRecord(cookie, baseDN, csn));
- previousCookie.update(baseDN, csn);
- return changeNumber;
+ return cnIndexDB.addRecord(new ChangeNumberIndexRecord(baseDN, csn));
}
- private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
+ private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN)
{
assertEquals(record.getCSN(), csn);
assertEquals(record.getBaseDN(), baseDN);
- assertEquals(record.getPreviousCookie(), cookie);
}
private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
@@ -220,10 +206,6 @@
assertEquals(cnIndexDB.count(), 3, "Db count");
assertFalse(cnIndexDB.isEmpty());
- assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
- assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
- assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
-
DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -262,7 +244,6 @@
final ChangeNumberIndexRecord newest = cnIndexDB.getNewestRecord();
assertEquals(oldest.getChangeNumber(), newestChangeNumber);
assertEquals(oldest.getChangeNumber(), newest.getChangeNumber());
- assertEquals(oldest.getPreviousCookie(), newest.getPreviousCookie());
assertEquals(oldest.getBaseDN(), newest.getBaseDN());
assertEquals(oldest.getCSN(), newest.getCSN());
}
@@ -277,21 +258,6 @@
return new ReplicationServer(cfg);
}
- private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,
- long changeNumber) throws Exception
- {
- DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(changeNumber);
- try
- {
- cursor.next();
- return cursor.getRecord().getPreviousCookie();
- }
- finally
- {
- StaticUtils.close(cursor);
- }
- }
-
private void assertCursorReadsInOrder(DBCursor<ChangeNumberIndexRecord> cursor,
long... cns) throws ChangelogException
{
diff --git a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 28ee16e..1295cac 100644
--- a/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -30,6 +30,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.List;
import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
@@ -46,13 +47,17 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
+import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
@@ -68,6 +73,8 @@
private DN TEST_ROOT_DN;
private static final ReplicationDBImplementation previousDBImpl = replicationDbImplementation;
+ private ReplicationServer replicationServer;
+ private JEReplicaDB replicaDB;
@BeforeClass
public void setDBImpl()
@@ -99,47 +106,131 @@
TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
}
- @Test
- public void testGenerateCursorFrom() throws Exception
+ @DataProvider
+ Object[][] cursorData()
{
- ReplicationServer replicationServer = null;
- JEReplicaDB replicaDB = null;
+ // create 7 csns
+ final CSN[] sevenCsns = generateCSNs(1, System.currentTimeMillis(), 7);
+ CSN beforeCsn = sevenCsns[0];
+ CSN middleCsn = sevenCsns[3]; // will be between csns[1] and csns[2]
+ CSN afterCsn = sevenCsns[6];
+
+ // but use only 4 of them for update msg
+ // beforeCsn, middleCsn and afterCsn are not used
+ // in order to test cursor generation from a key not present in the log (before, in the middle, after)
+ final List<CSN> usedCsns = new ArrayList<CSN>(Arrays.asList(sevenCsns));
+ usedCsns.remove(beforeCsn);
+ usedCsns.remove(middleCsn);
+ usedCsns.remove(afterCsn);
+ final CSN[] csns = usedCsns.toArray(new CSN[4]);
+
+ return new Object[][] {
+ // equal matching
+ { csns, beforeCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+ { csns, csns[0], EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
+ { csns, csns[1], EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
+ { csns, middleCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+ { csns, csns[2], EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
+ { csns, csns[3], EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
+ { csns, afterCsn, EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+
+ { csns, beforeCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { csns, csns[0], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
+ { csns, csns[1], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
+ { csns, middleCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { csns, csns[2], EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
+ { csns, csns[3], EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { csns, afterCsn, EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+
+ // less than or equal matching
+ { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+ { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
+ { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
+ { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
+ { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
+ { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
+ { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
+
+ { csns, beforeCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { csns, csns[0], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
+ { csns, csns[1], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
+ { csns, middleCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
+ { csns, csns[2], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
+ { csns, csns[3], LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { csns, afterCsn, LESS_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+
+ // greater than or equal matching
+ { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
+ { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 0, 3 },
+ { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 1, 3 },
+ { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
+ { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 2, 3 },
+ { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, 3, 3 },
+ { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, ON_MATCHING_KEY, -1, -1 },
+
+ { csns, beforeCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 0, 3 },
+ { csns, csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 1, 3 },
+ { csns, csns[1], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
+ { csns, middleCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 2, 3 },
+ { csns, csns[2], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, 3, 3 },
+ { csns, csns[3], GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { csns, afterCsn, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY, -1, -1 },
+ { null, null, null, null, -1, -1 } // stop line
+ };
+ }
+
+ /**
+ * Test the cursor with all acceptable strategies combination.
+ * Creation of a replication server is costly so it is created only once on first test and cleaned after the
+ * last test using the stop line in data to do so.
+ */
+ @Test(dataProvider="cursorData")
+ public void testGenerateCursor(CSN[] csns, CSN startCsn, KeyMatchingStrategy matchingStrategy,
+ PositionStrategy positionStrategy, int startIndex, int endIndex) throws Exception
+ {
+ DBCursor<UpdateMsg> cursor = null;
try
{
- TestCaseUtils.startServer();
- replicationServer = configureReplicationServer(100000, 10);
- replicaDB = newReplicaDB(replicationServer);
-
- final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
- final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
- csns2.remove(csns[3]);
-
- for (CSN csn : csns2)
+ if (replicationServer == null)
{
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
+ // initialize only once
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+ replicaDB = newReplicaDB(replicationServer);
+ for (CSN csn : csns)
+ {
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
+ }
+ }
+ if (csns == null)
+ {
+ return; // stop line, time to clean replication artefacts
}
- for (CSN csn : csns2)
+ cursor = replicaDB.generateCursorFrom(startCsn, matchingStrategy, positionStrategy);
+ if (startIndex != -1)
{
- assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+ assertThatCursorCanBeFullyReadFromStart(cursor, csns, startIndex, endIndex);
}
- assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
-
- for (int i = 0; i < csns2.size() - 1; i++)
+ else
{
- assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+ assertThatCursorIsExhausted(cursor);
}
- assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
}
finally
{
- shutdown(replicaDB);
- remove(replicationServer);
+ close(cursor);
+ if (csns == null)
+ {
+ // stop line, stop and remove replication
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
}
}
@Test
- void testTrim() throws Exception
+ public void testTrim() throws Exception
{
ReplicationServer replicationServer = null;
JEReplicaDB replicaDB = null;
@@ -288,28 +379,42 @@
}
}
- private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
- final PositionStrategy positionStrategy, final CSN expectedCSN)
- throws ChangelogException
+ private void advanceCursorUpTo(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex)
+ throws Exception
{
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
- try
+ for (int i = startIndex; i <= endIndex; i++)
{
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).isTrue();
- softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
- softly.assertAll();
+ assertThat(cursor.next()).as("next() value when i=" + i).isTrue();
+ assertThat(cursor.getRecord().getCSN()).isEqualTo(csns[i]);
}
- finally
- {
- close(cursor);
- }
+ }
+
+ private void assertThatCursorIsExhausted(DBCursor<UpdateMsg> cursor) throws Exception
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+
+ private void assertThatCursorCanBeFullyRead(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex, int endIndex)
+ throws Exception
+ {
+ advanceCursorUpTo(cursor, csns, startIndex, endIndex);
+ assertThatCursorIsExhausted(cursor);
+ }
+
+ private void assertThatCursorCanBeFullyReadFromStart(DBCursor<UpdateMsg> cursor, CSN[] csns, int startIndex,
+ int endIndex) throws Exception
+ {
+ assertThat(cursor.getRecord()).isNull();
+ assertThatCursorCanBeFullyRead(cursor, csns, startIndex, endIndex);
}
private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
final PositionStrategy positionStrategy) throws ChangelogException
{
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
try
{
final SoftAssertions softly = new SoftAssertions();
@@ -328,7 +433,7 @@
* optimize the oldest and newest records in the replication changelog db.
*/
@Test(groups = { "opendj-256" })
- void testGetOldestNewestCSNs() throws Exception
+ public void testGetOldestNewestCSNs() throws Exception
{
// It's worth testing with 2 different setting for counterRecord
// - a counter record is put every 10 Update msg in the db - just a unit
@@ -451,7 +556,7 @@
private void assertFoundInOrder(JEReplicaDB replicaDB,
final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
{
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
try
{
assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
--
Gitblit v1.10.0