Aligned (JE|File)ReplicaDB and their tests for easier comparison with each other.
Ported to FileReplicaDBTest additional checks for PositionStrategy.ON_MATCHING_KEY. These are the same that were previously added to JEReplicaDBTest.
| | |
| | | * @NonNull |
| | | */ |
| | | private volatile CSNLimits csnLimits; |
| | | |
| | | private final int serverId; |
| | | |
| | | private final DN baseDN; |
| | | |
| | | private final DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | |
| | | private final ReplicationServer replicationServer; |
| | | |
| | | private final ReplicationEnvironment replicationEnv; |
| | | |
| | | /** |
| | |
| | | ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg |
| | | .toString(), String.valueOf(baseDN), String.valueOf(serverId))); |
| | | } |
| | | |
| | | log.append(Record.from(updateMsg.getCSN(), updateMsg)); |
| | | |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | |
| | | |
| | | /** |
| | | * Returns a cursor that allows to retrieve the update messages from this DB. |
| | | * The starting position is defined by the provided CSN and cursor |
| | | * positioning strategy. |
| | | * The starting position is defined by the provided CSN and cursor positioning |
| | | * strategy. |
| | | * |
| | | * @param startCSN |
| | | * The position where the cursor must start. If null, start from the |
| | |
| | | * @param positionStrategy |
| | | * Cursor position strategy, which allow to choose if cursor must |
| | | * start from the provided CSN or just after the provided CSN. |
| | | * @return a new {@link DBCursor} to retreive update messages. |
| | | * @return a new {@link DBCursor} to retrieve update messages. |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | |
| | | public String toString() |
| | | { |
| | | final CSNLimits limits = csnLimits; |
| | | return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " + limits.oldestCSN + " " |
| | | + limits.newestCSN; |
| | | return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " |
| | | + limits.oldestCSN + " " + limits.newestCSN; |
| | | } |
| | | |
| | | /** |
| | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | | * server in the topology. |
| | | * Represents a replication server database for one server in the topology. |
| | | * <p> |
| | | * It is responsible for efficiently saving the updates that is received from |
| | | * each master server into stable storage. |
| | | * <p> |
| | | * This class is also able to generate a {@link DBCursor} that can be used to |
| | | * It is also able to generate a {@link DBCursor} that can be used to |
| | | * read all changes from a given {@link CSN}. |
| | | * <p> |
| | | * This class publish some monitoring information below cn=monitor. |
| | | * It publishes some monitoring information below cn=monitor. |
| | | */ |
| | | public class JEReplicaDB |
| | | class JEReplicaDB |
| | | { |
| | | |
| | | /** |
| | |
| | | this.oldestCSN = oldestCSN; |
| | | this.newestCSN = newestCSN; |
| | | } |
| | | |
| | | } |
| | | |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | private ReplicationDB db; |
| | | /** |
| | | * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. |
| | | * |
| | | * @NonNull |
| | | */ |
| | | private volatile CSNLimits csnLimits; |
| | | private int serverId; |
| | | private DN baseDN; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private ReplicationServer replicationServer; |
| | | private final int serverId; |
| | | private final DN baseDN; |
| | | private final DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private final ReplicationServer replicationServer; |
| | | private final ReplicationDB db; |
| | | |
| | | /** |
| | | * Creates a new ReplicaDB associated to a given LDAP server. |
| | | * |
| | | * @param serverId The serverId for which changes will be stored in the DB. |
| | | * @param baseDN the baseDN for which this DB was created. |
| | | * @param replicationServer The ReplicationServer that creates this ReplicaDB. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @throws ChangelogException If a database problem happened |
| | | * @param serverId |
| | | * Id of this server. |
| | | * @param baseDN |
| | | * the replication domain baseDN. |
| | | * @param replicationServer |
| | | * The ReplicationServer that creates this ReplicaDB. |
| | | * @param replicationEnv |
| | | * the Database Env to use to create the ReplicationServer DB. server |
| | | * for this domain. |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public JEReplicaDB(int serverId, DN baseDN, |
| | | ReplicationServer replicationServer, ReplicationDbEnv dbenv) |
| | | throws ChangelogException |
| | | JEReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer, |
| | | final ReplicationDbEnv replicationEnv) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); |
| | | csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | this.replicationServer = replicationServer; |
| | | this.db = new ReplicationDB(serverId, baseDN, replicationServer, replicationEnv); |
| | | this.csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | |
| | | db.addEntry(updateMsg); |
| | | |
| | | final CSNLimits limits = csnLimits; |
| | | final boolean updateNew = limits.newestCSN == null |
| | | || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); |
| | | final boolean updateOld = limits.oldestCSN == null; |
| | | if (updateOld || updateNew) |
| | | { |
| | |
| | | * |
| | | * @return the oldest CSN that has not been purged yet. |
| | | */ |
| | | public CSN getOldestCSN() |
| | | CSN getOldestCSN() |
| | | { |
| | | return csnLimits.oldestCSN; |
| | | } |
| | |
| | | * |
| | | * @return the newest CSN that has not been purged yet. |
| | | */ |
| | | public CSN getNewestCSN() |
| | | CSN getNewestCSN() |
| | | { |
| | | return csnLimits.newestCSN; |
| | | } |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem happened |
| | | */ |
| | | public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy) |
| | | DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy) |
| | | throws ChangelogException |
| | | { |
| | | return new JEReplicaDBCursor(db, startCSN, positionStrategy, this); |
| | |
| | | /** |
| | | * Shutdown this ReplicaDB. |
| | | */ |
| | | public void shutdown() |
| | | void shutdown() |
| | | { |
| | | if (shutdown.compareAndSet(false, true)) |
| | | { |
| | |
| | | * |
| | | * @param purgeCSN |
| | | * The CSN up to which changes can be purged. No purging happens when |
| | | * it is null. |
| | | * it is {@code null}. |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | |
| | | } |
| | | |
| | | /** |
| | | * This internal class is used to implement the Monitoring capabilities of the |
| | | * ReplicaDB. |
| | | * Implements monitoring capabilities of the ReplicaDB. |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | | List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | final List<Attribute> attributes = new ArrayList<Attribute>(); |
| | | create(attributes, "replicationServer-database",String.valueOf(serverId)); |
| | | create(attributes, "domain-name", baseDN.toNormalizedString()); |
| | | final CSNLimits limits = csnLimits; |
| | |
| | | return attributes; |
| | | } |
| | | |
| | | private void create(List<Attribute> attributes, String name, String value) |
| | | private void create(final List<Attribute> attributes, final String name, final String value) |
| | | { |
| | | attributes.add(Attributes.create(name, value)); |
| | | } |
| | | |
| | | private String encode(CSN csn) |
| | | private String encode(final CSN csn) |
| | | { |
| | | return csn + " " + new Date(csn.getTime()); |
| | | } |
| | |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | ReplicationServerDomain domain = replicationServer |
| | | .getReplicationServerDomain(baseDN); |
| | | return "Changelog for DS(" + serverId + "),cn=" |
| | | + domain.getMonitorInstanceName(); |
| | | ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN); |
| | | return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | * @throws ChangelogException When an exception occurs while removing the |
| | | * changes from the DB. |
| | | */ |
| | | public void clear() throws ChangelogException |
| | | void clear() throws ChangelogException |
| | | { |
| | | db.clear(); |
| | | csnLimits = new CSNLimits(null, null); |
| | |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.CSNGenerator; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | @SuppressWarnings("javadoc") |
| | |
| | | } |
| | | } |
| | | |
| | | private CSN[] generateCSNs(int serverId, long timestamp, int number) |
| | | { |
| | | CSNGenerator gen = new CSNGenerator(serverId, timestamp); |
| | | CSN[] csns = new CSN[number]; |
| | | for (int i = 0; i < csns.length; i++) |
| | | { |
| | | csns[i] = gen.newCSN(); |
| | | } |
| | | return csns; |
| | | } |
| | | |
| | | private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception |
| | | { |
| | | // Prepare data to be stored in the db |
| | |
| | | { |
| | | try |
| | | { |
| | | for (int i = 0; i < cns.length; i++) |
| | | for (long cn : cns) |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getChangeNumber(), cns[i]); |
| | | assertEquals(cursor.getRecord().getChangeNumber(), cn); |
| | | } |
| | | assertFalse(cursor.next()); |
| | | } |
| | |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | |
| | | import org.assertj.core.api.SoftAssertions; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy; |
| | | import org.opends.server.types.ByteString; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.DataProvider; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.testng.Assert.*; |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.server.TestCaseUtils.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.testng.Assert.*; |
| | | |
| | | /** |
| | | * Test the FileReplicaDB class |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (replicaDB != null) { |
| | | replicaDB.shutdown(); |
| | | } |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | waitChangesArePersisted(replicaDB, 3); |
| | | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[0]); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2]); |
| | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]); |
| | | assertFoundInOrder(replicaDB, csns[2], csns[3]); |
| | | assertFoundInOrder(replicaDB, csns[3]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | } |
| | | finally |
| | | { |
| | | if (replicaDB != null) { |
| | | replicaDB.shutdown(); |
| | | } |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | public void testGenerateCursorFrom() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6); |
| | | for (int i = 0; i < 5; i++) |
| | | final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns)); |
| | | csns2.remove(csns[3]); |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | if (i != 3) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | } |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY); |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[1]); |
| | | StaticUtils.close(cursor); |
| | | for (CSN csn : csns2) |
| | | { |
| | | assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn); |
| | | } |
| | | assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[3], AFTER_MATCHING_KEY); |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[4]); |
| | | StaticUtils.close(cursor); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[4], AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getRecord()); |
| | | for (int i = 0; i < csns2.size() - 1; i++) |
| | | { |
| | | assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1)); |
| | | } |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | if (replicaDB != null) { |
| | | replicaDB.shutdown(); |
| | | } |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6); |
| | | CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], PositionStrategy.AFTER_MATCHING_KEY); |
| | | cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | |
| | | int[] indicesToAdd = new int[] { 0, 1, 2, 4 }; |
| | |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | if (replicaDB != null) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | close(cursor); |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | public void testPurge() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | FileReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | |
| | | final FileReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 5); |
| | | |
| | |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (replicaDB != null) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy, final CSN expectedCSN) |
| | | throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy); |
| | | try |
| | | { |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test the logic that manages counter records in the FileReplicaDB in order to |
| | | * optimize the oldest and newest records in the replication changelog db. |
| | | */ |
| | | @Test(enabled=true, groups = { "opendj-256" }) |
| | | @Test(groups = { "opendj-256" }) |
| | | public void testGetOldestNewestCSNs() throws Exception |
| | | { |
| | | // It's worth testing with 2 different setting for counterRecord |
| | |
| | | testRoot = createCleanDir(); |
| | | dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer); |
| | | replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv); |
| | | //replicaDB.setCounterRecordWindowSize(counterWindow); |
| | | |
| | | // Populate the db with 'max' msg |
| | | int mySeqnum = 1; |
| | |
| | | replicaDB.shutdown(); |
| | | |
| | | replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv); |
| | | //replicaDB.setCounterRecordWindowSize(counterWindow); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | |
| | | } |
| | | finally |
| | | { |
| | | if (replicaDB != null) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | shutdown(replicaDB); |
| | | if (dbEnv != null) |
| | | { |
| | | dbEnv.shutdown(); |
| | |
| | | } |
| | | } |
| | | |
| | | private CSN[] generateCSNs(int serverId, long timestamp, int number) |
| | | private void shutdown(FileReplicaDB replicaDB) |
| | | { |
| | | if (replicaDB != null) |
| | | { |
| | | replicaDB.shutdown(); |
| | | } |
| | | } |
| | | |
| | | static CSN[] generateCSNs(int serverId, long timestamp, int number) |
| | | { |
| | | CSNGenerator gen = new CSNGenerator(serverId, timestamp); |
| | | CSN[] csns = new CSN[number]; |
| | |
| | | throws IOException, ConfigException |
| | | { |
| | | final int changelogPort = findFreePort(); |
| | | final ReplicationServerCfg conf = |
| | | new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, |
| | | windowSize, null); |
| | | final ReplicationServerCfg conf = new ReplServerFakeConfiguration( |
| | | changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null); |
| | | return new ReplicationServer(conf); |
| | | } |
| | | |
| | |
| | | return; |
| | | } |
| | | |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY); |
| | | assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns); |
| | | assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns); |
| | | } |
| | | |
| | | private void assertFoundInOrder(FileReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy); |
| | | try |
| | | { |
| | | // Cursor points to a null record initially |
| | | assertNull(cursor.getRecord()); |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | | |
| | | for (int i = 1; i < csns.length; i++) |
| | | for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++) |
| | | { |
| | | final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); |
| | | assertTrue(cursor.next(), msg); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[i], msg); |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).as(msg).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]); |
| | | softly.assertAll(); |
| | | } |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord() |
| | | + ", Expected null"); |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | } |
| | | } |
| | | |
| | | private void assertNotFound(FileReplicaDB replicaDB, CSN csn) throws Exception |
| | | { |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | try |
| | | { |
| | | cursor = replicaDB.generateCursorFrom(csn, AFTER_MATCHING_KEY); |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getRecord()); |
| | | } |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | |
| | | DN baseDN2 = DN.decode("o=baseDN2"); |
| | | DN baseDN3 = DN.decode("o=baseDN3"); |
| | | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | |
| | | DN baseDN2 = DN.decode("o=baseDN2"); |
| | | DN baseDN3 = DN.decode("o=baseDN3"); |
| | | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | |
| | | TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING); |
| | | } |
| | | |
| | | @Test(enabled=true) |
| | | @Test |
| | | public void testGenerateCursorFrom() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5); |
| | | final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns)); |
| | | csns2.remove(csns[3]); |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn); |
| | | } |
| | | assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]); |
| | | |
| | | for (int i = 0; i < csns2.size() - 1; i++) |
| | | { |
| | | assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1)); |
| | | } |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | void testTrim() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, 0, 5); |
| | | CSN[] csns = generateCSNs(1, 0, 5); |
| | | |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid")); |
| | |
| | | } |
| | | } |
| | | |
| | | static CSN[] newCSNs(int serverId, long timestamp, int number) |
| | | { |
| | | CSNGenerator gen = new CSNGenerator(serverId, timestamp); |
| | | CSN[] csns = new CSN[number]; |
| | | for (int i = 0; i < csns.length; i++) |
| | | { |
| | | csns[i] = gen.newCSN(); |
| | | } |
| | | return csns; |
| | | } |
| | | |
| | | private ReplicationServer configureReplicationServer(int windowSize, int queueSize) |
| | | throws IOException, ConfigException |
| | | { |
| | | final int changelogPort = findFreePort(); |
| | | final ReplicationServerCfg conf = |
| | | new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null); |
| | | return new ReplicationServer(conf); |
| | | } |
| | | |
| | | private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception |
| | | { |
| | | final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB(); |
| | | return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst(); |
| | | } |
| | | |
| | | private File createCleanDir() throws IOException |
| | | { |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot |
| | | + File.separator + "build"); |
| | | path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB"; |
| | | final File testRoot = new File(path); |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | testRoot.mkdirs(); |
| | | return testRoot; |
| | | } |
| | | |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception |
| | | { |
| | | if (csns.length == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns); |
| | | assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns); |
| | | } |
| | | |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy); |
| | | try |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | | |
| | | for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++) |
| | | { |
| | | final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).as(msg).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]); |
| | | softly.assertAll(); |
| | | } |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Test the feature of clearing a JEReplicaDB used by a replication server. |
| | | * The clear feature is used when a replication server receives a request to |
| | | * reset the generationId of a given domain. |
| | | */ |
| | | @Test(enabled=true) |
| | | void testClear() throws Exception |
| | | @Test |
| | | public void testClear() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEReplicaDB replicaDB = null; |
| | |
| | | replicationServer = configureReplicationServer(100, 5000); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | CSN[] csns = generateCSNs(1, 0, 3); |
| | | |
| | | // Add the changes and check they are here |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid")); |
| | |
| | | } |
| | | } |
| | | |
| | | @Test |
| | | public void testGenerateCursorFrom() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5); |
| | | final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns)); |
| | | csns2.remove(csns[3]); |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid")); |
| | | } |
| | | |
| | | for (CSN csn : csns2) |
| | | { |
| | | assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn); |
| | | } |
| | | assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]); |
| | | |
| | | for (int i = 0; i < csns2.size() - 1; i++) |
| | | { |
| | | assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1)); |
| | | } |
| | | assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY); |
| | | } |
| | | finally |
| | | { |
| | | shutdown(replicaDB); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | | |
| | | private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN, |
| | | final PositionStrategy positionStrategy, final CSN expectedCSN) |
| | | throws ChangelogException |
| | |
| | | * Test the logic that manages counter records in the JEReplicaDB in order to |
| | | * optimize the oldest and newest records in the replication changelog db. |
| | | */ |
| | | @Test(enabled=true, groups = { "opendj-256" }) |
| | | @Test(groups = { "opendj-256" }) |
| | | void testGetOldestNewestCSNs() throws Exception |
| | | { |
| | | // It's worth testing with 2 different setting for counterRecord |
| | |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | | |
| | | // Populate the db with 'max' msg |
| | | for (int i=max+1; i<=(2*max); i++) |
| | | for (int i=max+1; i<=2 * max; i++) |
| | | { |
| | | csns[i] = new CSN(now + i, mySeqnum, 1); |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | |
| | | // |
| | | replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); |
| | | |
| | | String testcase = "AFTER PURGE (oldest, newest)="; |
| | |
| | | } |
| | | } |
| | | |
| | | static CSN[] generateCSNs(int serverId, long timestamp, int number) |
| | | { |
| | | CSNGenerator gen = new CSNGenerator(serverId, timestamp); |
| | | CSN[] csns = new CSN[number]; |
| | | for (int i = 0; i < csns.length; i++) |
| | | { |
| | | csns[i] = gen.newCSN(); |
| | | } |
| | | return csns; |
| | | } |
| | | |
| | | private ReplicationServer configureReplicationServer(int windowSize, int queueSize) |
| | | throws IOException, ConfigException |
| | | { |
| | | final int changelogPort = findFreePort(); |
| | | final ReplicationServerCfg conf = new ReplServerFakeConfiguration( |
| | | changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null); |
| | | return new ReplicationServer(conf); |
| | | } |
| | | |
| | | private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception |
| | | { |
| | | final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB(); |
| | | return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst(); |
| | | } |
| | | |
| | | private File createCleanDir() throws IOException |
| | | { |
| | | String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT); |
| | | String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot |
| | | + File.separator + "build"); |
| | | path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB"; |
| | | final File testRoot = new File(path); |
| | | TestCaseUtils.deleteDirectory(testRoot); |
| | | testRoot.mkdirs(); |
| | | return testRoot; |
| | | } |
| | | |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception |
| | | { |
| | | if (csns.length == 0) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns); |
| | | assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns); |
| | | } |
| | | |
| | | private void assertFoundInOrder(JEReplicaDB replicaDB, |
| | | final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException |
| | | { |
| | | DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy); |
| | | try |
| | | { |
| | | assertNull(cursor.getRecord(), "Cursor should point to a null record initially"); |
| | | |
| | | for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++) |
| | | { |
| | | final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).as(msg).isTrue(); |
| | | softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]); |
| | | softly.assertAll(); |
| | | } |
| | | final SoftAssertions softly = new SoftAssertions(); |
| | | softly.assertThat(cursor.next()).isFalse(); |
| | | softly.assertThat(cursor.getRecord()).isNull(); |
| | | softly.assertAll(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | } |
| | | |
| | | } |