mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
20.16.2014 8a26dd959e71ac3feb6e94698e0b92c8623bd3c5
Aligned (JE|File)ReplicaDB and their tests for easier comparison with each other.

Ported to FileReplicaDBTest additional checks for PositionStrategy.ON_MATCHING_KEY. These are the same that were previously added to JEReplicaDBTest.
3 files modified
329 ■■■■ changed files
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java 80 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 4 ●●●● patch | view | raw | blame | history
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java 245 ●●●● patch | view | raw | blame | history
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -51,18 +51,17 @@
import static org.opends.messages.ReplicationMessages.*;
/**
 * This class is used for managing the replicationServer database for each
 * server in the topology.
 * Represents a replication server database for one server in the topology.
 * <p>
 * It is responsible for efficiently saving the updates that is received from
 * each master server into stable storage.
 * <p>
 * This class is also able to generate a {@link DBCursor} that can be used to
 * It is also able to generate a {@link DBCursor} that can be used to
 * read all changes from a given {@link CSN}.
 * <p>
 * This class publish some monitoring information below cn=monitor.
 * It publishes some monitoring information below cn=monitor.
 */
public class JEReplicaDB
class JEReplicaDB
{
  /**
@@ -81,41 +80,44 @@
      this.oldestCSN = oldestCSN;
      this.newestCSN = newestCSN;
    }
  }
  private final AtomicBoolean shutdown = new AtomicBoolean(false);
  private ReplicationDB db;
  /**
   * Holds the oldest and newest CSNs for this replicaDB for fast retrieval.
   *
   * @NonNull
   */
  private volatile CSNLimits csnLimits;
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private ReplicationServer replicationServer;
  private final int serverId;
  private final DN baseDN;
  private final DbMonitorProvider dbMonitor = new DbMonitorProvider();
  private final ReplicationServer replicationServer;
  private final ReplicationDB db;
  /**
   * Creates a new ReplicaDB associated to a given LDAP server.
   *
   * @param serverId The serverId for which changes will be stored in the DB.
   * @param baseDN the baseDN for which this DB was created.
   * @param replicationServer The ReplicationServer that creates this ReplicaDB.
   * @param dbenv the Database Env to use to create the ReplicationServer DB.
   * server for this domain.
   * @throws ChangelogException If a database problem happened
   * @param serverId
   *          Id of this server.
   * @param baseDN
   *          the replication domain baseDN.
   * @param replicationServer
   *          The ReplicationServer that creates this ReplicaDB.
   * @param replicationEnv
   *          the Database Env to use to create the ReplicationServer DB. server
   *          for this domain.
   * @throws ChangelogException
   *           If a database problem happened
   */
  public JEReplicaDB(int serverId, DN baseDN,
      ReplicationServer replicationServer, ReplicationDbEnv dbenv)
      throws ChangelogException
  JEReplicaDB(final int serverId, final DN baseDN, final ReplicationServer replicationServer,
      final ReplicationDbEnv replicationEnv) throws ChangelogException
  {
    this.replicationServer = replicationServer;
    this.serverId = serverId;
    this.baseDN = baseDN;
    db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
    csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    this.replicationServer = replicationServer;
    this.db = new ReplicationDB(serverId, baseDN, replicationServer, replicationEnv);
    this.csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
    DirectoryServer.deregisterMonitorProvider(dbMonitor);
    DirectoryServer.registerMonitorProvider(dbMonitor);
@@ -143,8 +145,7 @@
    db.addEntry(updateMsg);
    final CSNLimits limits = csnLimits;
    final boolean updateNew = limits.newestCSN == null
        || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN());
    final boolean updateOld = limits.oldestCSN == null;
    if (updateOld || updateNew)
    {
@@ -159,7 +160,7 @@
   *
   * @return the oldest CSN that has not been purged yet.
   */
  public CSN getOldestCSN()
  CSN getOldestCSN()
  {
    return csnLimits.oldestCSN;
  }
@@ -169,7 +170,7 @@
   *
   * @return the newest CSN that has not been purged yet.
   */
  public CSN getNewestCSN()
  CSN getNewestCSN()
  {
    return csnLimits.newestCSN;
  }
@@ -188,7 +189,7 @@
   * @throws ChangelogException
   *           if a database problem happened
   */
  public DBCursor<UpdateMsg> generateCursorFrom(CSN startCSN, PositionStrategy positionStrategy)
  DBCursor<UpdateMsg> generateCursorFrom(final CSN startCSN, final PositionStrategy positionStrategy)
      throws ChangelogException
  {
    return new JEReplicaDBCursor(db, startCSN, positionStrategy, this);
@@ -197,7 +198,7 @@
  /**
   * Shutdown this ReplicaDB.
   */
  public void shutdown()
  void shutdown()
  {
    if (shutdown.compareAndSet(false, true))
    {
@@ -211,7 +212,7 @@
   *
   * @param purgeCSN
   *          The CSN up to which changes can be purged. No purging happens when
   *          it is null.
   *          it is {@code null}.
   * @throws ChangelogException
   *           In case of database problem.
   */
@@ -279,8 +280,7 @@
  }
  /**
   * This internal class is used to implement the Monitoring capabilities of the
   * ReplicaDB.
   * Implements monitoring capabilities of the ReplicaDB.
   */
  private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
  {
@@ -288,7 +288,7 @@
    @Override
    public List<Attribute> getMonitorData()
    {
      List<Attribute> attributes = new ArrayList<Attribute>();
      final List<Attribute> attributes = new ArrayList<Attribute>();
      create(attributes, "replicationServer-database",String.valueOf(serverId));
      create(attributes, "domain-name", baseDN.toNormalizedString());
      final CSNLimits limits = csnLimits;
@@ -303,12 +303,12 @@
      return attributes;
    }
    private void create(List<Attribute> attributes, String name, String value)
    private void create(final List<Attribute> attributes, final String name, final String value)
    {
      attributes.add(Attributes.create(name, value));
    }
    private String encode(CSN csn)
    private String encode(final CSN csn)
    {
      return csn + " " + new Date(csn.getTime());
    }
@@ -317,16 +317,14 @@
    @Override
    public String getMonitorInstanceName()
    {
      ReplicationServerDomain domain = replicationServer
          .getReplicationServerDomain(baseDN);
      return "Changelog for DS(" + serverId + "),cn="
          + domain.getMonitorInstanceName();
      ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(baseDN);
      return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName();
    }
    /** {@inheritDoc} */
    @Override
    public void initializeMonitorProvider(MonitorProviderCfg configuration)
                            throws ConfigException,InitializationException
        throws ConfigException,InitializationException
    {
      // Nothing to do for now
    }
@@ -346,7 +344,7 @@
   * @throws ChangelogException When an exception occurs while removing the
   * changes from the DB.
   */
  public void clear() throws ChangelogException
  void clear() throws ChangelogException
  {
    db.clear();
    csnLimits = new CSNLimits(null, null);
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -90,7 +90,7 @@
      DN baseDN2 = DN.valueOf("o=baseDN2");
      DN baseDN3 = DN.valueOf("o=baseDN3");
      CSN[] csns = newCSNs(1, 0, 3);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add records
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
@@ -187,7 +187,7 @@
      DN baseDN2 = DN.valueOf("o=baseDN2");
      DN baseDN3 = DN.valueOf("o=baseDN3");
      CSN[] csns = newCSNs(1, 0, 3);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add records
      final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
opendj3-server-dev/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -83,7 +83,46 @@
    TEST_ROOT_DN = DN.valueOf(TEST_ROOT_DN_STRING);
  }
  @Test(enabled=true)
  @Test
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      final CSN[] csns = generateCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  @Test
  void testTrim() throws Exception
  {
    ReplicationServer replicationServer = null;
@@ -94,7 +133,7 @@
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 5);
      CSN[] csns = generateCSNs(1, 0, 5);
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[1], "uid"));
@@ -145,89 +184,13 @@
    }
  }
  static CSN[] newCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
    for (int i = 0; i < csns.length; i++)
    {
      csns[i] = gen.newCSN();
    }
    return csns;
  }
  private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf =
        new ReplServerFakeConfiguration(changelogPort, null, 0, 2, queueSize, windowSize, null);
    return new ReplicationServer(conf);
  }
  private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
  {
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
  }
  private File createCleanDir() throws IOException
  {
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
    return testRoot;
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
  {
    if (csns.length == 0)
    {
      return;
    }
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    try
    {
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
  /**
   * Test the feature of clearing a JEReplicaDB used by a replication server.
   * The clear feature is used when a replication server receives a request to
   * reset the generationId of a given domain.
   */
  @Test(enabled=true)
  void testClear() throws Exception
  @Test
  public void testClear() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
@@ -237,7 +200,7 @@
      replicationServer = configureReplicationServer(100, 5000);
      replicaDB = newReplicaDB(replicationServer);
      CSN[] csns = newCSNs(1, 0, 3);
      CSN[] csns = generateCSNs(1, 0, 3);
      // Add the changes and check they are here
      replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[0], "uid"));
@@ -260,45 +223,6 @@
    }
  }
  @Test
  public void testGenerateCursorFrom() throws Exception
  {
    ReplicationServer replicationServer = null;
    JEReplicaDB replicaDB = null;
    try
    {
      TestCaseUtils.startServer();
      replicationServer = configureReplicationServer(100000, 10);
      replicaDB = newReplicaDB(replicationServer);
      final CSN[] csns = newCSNs(1, System.currentTimeMillis(), 5);
      final ArrayList<CSN> csns2 = new ArrayList<CSN>(Arrays.asList(csns));
      csns2.remove(csns[3]);
      for (CSN csn : csns2)
      {
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csn, "uid"));
      }
      for (CSN csn : csns2)
      {
        assertNextCSN(replicaDB, csn, ON_MATCHING_KEY, csn);
      }
      assertNextCSN(replicaDB, csns[3], ON_MATCHING_KEY, csns[4]);
      for (int i = 0; i < csns2.size() - 1; i++)
      {
        assertNextCSN(replicaDB, csns2.get(i), AFTER_MATCHING_KEY, csns2.get(i + 1));
      }
      assertNotFound(replicaDB, csns[4], AFTER_MATCHING_KEY);
    }
    finally
    {
      shutdown(replicaDB);
      remove(replicationServer);
    }
  }
  private void assertNextCSN(JEReplicaDB replicaDB, final CSN startCSN,
      final PositionStrategy positionStrategy, final CSN expectedCSN)
      throws ChangelogException
@@ -338,7 +262,7 @@
   * Test the logic that manages counter records in the JEReplicaDB in order to
   * optimize the oldest and newest records in the replication changelog db.
   */
  @Test(enabled=true, groups = { "opendj-256" })
  @Test(groups = { "opendj-256" })
  void testGetOldestNewestCSNs() throws Exception
  {
    // It's worth testing with 2 different setting for counterRecord
@@ -403,7 +327,7 @@
      assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
      // Populate the db with 'max' msg
      for (int i=max+1; i<=(2*max); i++)
      for (int i=max+1; i<=2 * max; i++)
      {
        csns[i] = new CSN(now + i, mySeqnum, 1);
        replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
@@ -413,7 +337,6 @@
      assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
      assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
      //
      replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
      String testcase = "AFTER PURGE (oldest, newest)=";
@@ -449,4 +372,80 @@
    }
  }
  static CSN[] generateCSNs(int serverId, long timestamp, int number)
  {
    CSNGenerator gen = new CSNGenerator(serverId, timestamp);
    CSN[] csns = new CSN[number];
    for (int i = 0; i < csns.length; i++)
    {
      csns[i] = gen.newCSN();
    }
    return csns;
  }
  private ReplicationServer configureReplicationServer(int windowSize, int queueSize)
      throws IOException, ConfigException
  {
    final int changelogPort = findFreePort();
    final ReplicationServerCfg conf = new ReplServerFakeConfiguration(
        changelogPort, null, 0, 2, queueSize, windowSize, null);
    return new ReplicationServer(conf);
  }
  private JEReplicaDB newReplicaDB(ReplicationServer rs) throws Exception
  {
    final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
    return changelogDB.getOrCreateReplicaDB(TEST_ROOT_DN, 1, rs).getFirst();
  }
  private File createCleanDir() throws IOException
  {
    String buildRoot = System.getProperty(TestCaseUtils.PROPERTY_BUILD_ROOT);
    String path = System.getProperty(TestCaseUtils.PROPERTY_BUILD_DIR, buildRoot
            + File.separator + "build");
    path = path + File.separator + "unit-tests" + File.separator + "JEReplicaDB";
    final File testRoot = new File(path);
    TestCaseUtils.deleteDirectory(testRoot);
    testRoot.mkdirs();
    return testRoot;
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB, CSN... csns) throws Exception
  {
    if (csns.length == 0)
    {
      return;
    }
    assertFoundInOrder(replicaDB, AFTER_MATCHING_KEY, csns);
    assertFoundInOrder(replicaDB, ON_MATCHING_KEY, csns);
  }
  private void assertFoundInOrder(JEReplicaDB replicaDB,
      final PositionStrategy positionStrategy, CSN... csns) throws ChangelogException
  {
    DBCursor<UpdateMsg> cursor = replicaDB.generateCursorFrom(csns[0], positionStrategy);
    try
    {
      assertNull(cursor.getRecord(), "Cursor should point to a null record initially");
      for (int i = positionStrategy == ON_MATCHING_KEY ? 0 : 1; i < csns.length; i++)
      {
        final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
        final SoftAssertions softly = new SoftAssertions();
        softly.assertThat(cursor.next()).as(msg).isTrue();
        softly.assertThat(cursor.getRecord().getCSN()).as(msg).isEqualTo(csns[i]);
        softly.assertAll();
      }
      final SoftAssertions softly = new SoftAssertions();
      softly.assertThat(cursor.next()).isFalse();
      softly.assertThat(cursor.getRecord()).isNull();
      softly.assertAll();
    }
    finally
    {
      close(cursor);
    }
  }
}