From ab3cac04319c920ba14be59ea874e6e35f730655 Mon Sep 17 00:00:00 2001
From: Nicolas Capponi <nicolas.capponi@forgerock.com>
Date: Mon, 21 Jul 2014 17:06:28 +0000
Subject: [PATCH] Checkpoint commit for OPENDJ-1206 : Create a new ReplicationBackend/ChangelogBackend to support cn=changelog CR-4053
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 33 ++-
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java | 27 +++
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 3
opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java | 64 +++++---
opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java | 21 ++-
opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java | 13 +
opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java | 10 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java | 1
opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java | 32 ++--
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 14 +
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java | 16 +
opends/src/server/org/opends/server/replication/server/changelog/file/Log.java | 63 +++++---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 14 +
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 19 +-
opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java | 23 +++
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java | 5
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java | 13 +
opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java | 32 +++-
18 files changed, 265 insertions(+), 138 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 434fa85..6dd0fa4 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -56,6 +56,7 @@
import static org.opends.server.replication.common.ServerStatus.*;
import static org.opends.server.replication.common.StatusMachineEvent.*;
import static org.opends.server.replication.protocol.ProtocolVersion.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -1361,12 +1362,12 @@
* @return a non null {@link DBCursor} going from oldest to newest CSN
* @throws ChangelogException
* If a database problem happened
- * @see ReplicationDomainDB#getCursorFrom(DN, ServerState)
+ * @see ReplicationDomainDB#getCursorFrom(DN, ServerState, PositionStrategy)
*/
public DBCursor<UpdateMsg> getCursorFrom(ServerState startAfterServerState)
throws ChangelogException
{
- return domainDB.getCursorFrom(baseDN, startAfterServerState);
+ return domainDB.getCursorFrom(baseDN, startAfterServerState, AFTER_MATCHING_KEY);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
index ad658e4..1fe0714 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/DBCursor.java
@@ -60,6 +60,29 @@
{
/**
+ * Represents a cursor key matching strategy, which allow to choose if only
+ * the exact key must be found or if any key equals or higher should match.
+ */
+ public enum KeyMatchingStrategy {
+ /** matches only if the exact key is found. */
+ EQUAL_TO_KEY,
+ /** matches if the key or a greater key is found. */
+ GREATER_THAN_OR_EQUAL_TO_KEY
+ }
+
+ /**
+ * Represents a cursor positioning strategy, which allow to choose if the start point
+ * corresponds to the record at the provided key or the record just after the provided
+ * key.
+ */
+ public enum PositionStrategy {
+ /** start point is on the matching key. */
+ ON_MATCHING_KEY,
+ /** start point is after the matching key. */
+ AFTER_MATCHING_KEY
+ }
+
+ /**
* Getter for the current record.
*
* @return The current record.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
index 56b0509..e275020 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicationDomainDB.java
@@ -29,6 +29,7 @@
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
import org.opends.server.types.DN;
@@ -92,30 +93,33 @@
void removeDomain(DN baseDN) throws ChangelogException;
/**
- * Generates a {@link DBCursor} across all the domains starting after the
+ * Generates a {@link DBCursor} across all the domains starting at or after the
* provided {@link MultiDomainServerState} for each domain.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
* cursor.
*
- * @param startAfterState
+ * @param startState
* Starting point for each domain cursor. If any {@link ServerState}
* for a domain is null, then start from the oldest CSN for each
* replicaDBs
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, ServerState)
+ * @see #getCursorFrom(DN, ServerState, PositionStrategy)
*/
- public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startAfterState)
+ public MultiDomainDBCursor getCursorFrom(MultiDomainServerState startState, PositionStrategy positionStrategy)
throws ChangelogException;
// serverId methods
/**
* Generates a {@link DBCursor} across all the replicaDBs for the specified
- * replication domain starting after the provided {@link ServerState} for each
+ * replication domain starting at or after the provided {@link ServerState} for each
* replicaDBs.
* <p>
* When the cursor is not used anymore, client code MUST call the
@@ -124,21 +128,24 @@
*
* @param baseDN
* the replication domain baseDN
- * @param startAfterState
+ * @param startState
* Starting point for each ReplicaDB cursor. If any CSN for a
* replicaDB is null, then start from the oldest CSN for this
* replicaDB
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
- * @see #getCursorFrom(DN, int, CSN)
+ * @see #getCursorFrom(DN, int, CSN, PositionStrategy)
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startAfterState)
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, ServerState startState, PositionStrategy positionStrategy)
throws ChangelogException;
/**
* Generates a {@link DBCursor} for one replicaDB for the specified
- * replication domain and serverId starting after the provided {@link CSN}.
+ * replication domain and serverId starting at or after the provided {@link CSN}.
* <p>
* When the cursor is not used anymore, client code MUST call the
* {@link DBCursor#close()} method to free the resources and locks used by the
@@ -148,14 +155,17 @@
* the replication domain baseDN of the replicaDB
* @param serverId
* the serverId of the replicaDB
- * @param startAfterCSN
+ * @param startCSN
* Starting point for the ReplicaDB cursor. If the CSN is null, then
* start from the oldest CSN for this replicaDB
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
* @return a non null {@link DBCursor}
* @throws ChangelogException
* If a database problem happened
*/
- DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startAfterCSN)
+ DBCursor<UpdateMsg> getCursorFrom(DN baseDN, int serverId, CSN startCSN, PositionStrategy positionStrategy)
throws ChangelogException;
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
index e5cf15e..7bb2ce0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/BlockLogReader.java
@@ -26,6 +26,8 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import java.io.Closeable;
import java.io.EOFException;
@@ -33,7 +35,10 @@
import java.io.IOException;
import java.io.RandomAccessFile;
+import org.forgerock.util.Reject;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
import org.opends.server.util.StaticUtils;
@@ -135,31 +140,35 @@
}
/**
- * Position the reader to the record corresponding to the provided key or to
- * the nearest key (the lowest key higher than the provided key), and returns
- * the last record read.
+ * Position the reader to the record corresponding to the provided key and
+ * matching and positioning strategies. Returns the last record read.
*
* @param key
* Key to use as a start position. Key must not be {@code null}.
- * @param findNextRecord
- * If {@code true}, start position is the lowest key that is higher
- * than the provided key, otherwise start position is the provided
- * key.
+ * @param matchStrategy
+ * The key matching strategy.
+ * @param positionStrategy
+ * The positioning strategy.
* @return The pair (key_found, last_record_read). key_found is a boolean
- * indicating if reader is successfully positioned to the key or the
- * nearest key. last_record_read is the last record that was read.
- * When key_found is equals to {@code false}, then last_record_read is
- * always {@code null}. When key_found is equals to {@code true},
- * last_record_read can be valued or be {@code null}
+ * indicating if reader is successfully positioned. last_record_read
+ * is the last record that was read. When key_found is equals to
+ * {@code false}, then last_record_read is always {@code null}. When
+ * key_found is equals to {@code true}, last_record_read can be valued
+ * or be {@code null}
* @throws ChangelogException
* If an error occurs when seeking the key.
*/
- public Pair<Boolean, Record<K,V>> seekToRecord(final K key, final boolean findNextRecord) throws ChangelogException
+ public Pair<Boolean, Record<K,V>> seekToRecord(
+ final K key,
+ final KeyMatchingStrategy matchStrategy,
+ final PositionStrategy positionStrategy)
+ throws ChangelogException
{
+ Reject.checkNotNull(key);
final long markerPosition = searchClosestBlockStartToKey(key);
if (markerPosition >= 0)
{
- return positionToKeySequentially(markerPosition, key, findNextRecord);
+ return positionToKeySequentially(markerPosition, key, matchStrategy, positionStrategy);
}
return Pair.of(false, null);
}
@@ -440,39 +449,42 @@
/**
* Position to provided key, starting from provided block start position and
- * reading sequentially until key is found.
+ * reading sequentially until key is found according to matching and
+ * positioning strategies.
*
* @param blockStartPosition
* Position of read pointer in the file, expected to be the start of
* a block where a record offset is written.
* @param key
* The key to find.
- * @param findNextRecord
- * If {@code true}, position at the end of this method is the lowest
- * key that is higher than the provided key, otherwise position is
- * the provided key.
+ * @param matchStrategy
+ * The key matching strategy.
+ * @param positionStrategy
+ * The positioning strategy.
* @return The pair ({@code true}, last record read) if reader is successfully
- * positioned to the key or the nearest key (last record may be null
- * if end of file is reached), ({@code false}, null) otherwise.
+ * positioned (last record may be null if end of file is reached), (
+ * {@code false}, null) otherwise.
* @throws ChangelogException
- * If an error occurs.
+ * If an error occurs.
*/
Pair<Boolean, Record<K,V>> positionToKeySequentially(
final long blockStartPosition,
final K key,
- final boolean findNextRecord)
+ final KeyMatchingStrategy matchStrategy,
+ final PositionStrategy positionStrategy)
throws ChangelogException {
Record<K,V> record = readRecord(blockStartPosition);
do {
if (record != null)
{
final int keysComparison = record.getKey().compareTo(key);
- final boolean matches = findNextRecord ? keysComparison >= 0 : record.getKey().equals(key);
+ final boolean matches = (matchStrategy == EQUAL_TO_KEY && keysComparison == 0)
+ || (matchStrategy == GREATER_THAN_OR_EQUAL_TO_KEY && keysComparison >= 0);
if (matches)
{
- if (findNextRecord && keysComparison == 0)
+ if (positionStrategy == AFTER_MATCHING_KEY && keysComparison == 0)
{
- // skip key in order to position on lowest higher key
+ // skip matching key
record = readRecord();
}
return Pair.of(true, record);
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
index 65a95fc..3b98f9c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileChangelogDB.java
@@ -46,6 +46,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ChangeNumberIndexer;
import org.opends.server.replication.server.changelog.je.DomainDBCursor;
import org.opends.server.replication.server.changelog.je.MultiDomainDBCursor;
@@ -60,6 +61,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -122,7 +124,7 @@
private final AtomicBoolean shutdown = new AtomicBoolean();
static final DBCursor<UpdateMsg> EMPTY_CURSOR_REPLICA_DB =
- new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null);
+ new FileReplicaDBCursor(new Log.EmptyLogCursor<CSN, UpdateMsg>(), null, AFTER_MATCHING_KEY);
/**
* Creates a new changelog DB.
@@ -658,37 +660,38 @@
/** {@inheritDoc} */
@Override
- public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
- cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
+ cursor.addDomain(baseDN, startState.getServerState(baseDN));
}
return cursor;
}
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
- throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
for (int serverId : getDomainMap(baseDN).keySet())
{
// get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
cursor.addReplicaDB(serverId, lastCSN);
}
return cursor;
}
- private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, final PositionStrategy positionStrategy)
{
synchronized (registeredDomainCursors)
{
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
if (cursors == null)
{
@@ -715,15 +718,14 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
- throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
+ PositionStrategy positionStrategy) throws ChangelogException
{
final FileReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- final DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startAfterCSN);
- final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
// TODO JNR if (offlineCSN != null) ??
// What about replicas that suddenly become offline?
return new ReplicaOfflineCursor(cursor, offlineCSN);
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 4e6cba9..995eca0 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -203,21 +204,25 @@
}
/**
- * Returns a cursor that allows to retrieve the update messages from this DB,
- * starting at the position defined by the smallest CSN that is strictly
- * higher than the provided CSN.
+ * Returns a cursor that allows to retrieve the update messages from this DB.
+ * The starting position is defined by the provided CSN and cursor
+ * positioning strategy.
*
- * @param startAfterCSN
+ * @param startCSN
* The position where the cursor must start. If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * Cursor position strategy, which allow to choose if cursor must
+ * start from the provided CSN or just after the provided CSN.
* @return a new {@link DBCursor} to retreive update messages.
* @throws ChangelogException
* if a database problem happened
*/
- DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN) throws ChangelogException
+ DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
+ throws ChangelogException
{
- RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startAfterCSN);
- return new FileReplicaDBCursor(cursor, startAfterCSN);
+ RepositionableCursor<CSN, UpdateMsg> cursor = log.getNearestCursor(startCSN, positionStrategy);
+ return new FileReplicaDBCursor(cursor, startCSN, positionStrategy);
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
index c17755f..d48b18e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBCursor.java
@@ -25,6 +25,9 @@
*/
package org.opends.server.replication.server.changelog.file;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.changelog.api.ChangelogException;
@@ -58,7 +61,6 @@
*/
class FileReplicaDBCursor implements DBCursor<UpdateMsg>
{
-
/** The underlying cursor. */
private final RepositionableCursor<CSN, UpdateMsg> cursor;
@@ -68,18 +70,27 @@
/** The CSN to re-start with in case the cursor is exhausted. */
private CSN lastNonNullCurrentCSN;
+ private PositionStrategy positionStrategy;
+
/**
* Creates the cursor from provided log cursor and start CSN.
*
* @param cursor
* The underlying log cursor to read log.
- * @param startAfterCSN
+ * @param startCSN
* The CSN to use as a start point (excluded from cursor, the lowest
* CSN higher than this CSN is used as the real start point).
+ * @param positionStrategy
+ * Cursor position strategy, which allow to choose if cursor must
+ * start from the provided CSN or just after the provided CSN.
*/
- FileReplicaDBCursor(RepositionableCursor<CSN, UpdateMsg> cursor, CSN startAfterCSN) {
+ FileReplicaDBCursor(
+ final RepositionableCursor<CSN, UpdateMsg> cursor,
+ final CSN startCSN,
+ final PositionStrategy positionStrategy) {
this.cursor = cursor;
- this.lastNonNullCurrentCSN = startAfterCSN;
+ this.lastNonNullCurrentCSN = startCSN;
+ this.positionStrategy = positionStrategy;
}
/** {@inheritDoc} */
@@ -96,19 +107,23 @@
if (cursor.next())
{
nextRecord = cursor.getRecord();
- if (nextRecord.getKey().compareTo(lastNonNullCurrentCSN) > 0)
+ final int nextCSNCompare = nextRecord.getKey().compareTo(lastNonNullCurrentCSN);
+ if (nextCSNCompare > 0 || (nextCSNCompare == 0 && positionStrategy == ON_MATCHING_KEY))
{
+ // start CSN is found, switch to position strategy that always find the next
lastNonNullCurrentCSN = nextRecord.getKey();
+ positionStrategy = AFTER_MATCHING_KEY;
return true;
}
}
+ // either cursor is exhausted or we still have not reached the start CSN
return nextWhenCursorIsExhaustedOrNotCorrectlyPositionned();
}
/** Re-initialize the cursor after the last non null CSN. */
private boolean nextWhenCursorIsExhaustedOrNotCorrectlyPositionned() throws ChangelogException
{
- final boolean found = cursor.positionTo(lastNonNullCurrentCSN, true);
+ final boolean found = cursor.positionTo(lastNonNullCurrentCSN, GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
if (found && cursor.next())
{
nextRecord = cursor.getRecord();
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
index 16225d9..bff1def 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/Log.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.opends.messages.ReplicationMessages.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.Closeable;
@@ -52,6 +53,8 @@
import org.opends.server.loggers.ErrorLogger;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.LogFile.LogFileCursor;
import org.opends.server.util.StaticUtils;
@@ -439,7 +442,7 @@
return new EmptyLogCursor<K, V>();
}
cursor = new LogCursor<K, V>(this);
- cursor.positionTo(null, false);
+ cursor.positionTo(null, null, null);
registerCursor(cursor);
return cursor;
}
@@ -467,28 +470,35 @@
*/
public RepositionableCursor<K, V> getCursor(final K key) throws ChangelogException
{
- return getCursor(key, false);
+ return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, null);
}
/**
- * Returns a cursor that allows to retrieve the records from this log,
- * starting at the position defined by the smallest key that is higher than
- * the provided key.
+ * Returns a cursor that allows to retrieve the records from this log.
+ * The starting position is defined by the provided key and cursor
+ * positioning strategy.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, cursor will point at the first record of the log.
+ * @param positionStrategy
+ * The cursor positioning strategy.
* @return a cursor on the log records, which is never {@code null}
* @throws ChangelogException
* If the cursor can't be created.
*/
- public RepositionableCursor<K, V> getNearestCursor(final K key) throws ChangelogException
+ public RepositionableCursor<K, V> getNearestCursor(final K key, PositionStrategy positionStrategy)
+ throws ChangelogException
{
- return getCursor(key, true);
+ return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, positionStrategy);
}
- /** Returns a cursor starting from a key, using the strategy corresponding to provided boolean. */
- private RepositionableCursor<K, V> getCursor(final K key, boolean findNearest) throws ChangelogException
+ /**
+ * Returns a cursor starting from a key, using the provided matching and
+ * position strategies for the cursor.
+ */
+ private RepositionableCursor<K, V> getCursor(final K key, final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
if (key == null)
{
@@ -503,9 +513,9 @@
return new EmptyLogCursor<K, V>();
}
cursor = new LogCursor<K, V>(this);
- final boolean isFound = cursor.positionTo(key, findNearest);
- // for nearest case, it is ok if the target is not found
- if (isFound || findNearest)
+ final boolean isFound = cursor.positionTo(key, matchingStrategy, positionStrategy);
+ // When not matching the exact key, it is ok if the target is not found
+ if (isFound || matchingStrategy == GREATER_THAN_OR_EQUAL_TO_KEY)
{
registerCursor(cursor);
return cursor;
@@ -936,22 +946,23 @@
static interface RepositionableCursor<K extends Comparable<K>, V> extends DBCursor<Record<K, V>>
{
/**
- * Position the cursor to the record corresponding to the provided key or to
- * the nearest key (the lowest key higher than the provided key).
+ * Position the cursor to the record corresponding to the provided key and
+ * provided matching and positioning strategies.
*
* @param key
* Key to use as a start position for the cursor. If key is
* {@code null}, use the key of the first record instead.
- * @param findNearest
- * If {@code true}, start position is the lowest key that is higher
- * than the provided key, otherwise start position is the provided
- * key.
- * @return {@code true} if cursor is successfully positionned to the key or
- * the nearest key, {@code false} otherwise.
+ * @param matchStrategy
+ * The cursor key matching strategy.
+ * @param positionStrategy
+ * The cursor positioning strategy.
+ * @return {@code true} if cursor is successfully positioned, or
+ * {@code false} otherwise.
* @throws ChangelogException
* If an error occurs when positioning cursor.
*/
- boolean positionTo(K key, boolean findNearest) throws ChangelogException;
+ boolean positionTo(K key, KeyMatchingStrategy matchStrategy, PositionStrategy positionStrategy)
+ throws ChangelogException;
}
/**
@@ -1039,7 +1050,11 @@
/** {@inheritDoc} */
@Override
- public boolean positionTo(final K key, final boolean findNearest) throws ChangelogException
+ public boolean positionTo(
+ final K key,
+ final KeyMatchingStrategy matchStrategy,
+ final PositionStrategy positionStrategy)
+ throws ChangelogException
{
if (actAsEmptyCursor)
{
@@ -1053,7 +1068,7 @@
{
switchToLogFile(logFile);
}
- return (key == null) ? true : currentCursor.positionTo(key, findNearest);
+ return (key == null) ? true : currentCursor.positionTo(key, matchStrategy, positionStrategy);
}
finally
{
@@ -1128,7 +1143,7 @@
/** {@inheritDoc} */
@Override
- public boolean positionTo(K key, boolean returnLowestHigher) throws ChangelogException
+ public boolean positionTo(K key, KeyMatchingStrategy match, PositionStrategy pos) throws ChangelogException
{
return false;
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
index ff041ea..5f66cc2 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/LogFile.java
@@ -28,6 +28,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import java.io.BufferedWriter;
import java.io.Closeable;
@@ -41,6 +42,8 @@
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.file.Log.RepositionableCursor;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -335,7 +338,7 @@
*/
LogFileCursor<K, V> getCursor(final K key) throws ChangelogException
{
- return getCursor(key, false);
+ return getCursor(key, KeyMatchingStrategy.EQUAL_TO_KEY, PositionStrategy.ON_MATCHING_KEY);
}
/**
@@ -352,12 +355,15 @@
*/
LogFileCursor<K, V> getNearestCursor(final K key) throws ChangelogException
{
- return getCursor(key, true);
+ return getCursor(key, KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
}
/** Returns a cursor starting from a key, using the strategy corresponding to provided indicator. */
- private LogFileCursor<K, V> getCursor(final K key, boolean findNearest)
- throws ChangelogException
+ private LogFileCursor<K, V> getCursor(
+ final K key,
+ final KeyMatchingStrategy matchingStrategy,
+ final PositionStrategy positionStrategy)
+ throws ChangelogException
{
if (key == null)
{
@@ -367,7 +373,7 @@
try
{
cursor = new LogFileCursor<K, V>(this);
- cursor.positionTo(key, findNearest);
+ cursor.positionTo(key, matchingStrategy, positionStrategy);
// if target is not found, cursor is positioned at end of stream
return cursor;
}
@@ -628,8 +634,9 @@
/** {@inheritDoc} */
@Override
- public boolean positionTo(final K key, boolean findNearest) throws ChangelogException {
- final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, findNearest);
+ public boolean positionTo(final K key, final KeyMatchingStrategy match, final PositionStrategy pos)
+ throws ChangelogException {
+ final Pair<Boolean, Record<K, V>> result = reader.seekToRecord(key, match, pos);
final boolean found = result.getFirst();
initialRecord = found ? result.getSecond() : null;
return found;
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 8a9390d..36b3d5b 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -47,6 +47,7 @@
import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
@@ -353,7 +354,7 @@
}
}
- nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV);
+ nextChangeForInsertDBCursor = domainDB.getCursorFrom(mediumConsistencyRUV, PositionStrategy.AFTER_MATCHING_KEY);
nextChangeForInsertDBCursor.next();
if (newestRecord != null)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
index c249a78..26c40f7 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/DomainDBCursor.java
@@ -54,6 +54,8 @@
*/
private static final CSN NULL_CSN = new CSN(0, 0, 0);
+ private final PositionStrategy positionStrategy;
+
/**
* Builds a DomainDBCursor instance.
*
@@ -61,11 +63,15 @@
* the replication domain baseDN of this cursor
* @param domainDB
* the DB for the provided replication domain
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
*/
- public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB)
+ public DomainDBCursor(DN baseDN, ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
{
this.baseDN = baseDN;
this.domainDB = domainDB;
+ this.positionStrategy = positionStrategy;
}
/**
@@ -102,8 +108,9 @@
final Entry<Integer, CSN> pair = iter.next();
final int serverId = pair.getKey();
final CSN csn = pair.getValue();
- final CSN startAfterCSN = !NULL_CSN.equals(csn) ? csn : null;
- final DBCursor<UpdateMsg> cursor = domainDB.getCursorFrom(baseDN, serverId, startAfterCSN);
+ final CSN startCSN = !NULL_CSN.equals(csn) ? csn : null;
+ final DBCursor<UpdateMsg> cursor =
+ domainDB.getCursorFrom(baseDN, serverId, startCSN, positionStrategy);
addCursor(cursor, null);
iter.remove();
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index ce8b030..469347f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -33,6 +33,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import org.forgerock.util.Reject;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
@@ -46,6 +47,7 @@
import org.opends.server.replication.server.ChangelogState;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
@@ -56,6 +58,7 @@
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
/**
@@ -704,37 +707,38 @@
/** {@inheritDoc} */
@Override
- public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startAfterState) throws ChangelogException
+ public MultiDomainDBCursor getCursorFrom(final MultiDomainServerState startState,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this);
+ final MultiDomainDBCursor cursor = new MultiDomainDBCursor(this, positionStrategy);
registeredMultiDomainCursors.add(cursor);
for (DN baseDN : domainToReplicaDBs.keySet())
{
- cursor.addDomain(baseDN, startAfterState.getServerState(baseDN));
+ cursor.addDomain(baseDN, startState.getServerState(baseDN));
}
return cursor;
}
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startAfterState)
- throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final ServerState startState,
+ final PositionStrategy positionStrategy) throws ChangelogException
{
- final DomainDBCursor cursor = newDomainDBCursor(baseDN);
+ final DomainDBCursor cursor = newDomainDBCursor(baseDN, positionStrategy);
for (int serverId : getDomainMap(baseDN).keySet())
{
// get the last already sent CSN from that server to get a cursor
- final CSN lastCSN = startAfterState != null ? startAfterState.getCSN(serverId) : null;
+ final CSN lastCSN = startState != null ? startState.getCSN(serverId) : null;
cursor.addReplicaDB(serverId, lastCSN);
}
return cursor;
}
- private DomainDBCursor newDomainDBCursor(final DN baseDN)
+ private DomainDBCursor newDomainDBCursor(final DN baseDN, PositionStrategy positionStrategy)
{
synchronized (registeredDomainCursors)
{
- final DomainDBCursor cursor = new DomainDBCursor(baseDN, this);
+ final DomainDBCursor cursor = new DomainDBCursor(baseDN, this, positionStrategy);
List<DomainDBCursor> cursors = registeredDomainCursors.get(baseDN);
if (cursors == null)
{
@@ -761,15 +765,18 @@
/** {@inheritDoc} */
@Override
- public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startAfterCSN)
- throws ChangelogException
+ public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, CSN startCSN,
+ PositionStrategy positionStrategy) throws ChangelogException
+
{
+ Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
+ + " is not supported for the JE implementation fo changelog");
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
final DBCursor<UpdateMsg> cursor =
- replicaDB.generateCursorFrom(startAfterCSN);
- final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startAfterCSN);
+ replicaDB.generateCursorFrom(startCSN);
+ final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
// TODO JNR if (offlineCSN != null) ??
// What about replicas that suddenly become offline?
return new ReplicaOfflineCursor(cursor, offlineCSN);
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
index cf88679..a7f067f 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/MultiDomainDBCursor.java
@@ -50,15 +50,21 @@
private final ConcurrentSkipListSet<DN> removeDomains =
new ConcurrentSkipListSet<DN>();
+ private final PositionStrategy positionStrategy;
+
/**
* Builds a MultiDomainDBCursor instance.
*
* @param domainDB
* the replication domain management DB
+ * @param positionStrategy
+ * Cursor position strategy, which allow to indicates at which
+ * exact position the cursor must start
*/
- public MultiDomainDBCursor(ReplicationDomainDB domainDB)
+ public MultiDomainDBCursor(ReplicationDomainDB domainDB, PositionStrategy positionStrategy)
{
this.domainDB = domainDB;
+ this.positionStrategy = positionStrategy;
}
/**
@@ -86,7 +92,7 @@
final Entry<DN, ServerState> entry = iter.next();
final DN baseDN = entry.getKey();
final ServerState serverState = entry.getValue();
- final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState);
+ final DBCursor<UpdateMsg> domainDBCursor = domainDB.getCursorFrom(baseDN, serverState, positionStrategy);
addCursor(domainDBCursor, baseDN);
iter.remove();
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
index 7561613..2a7f6a3 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/BlockLogReaderWriterTest.java
@@ -26,6 +26,8 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.BlockLogReader.*;
import java.io.File;
@@ -38,6 +40,8 @@
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor.KeyMatchingStrategy;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteSequenceReader;
import org.opends.server.types.ByteString;
import org.opends.server.types.ByteStringBuilder;
@@ -203,7 +207,11 @@
try
{
reader = newReader(blockSize);
- Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, findNearest);
+ KeyMatchingStrategy matchStrategy =
+ findNearest ? KeyMatchingStrategy.GREATER_THAN_OR_EQUAL_TO_KEY : KeyMatchingStrategy.EQUAL_TO_KEY;
+ PositionStrategy posStrategy =
+ findNearest ? PositionStrategy.AFTER_MATCHING_KEY : PositionStrategy.ON_MATCHING_KEY;
+ Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, matchStrategy, posStrategy);
assertThat(result.getFirst()).isEqualTo(expectedFound);
assertThat(result.getSecond()).isEqualTo(expectedRecord);
@@ -331,7 +339,8 @@
for (Integer key : keysToSeek)
{
final long ts = System.nanoTime();
- Pair<Boolean, Record<Integer, Integer>> result = reader.seekToRecord(key, false);
+ Pair<Boolean, Record<Integer, Integer>> result =
+ reader.seekToRecord(key, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
final long te = System.nanoTime() - ts;
if (te < minTime) minTime = te;
if (te > maxTime) maxTime = te;
@@ -354,7 +363,8 @@
for (Integer val : keysToSeek)
{
long ts = System.nanoTime();
- Pair<Boolean, Record<Integer, Integer>> result = reader.positionToKeySequentially(0, val, false);
+ Pair<Boolean, Record<Integer, Integer>> result =
+ reader.positionToKeySequentially(0, val, GREATER_THAN_OR_EQUAL_TO_KEY, AFTER_MATCHING_KEY);
assertThat(result.getSecond()).isEqualTo(Record.from(val, val));
long te = System.nanoTime() - ts;
if (te < minTime) minTime = te;
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index e08e6d2..b2b5f27 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -52,6 +53,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
* Test the FileReplicaDB class
@@ -193,17 +195,17 @@
}
waitChangesArePersisted(replicaDB, 4);
- cursor = replicaDB.generateCursorFrom(csns[0]);
+ cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
assertTrue(cursor.next());
assertEquals(cursor.getRecord().getCSN(), csns[1]);
StaticUtils.close(cursor);
- cursor = replicaDB.generateCursorFrom(csns[3]);
+ cursor = replicaDB.generateCursorFrom(csns[3], AFTER_MATCHING_KEY);
assertTrue(cursor.next());
assertEquals(cursor.getRecord().getCSN(), csns[4]);
StaticUtils.close(cursor);
- cursor = replicaDB.generateCursorFrom(csns[4]);
+ cursor = replicaDB.generateCursorFrom(csns[4], AFTER_MATCHING_KEY);
assertFalse(cursor.next());
assertNull(cursor.getRecord());
}
@@ -242,7 +244,7 @@
CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
- cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey]);
+ cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], PositionStrategy.AFTER_MATCHING_KEY);
assertFalse(cursor.next());
int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
@@ -547,7 +549,7 @@
return;
}
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
try
{
// Cursor points to a null record initially
@@ -574,7 +576,7 @@
DBCursor<UpdateMsg> cursor = null;
try
{
- cursor = replicaDB.generateCursorFrom(csn);
+ cursor = replicaDB.generateCursorFrom(csn, AFTER_MATCHING_KEY);
assertFalse(cursor.next());
assertNull(cursor.getRecord());
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
index 1824aaa..e7e753d 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/LogTest.java
@@ -26,6 +26,7 @@
package org.opends.server.replication.server.changelog.file;
import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.replication.server.changelog.file.LogFileTest.*;
import java.io.File;
@@ -150,14 +151,14 @@
DBCursor<Record<String, String>> cursor1 = null, cursor2 = null, cursor3 = null;
try {
// this key is the first key of the log file "key1_key2.log"
- cursor1 = log.getNearestCursor("key001");
+ cursor1 = log.getNearestCursor("key001", AFTER_MATCHING_KEY);
assertThatCursorCanBeFullyReadFromStart(cursor1, 2, 10);
// this key is the last key of the log file "key3_key4.log"
- cursor2 = log.getNearestCursor("key004");
+ cursor2 = log.getNearestCursor("key004", AFTER_MATCHING_KEY);
assertThatCursorCanBeFullyReadFromStart(cursor2, 5, 10);
- cursor3 = log.getNearestCursor("key009");
+ cursor3 = log.getNearestCursor("key009", AFTER_MATCHING_KEY);
assertThatCursorCanBeFullyReadFromStart(cursor3, 10, 10);
}
finally {
@@ -171,7 +172,7 @@
Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
DBCursor<Record<String, String>> cursor = null;
try {
- cursor = log.getNearestCursor("key010");
+ cursor = log.getNearestCursor("key010", AFTER_MATCHING_KEY);
// lowest higher key does not exist
assertThatCursorIsExhausted(cursor);
@@ -188,7 +189,7 @@
DBCursor<Record<String, String>> cursor = null;
try {
// key is between key005 and key006
- cursor = log.getNearestCursor("key005000");
+ cursor = log.getNearestCursor("key005000", AFTER_MATCHING_KEY);
assertThatCursorCanBeFullyReadFromStart(cursor, 6, 10);
}
@@ -203,7 +204,7 @@
Log<String, String> log = openLog(LogFileTest.RECORD_PARSER);
DBCursor<Record<String, String>> cursor = null;
try {
- cursor = log.getNearestCursor(null);
+ cursor = log.getNearestCursor(null, null);
assertThatCursorCanBeFullyReadFromStart(cursor, 1, 10);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 5258d28..b71bf56 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -44,6 +44,7 @@
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
import org.opends.server.replication.server.changelog.api.ChangelogDB;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.api.ReplicationDomainDB;
import org.opends.server.types.DN;
import org.testng.annotations.*;
@@ -53,6 +54,7 @@
import static org.assertj.core.api.Assertions.*;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
/**
* Test for ChangeNumberIndexer class. All dependencies to the changelog DB
@@ -157,7 +159,7 @@
{
MockitoAnnotations.initMocks(this);
- multiDomainCursor = new MultiDomainDBCursor(domainDB);
+ multiDomainCursor = new MultiDomainDBCursor(domainDB, AFTER_MATCHING_KEY);
initialState = new ChangelogState();
initialCookie = new MultiDomainServerState();
replicaDBCursors = new HashMap<Pair<DN, Integer>, SequentialDBCursor>();
@@ -166,8 +168,8 @@
when(changelogDB.getChangeNumberIndexDB()).thenReturn(cnIndexDB);
when(changelogDB.getReplicationDomainDB()).thenReturn(domainDB);
- when(domainDB.getCursorFrom(any(MultiDomainServerState.class))).thenReturn(
- multiDomainCursor);
+ when(domainDB.getCursorFrom(any(MultiDomainServerState.class), eq(AFTER_MATCHING_KEY)))
+ .thenReturn(multiDomainCursor);
}
@AfterMethod
@@ -595,15 +597,15 @@
DomainDBCursor domainDBCursor = domainDBCursors.get(baseDN);
if (domainDBCursor == null)
{
- domainDBCursor = new DomainDBCursor(baseDN, domainDB);
+ domainDBCursor = new DomainDBCursor(baseDN, domainDB, AFTER_MATCHING_KEY);
domainDBCursors.put(baseDN, domainDBCursor);
multiDomainCursor.addDomain(baseDN, null);
- when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class)))
+ when(domainDB.getCursorFrom(eq(baseDN), any(ServerState.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(domainDBCursor);
}
domainDBCursor.addReplicaDB(serverId, null);
- when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class)))
+ when(domainDB.getCursorFrom(eq(baseDN), eq(serverId), any(CSN.class), eq(AFTER_MATCHING_KEY)))
.thenReturn(replicaDBCursor);
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
index da8bcd0..e5b022a 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/CompositeDBCursorTest.java
@@ -187,6 +187,7 @@
of(msg4, baseDN1));
}
+ @Test
public void recycleTwoElementsCursorsLongerExhaustion() throws Exception
{
final CompositeDBCursor<String> compCursor = newCompositeDBCursor(
--
Gitblit v1.10.0