mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
24.23.2013 22632fe3bf532c2b76f99b41b40c188d88464847
OPENDJ-1116 Introduce abstraction for the changelog DB

Changes after review from Matt.

*.java:
Changed terminology for easier reading:
- First => Oldest
- Last => Newest
5 files modified
145 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java 14 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java 56 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 8 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java 12 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java 55 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -114,24 +114,26 @@
  long getDomainChangesCount(DN baseDN);
  /**
   * Returns the FIRST {@link CSN}s of each serverId for the specified
   * Returns the oldest {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a {serverId => FIRST CSN} Map
   * @return a {serverId => oldest CSN} Map. If a replica DB is empty or closed,
   *         the oldest CSN will be null for that replica.
   */
  Map<Integer, CSN> getDomainFirstCSNs(DN baseDN);
  Map<Integer, CSN> getDomainOldestCSNs(DN baseDN);
  /**
   * Returns the LAST {@link CSN}s of each serverId for the specified
   * Returns the newest {@link CSN}s of each serverId for the specified
   * replication domain.
   *
   * @param baseDN
   *          the replication domain baseDN
   * @return a {serverId => LAST CSN} Map
   * @return a {serverId => newest CSN} Map. If a replica DB is empty or closed,
   *         the newest CSN will be null for that replica.
   */
  Map<Integer, CSN> getDomainLastCSNs(DN baseDN);
  Map<Integer, CSN> getDomainNewestCSNs(DN baseDN);
  /**
   * Retrieves the latest trim date for the specified replication domain.
opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -111,8 +111,8 @@
  private int queueByteSize = 0;
  private ReplicationDB db;
  private CSN firstChange;
  private CSN lastChange;
  private CSN oldestCSN;
  private CSN newestCSN;
  private int serverId;
  private DN baseDN;
  private DbMonitorProvider dbMonitor = new DbMonitorProvider();
@@ -153,8 +153,8 @@
    queueLowmarkBytes = 200 * queueLowmark;
    queueHimarkBytes = 200 * queueLowmark;
    db = new ReplicationDB(id, baseDN, replicationServer, dbenv);
    firstChange = db.readFirstChange();
    lastChange = db.readLastChange();
    oldestCSN = db.readOldestCSN();
    newestCSN = db.readNewestCSN();
    thread = new DirectoryThread(this, "Replication server RS("
        + replicationServer.getServerId()
        + ") changelog checkpointer for Replica DS(" + id
@@ -198,13 +198,13 @@
      queueByteSize += update.size();
      msgQueue.add(update);
      if (lastChange == null || lastChange.older(update.getCSN()))
      if (newestCSN == null || newestCSN.older(update.getCSN()))
      {
        lastChange = update.getCSN();
        newestCSN = update.getCSN();
      }
      if (firstChange == null)
      if (oldestCSN == null)
      {
        firstChange = update.getCSN();
        oldestCSN = update.getCSN();
      }
    }
  }
@@ -225,21 +225,23 @@
  }
  /**
   * Get the firstChange.
   * @return Returns the firstChange.
   * Get the oldest CSN that has not been purged yet.
   *
   * @return the oldest CSN that has not been purged yet.
   */
  public CSN getFirstChange()
  public CSN getOldestCSN()
  {
    return firstChange;
    return oldestCSN;
  }
  /**
   * Get the lastChange.
   * @return Returns the lastChange.
   * Get the newest CSN that has not been purged yet.
   *
   * @return the newest CSN that has not been purged yet.
   */
  public CSN getLastChange()
  public CSN getNewestCSN()
  {
    return lastChange;
    return newestCSN;
  }
  /**
@@ -249,9 +251,9 @@
   */
  public long getChangesCount()
  {
    if (lastChange != null && firstChange != null)
    if (newestCSN != null && oldestCSN != null)
    {
      return lastChange.getSeqnum() - firstChange.getSeqnum() + 1;
      return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
    }
    return 0;
  }
@@ -453,13 +455,13 @@
              return;
            }
            if (!csn.equals(lastChange) && csn.older(trimDate))
            if (!csn.equals(newestCSN) && csn.older(trimDate))
            {
              cursor.delete();
            }
            else
            {
              firstChange = csn;
              oldestCSN = csn;
              return;
            }
          }
@@ -532,13 +534,13 @@
          String.valueOf(serverId)));
      attributes.add(Attributes.create("domain-name",
          baseDN.toNormalizedString()));
      if (firstChange != null)
      if (oldestCSN != null)
      {
        attributes.add(Attributes.create("first-change", encode(firstChange)));
        attributes.add(Attributes.create("first-change", encode(oldestCSN)));
      }
      if (lastChange != null)
      if (newestCSN != null)
      {
        attributes.add(Attributes.create("last-change", encode(lastChange)));
        attributes.add(Attributes.create("last-change", encode(newestCSN)));
      }
      attributes.add(
          Attributes.create("queue-size", String.valueOf(msgQueue.size())));
@@ -581,7 +583,7 @@
  @Override
  public String toString()
  {
    return baseDN + " " + serverId + " " + firstChange + " " + lastChange;
    return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN;
  }
  /**
@@ -606,8 +608,8 @@
      queueByteSize = 0;
      db.clear();
      firstChange = db.readFirstChange();
      lastChange = db.readLastChange();
      oldestCSN = db.readOldestCSN();
      newestCSN = db.readNewestCSN();
    }
  }
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -270,28 +270,28 @@
  /** {@inheritDoc} */
  @Override
  public Map<Integer, CSN> getDomainFirstCSNs(DN baseDN)
  public Map<Integer, CSN> getDomainOldestCSNs(DN baseDN)
  {
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    final Map<Integer, CSN> results =
        new HashMap<Integer, CSN>(domainMap.size());
    for (DbHandler dbHandler : domainMap.values())
    {
      results.put(dbHandler.getServerId(), dbHandler.getFirstChange());
      results.put(dbHandler.getServerId(), dbHandler.getOldestCSN());
    }
    return results;
  }
  /** {@inheritDoc} */
  @Override
  public Map<Integer, CSN> getDomainLastCSNs(DN baseDN)
  public Map<Integer, CSN> getDomainNewestCSNs(DN baseDN)
  {
    final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
    final Map<Integer, CSN> results =
        new HashMap<Integer, CSN>(domainMap.size());
    for (DbHandler dbHandler : domainMap.values())
    {
      results.put(dbHandler.getServerId(), dbHandler.getLastChange());
      results.put(dbHandler.getServerId(), dbHandler.getNewestCSN());
    }
    return results;
  }
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -326,11 +326,11 @@
  }
  /**
   * Read the first Change from the database.
   * Read the oldest CSN present in the database.
   *
   * @return the first CSN.
   * @return the oldest CSN in the DB, null if the DB is empty or closed
   */
  public CSN readFirstChange()
  public CSN readOldestCSN()
  {
    dbCloseLock.readLock().lock();
@@ -383,11 +383,11 @@
  /**
   * Read the last Change from the database.
   * Read the newest CSN present in the database.
   *
   * @return the last CSN.
   * @return the newest CSN in the DB, null if the DB is empty or closed
   */
  public CSN readLastChange()
  public CSN readNewestCSN()
  {
    dbCloseLock.readLock().lock();
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -121,8 +121,8 @@
      assertNotFound(handler, csn5);
      // Test first and last
      assertEquals(csn1, handler.getFirstChange());
      assertEquals(csn3, handler.getLastChange());
      assertEquals(csn1, handler.getOldestCSN());
      assertEquals(csn3, handler.getNewestCSN());
      //--
            // Cursor tests with db and memory queue populated
@@ -144,9 +144,9 @@
      int count = 300;  // wait at most 60 seconds
      while (!purged && (count > 0))
      {
        CSN firstChange = handler.getFirstChange();
        CSN lastChange = handler.getLastChange();
        if (!firstChange.equals(csn4) || !lastChange.equals(csn4))
        CSN oldestCSN = handler.getOldestCSN();
        CSN newestCSN = handler.getNewestCSN();
        if (!oldestCSN.equals(csn4) || !newestCSN.equals(csn4))
        {
          TestCaseUtils.sleep(100);
        } else
@@ -266,15 +266,15 @@
      handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, csn3, "uid"));
      // Check they are here
      assertEquals(csn1, handler.getFirstChange());
      assertEquals(csn3, handler.getLastChange());
      assertEquals(csn1, handler.getOldestCSN());
      assertEquals(csn3, handler.getNewestCSN());
      // Clear ...
      handler.clear();
      // Check the db is cleared.
      assertEquals(null, handler.getFirstChange());
      assertEquals(null, handler.getLastChange());
      assertEquals(null, handler.getOldestCSN());
      assertEquals(null, handler.getNewestCSN());
    } finally
    {
@@ -365,10 +365,10 @@
      handler.flush();
      // Test first and last
      CSN csn1 = handler.getFirstChange();
      assertEquals(csn1, csnArray[1], "First change");
      CSN csnLast = handler.getLastChange();
      assertEquals(csnLast, csnArray[max], "Last change");
      CSN csn1 = handler.getOldestCSN();
      assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
      CSN csnLast = handler.getNewestCSN();
      assertEquals(csnLast, csnArray[max], "Wrong newest CSN");
      // Test count in different subcases trying to handle all special cases
      // regarding the 'counter' record and 'count' algorithm
@@ -448,10 +448,10 @@
      handler.setCounterWindowSize(counterWindow);
      // Test first and last
      csn1 = handler.getFirstChange();
      assertEquals(csn1, csnArray[1], "First change");
      csnLast = handler.getLastChange();
      assertEquals(csnLast, csnArray[max], "Last change");
      csn1 = handler.getOldestCSN();
      assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
      csnLast = handler.getNewestCSN();
      assertEquals(csnLast, csnArray[max], "Wrong newest CSN");
      testcase="FROM our first generated change TO now (> newest change in the db)";
      actualCnt = handler.getCount(csnArray[1], newerThanLast);
@@ -469,10 +469,10 @@
      handler.flush();
      // Test first and last
      csn1 = handler.getFirstChange();
      assertEquals(csn1, csnArray[1], "First change");
      csnLast = handler.getLastChange();
      assertEquals(csnLast, csnArray[2 * max], "Last change");
      csn1 = handler.getOldestCSN();
      assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
      csnLast = handler.getNewestCSN();
      assertEquals(csnLast, csnArray[2 * max], "Wrong newest CSN");
      testcase="FROM our first generated change TO now (> newest change in the db)";
      actualCnt = handler.getCount(csnArray[1], newerThanLast);
@@ -487,16 +487,17 @@
      debugInfo(tn,testcase + " After purge, total count=" + totalCount);
      testcase="AFTER PURGE (first, last)=";
      debugInfo(tn,testcase + handler.getFirstChange() + handler.getLastChange());
      assertEquals(handler.getLastChange(), csnArray[2*max], "Last=");
      debugInfo(tn, testcase + handler.getOldestCSN() + handler.getNewestCSN());
      assertEquals(handler.getNewestCSN(), csnArray[2*max], "Newest=");
      testcase="AFTER PURGE ";
      actualCnt = handler.getCount(csnArray[1], newerThanLast);
      int expectedCnt;
      if (totalCount>1)
      {
        expectedCnt = ((handler.getLastChange().getSeqnum()
                    - handler.getFirstChange().getSeqnum() + 1)/2)+1;
        final int newestSeqnum = handler.getNewestCSN().getSeqnum();
        final int oldestSeqnum = handler.getOldestCSN().getSeqnum();
        expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1;
      }
      else
      {
@@ -510,8 +511,8 @@
      handler.clear();
      // Check the db is cleared.
      assertEquals(null, handler.getFirstChange());
      assertEquals(null, handler.getLastChange());
      assertEquals(null, handler.getOldestCSN());
      assertEquals(null, handler.getNewestCSN());
      debugInfo(tn,"Success");
    }
    finally