mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.26.2014 aae049d2b08a07020cb12139b56dbd8f25c01500
Aligned (JE|File)ReplicaDB and their tests for easier comparison with each other.

Ported to FileReplicaDBTest additional checks for PositionStrategy.ON_MATCHING_KEY. These are the same that were previously added to JEReplicaDBTest.
6 files modified
546 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 78 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java 17 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java 185 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 4 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 245 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDB.java
@@ -97,15 +97,10 @@
   * @NonNull
   */
  private volatile CSNLimits csnLimits;
  private final int serverId;
  private final DN baseDN;
  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final ReplicationServer replicationServer;
  private final ReplicationEnvironment replicationEnv;
  /**
@@ -171,7 +166,9 @@
          ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg
              .toString(), String.valueOf(baseDN), String.valueOf(serverId)));
    }
    log.append(Record.from(updateMsg.getCSN(), updateMsg));
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
@@ -205,8 +202,8 @@
  /**
   * Returns a cursor that allows to retrieve the update messages from this DB.
   * The starting position is defined by the provided CSN and cursor
   * positioning strategy.
   * The starting position is defined by the provided CSN and cursor positioning
   * strategy.
   *
   * @param startCSN
   *          The position where the cursor must start. If null, start from the
@@ -214,7 +211,7 @@
   * @param positionStrategy
   *          Cursor position strategy, which allow to choose if cursor must
   *          start from the provided CSN or just after the provided CSN.
   * @return a new {@link DBCursor} to retreive update messages.
   * @return a new {@link DBCursor} to retrieve update messages.
   * @throws ChangelogException
   *           if a database problem happened
   */
@@ -315,8 +312,8 @@
  public String toString()
  {
    final CSNLimits limits = csnLimits;
    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " + limits.oldestCSN + " "
        + limits.newestCSN;
    return getClass().getSimpleName() + " " + baseDN + " " + serverId + " "
        + limits.oldestCSN + " " + limits.newestCSN;
  }
  /**
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -51,18 +51,17 @@
import static org.opends.messages.ReplicationMessages.*;
/**
 * This class is used for managing the replicationServer database for each
 * server in the topology.
 * Represents a replication server database for one server in the topology.
 * <p>
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * <p>
 * This class is also able to generate a {@link DBCursor} that can be used to
 * It is also able to generate a {@link DBCursor} that can be used to
 * read all changes from a given {@link CSN}.
 * <p>
 * This class publish some monitoring information below cn=monitor.
 * It publishes some monitoring information below cn=monitor.
 */
public class JEReplicaDB
class JEReplicaDB
{
  /**
@@ -81,41 +80,44 @@
      this.oldestCSN = oldestCSN;
      this.newestCSN = newestCSN;
    }
  }
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private ReplicationDB db;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
   *
   * @NonNull
   */
  private volatile CSNLimits csnLimits;
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private ReplicationServer replicationServer;
  private final int serverId;
  private final DN baseDN;
  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final ReplicationServer replicationServer;
  private final ReplicationDB db;
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
   * @param serverId The serverId for which changes will be stored in the DB.
   * @param baseDN the baseDN for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this ReplicaDB.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   * @param serverId
   *          Id of this server.
   * @param baseDN
   *          the replication domain baseDN.
   * @param replicationServer
   *          The ReplicationServer that creates this ReplicaDB.
   * @param replicationEnv
   *          the Database Env to use to create the ReplicationServer DB. server
   *          for this domain.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public JEReplicaDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      throws ChangelogException
  JEReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
      final ReplicationDbEnv replicationEnv) throws ChangelogException
  {
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    this.replicationServer = replicationServer;
    this.db = new ReplicationDB(serverId, baseDN, replicationServer, replicationEnv);
    this.csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -144,8 +146,7 @@
    db.addEntry(updateMsg);
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
        || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
    if (updateOld || updateNew)
    {
@@ -160,7 +161,7 @@
   *
   * @return the oldest CSN that has not been purged yet.
   */
  public CSN getOldestCSN()
  CSN getOldestCSN()
  {
    return csnLimits.oldestCSN;
  }
@@ -170,7 +171,7 @@
   *
   * @return the newest CSN that has not been purged yet.
   */
  public CSN getNewestCSN()
  CSN getNewestCSN()
  {
    return csnLimits.newestCSN;
  }
@@ -189,7 +190,7 @@
   * @throws ChangelogException
   *           if a database problem happened
   */
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
      throws ChangelogException
  {
    return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
@@ -198,7 +199,7 @@
  /**
   * Shutdown this ReplicaDB.
   */
  public void shutdown()
  void shutdown()
  {
    if (shutdown.compareAndSet(false, true))
    {
@@ -212,7 +213,7 @@
   *
   * @param purgeCSN
   *          The CSN up to which changes can be purged. No purging happens when
   *          it is null.
   *          it is {@code null}.
   * @throws ChangelogException
   *           In case of database problem.
   */
@@ -280,8 +281,7 @@
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * ReplicaDB.
   * Implements monitoring capabilities of the ReplicaDB.
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
@@ -289,7 +289,7 @@
    @Override
    public List<Attribute> getMonitorData()
    {
      List<Attribute> attributes = new ArrayList<Attribute>();
      final List<Attribute> attributes = new ArrayList<Attribute>();
      create(attributes, "replicationServer-database",String.valueOf(serverId));
      create(attributes, "domain-name", baseDN.toNormalizedString());
      final CSNLimits limits = csnLimits;
@@ -304,12 +304,12 @@
      return attributes;
    }
    private void create(List<Attribute> attributes, String name, String value)
    private void create(final List<Attribute> attributes, final String name, final String value)
    {
      attributes.add(Attributes.create(name, value));
    }
    private String encode(CSN csn)
    private String encode(final CSN csn)
    {
      return csn + " " + new Date(csn.getTime());
    }
@@ -318,10 +318,8 @@
    @Override
    public String getMonitorInstanceName()
    {
      ReplicationServerDomain domain = replicationServer
          .getReplicationServerDomain(baseDN);
      return "Changelog for DS(" + serverId + "),cn="
          + domain.getMonitorInstanceName();
      ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
      return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
    }
    /** {@inheritDoc} */
@@ -347,7 +345,7 @@
   * @throws ChangelogException When an exception occurs while removing the
   * changes from the DB.
   */
  public void clear() throws ChangelogException
  void clear() throws ChangelogException
  {
    db.clear();
    csnLimits = new CSNLimits(null, null);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileChangeNumberIndexDBTest.java
@@ -32,7 +32,6 @@
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.CSNGenerator;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
@@ -48,6 +47,7 @@
import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.replication.server.changelog.file.FileReplicaDBTest.*;
import static org.testng.Assert.*;
@SuppressWarnings("javadoc")
@@ -233,17 +233,6 @@
    }
  }
  private CSN[] generateCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
    for (int i = 0; i < csns.length; i++)
    {
      csns[i] = gen.newCSN();
    }
    return csns;
  }
  private long[] addThreeRecords(FileChangeNumberIndexDB cnIndexDB) throws Exception
  {
    // Prepare data to be stored in the db
@@ -334,10 +323,10 @@
  {
    try
    {
      for (int i = 0; i < cns.length; i++)
      for (long cn : cns)
      {
        assertTrue(cursor.next());
        assertEquals(cursor.getRecord().getChangeNumber(), cns[i]);
        assertEquals(cursor.getRecord().getChangeNumber(), cn);
      }
      assertFalse(cursor.next());
    }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/file/FileReplicaDBTest.java
@@ -27,7 +27,10 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import org.assertj.core.api.SoftAssertions;
import org.opends.server.TestCaseUtils;
import org.opends.server.admin.std.meta.ReplicationServerCfgDefn.ReplicationDBImplementation;
import org.opends.server.admin.std.server.ReplicationServerCfg;
@@ -40,20 +43,21 @@
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy;
import org.opends.server.types.ByteString;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import static org.assertj.core.api.Assertions.*;
import static org.opends.server.TestCaseUtils.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
import static org.opends.server.replication.server.changelog.api.DBCursor.PositionStrategy.*;
import static org.opends.server.util.StaticUtils.*;
import static org.testng.Assert.*;
/**
 * Test the FileReplicaDB class
@@ -124,9 +128,7 @@
    }
    finally
    {
      if (replicaDB != null) {
        replicaDB.shutdown();
      }
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -150,7 +152,7 @@
      waitChangesArePersisted(replicaDB, 3);
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
      assertNotFound(replicaDB, csns[4]);
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
      assertEquals(replicaDB.getOldestCSN(), csns[0]);
      assertEquals(replicaDB.getNewestCSN(), csns[2]);
@@ -162,13 +164,11 @@
      assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[2], csns[3]);
      assertFoundInOrder(replicaDB, csns[3]);
      assertNotFound(replicaDB, csns[4]);
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      if (replicaDB != null) {
        replicaDB.shutdown();
      }
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -177,7 +177,6 @@
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    DBCursor<UpdateMsg> cursor = null;
    FileReplicaDB replicaDB = null;
    try
    {
@@ -185,36 +184,31 @@
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
      for (int i = 0; i < 5; i++)
      final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        if (i != 3)
        {
          replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
        }
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      waitChangesArePersisted(replicaDB, 4);
      cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[1]);
      StaticUtils.close(cursor);
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      cursor = replicaDB.generateCursorFrom(csns[3], AFTER_MATCHING_KEY);
      assertTrue(cursor.next());
      assertEquals(cursor.getRecord().getCSN(), csns[4]);
      StaticUtils.close(cursor);
      cursor = replicaDB.generateCursorFrom(csns[4], AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      assertNull(cursor.getRecord());
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      StaticUtils.close(cursor);
      if (replicaDB != null) {
        replicaDB.shutdown();
      }
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -242,9 +236,9 @@
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 6);
      CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], PositionStrategy.AFTER_MATCHING_KEY);
      cursor = replicaDB.generateCursorFrom(csns[csnIndexForStartKey], AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      int[] indicesToAdd = new int[] { 0, 1, 2, 4 };
@@ -266,11 +260,8 @@
    }
    finally
    {
      StaticUtils.close(cursor);
      if (replicaDB != null)
      {
        replicaDB.shutdown();
      }
      close(cursor);
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -283,12 +274,12 @@
  public void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
    FileReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      final FileReplicaDB replicaDB = newReplicaDB(replicationServer);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, 0, 5);
@@ -320,6 +311,7 @@
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
@@ -338,7 +330,6 @@
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = generateCSNs(1, 0, 3);
@@ -359,19 +350,51 @@
    }
    finally
    {
      if (replicaDB != null)
      {
        replicaDB.shutdown();
      }
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  private void assertNextCSN(FileReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy, final CSN expectedCSN)
      throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isTrue();
      softly.assertThat(cursor.getRecord().getCSN()).isEqualTo(expectedCSN);
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  private void assertNotFound(FileReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(startCSN, positionStrategy);
    try
    {
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  /**
   * Test the logic that manages counter records in the FileReplicaDB in order to
   * optimize the oldest and newest records in the replication changelog db.
   */
  @Test(enabled=true, groups = { "opendj-256" })
  @Test(groups = { "opendj-256" })
  public void testGetOldestNewestCSNs() throws Exception
  {
    // It's worth testing with 2 different setting for counterRecord
@@ -410,7 +433,6 @@
      testRoot = createCleanDir();
      dbEnv = new ReplicationEnvironment(testRoot.getPath(), replicationServer);
      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      //replicaDB.setCounterRecordWindowSize(counterWindow);
      // Populate the db with 'max' msg
      int mySeqnum = 1;
@@ -434,7 +456,6 @@
      replicaDB.shutdown();
      replicaDB = new FileReplicaDB(1, TEST_ROOT_DN, replicationServer, dbEnv);
      //replicaDB.setCounterRecordWindowSize(counterWindow);
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -468,10 +489,7 @@
    }
    finally
    {
      if (replicaDB != null)
      {
        replicaDB.shutdown();
      }
      shutdown(replicaDB);
      if (dbEnv != null)
      {
        dbEnv.shutdown();
@@ -481,7 +499,15 @@
    }
  }
  private CSN[] generateCSNs(int serverId, long timestamp, int number)
  private void shutdown(FileReplicaDB replicaDB)
  {
    if (replicaDB != null)
    {
      replicaDB.shutdown();
    }
  }
  static CSN[] generateCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
@@ -518,9 +544,8 @@
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf =
        new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize,
            windowSize, null);
    final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
        changelogPort, null, ReplicationDBImplementation.LOG, 0, 2, queueSize, windowSize, null);
    return new ReplicationServer(conf);
  }
@@ -549,40 +574,34 @@
      return;
    }
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], AFTER_MATCHING_KEY);
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertFoundInOrder(FileReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    try
    {
      // Cursor points to a null record initially
      assertNull(cursor.getRecord());
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = 1; i < csns.length; i++)
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        assertTrue(cursor.next(), msg);
        assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      assertFalse(cursor.next());
      assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
          + ", Expected null");
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      StaticUtils.close(cursor);
    }
  }
  private void assertNotFound(FileReplicaDB replicaDB, CSN csn) throws Exception
  {
    DBCursor<UpdateMsg> cursor = null;
    try
    {
      cursor = replicaDB.generateCursorFrom(csn, AFTER_MATCHING_KEY);
      assertFalse(cursor.next());
      assertNull(cursor.getRecord());
    }
    finally
    {
      StaticUtils.close(cursor);
      close(cursor);
    }
  }
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -107,7 +107,7 @@
      DN baseDN2 = DN.decode("o=baseDN2");
      DN baseDN3 = DN.decode("o=baseDN3");
      CSN[] csns = newCSNs(1, 0, 3);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add records
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
@@ -205,7 +205,7 @@
      DN baseDN2 = DN.decode("o=baseDN2");
      DN baseDN3 = DN.decode("o=baseDN3");
      CSN[] csns = newCSNs(1, 0, 3);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add records
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -100,7 +100,46 @@
    TEST_ROOT_DN = DN.decode(TEST_ROOT_DN_STRING);
  }
  @Test(enabled=true)
  @Test
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  @Test
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
@@ -111,7 +150,7 @@
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 5);
      CSN[] csns = generateCSNs(1, 0, 5);
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
@@ -162,89 +201,13 @@
    }
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
    for (int i = 0; i < csns.length; i++)
    {
      csns[i] = gen.newCSN();
    }
    return csns;
  }
  private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf =
        new ReplServerFakeConfiguration(changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null);
    return new ReplicationServer(conf);
  }
  private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
  {
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
  }
  private File createCleanDir() throws IOException
  {
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
    return testRoot;
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
  {
    if (csns.length == 0)
    {
      return;
    }
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    try
    {
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  /**
   * Test the feature of clearing a JEReplicaDB used by a replication server.
   * The clear feature is used when a replication server receives a request to
   * reset the generationId of a given domain.
   */
  @Test(enabled=true)
  void testClear() throws Exception
  @Test
  public void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
@@ -254,7 +217,7 @@
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 3);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add the changes and check they are here
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
@@ -277,45 +240,6 @@
    }
  }
  @Test
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy, final CSN expectedCSN)
      throws ChangelogException
@@ -355,7 +279,7 @@
   * Test the logic that manages counter records in the JEReplicaDB in order to
   * optimize the oldest and newest records in the replication changelog db.
   */
  @Test(enabled=true, groups = { "opendj-256" })
  @Test(groups = { "opendj-256" })
  void testGetOldestNewestCSNs() throws Exception
  {
    // It's worth testing with 2 different setting for counterRecord
@@ -420,7 +344,7 @@
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
      // Populate the db with 'max' msg
      for (int i=max+1; i<=(2*max); i++)
      for (int i=max+1; i<=2 * max; i++)
      {
        csns[i] = new CSN(now + i, mySeqnum, 1);
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
@@ -430,7 +354,6 @@
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      //
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      String testcase = "AFTER PURGE (oldest, newest)=";
@@ -466,4 +389,80 @@
    }
  }
  static CSN[] generateCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
    for (int i = 0; i < csns.length; i++)
    {
      csns[i] = gen.newCSN();
    }
    return csns;
  }
  private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
        changelogPort, null, ReplicationDBImplementation.JE, 0, 2, queueSize, windowSize, null);
    return new ReplicationServer(conf);
  }
  private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
  {
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
  }
  private File createCleanDir() throws IOException
  {
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
    return testRoot;
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
  {
    if (csns.length == 0)
    {
      return;
    }
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    try
    {
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
}