From caca80367f4fa2843338da673c7def60bb012961 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 04 Sep 2013 12:34:42 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java                 |  231 ++++++++++++++++---------------------
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java |   40 ++++-
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java     |    3 
 opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java     |   34 -----
 opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                  |    6 
 5 files changed, 141 insertions(+), 173 deletions(-)

diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 168a69d..e0b585d 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -1347,9 +1347,11 @@
    *         provided oldestChange, <code>false</code> otherwise
    * @throws DirectoryException
    *           if any problem occur
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
   private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
-      throws DirectoryException
+      throws DirectoryException, ChangelogException
   {
     // We also need to check if the draftCNdb is consistent with
     // the changelogdb.
@@ -1451,7 +1453,7 @@
   }
 
   private void assignNewDraftCNAndStore(ECLUpdateMsg change)
-      throws DirectoryException
+      throws DirectoryException, ChangelogException
   {
     // generate a new change number and assign to this change
     change.setChangeNumber(replicationServer.getNewChangeNumber());
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 59ce757..90bb23f 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -586,8 +586,7 @@
    * Enable the ECL access by creating a dedicated workflow element.
    * @throws DirectoryException when an error occurs.
    */
-  public void enableECL()
-  throws DirectoryException
+  public void enableECL() throws DirectoryException
   {
     if (externalChangeLogWorkflowImpl!=null)
     {
@@ -661,8 +660,7 @@
     {
       Message message =
         NOTE_ERR_UNABLE_TO_ENABLE_ECL_VIRTUAL_ATTR.get(attrName, e.toString());
-      throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
-          message, e);
+      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, message, e);
     }
   }
 
@@ -679,19 +677,14 @@
       // internalNetworkGroup?
       NetworkGroup internalNetworkGroup = NetworkGroup
           .getInternalNetworkGroup();
-      internalNetworkGroup
-          .deregisterWorkflow(externalChangeLogWorkflowID);
+      internalNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
 
       // FIXME:ECL should the ECL Workflow be registered in adminNetworkGroup?
-      NetworkGroup adminNetworkGroup = NetworkGroup
-          .getAdminNetworkGroup();
-      adminNetworkGroup
-          .deregisterWorkflow(externalChangeLogWorkflowID);
+      NetworkGroup adminNetworkGroup = NetworkGroup.getAdminNetworkGroup();
+      adminNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
 
-      NetworkGroup defaultNetworkGroup = NetworkGroup
-          .getDefaultNetworkGroup();
-      defaultNetworkGroup
-          .deregisterWorkflow(externalChangeLogWorkflowID);
+      NetworkGroup defaultNetworkGroup = NetworkGroup.getDefaultNetworkGroup();
+      defaultNetworkGroup.deregisterWorkflow(externalChangeLogWorkflowID);
 
       eclwf.deregister();
       eclwf.finalizeWorkflow();
@@ -705,11 +698,26 @@
       eclwe.finalizeWorkflowElement();
     }
 
+    shutdownCNIndexDB();
+  }
+
+  private void shutdownCNIndexDB()
+  {
     synchronized (cnIndexDBLock)
     {
       if (cnIndexDB != null)
       {
-        cnIndexDB.shutdown();
+        try
+        {
+          cnIndexDB.shutdown();
+        }
+        catch (ChangelogException ignored)
+        {
+          if (debugEnabled())
+          {
+            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
+          }
+        }
       }
     }
   }
@@ -1385,17 +1393,7 @@
           }
         }
 
-        try
-        {
-          cnIndexDB.shutdown();
-        }
-        catch (Exception ignored)
-        {
-          if (debugEnabled())
-          {
-            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
-          }
-        }
+        shutdownCNIndexDB();
 
         lastGeneratedChangeNumber = 0;
         cnIndexDB = null;
@@ -1639,7 +1637,7 @@
         if (cnIndexDB == null)
         {
           cnIndexDB = new DraftCNDbHandler(this, this.dbEnv);
-          lastGeneratedChangeNumber = getLastChangeNumber();
+          lastGeneratedChangeNumber = cnIndexDB.getLastChangeNumber();
         }
         return cnIndexDB;
       }
@@ -1654,40 +1652,6 @@
   }
 
   /**
-   * Get the value of the first change number, 0 when db is empty.
-   *
-   * @return the first value.
-   */
-  public long getFirstChangeNumber()
-  {
-    synchronized (cnIndexDBLock)
-    {
-      if (cnIndexDB != null)
-      {
-        return cnIndexDB.getFirstChangeNumber();
-      }
-      return 0;
-    }
-  }
-
-  /**
-   * Get the value of the last change number, 0 when db is empty.
-   *
-   * @return the last value.
-   */
-  public long getLastChangeNumber()
-  {
-    synchronized (cnIndexDBLock)
-    {
-      if (cnIndexDB != null)
-      {
-        return cnIndexDB.getLastChangeNumber();
-      }
-      return 0;
-    }
-  }
-
-  /**
    * Generate a new change number.
    *
    * @return The generated change number
@@ -1739,89 +1703,96 @@
     boolean dbEmpty = false;
     final ChangeNumberIndexDB cnIndexDB = getChangeNumberIndexDB();
 
-    long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
-    Map<String, ServerState> domainsServerStateForLastCN = null;
-    CSN csnForLastCN = null;
-    String domainForLastCN = null;
-    if (firstChangeNumber < 1)
+    try
     {
-      dbEmpty = true;
-      firstChangeNumber = 0;
-      lastChangeNumber = 0;
-    }
-    else
-    {
-      lastChangeNumber = cnIndexDB.getLastChangeNumber();
-
-      // Get the generalized state associated with the current last change
-      // number and initializes from it the startStates table
-      String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
-      if (lastCNGenState != null && lastCNGenState.length() > 0)
+      long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
+      Map<String, ServerState> domainsServerStateForLastCN = null;
+      CSN csnForLastCN = null;
+      String domainForLastCN = null;
+      if (firstChangeNumber < 1)
       {
-        domainsServerStateForLastCN =
-            MultiDomainServerState.splitGenStateToServerStates(lastCNGenState);
-      }
-
-      csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
-      domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
-    }
-
-    long newestDate = 0;
-    for (ReplicationServerDomain rsd : getReplicationServerDomains())
-    {
-      if (contains(excludedBaseDNs, rsd.getBaseDn()))
-        continue;
-
-      // for this domain, have the state in the replchangelog
-      // where the last change number update is
-      long ec;
-      if (domainsServerStateForLastCN == null)
-      {
-        // Count changes of this domain from the beginning of the changelog
-        CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
-        ec = rsd.getEligibleCount(
-                  rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
-                  crossDomainEligibleCSN);
+        dbEmpty = true;
+        firstChangeNumber = 0;
+        lastChangeNumber = 0;
       }
       else
       {
-        // There are records in the draftDB (so already returned to clients)
-        // BUT
-        //  There is nothing related to this domain in the last draft record
-        //  (may be this domain was disabled when this record was returned).
-        // In that case, are counted the changes from
-        //  the date of the most recent change from this last draft record
-        if (newestDate == 0)
+        lastChangeNumber = cnIndexDB.getLastChangeNumber();
+
+        // Get the generalized state associated with the current last change
+        // number and initializes from it the startStates table
+        String lastCNGenState = cnIndexDB.getPreviousCookie(lastChangeNumber);
+        if (lastCNGenState != null && lastCNGenState.length() > 0)
         {
-          newestDate = csnForLastCN.getTime();
+          domainsServerStateForLastCN = MultiDomainServerState
+              .splitGenStateToServerStates(lastCNGenState);
         }
 
-        // And count changes of this domain from the date of the
-        // lastseqnum record (that does not refer to this domain)
-        CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
-        ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
-
-        if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn()))
-          ec--;
+        csnForLastCN = cnIndexDB.getCSN(lastChangeNumber);
+        domainForLastCN = cnIndexDB.getBaseDN(lastChangeNumber);
       }
 
-      // cumulates on domains
-      lastChangeNumber += ec;
+      long newestDate = 0;
+      for (ReplicationServerDomain rsd : getReplicationServerDomains())
+      {
+        if (contains(excludedBaseDNs, rsd.getBaseDn()))
+          continue;
 
-      // CNIndexDB is empty and there are eligible updates in the replication
-      // changelog then init first change number
-      if (ec > 0 && firstChangeNumber == 0)
-        firstChangeNumber = 1;
+        // for this domain, have the state in the replchangelog
+        // where the last change number update is
+        long ec;
+        if (domainsServerStateForLastCN == null)
+        {
+          // Count changes of this domain from the beginning of the changelog
+          CSN trimCSN = new CSN(rsd.getLatestDomainTrimDate(), 0, 0);
+          ec = rsd.getEligibleCount(
+              rsd.getStartState().duplicateOnlyOlderThan(trimCSN),
+              crossDomainEligibleCSN);
+        }
+        else
+        {
+          // There are records in the draftDB (so already returned to clients)
+          // BUT
+          // There is nothing related to this domain in the last draft record
+          // (may be this domain was disabled when this record was returned).
+          // In that case, are counted the changes from
+          // the date of the most recent change from this last draft record
+          if (newestDate == 0)
+          {
+            newestDate = csnForLastCN.getTime();
+          }
+
+          // And count changes of this domain from the date of the
+          // lastseqnum record (that does not refer to this domain)
+          CSN csnx = new CSN(newestDate, csnForLastCN.getSeqnum(), 0);
+          ec = rsd.getEligibleCount(csnx, crossDomainEligibleCSN);
+
+          if (domainForLastCN.equalsIgnoreCase(rsd.getBaseDn()))
+            ec--;
+        }
+
+        // cumulates on domains
+        lastChangeNumber += ec;
+
+        // CNIndexDB is empty and there are eligible updates in the replication
+        // changelog then init first change number
+        if (ec > 0 && firstChangeNumber == 0)
+          firstChangeNumber = 1;
+      }
+
+      if (dbEmpty)
+      {
+        // The database was empty, just keep increasing numbers since last time
+        // we generated one change number.
+        firstChangeNumber += lastGeneratedChangeNumber;
+        lastChangeNumber += lastGeneratedChangeNumber;
+      }
+      return new long[] { firstChangeNumber, lastChangeNumber };
     }
-
-    if (dbEmpty)
+    catch (ChangelogException e)
     {
-      // The database was empty, just keep increasing numbers since last time
-      // we generated one change number.
-      firstChangeNumber += lastGeneratedChangeNumber;
-      lastChangeNumber += lastGeneratedChangeNumber;
+      throw new DirectoryException(ResultCode.OPERATIONS_ERROR, e);
     }
-    return new long[] { firstChangeNumber, lastChangeNumber };
   }
 
   /**
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
index 57faad5..ccd70e2 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ChangeNumberIndexDB.java
@@ -49,8 +49,10 @@
    * @param changeNumber
    *          the provided change number.
    * @return the associated CSN, null when none.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  public CSN getCSN(long changeNumber);
+  public CSN getCSN(long changeNumber) throws ChangelogException;
 
   /**
    * Get the baseDN associated to a provided change number.
@@ -58,8 +60,10 @@
    * @param changeNumber
    *          the provided change number.
    * @return the baseDN, null when none.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  public String getBaseDN(long changeNumber);
+  public String getBaseDN(long changeNumber) throws ChangelogException;
 
   /**
    * Get the previous cookie associated to a provided change number.
@@ -67,22 +71,28 @@
    * @param changeNumber
    *          the provided change number.
    * @return the previous cookie, null when none.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  String getPreviousCookie(long changeNumber);
+  String getPreviousCookie(long changeNumber) throws ChangelogException;
 
   /**
    * Get the first change number stored in this DB.
    *
    * @return Returns the first change number in this DB.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  long getFirstChangeNumber();
+  long getFirstChangeNumber() throws ChangelogException;
 
   /**
    * Get the last change number stored in this DB.
    *
    * @return Returns the last change number in this DB
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  long getLastChangeNumber();
+  long getLastChangeNumber() throws ChangelogException;
 
   /**
    * Add an update to the list of messages that must be saved to this DB managed
@@ -99,8 +109,11 @@
    *          The associated baseDN.
    * @param csn
    *          The associated replication CSN.
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  void add(long changeNumber, String previousCookie, String baseDN, CSN csn);
+  void add(long changeNumber, String previousCookie, String baseDN, CSN csn)
+      throws ChangelogException;
 
   /**
    * Generate a new {@link ChangeNumberIndexDBCursor} that allows to browse the
@@ -113,7 +126,7 @@
    *         this DBHandler and starting at the position defined by a given
    *         changeNumber.
    * @throws ChangelogException
-   *           if a database problem happened.
+   *           if a database problem occurs.
    */
   ChangeNumberIndexDBCursor getCursorFrom(long startChangeNumber)
       throws ChangelogException;
@@ -123,14 +136,16 @@
    *
    * @return <code>true</code> if this database is empty, <code>false</code>
    *         otherwise
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  boolean isEmpty();
+  boolean isEmpty() throws ChangelogException;
 
   /**
    * Clear the changes from this DB (from both memory cache and DB storage).
    *
    * @throws ChangelogException
-   *           When an exception occurs while removing the changes from this DB.
+   *           if a database problem occurs.
    */
   void clear() throws ChangelogException;
 
@@ -142,13 +157,16 @@
    *          The baseDN for which we want to remove all records from this DB,
    *          null means all.
    * @throws ChangelogException
-   *           When an exception occurs while removing the changes from this DB.
+   *           if a database problem occurs.
    */
   void clear(String baseDNToClear) throws ChangelogException;
 
   /**
    * Shutdown this DB.
+   *
+   * @throws ChangelogException
+   *           if a database problem occurs.
    */
-  void shutdown();
+  void shutdown() throws ChangelogException;
 
 }
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
index c3a0c31..655de4a 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/api/ReplicaDBCursor.java
@@ -33,6 +33,9 @@
 /**
  * This cursor allows to iterate through the changes received from a given
  * replica (Directory Server) in the topology.
+ * <p>
+ * Instances of this class are sorted in the order defined by the CSN of the
+ * current {@link UpdateMsg}, i.e. the cursor with the oldest CSN comes first.
  */
 public interface ReplicaDBCursor extends Closeable, Comparable<ReplicaDBCursor>
 {
diff --git a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
index 3d57e71..7d1ce8b 100644
--- a/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
+++ b/opendj-sdk/opends/src/server/org/opends/server/replication/server/changelog/je/DraftCNDbHandler.java
@@ -516,7 +516,7 @@
 
   /** {@inheritDoc} */
   @Override
-  public String getPreviousCookie(long changeNumber)
+  public String getPreviousCookie(long changeNumber) throws ChangelogException
   {
     DraftCNDBCursor cursor = null;
     try
@@ -524,11 +524,6 @@
       cursor = db.openReadCursor(changeNumber);
       return cursor.currentValue();
     }
-    catch(Exception e)
-    {
-      debugException("getValue", changeNumber, e);
-      return null;
-    }
     finally
     {
       close(cursor);
@@ -537,7 +532,7 @@
 
   /** {@inheritDoc} */
   @Override
-  public CSN getCSN(long changeNumber)
+  public CSN getCSN(long changeNumber) throws ChangelogException
   {
     DraftCNDBCursor cursor = null;
     try
@@ -545,20 +540,15 @@
       cursor = db.openReadCursor(changeNumber);
       return cursor.currentCSN();
     }
-    catch(Exception e)
-    {
-      debugException("getCSN", changeNumber, e);
-      return null;
-    }
     finally
     {
       close(cursor);
     }
   }
 
-  /**{@inheritDoc}*/
+  /** {@inheritDoc} */
   @Override
-  public String getBaseDN(long changeNumber)
+  public String getBaseDN(long changeNumber) throws ChangelogException
   {
     DraftCNDBCursor cursor = null;
     try
@@ -566,25 +556,9 @@
       cursor = db.openReadCursor(changeNumber);
       return cursor.currentBaseDN();
     }
-    catch(Exception e)
-    {
-      debugException("getBaseDN", changeNumber, e);
-      return null;
-    }
     finally
     {
       close(cursor);
     }
   }
-
-  private void debugException(String methodName, long changeNumber, Exception e)
-  {
-    if (debugEnabled())
-      TRACER.debugInfo("In DraftCNDbHandler." + methodName + "(), read: "
-          + " key=" + changeNumber + " value returned is null"
-          + " first="+ db.readFirstChangeNumber()
-          + " last=" + db.readLastChangeNumber()
-          + " count=" + db.count()
-          + " exception " + e + " " + e.getMessage());
-  }
 }

--
Gitblit v1.10.0