From c64a67b3d0b51743d9f2a2bf110cb365b8b104af Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 26 Aug 2013 08:41:18 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ReplicationServer.java |  168 ++++++++++++++++++++++++++-----------------------------
 1 files changed, 80 insertions(+), 88 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 01afb43..7b16799 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -52,6 +52,7 @@
 import org.opends.server.replication.common.*;
 import org.opends.server.replication.plugin.MultimasterReplication;
 import org.opends.server.replication.protocol.*;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
 import org.opends.server.replication.server.changelog.api.ChangelogException;
 import org.opends.server.replication.server.changelog.je.DbHandler;
 import org.opends.server.replication.server.changelog.je.DraftCNDbHandler;
@@ -65,6 +66,7 @@
 import static org.opends.messages.ReplicationMessages.*;
 import static org.opends.server.loggers.ErrorLogger.*;
 import static org.opends.server.loggers.debug.DebugLogger.*;
+import static org.opends.server.types.ResultCode.*;
 import static org.opends.server.util.ServerConstants.*;
 import static org.opends.server.util.StaticUtils.*;
 
@@ -143,23 +145,22 @@
   private long monitoringPublisherPeriod = 3000;
 
   /**
-   * The handler of the draft change numbers database, the database used to
-   * store the relation between a draft change number ('seqnum') and the
-   * associated cookie.
+   * The handler of the changelog database, the database stores the relation
+   * between a draft change number ('seqnum') and the associated cookie.
    * <p>
-   * Guarded by draftCNLock
+   * Guarded by changelogDBLock
    */
-  private DraftCNDbHandler draftCNDbHandler;
+  private ChangelogDB changelogDB;
 
   /**
    * The last value generated of the draft change number.
    * <p>
-   * Guarded by draftCNLock
+   * Guarded by changelogDBLock
    **/
   private int lastGeneratedDraftCN = 0;
 
-  /** Used for protecting draft CN related state. */
-  private final Object draftCNLock = new Object();
+  /** Used for protecting changelogDB related state. */
+  private final Object changelogDBLock = new Object();
 
   /**
    * The tracer object for the debug logger.
@@ -183,7 +184,7 @@
   private long domainTicket = 0L;
 
   /** BaseDNs excluded for ECL. */
-  private Collection<String> excludedBaseDNs = new ArrayList<String>();
+  private Set<String> excludedBaseDNs = new HashSet<String>();
 
   /**
    * The weight affected to the replication server.
@@ -470,7 +471,7 @@
 
   private Set<String> getConnectedRSUrls(ReplicationServerDomain domain)
   {
-    Set<String> results = new LinkedHashSet<String>();
+    Set<String> results = new HashSet<String>();
     for (ReplicationServerHandler rsHandler : domain.getConnectedRSs().values())
     {
       results.add(normalizeServerURL(rsHandler.getServerAddressURL()));
@@ -714,11 +715,11 @@
       eclwe.finalizeWorkflowElement();
     }
 
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
-      if (draftCNDbHandler != null)
+      if (changelogDB != null)
       {
-        draftCNDbHandler.shutdown();
+        changelogDB.shutdown();
       }
     }
   }
@@ -900,42 +901,39 @@
     {
       dbEnv.clearGenerationId(baseDn);
     }
-    catch (Exception e)
+    catch (Exception ignored)
     {
-      // Ignore.
       if (debugEnabled())
       {
-        TRACER.debugCaught(DebugLogLevel.WARNING, e);
+        TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
       }
     }
 
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
-      if (draftCNDbHandler != null)
+      if (changelogDB != null)
       {
         try
         {
-          draftCNDbHandler.clear(baseDn);
+          changelogDB.clear(baseDn);
         }
-        catch (Exception e)
+        catch (Exception ignored)
         {
-          // Ignore.
           if (debugEnabled())
           {
-            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
           }
         }
 
         try
         {
-          lastGeneratedDraftCN = draftCNDbHandler.getLastKey();
+          lastGeneratedDraftCN = changelogDB.getLastKey();
         }
-        catch (Exception e)
+        catch (Exception ignored)
         {
-          // Ignore.
           if (debugEnabled())
           {
-            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
           }
         }
       }
@@ -1352,12 +1350,10 @@
   public void processExportBegin(Backend backend, LDIFExportConfig config)
   {
     if (debugEnabled())
-      TRACER.debugInfo("RS " +getMonitorInstanceName()+
-          " Export starts");
+      TRACER.debugInfo("RS " + getMonitorInstanceName() + " Export starts");
     if (backend.getBackendID().equals(backendId))
     {
       // Retrieves the backend related to this replicationServerDomain
-      // backend =
       ReplicationBackend b =
       (ReplicationBackend)DirectoryServer.getBackend(backendId);
       b.setServer(this);
@@ -1394,38 +1390,36 @@
       rsd.clearDbs();
     }
 
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
-      if (draftCNDbHandler != null)
+      if (changelogDB != null)
       {
         try
         {
-          draftCNDbHandler.clear();
+          changelogDB.clear();
         }
-        catch (Exception e)
+        catch (Exception ignored)
         {
-          // Ignore.
           if (debugEnabled())
           {
-            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
           }
         }
 
         try
         {
-          draftCNDbHandler.shutdown();
+          changelogDB.shutdown();
         }
-        catch (Exception e)
+        catch (Exception ignored)
         {
-          // Ignore.
           if (debugEnabled())
           {
-            TRACER.debugCaught(DebugLogLevel.WARNING, e);
+            TRACER.debugCaught(DebugLogLevel.WARNING, ignored);
           }
         }
 
         lastGeneratedDraftCN = 0;
-        draftCNDbHandler = null;
+        changelogDB = null;
       }
     }
   }
@@ -1614,67 +1608,70 @@
     ChangeNumber eligibleCN = null;
     for (ReplicationServerDomain domain : getReplicationServerDomains())
     {
-      if ((excludedBaseDNs != null) &&
-          excludedBaseDNs.contains(domain.getBaseDn()))
+      if (contains(excludedBaseDNs, domain.getBaseDn()))
         continue;
 
-      ChangeNumber domainEligibleCN = domain.getEligibleCN();
-      String dates = "";
-      if (domainEligibleCN != null)
+      final ChangeNumber domainEligibleCN = domain.getEligibleCN();
+      if (eligibleCN == null
+          || (domainEligibleCN != null && domainEligibleCN.older(eligibleCN)))
       {
-        if ((eligibleCN == null) || (domainEligibleCN.older(eligibleCN)))
-        {
-          eligibleCN = domainEligibleCN;
-        }
-        dates = new Date(domainEligibleCN.getTime()).toString();
+        eligibleCN = domainEligibleCN;
       }
-      debugLog += "[dn=" + domain.getBaseDn()
-           + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
+
+      if (debugEnabled())
+      {
+        final String dates = domainEligibleCN == null ?
+            "" : new Date(domainEligibleCN.getTime()).toString();
+        debugLog += "[baseDN=" + domain.getBaseDn()
+            + "] [eligibleCN=" + domainEligibleCN + ", " + dates + "]";
+      }
     }
 
-    if (eligibleCN==null)
+    if (eligibleCN==null )
     {
       eligibleCN = new ChangeNumber(TimeThread.getTime(), 0, 0);
     }
 
-    if (debugEnabled())
+    if (debugEnabled()) {
       TRACER.debugInfo("In " + this + " getEligibleCN() ends with " +
         " the following domainEligibleCN for each domain :" + debugLog +
         " thus CrossDomainEligibleCN=" + eligibleCN +
         "  ts=" + new Date(eligibleCN.getTime()).toString());
-
+    }
     return eligibleCN;
   }
 
-
+  private boolean contains(Set<String> col, String elem)
+  {
+    return col != null && col.contains(elem);
+  }
 
   /**
-   * Get or create a handler on a Db on DraftCN for external changelog.
+   * Get (or create) a handler on the ChangelogDB for external changelog.
    *
    * @return the handler.
    * @throws DirectoryException
    *           when needed.
    */
-  public DraftCNDbHandler getDraftCNDbHandler() throws DirectoryException
+  public ChangelogDB getChangelogDB() throws DirectoryException
   {
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
       try
       {
-        if (draftCNDbHandler == null)
+        if (changelogDB == null)
         {
-          draftCNDbHandler = new DraftCNDbHandler(this, this.dbEnv);
+          changelogDB = new DraftCNDbHandler(this, this.dbEnv);
           lastGeneratedDraftCN = getLastDraftChangeNumber();
         }
-        return draftCNDbHandler;
+        return changelogDB;
       }
       catch (Exception e)
       {
         TRACER.debugCaught(DebugLogLevel.ERROR, e);
         MessageBuilder mb = new MessageBuilder();
         mb.append(ERR_DRAFT_CHANGENUMBER_DATABASE.get(""));
-        throw new DirectoryException(ResultCode.OPERATIONS_ERROR,
-            mb.toMessage(), e);
+        throw new DirectoryException(OPERATIONS_ERROR, mb.toMessage(), e);
       }
     }
   }
@@ -1685,11 +1682,11 @@
    */
   public int getFirstDraftChangeNumber()
   {
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
-      if (draftCNDbHandler != null)
+      if (changelogDB != null)
       {
-        return draftCNDbHandler.getFirstKey();
+        return changelogDB.getFirstKey();
       }
       return 0;
     }
@@ -1701,11 +1698,11 @@
    */
   public int getLastDraftChangeNumber()
   {
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
-      if (draftCNDbHandler != null)
+      if (changelogDB != null)
       {
-        return draftCNDbHandler.getLastKey();
+        return changelogDB.getLastKey();
       }
       return 0;
     }
@@ -1717,7 +1714,7 @@
    */
   public int getNewDraftCN()
   {
-    synchronized (draftCNLock)
+    synchronized (changelogDBLock)
     {
       return ++lastGeneratedDraftCN;
     }
@@ -1756,12 +1753,11 @@
      */
 
     int lastDraftCN;
-    Boolean dbEmpty = false;
-    Long newestDate = 0L;
-    DraftCNDbHandler draftCNDbH = getDraftCNDbHandler();
+    boolean dbEmpty = false;
+    long newestDate = 0L;
+    ChangelogDB changelogDB = getChangelogDB();
 
-    // Get the first DraftCN from the DraftCNdb
-    int firstDraftCN = draftCNDbH.getFirstKey();
+    int firstDraftCN = changelogDB.getFirstKey();
     Map<String,ServerState> domainsServerStateForLastSeqnum = null;
     ChangeNumber changeNumberForLastSeqnum = null;
     String domainForLastSeqnum = null;
@@ -1773,12 +1769,11 @@
     }
     else
     {
-      // Get the last DraftCN from the DraftCNdb
-      lastDraftCN = draftCNDbH.getLastKey();
+      lastDraftCN = changelogDB.getLastKey();
 
       // Get the generalized state associated with the current last DraftCN
       // and initializes from it the startStates table
-      String lastSeqnumGenState = draftCNDbH.getValue(lastDraftCN);
+      String lastSeqnumGenState = changelogDB.getPreviousCookie(lastDraftCN);
       if ((lastSeqnumGenState != null) && (lastSeqnumGenState.length()>0))
       {
         domainsServerStateForLastSeqnum = MultiDomainServerState.
@@ -1786,16 +1781,16 @@
       }
 
       // Get the changeNumber associated with the current last DraftCN
-      changeNumberForLastSeqnum = draftCNDbH.getChangeNumber(lastDraftCN);
+      changeNumberForLastSeqnum = changelogDB.getChangeNumber(lastDraftCN);
 
       // Get the domain associated with the current last DraftCN
-      domainForLastSeqnum = draftCNDbH.getBaseDN(lastDraftCN);
+      domainForLastSeqnum = changelogDB.getBaseDN(lastDraftCN);
     }
 
     // Domain by domain
     for (ReplicationServerDomain rsd : getReplicationServerDomains())
     {
-      if (excludedBaseDNs.contains(rsd.getBaseDn()))
+      if (contains(excludedBaseDNs, rsd.getBaseDn()))
         continue;
 
       // for this domain, have the state in the replchangelog
@@ -1860,15 +1855,12 @@
   {
     disableEligibility(excludedBaseDNs);
 
+    // Initialize start state for all running domains with empty state
     MultiDomainServerState result = new MultiDomainServerState();
-    // Initialize start state for  all running domains with empty state
     for (ReplicationServerDomain rsd : getReplicationServerDomains())
     {
-      if ((excludedBaseDNs != null)
-          && (excludedBaseDNs.contains(rsd.getBaseDn())))
-        continue;
-
-      if (rsd.getDbServerState().isEmpty())
+      if (contains(excludedBaseDNs, rsd.getBaseDn())
+          || rsd.getDbServerState().isEmpty())
         continue;
 
       result.update(rsd.getBaseDn(), rsd.getEligibleState(getEligibleCN()));

--
Gitblit v1.10.0