From aae049d2b08a07020cb12139b56dbd8f25c01500 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 20 Aug 2014 13:26:09 +0000
Subject: [PATCH] Aligned (JE|File)ReplicaDB and their tests for easier comparison with each other.
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 4
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java | 187 ++++++++++--------
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 80 +++----
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java | 17 -
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 245 ++++++++++++------------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java | 17 -
6 files changed, 276 insertions(+), 274 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
index 995eca0..db6d2fb 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -97,15 +97,10 @@
* @NonNull
*/
private volatile CSNLimits csnLimits;
-
private final int serverId;
-
private final DN baseDN;
-
private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
-
private final ReplicationServer replicationServer;
-
private final ReplicationEnvironment replicationEnv;
/**
@@ -171,7 +166,9 @@
ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
.toString(), String.valueOf(baseDN), String.valueOf(serverId)));
}
+
log.append(Record.from(updateMsg.getCSN(), updateMsg));
+
final CSNLimits limits = csnLimits;
final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
final boolean updateOld = limits.oldestCSN == null;
@@ -205,8 +202,8 @@
/**
* Returns a cursor that allows to retrieve the update messages from this DB.
- * The starting position is defined by the provided CSN and cursor
- * positioning strategy.
+ * The starting position is defined by the provided CSN and cursor positioning
+ * strategy.
*
* @param startCSN
* The position where the cursor must start. If null, start from the
@@ -214,7 +211,7 @@
* @param positionStrategy
* Cursor position strategy, which allow to choose if cursor must
* start from the provided CSN or just after the provided CSN.
- * @return a new {@link DBCursor} to retreive update messages.
+ * @return a new {@link DBCursor} to retrieve update messages.
* @throws ChangelogException
* if a database problem happened
*/
@@ -315,8 +312,8 @@
public String toString()
{
final CSNLimits limits = csnLimits;
- return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " + limits.oldestCSN + " "
- + limits.newestCSN;
+ return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
+ + limits.oldestCSN + " " + limits.newestCSN;
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index bb9a22f..24c5049 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -51,18 +51,17 @@
import static org.opends.messages.ReplicationMessages.*;
/**
- * This class is used for managing the replicationServer database for each
- * server in the topology.
+ * Represents a replication server database for one server in the topology.
* <p>
* It is responsible for efficiently saving the updates that is received from
* each master server into stable storage.
* <p>
- * This class is also able to generate a {@link DBCursor} that can be used to
+ * It is also able to generate a {@link DBCursor} that can be used to
* read all changes from a given {@link CSN}.
* <p>
- * This class publish some monitoring information below cn=monitor.
+ * It publishes some monitoring information below cn=monitor.
*/
-public class JEReplicaDB
+class JEReplicaDB
{
/**
@@ -81,41 +80,44 @@
this.oldestCSN = oldestCSN;
this.newestCSN = newestCSN;
}
-
}
private final AtomicBoolean shutdown = new AtomicBoolean(false);
- private ReplicationDB db;
/**
* Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
*
* @NonNull
*/
private volatile CSNLimits csnLimits;
- private int serverId;
- private DN baseDN;
- private DbMonitorProvider dbMonitor = new DbMonitorProvider();
- private ReplicationServer replicationServer;
+ private final int serverId;
+ private final DN baseDN;
+ private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
+ private final ReplicationServer replicationServer;
+ private final ReplicationDB db;
/**
* Creates a new ReplicaDB associated to a given LDAP server.
*
- * @param serverId The serverId for which changes will be stored in the DB.
- * @param baseDN the baseDN for which this DB was created.
- * @param replicationServer The ReplicationServer that creates this ReplicaDB.
- * @param dbenv the Database Env to use to create the ReplicationServer DB.
- * server for this domain.
- * @throws ChangelogException If a database problem happened
+ * @param serverId
+ * Id of this server.
+ * @param baseDN
+ * the replication domain baseDN.
+ * @param replicationServer
+ * The ReplicationServer that creates this ReplicaDB.
+ * @param replicationEnv
+ * the Database Env to use to create the ReplicationServer DB. server
+ * for this domain.
+ * @throws ChangelogException
+ * If a database problem happened
*/
- public JEReplicaDB(int serverId, DN baseDN,
- ReplicationServer replicationServer, ReplicationDbEnv dbenv)
- throws ChangelogException
+ JEReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
+ final ReplicationDbEnv replicationEnv) throws ChangelogException
{
- this.replicationServer = replicationServer;
this.serverId = serverId;
this.baseDN = baseDN;
- db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
- csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
+ this.replicationServer = replicationServer;
+ this.db = new ReplicationDB(serverId, baseDN, replicationServer, replicationEnv);
+ this.csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
DirectoryServer.deregisterMonitorProvider(dbMonitor);
DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -144,8 +146,7 @@
db.addEntry(updateMsg);
final CSNLimits limits = csnLimits;
- final boolean updateNew = limits.newestCSN == null
- || limits.newestCSN.isOlderThan(updateMsg.getCSN());
+ final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
final boolean updateOld = limits.oldestCSN == null;
if (updateOld || updateNew)
{
@@ -160,7 +161,7 @@
*
* @return the oldest CSN that has not been purged yet.
*/
- public CSN getOldestCSN()
+ CSN getOldestCSN()
{
return csnLimits.oldestCSN;
}
@@ -170,7 +171,7 @@
*
* @return the newest CSN that has not been purged yet.
*/
- public CSN getNewestCSN()
+ CSN getNewestCSN()
{
return csnLimits.newestCSN;
}
@@ -189,7 +190,7 @@
* @throws ChangelogException
* if a database problem happened
*/
- public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
+ DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
throws ChangelogException
{
return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
@@ -198,7 +199,7 @@
/**
* Shutdown this ReplicaDB.
*/
- public void shutdown()
+ void shutdown()
{
if (shutdown.compareAndSet(false, true))
{
@@ -212,7 +213,7 @@
*
* @param purgeCSN
* The CSN up to which changes can be purged. No purging happens when
- * it is null.
+ * it is {@code null}.
* @throws ChangelogException
* In case of database problem.
*/
@@ -280,8 +281,7 @@
}
/**
- * This internal class is used to implement the Monitoring capabilities of the
- * ReplicaDB.
+ * Implements monitoring capabilities of the ReplicaDB.
*/
private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
{
@@ -289,7 +289,7 @@
@Override
public List<Attribute> getMonitorData()
{
- List<Attribute> attributes = new ArrayList<Attribute>();
+ final List<Attribute> attributes = new ArrayList<Attribute>();
create(attributes, "replicationServer-database",String.valueOf(serverId));
create(attributes, "domain-name", baseDN.toNormalizedString());
final CSNLimits limits = csnLimits;
@@ -304,12 +304,12 @@
return attributes;
}
- private void create(List<Attribute> attributes, String name, String value)
+ private void create(final List<Attribute> attributes, final String name, final String value)
{
attributes.add(Attributes.create(name, value));
}
- private String encode(CSN csn)
+ private String encode(final CSN csn)
{
return csn + " " + new Date(csn.getTime());
}
@@ -318,16 +318,14 @@
@Override
public String getMonitorInstanceName()
{
- ReplicationServerDomain domain = replicationServer
- .getReplicationServerDomain(baseDN);
- return "Changelog for DS(" + serverId + "),cn="
- + domain.getMonitorInstanceName();
+ ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
+ return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
}
/** {@inheritDoc} */
@Override
public void initializeMonitorProvider(MonitorProviderCfg configuration)
- throws ConfigException,InitializationException
+ throws ConfigException,InitializationException
{
// Nothing to do for now
}
@@ -347,7 +345,7 @@
* @throws ChangelogException When an exception occurs while removing the
* changes from the DB.
*/
- public void clear() throws ChangelogException
+ void clear() throws ChangelogException
{
db.clear();
csnLimits = new CSNLimits(null, null);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
index cf896e6..e761851 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -32,7 +32,6 @@
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -48,6 +47,7 @@
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
+import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
import static org.testng.Assert.*;
@SuppressWarnings("javadoc")
@@ -233,17 +233,6 @@
}
}
- private CSN[] generateCSNs(int serverId, long timestamp, int number)
- {
- CSNGenerator gen = new CSNGenerator(serverId, timestamp);
- CSN[] csns = new CSN[number];
- for (int i = 0; i < csns.length; i++)
- {
- csns[i] = gen.newCSN();
- }
- return csns;
- }
-
private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception
{
// Prepare data to be stored in the db
@@ -334,10 +323,10 @@
{
try
{
- for (int i = 0; i < cns.length; i++)
+ for (long cn : cns)
{
assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getChangeNumber(), cns[i]);
+ assertEquals(cursor.getRecord().getChangeNumber(), cn);
}
assertFalse(cursor.next());
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
index b2b5f27..a9d707c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -27,7 +27,10 @@
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -40,20 +43,21 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
-import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
-import static org.testng.Assert.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
+import static org.opends.server.util.StaticUtils.*;
+import static org.testng.Assert.*;
/**
* Test the FileReplicaDB class
@@ -124,9 +128,7 @@
}
finally
{
- if (replicaDB != null) {
- replicaDB.shutdown();
- }
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -150,7 +152,7 @@
waitChangesArePersisted(replicaDB, 3);
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
- assertNotFound(replicaDB, csns[4]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
assertEquals(replicaDB.getOldestCSN(), csns[0]);
assertEquals(replicaDB.getNewestCSN(), csns[2]);
@@ -162,13 +164,11 @@
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
assertFoundInOrder(replicaDB, csns[2], csns[3]);
assertFoundInOrder(replicaDB, csns[3]);
- assertNotFound(replicaDB, csns[4]);
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
}
finally
{
- if (replicaDB != null) {
- replicaDB.shutdown();
- }
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -177,7 +177,6 @@
public void testGenerateCursorFrom() throws Exception
{
ReplicationServer replicationServer = null;
- DBCursor<UpdateMsg> cursor = null;
FileReplicaDB replicaDB = null;
try
{
@@ -185,36 +184,31 @@
replicationServer = configureReplicationServer(100000, 10);
replicaDB = newReplicaDB(replicationServer);
- CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
- for (int i = 0; i < 5; i++)
+ final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
+ final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+ csns2.remove(csns[3]);
+
+ for (CSN csn : csns2)
{
- if (i != 3)
- {
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
- }
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
}
waitChangesArePersisted(replicaDB, 4);
- cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[1]);
- StaticUtils.close(cursor);
+ for (CSN csn : csns2)
+ {
+ assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+ }
+ assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
- cursor = replicaDB.generateCursorFrom(csns[3], AFTER_MATCHING_KEY);
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[4]);
- StaticUtils.close(cursor);
-
- cursor = replicaDB.generateCursorFrom(csns[4], AFTER_MATCHING_KEY);
- assertFalse(cursor.next());
- assertNull(cursor.getRecord());
+ for (int i = 0; i < csns2.size() - 1; i++)
+ {
+ assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+ }
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
}
finally
{
- StaticUtils.close(cursor);
- if (replicaDB != null) {
- replicaDB.shutdown();
- }
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -242,9 +236,9 @@
replicationServer = configureReplicationServer(100000, 10);
replicaDB = newReplicaDB(replicationServer);
- CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
+ CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
- cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], PositionStrategy.AFTER_MATCHING_KEY);
+ cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], AFTER_MATCHING_KEY);
assertFalse(cursor.next());
int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
@@ -266,11 +260,8 @@
}
finally
{
- StaticUtils.close(cursor);
- if (replicaDB != null)
- {
- replicaDB.shutdown();
- }
+ close(cursor);
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -283,12 +274,12 @@
public void testPurge() throws Exception
{
ReplicationServer replicationServer = null;
+ FileReplicaDB replicaDB = null;
try
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
-
- final FileReplicaDB replicaDB = newReplicaDB(replicationServer);
+ replicaDB = newReplicaDB(replicationServer);
CSN[] csns = generateCSNs(1, 0, 5);
@@ -320,6 +311,7 @@
}
finally
{
+ shutdown(replicaDB);
remove(replicationServer);
}
}
@@ -338,7 +330,6 @@
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100, 5000);
-
replicaDB = newReplicaDB(replicationServer);
CSN[] csns = generateCSNs(1, 0, 3);
@@ -359,19 +350,51 @@
}
finally
{
- if (replicaDB != null)
- {
- replicaDB.shutdown();
- }
+ shutdown(replicaDB);
remove(replicationServer);
}
}
+ private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy, final CSN expectedCSN)
+ throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
+ private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN,
+ final PositionStrategy positionStrategy) throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
+ try
+ {
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
/**
* Test the logic that manages counter records in the FileReplicaDB in order to
* optimize the oldest and newest records in the replication changelog db.
*/
- @Test(enabled=true, groups = { "opendj-256" })
+ @Test(groups = { "opendj-256" })
public void testGetOldestNewestCSNs() throws Exception
{
// It's worth testing with 2 different setting for counterRecord
@@ -410,7 +433,6 @@
testRoot = createCleanDir();
dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
- //replicaDB.setCounterRecordWindowSize(counterWindow);
// Populate the db with 'max' msg
int mySeqnum = 1;
@@ -434,7 +456,6 @@
replicaDB.shutdown();
replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
- //replicaDB.setCounterRecordWindowSize(counterWindow);
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -468,10 +489,7 @@
}
finally
{
- if (replicaDB != null)
- {
- replicaDB.shutdown();
- }
+ shutdown(replicaDB);
if (dbEnv != null)
{
dbEnv.shutdown();
@@ -481,7 +499,15 @@
}
}
- private CSN[] generateCSNs(int serverId, long timestamp, int number)
+ private void shutdown(FileReplicaDB replicaDB)
+ {
+ if (replicaDB != null)
+ {
+ replicaDB.shutdown();
+ }
+ }
+
+ static CSN[] generateCSNs(int serverId, long timestamp, int number)
{
CSNGenerator gen = new CSNGenerator(serverId, timestamp);
CSN[] csns = new CSN[number];
@@ -518,9 +544,8 @@
throws IOException, ConfigException
{
final int changelogPort = findFreePort();
- final ReplicationServerCfg conf =
- new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize,
- windowSize, null);
+ final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
+ changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null);
return new ReplicationServer(conf);
}
@@ -549,40 +574,34 @@
return;
}
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
- try
- {
- // Cursor points to a null record initially
- assertNull(cursor.getRecord());
-
- for (int i = 1; i < csns.length; i++)
- {
- final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
- assertTrue(cursor.next(), msg);
- assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
- }
- assertFalse(cursor.next());
- assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
- + ", Expected null");
- }
- finally
- {
- StaticUtils.close(cursor);
- }
+ assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+ assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
}
- private void assertNotFound(FileReplicaDB replicaDB, CSN csn) throws Exception
+ private void assertFoundInOrder(FileReplicaDB replicaDB,
+ final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
{
- DBCursor<UpdateMsg> cursor = null;
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
try
{
- cursor = replicaDB.generateCursorFrom(csn, AFTER_MATCHING_KEY);
- assertFalse(cursor.next());
- assertNull(cursor.getRecord());
+ assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+ for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+ {
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).as(msg).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+ softly.assertAll();
+ }
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
}
finally
{
- StaticUtils.close(cursor);
+ close(cursor);
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 4f6fd24..7fe033e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -107,7 +107,7 @@
DN baseDN2 = DN.decode("o=baseDN2");
DN baseDN3 = DN.decode("o=baseDN3");
- CSN[] csns = newCSNs(1, 0, 3);
+ CSN[] csns = generateCSNs(1, 0, 3);
// Add records
final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
@@ -205,7 +205,7 @@
DN baseDN2 = DN.decode("o=baseDN2");
DN baseDN3 = DN.decode("o=baseDN3");
- CSN[] csns = newCSNs(1, 0, 3);
+ CSN[] csns = generateCSNs(1, 0, 3);
// Add records
final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index 3033af5..725e77c 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -100,7 +100,46 @@
TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING);
}
- @Test(enabled=true)
+ @Test
+ public void testGenerateCursorFrom() throws Exception
+ {
+ ReplicationServer replicationServer = null;
+ JEReplicaDB replicaDB = null;
+ try
+ {
+ TestCaseUtils.startServer();
+ replicationServer = configureReplicationServer(100000, 10);
+ replicaDB = newReplicaDB(replicationServer);
+
+ final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
+ final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
+ csns2.remove(csns[3]);
+
+ for (CSN csn : csns2)
+ {
+ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
+ }
+
+ for (CSN csn : csns2)
+ {
+ assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
+ }
+ assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
+
+ for (int i = 0; i < csns2.size() - 1; i++)
+ {
+ assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
+ }
+ assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
+ }
+ finally
+ {
+ shutdown(replicaDB);
+ remove(replicationServer);
+ }
+ }
+
+ @Test
void testTrim() throws Exception
{
ReplicationServer replicationServer = null;
@@ -111,7 +150,7 @@
replicationServer = configureReplicationServer(100, 5000);
replicaDB = newReplicaDB(replicationServer);
- CSN[] csns = newCSNs(1, 0, 5);
+ CSN[] csns = generateCSNs(1, 0, 5);
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
@@ -162,89 +201,13 @@
}
}
- static CSN[] newCSNs(int serverId, long timestamp, int number)
- {
- CSNGenerator gen = new CSNGenerator(serverId, timestamp);
- CSN[] csns = new CSN[number];
- for (int i = 0; i < csns.length; i++)
- {
- csns[i] = gen.newCSN();
- }
- return csns;
- }
-
- private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
- throws IOException, ConfigException
- {
- final int changelogPort = findFreePort();
- final ReplicationServerCfg conf =
- new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null);
- return new ReplicationServer(conf);
- }
-
- private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
- {
- final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
- return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
- }
-
- private File createCleanDir() throws IOException
- {
- String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
- String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
- + File.separator + "build");
- path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
- final File testRoot = new File(path);
- TestCaseUtils.deleteDirectory(testRoot);
- testRoot.mkdirs();
- return testRoot;
- }
-
- private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
- {
- if (csns.length == 0)
- {
- return;
- }
-
- assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
- assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
- }
-
- private void assertFoundInOrder(JEReplicaDB replicaDB,
- final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
- {
- DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
- try
- {
- assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
-
- for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
- {
- final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).as(msg).isTrue();
- softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
- softly.assertAll();
- }
- final SoftAssertions softly = new SoftAssertions();
- softly.assertThat(cursor.next()).isFalse();
- softly.assertThat(cursor.getRecord()).isNull();
- softly.assertAll();
- }
- finally
- {
- close(cursor);
- }
- }
-
/**
* Test the feature of clearing a JEReplicaDB used by a replication server.
* The clear feature is used when a replication server receives a request to
* reset the generationId of a given domain.
*/
- @Test(enabled=true)
- void testClear() throws Exception
+ @Test
+ public void testClear() throws Exception
{
ReplicationServer replicationServer = null;
JEReplicaDB replicaDB = null;
@@ -254,7 +217,7 @@
replicationServer = configureReplicationServer(100, 5000);
replicaDB = newReplicaDB(replicationServer);
- CSN[] csns = newCSNs(1, 0, 3);
+ CSN[] csns = generateCSNs(1, 0, 3);
// Add the changes and check they are here
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
@@ -277,45 +240,6 @@
}
}
- @Test
- public void testGenerateCursorFrom() throws Exception
- {
- ReplicationServer replicationServer = null;
- JEReplicaDB replicaDB = null;
- try
- {
- TestCaseUtils.startServer();
- replicationServer = configureReplicationServer(100000, 10);
- replicaDB = newReplicaDB(replicationServer);
-
- final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
- final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
- csns2.remove(csns[3]);
-
- for (CSN csn : csns2)
- {
- replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
- }
-
- for (CSN csn : csns2)
- {
- assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
- }
- assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
-
- for (int i = 0; i < csns2.size() - 1; i++)
- {
- assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
- }
- assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
- }
- finally
- {
- shutdown(replicaDB);
- remove(replicationServer);
- }
- }
-
private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
final PositionStrategy positionStrategy, final CSN expectedCSN)
throws ChangelogException
@@ -355,7 +279,7 @@
* Test the logic that manages counter records in the JEReplicaDB in order to
* optimize the oldest and newest records in the replication changelog db.
*/
- @Test(enabled=true, groups = { "opendj-256" })
+ @Test(groups = { "opendj-256" })
void testGetOldestNewestCSNs() throws Exception
{
// It's worth testing with 2 different setting for counterRecord
@@ -420,7 +344,7 @@
assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
// Populate the db with 'max' msg
- for (int i=max+1; i<=(2*max); i++)
+ for (int i=max+1; i<=2 * max; i++)
{
csns[i] = new CSN(now + i, mySeqnum, 1);
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
@@ -430,7 +354,6 @@
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
- //
replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
String testcase = "AFTER PURGE (oldest, newest)=";
@@ -466,4 +389,80 @@
}
}
+ static CSN[] generateCSNs(int serverId, long timestamp, int number)
+ {
+ CSNGenerator gen = new CSNGenerator(serverId, timestamp);
+ CSN[] csns = new CSN[number];
+ for (int i = 0; i < csns.length; i++)
+ {
+ csns[i] = gen.newCSN();
+ }
+ return csns;
+ }
+
+ private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
+ throws IOException, ConfigException
+ {
+ final int changelogPort = findFreePort();
+ final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
+ changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null);
+ return new ReplicationServer(conf);
+ }
+
+ private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
+ {
+ final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
+ return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
+ }
+
+ private File createCleanDir() throws IOException
+ {
+ String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
+ String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
+ + File.separator + "build");
+ path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
+ final File testRoot = new File(path);
+ TestCaseUtils.deleteDirectory(testRoot);
+ testRoot.mkdirs();
+ return testRoot;
+ }
+
+ private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
+ {
+ if (csns.length == 0)
+ {
+ return;
+ }
+
+ assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
+ assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
+ }
+
+ private void assertFoundInOrder(JEReplicaDB replicaDB,
+ final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
+ {
+ DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
+ try
+ {
+ assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
+
+ for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
+ {
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).as(msg).isTrue();
+ softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
+ softly.assertAll();
+ }
+ final SoftAssertions softly = new SoftAssertions();
+ softly.assertThat(cursor.next()).isFalse();
+ softly.assertThat(cursor.getRecord()).isNull();
+ softly.assertAll();
+ }
+ finally
+ {
+ close(cursor);
+ }
+ }
+
}
--
Gitblit v1.10.0