From c56828ecb3b656bb531d313b722bd572eeb10905 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 08 Oct 2013 05:45:15 +0000
Subject: [PATCH] OPENDJ-1116 Introduce abstraction for the changelog DB

---
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java |  380 +++++++++++++++++++++++++-----------------------------
 1 files changed, 177 insertions(+), 203 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 1f44467..19f3dc7 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -31,7 +31,6 @@
 import java.util.*;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
-import java.util.zip.DataFormatException;
 
 import org.opends.messages.Category;
 import org.opends.messages.Message;
@@ -317,7 +316,7 @@
     }
   }
 
-  private String clDomCtxtsToString(String msg)
+  private String domaimCtxtsToString(String msg)
   {
     StringBuilder buffer = new StringBuilder();
     buffer.append(msg).append("\n");
@@ -483,17 +482,12 @@
 
   /**
    * Wait receiving the StartSessionMsg from the remote DS and process it.
+   *
    * @return the startSessionMsg received
-   * @throws DirectoryException
-   * @throws IOException
-   * @throws ClassNotFoundException
-   * @throws DataFormatException
-   * @throws NotSupportedOldVersionPDUException
+   * @throws Exception
    */
   private StartECLSessionMsg waitAndProcessStartSessionECLFromRemoteServer()
-  throws DirectoryException, IOException, ClassNotFoundException,
-  DataFormatException,
-  NotSupportedOldVersionPDUException
+      throws Exception
   {
     ReplicationMsg msg = session.receive();
 
@@ -505,9 +499,8 @@
     }
     else if (!(msg instanceof StartECLSessionMsg))
     {
-      Message message = Message
-          .raw("Protocol error: StartECLSessionMsg required." + msg
-              + " received.");
+      Message message = Message.raw(
+          "Protocol error: StartECLSessionMsg required." + msg + " received.");
       abortStart(message);
       return null;
     }
@@ -576,7 +569,7 @@
    * @param startChangeNumber
    *          the start change number coming from the request filter.
    * @return the cookie corresponding to the passed in startChangeNumber.
-   * @throws Exception
+   * @throws ChangelogException
    *           if a database problem occurred
    * @throws DirectoryException
    *           if a database problem occurred
@@ -677,170 +670,9 @@
   private void initializeChangelogDomainCtxts(String providedCookie,
       boolean allowUnknownDomains) throws DirectoryException
   {
-    /*
-    This map is initialized from the providedCookie.
-    Below, it will be traversed and each domain configured with ECL will be
-    checked and removed from the map.
-    At the end, normally the map should be empty.
-    Depending on allowUnknownDomains provided flag, a non empty map will
-    be considered as an error when allowUnknownDomains is false.
-    */
-    Map<DN, ServerState> startStatesFromProvidedCookie =
-        new HashMap<DN, ServerState>();
-
-    ReplicationServer rs = this.replicationServer;
-
-    // Parse the provided cookie and overwrite startState from it.
-    if ((providedCookie != null) && (providedCookie.length()!=0))
-      startStatesFromProvidedCookie =
-        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
-
     try
     {
-      // Creates the table that will contain the real-time info for each
-      // and every domain.
-      final Set<DomainContext> tmpSet = new HashSet<DomainContext>();
-      final StringBuilder missingDomains = new StringBuilder();
-      for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
-      {
-        // skip the 'unreal' changelog domain
-        if (domain == this.replicationServerDomain)
-          continue;
-
-        // skip the excluded domains
-        if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
-        {
-          // this is an excluded domain
-          if (allowUnknownDomains)
-            startStatesFromProvidedCookie.remove(domain.getBaseDN());
-          continue;
-        }
-
-        // skip unused domains
-        final ServerState latestServerState = domain.getLatestServerState();
-        if (latestServerState.isEmpty())
-          continue;
-
-        // Creates the new domain context
-        final DomainContext newDomainCtxt = new DomainContext();
-        newDomainCtxt.active = true;
-        newDomainCtxt.rsDomain = domain;
-        newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
-
-        // Assign the start state for the domain
-        if (isPersistent == PERSISTENT_CHANGES_ONLY)
-        {
-          newDomainCtxt.startState = latestServerState;
-          startStatesFromProvidedCookie.remove(domain.getBaseDN());
-        }
-        else
-        {
-          // let's take the start state for this domain from the provided
-          // cookie
-          newDomainCtxt.startState =
-              startStatesFromProvidedCookie.remove(domain.getBaseDN());
-
-          if (providedCookie == null
-              || providedCookie.length() == 0
-              || allowUnknownDomains)
-          {
-            // when there is no cookie provided in the request,
-            // let's start traversing this domain from the beginning of
-            // what we have in the replication changelog
-            if (newDomainCtxt.startState == null)
-            {
-              CSN latestTrimCSN =
-                  new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
-              newDomainCtxt.startState =
-                  domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
-            }
-          }
-          else
-          {
-            // when there is a cookie provided in the request,
-            if (newDomainCtxt.startState == null)
-            {
-              missingDomains.append(domain.getBaseDN()).append(":;");
-              continue;
-            }
-            else if (!newDomainCtxt.startState.isEmpty())
-            {
-              if (hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
-              {
-                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-                    ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
-                      newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
-              }
-            }
-          }
-
-          newDomainCtxt.stopState = latestServerState;
-        }
-        newDomainCtxt.currentState = new ServerState();
-
-        // Creates an unconnected SH for the domain
-        MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
-        mh.setInitialServerState(newDomainCtxt.startState);
-        mh.setBaseDNAndDomain(domain.getBaseDN(), false);
-        // register the unconnected into the domain
-        domain.registerHandler(mh);
-        newDomainCtxt.mh = mh;
-
-        previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
-                              newDomainCtxt.startState);
-
-        // store the new context
-        tmpSet.add(newDomainCtxt);
-      }
-
-      if (missingDomains.length()>0)
-      {
-        // If there are domain missing in the provided cookie,
-        // the request is rejected and a full resync is required.
-        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
-              missingDomains,
-              "<" + providedCookie + missingDomains + ">"));
-      }
-
-      domainCtxts = tmpSet;
-
-      /*
-      When it is valid to have the provided cookie containing unknown domains
-      (allowUnknownDomains is true), 2 cases must be considered :
-      - if the cookie contains a domain that is replicated but where
-      ECL is disabled, then this case is considered above
-      - if the cookie contains a domain that is even not replicated
-      then this case need to be considered here in another loop.
-      */
-      if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
-      {
-        for (DN providedDomain : startStatesFromProvidedCookie.keySet())
-          if (rs.getReplicationServerDomain(providedDomain) == null)
-            // the domain provided in the cookie is not replicated
-            startStatesFromProvidedCookie.remove(providedDomain);
-      }
-
-      // Now do the final checking
-      if (!startStatesFromProvidedCookie.isEmpty())
-      {
-        /*
-        After reading all the known domains from the provided cookie, there
-        is one (or several) domain that are not currently configured.
-        This domain has probably been removed or replication disabled on it.
-        The request is rejected and full resync is required.
-        */
-        StringBuilder sb = new StringBuilder();
-        for (DomainContext domainCtxt : domainCtxts) {
-          sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
-            .append(domainCtxt.startState).append(";");
-        }
-        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-            ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
-                startStatesFromProvidedCookie.toString() ,sb.toString()));
-      }
-
-      // the next record from the CNIndexDB should be the one
+      domainCtxts = buildDomainContexts(providedCookie, allowUnknownDomains);
       startCookie = providedCookie;
 
       // Initializes each and every domain with the next(first) eligible message
@@ -867,8 +699,163 @@
               e);
     }
     if (debugEnabled())
-      TRACER.debugInfo(
-        " initializeCLDomCtxts ends with " + " " + dumpState());
+      TRACER.debugInfo("initializeChangelogDomainCtxts() ends with "
+          + dumpState());
+  }
+
+  private Set<DomainContext> buildDomainContexts(String providedCookie,
+      boolean allowUnknownDomains) throws DirectoryException
+  {
+    final Set<DomainContext> results = new HashSet<DomainContext>();
+    final ReplicationServer rs = this.replicationServer;
+
+    /*
+    This map is initialized from the providedCookie.
+    Below, it will be traversed and each domain configured with ECL will be
+    checked and removed from the map.
+    At the end, normally the map should be empty.
+    Depending on allowUnknownDomains provided flag, a non empty map will
+    be considered as an error when allowUnknownDomains is false.
+    */
+    final Map<DN, ServerState> startStatesFromProvidedCookie =
+        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
+
+    final StringBuilder missingDomains = new StringBuilder();
+    for (ReplicationServerDomain domain : toIterable(rs.getDomainIterator()))
+    {
+      // skip the 'unreal' changelog domain
+      if (domain == this.replicationServerDomain)
+        continue;
+
+      // skip the excluded domains
+      if (excludedBaseDNs.contains(domain.getBaseDN().toNormalizedString()))
+      {
+        // this is an excluded domain
+        if (allowUnknownDomains)
+          startStatesFromProvidedCookie.remove(domain.getBaseDN());
+        continue;
+      }
+
+      // skip unused domains
+      final ServerState latestServerState = domain.getLatestServerState();
+      if (latestServerState.isEmpty())
+        continue;
+
+
+      // Creates the new domain context
+      final DomainContext newDomainCtxt = new DomainContext();
+      newDomainCtxt.active = true;
+      newDomainCtxt.rsDomain = domain;
+      newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
+
+      // Assign the start state for the domain
+      if (isPersistent == PERSISTENT_CHANGES_ONLY)
+      {
+        newDomainCtxt.startState = latestServerState;
+        startStatesFromProvidedCookie.remove(domain.getBaseDN());
+      }
+      else
+      {
+        // let's take the start state for this domain from the provided
+        // cookie
+        newDomainCtxt.startState =
+            startStatesFromProvidedCookie.remove(domain.getBaseDN());
+
+        if (providedCookie == null || providedCookie.length() == 0
+            || allowUnknownDomains)
+        {
+          // when there is no cookie provided in the request,
+          // let's start traversing this domain from the beginning of
+          // what we have in the replication changelog
+          if (newDomainCtxt.startState == null)
+          {
+            CSN latestTrimCSN =
+                new CSN(newDomainCtxt.domainLatestTrimDate, 0, 0);
+            newDomainCtxt.startState =
+                domain.getStartState().duplicateOnlyOlderThan(latestTrimCSN);
+          }
+        }
+        else
+        {
+          // when there is a cookie provided in the request,
+          if (newDomainCtxt.startState == null)
+          {
+            missingDomains.append(domain.getBaseDN()).append(":;");
+            continue;
+          }
+          else if (!newDomainCtxt.startState.isEmpty()
+              && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
+          {
+            throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+                ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
+                    newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
+          }
+        }
+
+        newDomainCtxt.stopState = latestServerState;
+      }
+      newDomainCtxt.currentState = new ServerState();
+
+      // Creates an unconnected SH for the domain
+      MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
+      mh.setInitialServerState(newDomainCtxt.startState);
+      mh.setBaseDNAndDomain(domain.getBaseDN(), false);
+      // register the unconnected into the domain
+      domain.registerHandler(mh);
+      newDomainCtxt.mh = mh;
+
+      previousCookie.update(newDomainCtxt.rsDomain.getBaseDN(),
+                            newDomainCtxt.startState);
+
+      results.add(newDomainCtxt);
+    }
+
+    if (missingDomains.length()>0)
+    {
+      // If there are domain missing in the provided cookie,
+      // the request is rejected and a full resync is required.
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+        ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+            missingDomains,
+            "<" + providedCookie + missingDomains + ">"));
+    }
+
+    /*
+    When it is valid to have the provided cookie containing unknown domains
+    (allowUnknownDomains is true), 2 cases must be considered :
+    - if the cookie contains a domain that is replicated but where
+    ECL is disabled, then this case is considered above
+    - if the cookie contains a domain that is even not replicated
+    then this case need to be considered here in another loop.
+    */
+    if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
+    {
+      for (DN providedDomain : startStatesFromProvidedCookie.keySet())
+        if (rs.getReplicationServerDomain(providedDomain) == null)
+          // the domain provided in the cookie is not replicated
+          startStatesFromProvidedCookie.remove(providedDomain);
+    }
+
+    // Now do the final checking
+    if (!startStatesFromProvidedCookie.isEmpty())
+    {
+      /*
+      After reading all the known domains from the provided cookie, there
+      is one (or several) domain that are not currently configured.
+      This domain has probably been removed or replication disabled on it.
+      The request is rejected and full resync is required.
+      */
+      StringBuilder sb = new StringBuilder();
+      for (DomainContext domainCtxt : domainCtxts) {
+        sb.append(domainCtxt.rsDomain.getBaseDN()).append(":")
+          .append(domainCtxt.startState).append(";");
+      }
+      throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+          ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+              startStatesFromProvidedCookie.toString() ,sb.toString()));
+    }
+
+    return results;
   }
 
   private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
@@ -1039,6 +1026,7 @@
       try
       {
         // Disable timeout for next communications
+        // FIXME: why? and where is it reset?
         session.setSoTimeout(0);
       }
       catch(Exception e) { /* do nothing */ }
@@ -1046,18 +1034,15 @@
       // sendWindow MUST be created before starting the writer
       sendWindow = new Semaphore(sendWindowSize);
 
-      // create reader
       reader = new ServerReader(session, this);
       reader.start();
 
       if (writer == null)
       {
-        // create writer
         writer = new ECLServerWriter(session,this,replicationServerDomain);
         writer.start();
       }
 
-      // Resume the writer
       ((ECLServerWriter)writer).resumeWriter();
 
       // TODO:ECL Potential race condition if writer not yet resumed here
@@ -1073,7 +1058,7 @@
     if (debugEnabled())
       TRACER.debugInfo(getClass().getCanonicalName() + " " + operationId
           + " initialized: " + " " + dumpState() + " " + " "
-          + clDomCtxtsToString(""));
+          + domaimCtxtsToString(""));
   }
 
   private void initializeChangelogSearch(StartECLSessionMsg msg)
@@ -1136,18 +1121,17 @@
   @Override
   protected UpdateMsg getNextMessage(boolean synchronous)
   {
-    UpdateMsg msg = null;
     try
     {
       ECLUpdateMsg eclMsg = getNextECLUpdate();
-      if (eclMsg!=null)
-        msg = eclMsg.getUpdateMsg();
+      if (eclMsg != null)
+        return eclMsg.getUpdateMsg();
     }
     catch(DirectoryException de)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, de);
     }
-    return msg;
+    return null;
   }
 
   /**
@@ -1187,7 +1171,7 @@
       while (continueLooping && searchPhase == INIT_PHASE)
       {
         // Step 1 & 2
-        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
+        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
         if (oldestContext == null)
         { // there is no oldest change to process
           closeInitPhase();
@@ -1196,7 +1180,6 @@
           return null;
         }
 
-        // Build the ECLUpdateMsg to be returned
         final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
 
         // Default is not to loop, with one exception
@@ -1222,8 +1205,6 @@
         }
         if (oldestContext.active)
         {
-          // populates the table with the next eligible msg from iDom
-          // in non blocking mode, return null when no more eligible msg
           oldestContext.computeNextEligibleMessageForDomain(operationId);
         }
         oldestChange = change;
@@ -1232,7 +1213,7 @@
       if (searchPhase == PERSISTENT_PHASE)
       {
         if (debugEnabled())
-          TRACER.debugInfo(clDomCtxtsToString(
+          TRACER.debugInfo(domaimCtxtsToString(
               "In getNextECLUpdate (persistent): "
                   + "looking for the generalized oldest change"));
 
@@ -1240,12 +1221,11 @@
           domainCtxt.computeNextEligibleMessageForDomain(operationId);
         }
 
-        DomainContext oldestContext = findOldestChangeFromDomainCtxts();
+        final DomainContext oldestContext = findDomainCtxtWithOldestChange();
         if (oldestContext != null)
         {
           final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
           oldestContext.currentState.update(change.getUpdateMsg().getCSN());
-
           if (draftCompat)
           {
             assignNewChangeNumberAndStore(change);
@@ -1269,16 +1249,12 @@
       if (debugEnabled())
         TRACER.debugInfo("getNextECLUpdate updates previousCookie:" + csn);
 
-      // Update the current state
       previousCookie.update(oldestChange.getBaseDN(), csn);
-
-      // Set the current value of global state in the returned message
       oldestChange.setCookie(previousCookie);
 
       if (debugEnabled())
         TRACER.debugInfo("getNextECLUpdate returns result oldestChange="
             + oldestChange);
-
     }
     return oldestChange;
   }
@@ -1327,8 +1303,6 @@
         return true;
       }
 
-
-      // the next change from the CNIndexDB
       final CNIndexRecord currentRecord = cnIndexDBCursor.getRecord();
       final CSN csnFromCNIndexDB = currentRecord.getCSN();
       final DN dnFromCNIndexDB = currentRecord.getBaseDN();
@@ -1454,7 +1428,7 @@
       searchPhase = UNDEFINED_PHASE;
     }
 
-    // End of INIT_PHASE => always release the iterator
+    // End of INIT_PHASE => always release the cursor
     releaseCursor();
   }
 
@@ -1464,13 +1438,13 @@
    * @return the domainCtxt of the domain with the oldest change, null when
    *         none.
    */
-  private DomainContext findOldestChangeFromDomainCtxts()
+  private DomainContext findDomainCtxtWithOldestChange()
   {
     DomainContext oldestCtxt = null;
     for (DomainContext domainCtxt : domainCtxts)
     {
       if (domainCtxt.active
-          // .msg is null when the previous (non blocking) nextMessage did
+          // .nextMsg is null when the previous (non blocking) nextMessage did
           // not have any eligible msg to return
           && domainCtxt.nextMsg != null
           && (oldestCtxt == null
@@ -1482,7 +1456,7 @@
 
     if (debugEnabled())
       TRACER.debugInfo("In cn=changelog," + this
-          + " getOldestChangeFromDomainCtxts() returns "
+          + " findDomainCtxtWithOldestChange() returns "
           + ((oldestCtxt != null) ? oldestCtxt.nextMsg : "-1"));
 
     return oldestCtxt;

--
Gitblit v1.10.0