From 0c32ce52234710c37d8b5b751dd16f536943e3c1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 20 Aug 2014 10:57:29 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog
---
opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 151 ++++++++++++++-------
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 16 +-
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 9
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 46 ++++--
opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 162 +++++++++++++---------
5 files changed, 235 insertions(+), 149 deletions(-)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 0b2de60..ceca244 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -37,7 +37,6 @@
import org.forgerock.i18n.LocalizableMessageBuilder;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
-import org.forgerock.util.Reject;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
@@ -578,7 +577,10 @@
{
firstException = e;
}
- else logger.traceException(e);
+ else
+ {
+ logger.traceException(e);
+ }
}
}
}
@@ -757,12 +759,10 @@
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
final PositionStrategy positionStrategy) throws ChangelogException
{
- Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
- + " is not supported for the JE implementation fo changelog");
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
@@ -853,12 +853,12 @@
{
indexer.replicaOffline(baseDN, offlineCSN);
}
- updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+ updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
}
- private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+ private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
{
- final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
if (cursors != null && !cursors.isEmpty())
{
for (ReplicaCursor cursor : cursors)
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 5efafee..3fd1878 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,6 +41,7 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
@@ -177,18 +178,20 @@
* Generate a new {@link DBCursor} that allows to browse the db managed by
* this ReplicaDB and starting at the position defined by a given CSN.
*
- * @param startAfterCSN
+ * @param startCSN
* The position where the cursor must start. If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @return a new {@link DBCursor} that allows to browse the db managed by this
* ReplicaDB and starting at the position defined by a given CSN.
* @throws ChangelogException
* if a database problem happened
*/
- public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
+ public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
throws ChangelogException
{
- return new JEReplicaDBCursor(db, startAfterCSN, this);
+ return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 1e9e8f7..e3dc03e 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,18 +32,21 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
/**
* Berkeley DB JE implementation of {@link DBCursor}.
*
* \@NotThreadSafe
*/
-public class JEReplicaDBCursor implements DBCursor<UpdateMsg>
+class JEReplicaDBCursor implements DBCursor<UpdateMsg>
{
- private UpdateMsg currentChange;
- private ReplServerDBCursor cursor;
+ private final ReplicationDB db;
+ private final PositionStrategy positionStrategy;
private JEReplicaDB replicaDB;
- private ReplicationDB db;
private CSN lastNonNullCurrentCSN;
+ private ReplServerDBCursor cursor;
+ private UpdateMsg currentChange;
/**
* Creates a new {@link JEReplicaDBCursor}. All created cursor must be
@@ -51,20 +54,23 @@
*
* @param db
* The db where the cursor must be created.
- * @param startAfterCSN
+ * @param startCSN
* The CSN after which the cursor must start.If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @param replicaDB
* The associated JEReplicaDB.
* @throws ChangelogException
- * if a database problem happened.
+ * if a database problem happened.
*/
- public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN,
+ public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy,
JEReplicaDB replicaDB) throws ChangelogException
{
this.db = db;
+ this.positionStrategy = positionStrategy;
this.replicaDB = replicaDB;
- this.lastNonNullCurrentCSN = startAfterCSN;
+ this.lastNonNullCurrentCSN = startCSN;
}
/** {@inheritDoc} */
@@ -78,9 +84,6 @@
@Override
public boolean next() throws ChangelogException
{
- final ReplServerDBCursor localCursor = cursor;
- currentChange = localCursor != null ? localCursor.next() : null;
-
if (currentChange == null)
{
synchronized (this)
@@ -91,10 +94,18 @@
// if following code is called while the cursor is closed.
// It is better to let the deadlock happen to help quickly identifying
// and fixing such issue with unit tests.
- cursor = db.openReadCursor(lastNonNullCurrentCSN);
- currentChange = cursor.next();
+ cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy);
}
}
+
+ // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized.
+ if (positionStrategy == ON_MATCHING_KEY && currentChange != null
+ || positionStrategy == AFTER_MATCHING_KEY)
+ {
+ cursor.next();
+ }
+ currentChange = cursor.getRecord();
+
if (currentChange != null)
{
lastNonNullCurrentCSN = currentChange.getCSN();
@@ -110,7 +121,7 @@
synchronized (this)
{
closeCursor();
- this.replicaDB = null;
+ replicaDB = null;
}
}
@@ -120,11 +131,12 @@
{
cursor.close();
cursor = null;
+ currentChange = null;
}
}
/**
- * Called by the Gc when the object is garbage collected. Release the internal
+ * Called by the GC when the object is garbage collected. Release the internal
* cursor in case the cursor was badly used and {@link #close()} was never
* called.
*/
@@ -138,7 +150,9 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentChange=" + currentChange
+ return getClass().getSimpleName()
+ + " positionStrategy=" + positionStrategy
+ + " currentChange=" + currentChange
+ " replicaDB=" + replicaDB;
}
}
diff --git a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index 27ea1a6..abbe49c 100644
--- a/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj-sdk/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,7 +26,6 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.io.Closeable;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReadWriteLock;
@@ -39,6 +38,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -244,15 +245,18 @@
private DatabaseEntry createReplicationKey(CSN csn)
{
- DatabaseEntry key = new DatabaseEntry();
- try
+ final DatabaseEntry key = new DatabaseEntry();
+ if (csn != null)
{
- key.setData(csn.toString().getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
+ try
+ {
+ key.setData(csn.toString().getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // Should never happens, UTF-8 is always supported
+ // TODO : add better logging
+ }
}
return key;
}
@@ -285,13 +289,15 @@
* @param startCSN
* The CSN from which the cursor must start.If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @return The ReplServerDBCursor.
* @throws ChangelogException
* If a database problem happened
*/
- ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- return new ReplServerDBCursor(startCSN);
+ return new ReplServerDBCursor(startCSN, positionStrategy);
}
/**
@@ -445,7 +451,7 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- class ReplServerDBCursor implements Closeable
+ class ReplServerDBCursor implements DBCursor<UpdateMsg>
{
/**
* The transaction that will protect the actions done with the cursor.
@@ -454,12 +460,14 @@
* <p>
* Will be set non null for a write cursor
*/
- private final Transaction txn;
private final Cursor cursor;
private final DatabaseEntry key;
private final DatabaseEntry data;
+ /** \@Null for read cursors, \@NotNull for deleting cursors. */
+ private final Transaction txn;
+ private UpdateMsg currentRecord;
- private boolean isClosed = false;
+ private boolean isClosed;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
@@ -467,21 +475,15 @@
*
* @param startCSN
* The CSN from which the cursor must start.
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @throws ChangelogException
* When the startCSN does not exist.
*/
- private ReplServerDBCursor(CSN startCSN) throws ChangelogException
+ private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- if (startCSN != null)
- {
- key = createReplicationKey(startCSN);
- }
- else
- {
- key = new DatabaseEntry();
- }
+ key = createReplicationKey(startCSN);
data = new DatabaseEntry();
-
txn = null;
// Take the lock. From now on, whatever error that happen in the life
@@ -515,18 +517,25 @@
return;
}
- // We can move close to the startCSN.
- // Let's create a cursor from that point.
- DatabaseEntry aKey = new DatabaseEntry();
- DatabaseEntry aData = new DatabaseEntry();
- if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
+ if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
{
- localCursor.close();
- localCursor = db.openCursor(txn, null);
+ // We can move close to the startCSN.
+ // Let's create a cursor from that point.
+ key.setData(null);
+ if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ localCursor.close();
+ localCursor = db.openCursor(txn, null);
+ }
}
}
cursor = localCursor;
cursorHeld = cursor != null;
+
+ if (key.getData() != null)
+ {
+ computeCurrentRecord();
+ }
}
catch (DatabaseException e)
{
@@ -604,6 +613,7 @@
return;
}
isClosed = true;
+ currentRecord = null;
}
closeAndReleaseReadLock(cursor);
@@ -658,6 +668,7 @@
return null;
}
+ currentRecord = null;
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -672,60 +683,73 @@
}
}
- /**
- * Get the next UpdateMsg from this cursor.
- *
- * @return the next UpdateMsg.
- */
- UpdateMsg next()
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
{
if (isClosed)
{
- return null;
+ return false;
}
- UpdateMsg currentChange = null;
- while (currentChange == null)
+ currentRecord = null;
+ while (currentRecord == null)
{
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
{
- return null;
+ return false;
}
}
catch (DatabaseException e)
{
- return null;
+ throw new ChangelogException(e);
}
-
- CSN csn = null;
- try
- {
- csn = toCSN(key.getData());
- if (isACounterRecord(csn))
- {
- continue;
- }
- currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
- data.getData(), ProtocolVersion.getCurrentVersion());
- }
- catch (Exception e)
- {
- /*
- * An error happening trying to convert the data from the
- * replicationServer database to an Update LocalizableMessage. This can only
- * happen if the database is corrupted. There is not much more that we
- * can do at this point except trying to continue with the next
- * record. In such case, it is therefore possible that we miss some
- * changes.
- * TODO : This should be handled by the repair functionality.
- */
- logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
- csn, e.getMessage());
- }
+ computeCurrentRecord();
}
- return currentChange;
+ return currentRecord != null;
+ }
+
+ private void computeCurrentRecord()
+ {
+ CSN csn = null;
+ try
+ {
+ csn = toCSN(key.getData());
+ if (isACounterRecord(csn))
+ {
+ return;
+ }
+ currentRecord = toRecord(data.getData());
+ }
+ catch (Exception e)
+ {
+ /*
+ * An error happening trying to convert the data from the
+ * replicationServer database to an Update Message. This can only
+ * happen if the database is corrupted. There is not much more that we
+ * can do at this point except trying to continue with the next
+ * record. In such case, it is therefore possible that we miss some
+ * changes.
+ * TODO : This should be handled by the repair functionality.
+ */
+ logger.error(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD, replicationServer.getServerId(),
+ csn, e.getMessage());
+ }
+ }
+
+ private UpdateMsg toRecord(final byte[] data) throws Exception
+ {
+ final short currentVersion = ProtocolVersion.getCurrentVersion();
+ return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return currentRecord;
}
/**
diff --git a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 9e3670b..cc1be60 100644
--- a/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opendj-sdk/opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -28,7 +28,10 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.forgerock.opendj.config.server.ConfigException;
@@ -40,13 +43,16 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
@@ -81,11 +87,12 @@
void testTrim() throws Exception
{
ReplicationServer replicationServer = null;
+ JEReplicaDB replicaDB = null;
try
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
- final JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+ replicaDB = newReplicaDB(replicationServer);
CSN[] csns = newCSNs(1, 0, 5);
@@ -97,7 +104,7 @@
//--
// Iterator tests with changes persisted
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
- assertNotFound(replicaDB, csns[4]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
assertEquals(replicaDB.getOldestCSN(), csns[0]);
assertEquals(replicaDB.getNewestCSN(), csns[2]);
@@ -110,7 +117,7 @@
// Test cursor from existing CSN
assertFoundInOrder(replicaDB, csns[2], csns[3]);
assertFoundInOrder(replicaDB, csns[3]);
- assertNotFound(replicaDB, csns[4]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
@@ -133,6 +140,7 @@
}
finally
{
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -182,38 +190,34 @@
return;
}
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
- try
- {
- assertNull(cursor.getRecord());
- for (int i = 1; i < csns.length; i++)
- {
- final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
- assertTrue(cursor.next(), msg);
- assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
- }
- assertFalse(cursor.next());
- assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
- + ", Expected null");
- }
- finally
- {
- StaticUtils.close(cursor);
- }
+ assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+ assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
}
- private void assertNotFound(JEReplicaDB replicaDB, CSN csn) throws Exception
+ private void assertFoundInOrder(JEReplicaDB replicaDB,
+ final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
{
- DBCursor<UpdateMsg> cursor = null;
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
try
{
- cursor = replicaDB.generateCursorFrom(csn);
- assertFalse(cursor.next());
- assertNull(cursor.getRecord());
+ assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+ for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+ {
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).as(msg).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+ softly.assertAll();
+ }
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
}
finally
{
- StaticUtils.close(cursor);
+ close(cursor);
}
}
@@ -226,11 +230,12 @@
void testClear() throws Exception
{
ReplicationServer replicationServer = null;
+ JEReplicaDB replicaDB = null;
try
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
- JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+ replicaDB = newReplicaDB(replicationServer);
CSN[] csns = newCSNs(1, 0, 3);
@@ -250,6 +255,7 @@
}
finally
{
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -258,7 +264,6 @@
public void testGenerateCursorFrom() throws Exception
{
ReplicationServer replicationServer = null;
- DBCursor<UpdateMsg> cursor = null;
JEReplicaDB replicaDB = null;
try
{
@@ -266,38 +271,69 @@
replicationServer = configureReplicationServer(100000, 10);
replicaDB = newReplicaDB(replicationServer);
- CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
- for (int i = 0; i < 5; i++)
+ final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
+ final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+ csns2.remove(csns[3]);
+
+ for (CSN csn : csns2)
{
- if (i != 3)
- {
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
- }
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
}
- cursor = replicaDB.generateCursorFrom(csns[0]);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[1]);
- StaticUtils.close(cursor);
+ for (CSN csn : csns2)
+ {
+ assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+ }
+ assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
- cursor = replicaDB.generateCursorFrom(csns[3]);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[4]);
- StaticUtils.close(cursor);
-
- cursor = replicaDB.generateCursorFrom(csns[4]);
- assertFalse(cursor.next());
- assertNull(cursor.getRecord());
+ for (int i = 0; i < csns2.size() - 1; i++)
+ {
+ assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+ }
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
}
finally
{
- StaticUtils.close(cursor);
- if (replicaDB != null)
- replicaDB.shutdown();
+ shutdown(replicaDB);
remove(replicationServer);
}
}
+ private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy, final CSN expectedCSN)
+ throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
/**
* Test the logic that manages counter records in the JEReplicaDB in order to
* optimize the oldest and newest records in the replication changelog db.
@@ -395,13 +431,22 @@
}
finally
{
- if (replicaDB != null)
- replicaDB.shutdown();
+ shutdown(replicaDB);
if (dbEnv != null)
+ {
dbEnv.shutdown();
+ }
remove(replicationServer);
TestCaseUtils.deleteDirectory(testRoot);
}
}
+ private void shutdown(JEReplicaDB replicaDB)
+ {
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ }
+
}
--
Gitblit v1.10.0