From daff0f96ee3c6bfc59aeb3e37e52e3c9116d0978 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 24 Sep 2013 15:23:11 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java                         |    8 +-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java                             |   56 +++++++++---------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java                          |   14 ++--
 opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java |   55 +++++++++---------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java                         |   12 ++--
 5 files changed, 75 insertions(+), 70 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
index f8bd02d..1bc8fd1 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangelogDB.java
@@ -114,24 +114,26 @@
   long getDomainChangesCount(DN baseDN);
 
   /**
-   * Returns the FIRST {@link CSN}s of each serverId for the specified
+   * Returns the oldest {@link CSN}s of each serverId for the specified
    * replication domain.
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @return a {serverId => FIRST CSN} Map
+   * @return a {serverId => oldest CSN} Map. If a replica DB is empty or closed,
+   *         the oldest CSN will be null for that replica.
    */
-  Map<Integer, CSN> getDomainFirstCSNs(DN baseDN);
+  Map<Integer, CSN> getDomainOldestCSNs(DN baseDN);
 
   /**
-   * Returns the LAST {@link CSN}s of each serverId for the specified
+   * Returns the newest {@link CSN}s of each serverId for the specified
    * replication domain.
    *
    * @param baseDN
    *          the replication domain baseDN
-   * @return a {serverId => LAST CSN} Map
+   * @return a {serverId => newest CSN} Map. If a replica DB is empty or closed,
+   *         the newest CSN will be null for that replica.
    */
-  Map<Integer, CSN> getDomainLastCSNs(DN baseDN);
+  Map<Integer, CSN> getDomainNewestCSNs(DN baseDN);
 
   /**
    * Retrieves the latest trim date for the specified replication domain.
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
index 11c5a81..9f57083 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DbHandler.java
@@ -111,8 +111,8 @@
   private int queueByteSize = 0;
 
   private ReplicationDB db;
-  private CSN firstChange;
-  private CSN lastChange;
+  private CSN oldestCSN;
+  private CSN newestCSN;
   private int serverId;
   private DN baseDN;
   private DbMonitorProvider dbMonitor = new DbMonitorProvider();
@@ -153,8 +153,8 @@
     queueLowmarkBytes = 200 * queueLowmark;
     queueHimarkBytes = 200 * queueLowmark;
     db = new ReplicationDB(id, baseDN, replicationServer, dbenv);
-    firstChange = db.readFirstChange();
-    lastChange = db.readLastChange();
+    oldestCSN = db.readOldestCSN();
+    newestCSN = db.readNewestCSN();
     thread = new DirectoryThread(this, "Replication server RS("
         + replicationServer.getServerId()
         + ") changelog checkpointer for Replica DS(" + id
@@ -198,13 +198,13 @@
 
       queueByteSize += update.size();
       msgQueue.add(update);
-      if (lastChange == null || lastChange.older(update.getCSN()))
+      if (newestCSN == null || newestCSN.older(update.getCSN()))
       {
-        lastChange = update.getCSN();
+        newestCSN = update.getCSN();
       }
-      if (firstChange == null)
+      if (oldestCSN == null)
       {
-        firstChange = update.getCSN();
+        oldestCSN = update.getCSN();
       }
     }
   }
@@ -225,21 +225,23 @@
   }
 
   /**
-   * Get the firstChange.
-   * @return Returns the firstChange.
+   * Get the oldest CSN that has not been purged yet.
+   *
+   * @return the oldest CSN that has not been purged yet.
    */
-  public CSN getFirstChange()
+  public CSN getOldestCSN()
   {
-    return firstChange;
+    return oldestCSN;
   }
 
   /**
-   * Get the lastChange.
-   * @return Returns the lastChange.
+   * Get the newest CSN that has not been purged yet.
+   *
+   * @return the newest CSN that has not been purged yet.
    */
-  public CSN getLastChange()
+  public CSN getNewestCSN()
   {
-    return lastChange;
+    return newestCSN;
   }
 
   /**
@@ -249,9 +251,9 @@
    */
   public long getChangesCount()
   {
-    if (lastChange != null && firstChange != null)
+    if (newestCSN != null && oldestCSN != null)
     {
-      return lastChange.getSeqnum() - firstChange.getSeqnum() + 1;
+      return newestCSN.getSeqnum() - oldestCSN.getSeqnum() + 1;
     }
     return 0;
   }
@@ -453,13 +455,13 @@
               return;
             }
 
-            if (!csn.equals(lastChange) && csn.older(trimDate))
+            if (!csn.equals(newestCSN) && csn.older(trimDate))
             {
               cursor.delete();
             }
             else
             {
-              firstChange = csn;
+              oldestCSN = csn;
               return;
             }
           }
@@ -532,13 +534,13 @@
           String.valueOf(serverId)));
       attributes.add(Attributes.create("domain-name",
           baseDN.toNormalizedString()));
-      if (firstChange != null)
+      if (oldestCSN != null)
       {
-        attributes.add(Attributes.create("first-change", encode(firstChange)));
+        attributes.add(Attributes.create("first-change", encode(oldestCSN)));
       }
-      if (lastChange != null)
+      if (newestCSN != null)
       {
-        attributes.add(Attributes.create("last-change", encode(lastChange)));
+        attributes.add(Attributes.create("last-change", encode(newestCSN)));
       }
       attributes.add(
           Attributes.create("queue-size", String.valueOf(msgQueue.size())));
@@ -581,7 +583,7 @@
   @Override
   public String toString()
   {
-    return baseDN + " " + serverId + " " + firstChange + " " + lastChange;
+    return baseDN + " " + serverId + " " + oldestCSN + " " + newestCSN;
   }
 
   /**
@@ -606,8 +608,8 @@
       queueByteSize = 0;
 
       db.clear();
-      firstChange = db.readFirstChange();
-      lastChange = db.readLastChange();
+      oldestCSN = db.readOldestCSN();
+      newestCSN = db.readNewestCSN();
     }
   }
 
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index e7981f3..ab85ba2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -270,28 +270,28 @@
 
   /** {@inheritDoc} */
   @Override
-  public Map<Integer, CSN> getDomainFirstCSNs(DN baseDN)
+  public Map<Integer, CSN> getDomainOldestCSNs(DN baseDN)
   {
     final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
     final Map<Integer, CSN> results =
         new HashMap<Integer, CSN>(domainMap.size());
     for (DbHandler dbHandler : domainMap.values())
     {
-      results.put(dbHandler.getServerId(), dbHandler.getFirstChange());
+      results.put(dbHandler.getServerId(), dbHandler.getOldestCSN());
     }
     return results;
   }
 
   /** {@inheritDoc} */
   @Override
-  public Map<Integer, CSN> getDomainLastCSNs(DN baseDN)
+  public Map<Integer, CSN> getDomainNewestCSNs(DN baseDN)
   {
     final Map<Integer, DbHandler> domainMap = getDomainMap(baseDN);
     final Map<Integer, CSN> results =
         new HashMap<Integer, CSN>(domainMap.size());
     for (DbHandler dbHandler : domainMap.values())
     {
-      results.put(dbHandler.getServerId(), dbHandler.getLastChange());
+      results.put(dbHandler.getServerId(), dbHandler.getNewestCSN());
     }
     return results;
   }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index c50b535..78a2167 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -326,11 +326,11 @@
   }
 
   /**
-   * Read the first Change from the database.
+   * Read the oldest CSN present in the database.
    *
-   * @return the first CSN.
+   * @return the oldest CSN in the DB, null if the DB is empty or closed
    */
-  public CSN readFirstChange()
+  public CSN readOldestCSN()
   {
     dbCloseLock.readLock().lock();
 
@@ -383,11 +383,11 @@
 
 
   /**
-   * Read the last Change from the database.
+   * Read the newest CSN present in the database.
    *
-   * @return the last CSN.
+   * @return the newest CSN in the DB, null if the DB is empty or closed
    */
-  public CSN readLastChange()
+  public CSN readNewestCSN()
   {
     dbCloseLock.readLock().lock();
 
diff --git a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
index b68f4c9..5e7e01c 100644
--- a/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
+++ b/opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/DbHandlerTest.java
@@ -121,8 +121,8 @@
       assertNotFound(handler, csn5);
 
       // Test first and last
-      assertEquals(csn1, handler.getFirstChange());
-      assertEquals(csn3, handler.getLastChange());
+      assertEquals(csn1, handler.getOldestCSN());
+      assertEquals(csn3, handler.getNewestCSN());
 
       //--
 			// Cursor tests with db and memory queue populated
@@ -144,9 +144,9 @@
       int count = 300;  // wait at most 60 seconds
       while (!purged && (count > 0))
       {
-        CSN firstChange = handler.getFirstChange();
-        CSN lastChange = handler.getLastChange();
-        if (!firstChange.equals(csn4) || !lastChange.equals(csn4))
+        CSN oldestCSN = handler.getOldestCSN();
+        CSN newestCSN = handler.getNewestCSN();
+        if (!oldestCSN.equals(csn4) || !newestCSN.equals(csn4))
         {
           TestCaseUtils.sleep(100);
         } else
@@ -266,15 +266,15 @@
       handler.add(new DeleteMsg(TEST_ROOT_DN_STRING, csn3, "uid"));
 
       // Check they are here
-      assertEquals(csn1, handler.getFirstChange());
-      assertEquals(csn3, handler.getLastChange());
+      assertEquals(csn1, handler.getOldestCSN());
+      assertEquals(csn3, handler.getNewestCSN());
 
       // Clear ...
       handler.clear();
 
       // Check the db is cleared.
-      assertEquals(null, handler.getFirstChange());
-      assertEquals(null, handler.getLastChange());
+      assertEquals(null, handler.getOldestCSN());
+      assertEquals(null, handler.getNewestCSN());
 
     } finally
     {
@@ -365,10 +365,10 @@
       handler.flush();
 
       // Test first and last
-      CSN csn1 = handler.getFirstChange();
-      assertEquals(csn1, csnArray[1], "First change");
-      CSN csnLast = handler.getLastChange();
-      assertEquals(csnLast, csnArray[max], "Last change");
+      CSN csn1 = handler.getOldestCSN();
+      assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
+      CSN csnLast = handler.getNewestCSN();
+      assertEquals(csnLast, csnArray[max], "Wrong newest CSN");
 
       // Test count in different subcases trying to handle all special cases
       // regarding the 'counter' record and 'count' algorithm
@@ -448,10 +448,10 @@
       handler.setCounterWindowSize(counterWindow);
 
       // Test first and last
-      csn1 = handler.getFirstChange();
-      assertEquals(csn1, csnArray[1], "First change");
-      csnLast = handler.getLastChange();
-      assertEquals(csnLast, csnArray[max], "Last change");
+      csn1 = handler.getOldestCSN();
+      assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
+      csnLast = handler.getNewestCSN();
+      assertEquals(csnLast, csnArray[max], "Wrong newest CSN");
 
       testcase="FROM our first generated change TO now (> newest change in the db)";
       actualCnt = handler.getCount(csnArray[1], newerThanLast);
@@ -469,10 +469,10 @@
       handler.flush();
 
       // Test first and last
-      csn1 = handler.getFirstChange();
-      assertEquals(csn1, csnArray[1], "First change");
-      csnLast = handler.getLastChange();
-      assertEquals(csnLast, csnArray[2 * max], "Last change");
+      csn1 = handler.getOldestCSN();
+      assertEquals(csn1, csnArray[1], "Wrong oldest CSN");
+      csnLast = handler.getNewestCSN();
+      assertEquals(csnLast, csnArray[2 * max], "Wrong newest CSN");
 
       testcase="FROM our first generated change TO now (> newest change in the db)";
       actualCnt = handler.getCount(csnArray[1], newerThanLast);
@@ -487,16 +487,17 @@
       debugInfo(tn,testcase + " After purge, total count=" + totalCount);
 
       testcase="AFTER PURGE (first, last)=";
-      debugInfo(tn,testcase + handler.getFirstChange() + handler.getLastChange());
-      assertEquals(handler.getLastChange(), csnArray[2*max], "Last=");
+      debugInfo(tn, testcase + handler.getOldestCSN() + handler.getNewestCSN());
+      assertEquals(handler.getNewestCSN(), csnArray[2*max], "Newest=");
 
       testcase="AFTER PURGE ";
       actualCnt = handler.getCount(csnArray[1], newerThanLast);
       int expectedCnt;
       if (totalCount>1)
       {
-        expectedCnt = ((handler.getLastChange().getSeqnum()
-                    - handler.getFirstChange().getSeqnum() + 1)/2)+1;
+        final int newestSeqnum = handler.getNewestCSN().getSeqnum();
+        final int oldestSeqnum = handler.getOldestCSN().getSeqnum();
+        expectedCnt = ((newestSeqnum - oldestSeqnum + 1)/2) + 1;
       }
       else
       {
@@ -510,8 +511,8 @@
       handler.clear();
 
       // Check the db is cleared.
-      assertEquals(null, handler.getFirstChange());
-      assertEquals(null, handler.getLastChange());
+      assertEquals(null, handler.getOldestCSN());
+      assertEquals(null, handler.getNewestCSN());
       debugInfo(tn,"Success");
     }
     finally

--
Gitblit v1.10.0