From 8a0ef37a4b2c5d28b2bc9c91e90c4201768b97ea Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 19 Aug 2014 15:43:59 +0000
Subject: [PATCH] OPENDJ-1206 (CR-4261) Create a new ReplicationBackend/ChangelogBackend to support cn=changelog
---
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 19 +-
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 11 +
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 46 ++++--
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 151 ++++++++++++++-------
opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 168 +++++++++++++----------
5 files changed, 241 insertions(+), 154 deletions(-)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index 90091c9..5b3cca6 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -34,7 +34,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
-import org.forgerock.util.Reject;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.opends.server.api.DirectoryThread;
@@ -183,7 +182,9 @@
catch (Exception e)
{
if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
final MessageBuilder mb = new MessageBuilder(e.getLocalizedMessage()).append(" ")
.append(String.valueOf(dbDirectory));
@@ -585,7 +586,9 @@
firstException = e;
}
else if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
}
}
@@ -687,7 +690,9 @@
catch (Exception e)
{
if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
logError(ERR_CHANGENUMBER_DATABASE.get(e.getLocalizedMessage()));
}
}
@@ -765,12 +770,10 @@
public DBCursor<UpdateMsg> getCursorFrom(final DN baseDN, final int serverId, final CSN startCSN,
final PositionStrategy positionStrategy) throws ChangelogException
{
- Reject.ifTrue(positionStrategy == PositionStrategy.ON_MATCHING_KEY, "The position strategy ON_MATCHING_KEY"
- + " is not supported for the JE implementation fo changelog");
final JEReplicaDB replicaDB = getReplicaDB(baseDN, serverId);
if (replicaDB != null)
{
- final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN);
+ final DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
final CSN offlineCSN = getOfflineCSN(baseDN, serverId, startCSN);
final Pair<DN, Integer> replicaID = Pair.of(baseDN, serverId);
final ReplicaCursor replicaCursor = new ReplicaCursor(cursor, offlineCSN, replicaID, this);
@@ -860,7 +863,7 @@
{
replicationEnv.notifyReplicaOnline(baseDN, serverId);
}
- updateCursorsWithOfflineCSN(baseDN, null);
+ updateCursorsWithOfflineCSN(baseDN, serverId, null);
}
/** {@inheritDoc} */
@@ -873,12 +876,12 @@
{
indexer.replicaOffline(baseDN, offlineCSN);
}
- updateCursorsWithOfflineCSN(baseDN, offlineCSN);
+ updateCursorsWithOfflineCSN(baseDN, offlineCSN.getServerId(), offlineCSN);
}
- private void updateCursorsWithOfflineCSN(final DN baseDN, final CSN offlineCSN)
+ private void updateCursorsWithOfflineCSN(final DN baseDN, int serverId, final CSN offlineCSN)
{
- final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, offlineCSN));
+ final List<ReplicaCursor> cursors = replicaCursors.get(Pair.of(baseDN, serverId));
if (cursors != null && !cursors.isEmpty())
{
for (ReplicaCursor cursor : cursors)
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index 6d93ca0..bb9a22f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -41,7 +41,8 @@
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
-import org.opends.server.replication.server.changelog.je.ReplicationDB.*;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
+import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
import org.opends.server.types.Attribute;
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
@@ -178,18 +179,20 @@
* Generate a new {@link DBCursor} that allows to browse the db managed by
* this ReplicaDB and starting at the position defined by a given CSN.
*
- * @param startAfterCSN
+ * @param startCSN
* The position where the cursor must start. If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @return a new {@link DBCursor} that allows to browse the db managed by this
* ReplicaDB and starting at the position defined by a given CSN.
* @throws ChangelogException
* if a database problem happened
*/
- public DBCursor<UpdateMsg> generateCursorFrom(CSN startAfterCSN)
+ public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
throws ChangelogException
{
- return new JEReplicaDBCursor(db, startAfterCSN, this);
+ return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
}
/**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 1e9e8f7..e3dc03e 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -32,18 +32,21 @@
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.je.ReplicationDB.ReplServerDBCursor;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+
/**
* Berkeley DB JE implementation of {@link DBCursor}.
*
* \@NotThreadSafe
*/
-public class JEReplicaDBCursor implements DBCursor<UpdateMsg>
+class JEReplicaDBCursor implements DBCursor<UpdateMsg>
{
- private UpdateMsg currentChange;
- private ReplServerDBCursor cursor;
+ private final ReplicationDB db;
+ private final PositionStrategy positionStrategy;
private JEReplicaDB replicaDB;
- private ReplicationDB db;
private CSN lastNonNullCurrentCSN;
+ private ReplServerDBCursor cursor;
+ private UpdateMsg currentChange;
/**
* Creates a new {@link JEReplicaDBCursor}. All created cursor must be
@@ -51,20 +54,23 @@
*
* @param db
* The db where the cursor must be created.
- * @param startAfterCSN
+ * @param startCSN
* The CSN after which the cursor must start.If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @param replicaDB
* The associated JEReplicaDB.
* @throws ChangelogException
- * if a database problem happened.
+ * if a database problem happened.
*/
- public JEReplicaDBCursor(ReplicationDB db, CSN startAfterCSN,
+ public JEReplicaDBCursor(ReplicationDB db, CSN startCSN, PositionStrategy positionStrategy,
JEReplicaDB replicaDB) throws ChangelogException
{
this.db = db;
+ this.positionStrategy = positionStrategy;
this.replicaDB = replicaDB;
- this.lastNonNullCurrentCSN = startAfterCSN;
+ this.lastNonNullCurrentCSN = startCSN;
}
/** {@inheritDoc} */
@@ -78,9 +84,6 @@
@Override
public boolean next() throws ChangelogException
{
- final ReplServerDBCursor localCursor = cursor;
- currentChange = localCursor != null ? localCursor.next() : null;
-
if (currentChange == null)
{
synchronized (this)
@@ -91,10 +94,18 @@
// if following code is called while the cursor is closed.
// It is better to let the deadlock happen to help quickly identifying
// and fixing such issue with unit tests.
- cursor = db.openReadCursor(lastNonNullCurrentCSN);
- currentChange = cursor.next();
+ cursor = db.openReadCursor(lastNonNullCurrentCSN, positionStrategy);
}
}
+
+ // For ON_MATCHING_KEY, do not call next() if the cursor has just been initialized.
+ if (positionStrategy == ON_MATCHING_KEY && currentChange != null
+ || positionStrategy == AFTER_MATCHING_KEY)
+ {
+ cursor.next();
+ }
+ currentChange = cursor.getRecord();
+
if (currentChange != null)
{
lastNonNullCurrentCSN = currentChange.getCSN();
@@ -110,7 +121,7 @@
synchronized (this)
{
closeCursor();
- this.replicaDB = null;
+ replicaDB = null;
}
}
@@ -120,11 +131,12 @@
{
cursor.close();
cursor = null;
+ currentChange = null;
}
}
/**
- * Called by the Gc when the object is garbage collected. Release the internal
+ * Called by the GC when the object is garbage collected. Release the internal
* cursor in case the cursor was badly used and {@link #close()} was never
* called.
*/
@@ -138,7 +150,9 @@
@Override
public String toString()
{
- return getClass().getSimpleName() + " currentChange=" + currentChange
+ return getClass().getSimpleName()
+ + " positionStrategy=" + positionStrategy
+ + " currentChange=" + currentChange
+ " replicaDB=" + replicaDB;
}
}
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index afc94f0..c04ffef 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -26,12 +26,10 @@
*/
package org.opends.server.replication.server.changelog.je;
-import java.io.Closeable;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.protocol.ProtocolVersion;
@@ -40,6 +38,8 @@
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.ChangelogException;
+import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
@@ -244,15 +244,18 @@
private DatabaseEntry createReplicationKey(CSN csn)
{
- DatabaseEntry key = new DatabaseEntry();
- try
+ final DatabaseEntry key = new DatabaseEntry();
+ if (csn != null)
{
- key.setData(csn.toString().getBytes("UTF-8"));
- }
- catch (UnsupportedEncodingException e)
- {
- // Should never happens, UTF-8 is always supported
- // TODO : add better logging
+ try
+ {
+ key.setData(csn.toString().getBytes("UTF-8"));
+ }
+ catch (UnsupportedEncodingException e)
+ {
+ // Should never happens, UTF-8 is always supported
+ // TODO : add better logging
+ }
}
return key;
}
@@ -286,13 +289,15 @@
* @param startCSN
* The CSN from which the cursor must start.If null, start from the
* oldest CSN
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @return The ReplServerDBCursor.
* @throws ChangelogException
* If a database problem happened
*/
- ReplServerDBCursor openReadCursor(CSN startCSN) throws ChangelogException
+ ReplServerDBCursor openReadCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- return new ReplServerDBCursor(startCSN);
+ return new ReplServerDBCursor(startCSN, positionStrategy);
}
/**
@@ -446,7 +451,7 @@
* This Class implements a cursor that can be used to browse a
* replicationServer database.
*/
- class ReplServerDBCursor implements Closeable
+ class ReplServerDBCursor implements DBCursor<UpdateMsg>
{
/**
* The transaction that will protect the actions done with the cursor.
@@ -455,12 +460,14 @@
* <p>
* Will be set non null for a write cursor
*/
- private final Transaction txn;
private final Cursor cursor;
private final DatabaseEntry key;
private final DatabaseEntry data;
+ /** \@Null for read cursors, \@NotNull for deleting cursors. */
+ private final Transaction txn;
+ private UpdateMsg currentRecord;
- private boolean isClosed = false;
+ private boolean isClosed;
/**
* Creates a ReplServerDBCursor that can be used for browsing a
@@ -468,21 +475,15 @@
*
* @param startCSN
* The CSN from which the cursor must start.
+ * @param positionStrategy
+ * indicates at which exact position the cursor must start
* @throws ChangelogException
* When the startCSN does not exist.
*/
- private ReplServerDBCursor(CSN startCSN) throws ChangelogException
+ private ReplServerDBCursor(CSN startCSN, PositionStrategy positionStrategy) throws ChangelogException
{
- if (startCSN != null)
- {
- key = createReplicationKey(startCSN);
- }
- else
- {
- key = new DatabaseEntry();
- }
+ key = createReplicationKey(startCSN);
data = new DatabaseEntry();
-
txn = null;
// Take the lock. From now on, whatever error that happen in the life
@@ -516,18 +517,25 @@
return;
}
- // We can move close to the startCSN.
- // Let's create a cursor from that point.
- DatabaseEntry aKey = new DatabaseEntry();
- DatabaseEntry aData = new DatabaseEntry();
- if (localCursor.getPrev(aKey, aData, LockMode.DEFAULT) != SUCCESS)
+ if (positionStrategy == PositionStrategy.AFTER_MATCHING_KEY)
{
- localCursor.close();
- localCursor = db.openCursor(txn, null);
+ // We can move close to the startCSN.
+ // Let's create a cursor from that point.
+ key.setData(null);
+ if (localCursor.getPrev(key, data, LockMode.DEFAULT) != SUCCESS)
+ {
+ localCursor.close();
+ localCursor = db.openCursor(txn, null);
+ }
}
}
cursor = localCursor;
cursorHeld = cursor != null;
+
+ if (key.getData() != null)
+ {
+ computeCurrentRecord();
+ }
}
catch (DatabaseException e)
{
@@ -605,6 +613,7 @@
return;
}
isClosed = true;
+ currentRecord = null;
}
closeAndReleaseReadLock(cursor);
@@ -659,6 +668,7 @@
return null;
}
+ currentRecord = null;
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
@@ -673,63 +683,75 @@
}
}
- /**
- * Get the next UpdateMsg from this cursor.
- *
- * @return the next UpdateMsg.
- */
- UpdateMsg next()
+ /** {@inheritDoc} */
+ @Override
+ public boolean next() throws ChangelogException
{
if (isClosed)
{
- return null;
+ return false;
}
- UpdateMsg currentChange = null;
- while (currentChange == null)
+ currentRecord = null;
+ while (currentRecord == null)
{
try
{
if (cursor.getNext(key, data, LockMode.DEFAULT) != SUCCESS)
{
- return null;
+ return false;
}
}
catch (DatabaseException e)
{
- return null;
+ throw new ChangelogException(e);
}
-
- CSN csn = null;
- try
- {
- csn = toCSN(key.getData());
- if (isACounterRecord(csn))
- {
- continue;
- }
- currentChange = (UpdateMsg) ReplicationMsg.generateMsg(
- data.getData(), ProtocolVersion.getCurrentVersion());
- }
- catch (Exception e)
- {
- /*
- * An error happening trying to convert the data from the
- * replicationServer database to an Update Message. This can only
- * happen if the database is corrupted. There is not much more that we
- * can do at this point except trying to continue with the next
- * record. In such case, it is therefore possible that we miss some
- * changes.
- * TODO : This should be handled by the repair functionality.
- */
- Message message = ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD
- .get(replicationServer.getServerId(),
- (csn != null ? csn.toString() : ""),
- e.getMessage());
- logError(message);
- }
+ computeCurrentRecord();
}
- return currentChange;
+ return currentRecord != null;
+ }
+
+ private void computeCurrentRecord()
+ {
+ CSN csn = null;
+ try
+ {
+ csn = toCSN(key.getData());
+ if (isACounterRecord(csn))
+ {
+ return;
+ }
+ currentRecord = toRecord(data.getData());
+ }
+ catch (Exception e)
+ {
+ /*
+ * An error happening trying to convert the data from the
+ * replicationServer database to an Update Message. This can only
+ * happen if the database is corrupted. There is not much more that we
+ * can do at this point except trying to continue with the next
+ * record. In such case, it is therefore possible that we miss some
+ * changes.
+ * TODO : This should be handled by the repair functionality.
+ */
+ logError(ERR_REPLICATIONDB_CANNOT_PROCESS_CHANGE_RECORD.get(
+ replicationServer.getServerId(),
+ (csn != null ? csn.toString() : ""),
+ e.getMessage()));
+ }
+ }
+
+ private UpdateMsg toRecord(final byte[] data) throws Exception
+ {
+ final short currentVersion = ProtocolVersion.getCurrentVersion();
+ return (UpdateMsg) ReplicationMsg.generateMsg(data, currentVersion);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public UpdateMsg getRecord()
+ {
+ return currentRecord;
}
/**
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 9442e44..3033af5 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -28,7 +28,10 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -41,15 +44,18 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
+import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
@@ -98,11 +104,12 @@
void testTrim() throws Exception
{
ReplicationServer replicationServer = null;
+ JEReplicaDB replicaDB = null;
try
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
- final JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+ replicaDB = newReplicaDB(replicationServer);
CSN[] csns = newCSNs(1, 0, 5);
@@ -114,7 +121,7 @@
//--
// Iterator tests with changes persisted
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
- assertNotFound(replicaDB, csns[4]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
assertEquals(replicaDB.getOldestCSN(), csns[0]);
assertEquals(replicaDB.getNewestCSN(), csns[2]);
@@ -127,7 +134,7 @@
// Test cursor from existing CSN
assertFoundInOrder(replicaDB, csns[2], csns[3]);
assertFoundInOrder(replicaDB, csns[3]);
- assertNotFound(replicaDB, csns[4]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
@@ -150,6 +157,7 @@
}
finally
{
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -199,38 +207,34 @@
return;
}
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0]);
- try
- {
- assertNull(cursor.getRecord());
- for (int i = 1; i < csns.length; i++)
- {
- final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
- assertTrue(cursor.next(), msg);
- assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
- }
- assertFalse(cursor.next());
- assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
- + ", Expected null");
- }
- finally
- {
- StaticUtils.close(cursor);
- }
+ assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+ assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
}
- private void assertNotFound(JEReplicaDB replicaDB, CSN csn) throws Exception
+ private void assertFoundInOrder(JEReplicaDB replicaDB,
+ final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
{
- DBCursor<UpdateMsg> cursor = null;
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
try
{
- cursor = replicaDB.generateCursorFrom(csn);
- assertFalse(cursor.next());
- assertNull(cursor.getRecord());
+ assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+ for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+ {
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).as(msg).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+ softly.assertAll();
+ }
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
}
finally
{
- StaticUtils.close(cursor);
+ close(cursor);
}
}
@@ -243,11 +247,12 @@
void testClear() throws Exception
{
ReplicationServer replicationServer = null;
+ JEReplicaDB replicaDB = null;
try
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
- JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+ replicaDB = newReplicaDB(replicationServer);
CSN[] csns = newCSNs(1, 0, 3);
@@ -267,6 +272,7 @@
}
finally
{
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -275,7 +281,6 @@
public void testGenerateCursorFrom() throws Exception
{
ReplicationServer replicationServer = null;
- DBCursor<UpdateMsg> cursor = null;
JEReplicaDB replicaDB = null;
try
{
@@ -283,38 +288,69 @@
replicationServer = configureReplicationServer(100000, 10);
replicaDB = newReplicaDB(replicationServer);
- CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
- for (int i = 0; i < 5; i++)
+ final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
+ final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+ csns2.remove(csns[3]);
+
+ for (CSN csn : csns2)
{
- if (i != 3)
- {
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
- }
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
}
- cursor = replicaDB.generateCursorFrom(csns[0]);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[1]);
- StaticUtils.close(cursor);
+ for (CSN csn : csns2)
+ {
+ assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+ }
+ assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
- cursor = replicaDB.generateCursorFrom(csns[3]);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[4]);
- StaticUtils.close(cursor);
-
- cursor = replicaDB.generateCursorFrom(csns[4]);
- assertFalse(cursor.next());
- assertNull(cursor.getRecord());
+ for (int i = 0; i < csns2.size() - 1; i++)
+ {
+ assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+ }
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
}
finally
{
- StaticUtils.close(cursor);
- if (replicaDB != null)
- replicaDB.shutdown();
+ shutdown(replicaDB);
remove(replicationServer);
}
}
+ private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy, final CSN expectedCSN)
+ throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ private void assertNotFound(JEReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
/**
* Test the logic that manages counter records in the JEReplicaDB in order to
* optimize the oldest and newest records in the replication changelog db.
@@ -412,13 +448,22 @@
}
finally
{
- if (replicaDB != null)
- replicaDB.shutdown();
+ shutdown(replicaDB);
if (dbEnv != null)
+ {
dbEnv.shutdown();
+ }
remove(replicationServer);
TestCaseUtils.deleteDirectory(testRoot);
}
}
+ private void shutdown(JEReplicaDB replicaDB)
+ {
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ }
+
}
--
Gitblit v1.10.0