From 84cf626ebcae1b535abe9efd3eed5cdf78bdd319 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 05 Sep 2013 07:51:54 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java |   97 ++++++++++++++++++++++++------------------------
 1 files changed, 49 insertions(+), 48 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index e0b585d..ee5f785 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -41,7 +41,10 @@
 import org.opends.server.replication.common.ServerState;
 import org.opends.server.replication.common.ServerStatus;
 import org.opends.server.replication.protocol.*;
-import org.opends.server.replication.server.changelog.api.*;
+import org.opends.server.replication.server.changelog.api.CNIndexData;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDB;
+import org.opends.server.replication.server.changelog.api.ChangeNumberIndexDBCursor;
+import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.types.*;
 import org.opends.server.util.ServerConstants;
 
@@ -71,7 +74,7 @@
   /**
    * Specifies the last changer number requested.
    */
-  private int lastChangeNumber = 0;
+  private long lastChangeNumber = 0;
   /**
    * Specifies whether the change number db has been read until its end.
    */
@@ -522,7 +525,7 @@
    * @throws DirectoryException
    *           When an error is raised.
    */
-  private void initializeCLSearchFromChangeNumber(int startChangeNumber)
+  private void initializeCLSearchFromChangeNumber(long startChangeNumber)
       throws DirectoryException
   {
     try
@@ -535,13 +538,13 @@
     catch(DirectoryException de)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, de);
-      releaseIterator();
+      releaseCursor();
       throw de;
     }
     catch(Exception e)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      releaseIterator();
+      releaseCursor();
       throw new DirectoryException(
           ResultCode.OPERATIONS_ERROR,
           Message.raw(Category.SYNC,
@@ -561,9 +564,8 @@
    * @throws DirectoryException
    *           if a database problem occurred
    */
-  private String findCookie(final int startChangeNumber)
-      throws ChangelogException,
-      DirectoryException
+  private String findCookie(final long startChangeNumber)
+      throws ChangelogException, DirectoryException
   {
     final ChangeNumberIndexDB cnIndexDB =
         replicationServer.getChangeNumberIndexDB();
@@ -581,9 +583,9 @@
         return null;
       }
 
-      final long firstChangeNumber = cnIndexDB.getFirstChangeNumber();
-      final String crossDomainStartState =
-          cnIndexDB.getPreviousCookie(firstChangeNumber);
+      final CNIndexData firstCNData = cnIndexDB.getFirstCNIndexData();
+      final long firstChangeNumber = firstCNData.getChangeNumber();
+      final String crossDomainStartState = firstCNData.getPreviousCookie();
       cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
       return crossDomainStartState;
     }
@@ -591,11 +593,11 @@
     // Request filter DOES contain a startChangeNumber
 
     // Read the draftCNDb to see whether it contains startChangeNumber
-    String crossDomainStartState =
-        cnIndexDB.getPreviousCookie(startChangeNumber);
-    if (crossDomainStartState != null)
+    CNIndexData startCNData = cnIndexDB.getCNIndexData(startChangeNumber);
+    if (startCNData != null)
     {
       // found the provided startChangeNumber, let's return it
+      final String crossDomainStartState = startCNData.getPreviousCookie();
       cnIndexDBCursor = cnIndexDB.getCursorFrom(startChangeNumber);
       return crossDomainStartState;
     }
@@ -615,9 +617,10 @@
     // the DB, let's use the lower limit.
     if (startChangeNumber < firstChangeNumber)
     {
-      crossDomainStartState = cnIndexDB.getPreviousCookie(firstChangeNumber);
-      if (crossDomainStartState != null)
+      CNIndexData firstCNData = cnIndexDB.getCNIndexData(firstChangeNumber);
+      if (firstCNData != null)
       {
+        final String crossDomainStartState = firstCNData.getPreviousCookie();
         cnIndexDBCursor = cnIndexDB.getCursorFrom(firstChangeNumber);
         return crossDomainStartState;
       }
@@ -636,8 +639,9 @@
         return null;
       }
 
-      final long lastKey = cnIndexDB.getLastChangeNumber();
-      crossDomainStartState = cnIndexDB.getPreviousCookie(lastKey);
+      final CNIndexData lastCNData = cnIndexDB.getLastCNIndexData();
+      final long lastKey = lastCNData.getChangeNumber();
+      final String crossDomainStartState = lastCNData.getPreviousCookie();
       cnIndexDBCursor = cnIndexDB.getCursorFrom(lastKey);
       return crossDomainStartState;
 
@@ -897,7 +901,7 @@
   {
     if (debugEnabled())
       TRACER.debugInfo(this + " shutdown()");
-    releaseIterator();
+    releaseCursor();
     for (DomainContext domainCtxt : domainCtxts) {
       if (!domainCtxt.unRegisterHandler()) {
         logError(Message.raw(Category.SYNC, Severity.NOTICE,
@@ -910,7 +914,7 @@
     domainCtxts = null;
   }
 
-  private void releaseIterator()
+  private void releaseCursor()
   {
     if (this.cnIndexDBCursor != null)
     {
@@ -1256,13 +1260,10 @@
         oldestContext.currentState.update(
             change.getUpdateMsg().getCSN());
 
-        if (oldestContext.currentState.cover(oldestContext.stopState))
-        {
-          oldestContext.active = false;
-        }
-        if (draftCompat
-            && lastChangeNumber > 0
-            && change.getChangeNumber() > lastChangeNumber)
+        if (oldestContext.currentState.cover(oldestContext.stopState)
+            || (draftCompat
+                && lastChangeNumber > 0
+                && change.getChangeNumber() > lastChangeNumber))
         {
           oldestContext.active = false;
         }
@@ -1278,8 +1279,9 @@
       if (searchPhase == PERSISTENT_PHASE)
       {
         if (debugEnabled())
-          clDomCtxtsToString("In getNextECLUpdate (persistent): " +
-          "looking for the generalized oldest change");
+          TRACER.debugInfo(clDomCtxtsToString(
+              "In getNextECLUpdate (persistent): "
+                  + "looking for the generalized oldest change"));
 
         for (DomainContext domainCtxt : domainCtxts) {
           domainCtxt.getNextEligibleMessageForDomain(operationId);
@@ -1300,7 +1302,7 @@
 
           if (draftCompat)
           {
-            assignNewDraftCNAndStore(change);
+            assignNewChangeNumberAndStore(change);
           }
           oldestChange = change;
         }
@@ -1317,21 +1319,19 @@
 
     if (oldestChange != null)
     {
+      final CSN csn = oldestChange.getUpdateMsg().getCSN();
       if (debugEnabled())
-        TRACER.debugInfo("getNextECLUpdate updates previousCookie:"
-          + oldestChange.getUpdateMsg().getCSN());
+        TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
 
       // Update the current state
-      previousCookie.update(
-          oldestChange.getBaseDN(),
-          oldestChange.getUpdateMsg().getCSN());
+      previousCookie.update(oldestChange.getBaseDN(), csn);
 
       // Set the current value of global state in the returned message
       oldestChange.setCookie(previousCookie);
 
       if (debugEnabled())
-        TRACER.debugInfo("getNextECLUpdate returns result oldest change =" +
-                oldestChange);
+        TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
+            + oldestChange);
 
     }
     return oldestChange;
@@ -1370,14 +1370,15 @@
       if (isEndOfCNIndexDBReached)
       {
         // we are at the end of the DraftCNdb in the append mode
-        assignNewDraftCNAndStore(oldestChange);
+        assignNewChangeNumberAndStore(oldestChange);
         return true;
       }
 
 
       // the next change from the CNIndexDB
-      CSN csnFromDraftCNDb = cnIndexDBCursor.getCSN();
-      String dnFromDraftCNDb = cnIndexDBCursor.getBaseDN();
+      final CNIndexData cnIndexData = cnIndexDBCursor.getCNIndexData();
+      final CSN csnFromDraftCNDb = cnIndexData.getCSN();
+      final String dnFromDraftCNDb = cnIndexData.getBaseDN();
 
       if (debugEnabled())
         TRACER.debugInfo("assignChangeNumber() generating change number "
@@ -1392,10 +1393,10 @@
       {
         if (debugEnabled())
           TRACER.debugInfo("assignChangeNumber() generating change number "
-              + " assigning changeNumber=" + cnIndexDBCursor.getChangeNumber()
+              + " assigning changeNumber=" + cnIndexData.getChangeNumber()
               + " to change=" + oldestChange);
 
-        oldestChange.setChangeNumber(cnIndexDBCursor.getChangeNumber());
+        oldestChange.setChangeNumber(cnIndexData.getChangeNumber());
         return true;
       }
 
@@ -1429,8 +1430,8 @@
 
         if (debugEnabled())
           TRACER.debugInfo("assignChangeNumber() generating change number has"
-              + "skipped to  changeNumber=" + cnIndexDBCursor.getChangeNumber()
-              + " csn=" + cnIndexDBCursor.getCSN() + " End of CNIndexDB ?"
+              + "skipped to  changeNumber=" + cnIndexData.getChangeNumber()
+              + " csn=" + cnIndexData.getCSN() + " End of CNIndexDB ?"
               + isEndOfCNIndexDBReached);
       }
       catch (ChangelogException e)
@@ -1452,7 +1453,7 @@
     return sameDN && sameCSN;
   }
 
-  private void assignNewDraftCNAndStore(ECLUpdateMsg change)
+  private void assignNewChangeNumberAndStore(ECLUpdateMsg change)
       throws DirectoryException, ChangelogException
   {
     // generate a new change number and assign to this change
@@ -1460,11 +1461,11 @@
 
     // store in CNIndexDB the pair
     // (change number of the current change, state before this change)
-    replicationServer.getChangeNumberIndexDB().add(
+    replicationServer.getChangeNumberIndexDB().add(new CNIndexData(
         change.getChangeNumber(),
         previousCookie.toString(),
         change.getBaseDN(),
-        change.getUpdateMsg().getCSN());
+        change.getUpdateMsg().getCSN()));
   }
 
   /**
@@ -1499,7 +1500,7 @@
     }
 
     // End of INIT_PHASE => always release the iterator
-    releaseIterator();
+    releaseCursor();
   }
 
   /**

--
Gitblit v1.10.0