From 82f5228d84de25cd2ea7d99e9880a8c11971e743 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 07 Oct 2013 14:40:12 +0000
Subject: [PATCH] Code cleanups.

---
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java |  323 +++++++++++++++++++++++------------------------------
 1 files changed, 142 insertions(+), 181 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 1adfaaf..cd9d4a1 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -50,6 +50,7 @@
 import static org.opends.server.loggers.debug.DebugLogger.*;
 import static org.opends.server.replication.protocol.ProtocolVersion.*;
 import static org.opends.server.replication.protocol.StartECLSessionMsg.*;
+import static org.opends.server.util.StaticUtils.*;
 
 /**
  * This class defines a server handler, which handles all interaction with a
@@ -58,6 +59,11 @@
 public final class ECLServerHandler extends ServerHandler
 {
 
+  private static int UNDEFINED_PHASE = 0;
+  /** TODO JNR. */
+  public static int INIT_PHASE = 1;
+  private static int PERSISTENT_PHASE = 2;
+
   /**
    * This is a string identifying the operation, provided by the client part of
    * the ECL, used to help interpretation of messages logged.
@@ -108,6 +114,11 @@
   private CSN eligibleCSN;
 
   /**
+   * The global list of contexts by domain for the search currently processed.
+   */
+  private DomainContext[] domainCtxts = new DomainContext[0];
+
+  /**
    * Provides a string representation of this object.
    * @return the string representation.
    */
@@ -172,8 +183,7 @@
       buffer.append("[ [active=").append(active)
           .append("] [rsd=").append(rsd)
           .append("] [nextMsg=").append(nextMsg).append("(")
-          .append(nextMsg != null ?
-          new Date(nextMsg.getCSN().getTime()).toString():"")
+          .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
           .append(")")
           .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
           .append("] [startState=").append(startState)
@@ -183,16 +193,15 @@
     }
 
     /**
-     * Get the next message eligible regarding
-     * the crossDomain eligible CSN. Put it in the context table.
-     * @param opid The operation id.
+     * Computes the next message eligible regarding the crossDomain eligible
+     * CSN.
+     *
+     * @param opId The operation id.
      */
-    private void getNextEligibleMessageForDomain(String opid)
+    private void computeNextEligibleMessageForDomain(String opId)
     {
       if (debugEnabled())
-        TRACER.debugInfo(" In ECLServerHandler, for " + mh.getBaseDNString() +
-          " getNextEligibleMessageForDomain(" + opid+ ") "
-          + "ctxt=" + toString());
+        debugInfo(opId, "ctxt=" + this);
 
       assert(nextMsg == null);
       try
@@ -202,22 +211,15 @@
         // not eligible
         if (nextNonEligibleMsg != null)
         {
-          boolean hasBecomeEligible =
-            (nextNonEligibleMsg.getCSN().getTime()
-                <= eligibleCSN.getTime());
+          final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
 
           if (debugEnabled())
-            TRACER.debugInfo(" In ECLServerHandler, for "
-              + mh.getBaseDNString()
-              + " getNextEligibleMessageForDomain(" + opid + ") "
-              + " stored nonEligibleMsg " + nextNonEligibleMsg
-              + " has now become eligible regarding "
-              + " the eligibleCSN ("+ eligibleCSN
-              + " ):" + hasBecomeEligible);
+            debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
+                + " has now become eligible regarding the eligibleCSN ("
+                + eligibleCSN + " ): " + hasBecomeEligible);
 
           if (hasBecomeEligible)
           {
-            // it is now eligible
             nextMsg = nextNonEligibleMsg;
             nextNonEligibleMsg = null;
           }
@@ -226,50 +228,30 @@
         else
         {
           // Here comes a new message !!!
-          // non blocking
-          UpdateMsg newMsg;
-          do {
-            newMsg = mh.getNextMessage(false);
-            // when the replication changelog is trimmed, the last (latest) chg
-            // is left in the db (whatever its age), and we don't want this chg
-            // to be returned in the external changelog.
-            // So let's check if the chg time is older than the trim date
-          } while ((newMsg!=null) &&
-              (newMsg.getCSN().getTime() < domainLatestTrimDate));
-
-          if (debugEnabled())
-            TRACER.debugInfo(" In ECLServerHandler, for "
-                + mh.getBaseDNString()
-                + " getNextEligibleMessageForDomain(" + opid + ") "
-                + " got new message : "
-                +  " baseDN=[" + mh.getBaseDNString()
-                + "] [newMsg=" + newMsg + "]" + dumpState());
-
-          // in non blocking mode, return null when no more msg
-          if (newMsg != null)
+          final UpdateMsg newMsg = getNextMessage();
+          if (newMsg == null)
           {
-            boolean isEligible = (newMsg.getCSN().getTime()
-                <= eligibleCSN.getTime());
+            return;
+          }
 
           if (debugEnabled())
-              TRACER.debugInfo(" In ECLServerHandler, for "
-                + mh.getBaseDNString()
-                + " getNextEligibleMessageForDomain(" + opid+ ") "
-                + "newMsg isEligible=" + isEligible + " since "
-                + "newMsg=[" + newMsg.getCSN()
-                + " " + new Date(newMsg.getCSN().getTime())
-                + "] eligibleCSN=[" + eligibleCSN
-                + " " + new Date(eligibleCSN.getTime())+"]"
+            debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
                 + dumpState());
 
-            if (isEligible)
-            {
-              nextMsg = newMsg;
-            }
-            else
-            {
-              nextNonEligibleMsg = newMsg;
-            }
+          final boolean isEligible = isEligible(newMsg);
+
+          if (debugEnabled())
+            debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
+                + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
+                + toString(eligibleCSN) + "] " + dumpState());
+
+          if (isEligible)
+          {
+            nextMsg = newMsg;
+          }
+          else
+          {
+            nextNonEligibleMsg = newMsg;
           }
         }
       }
@@ -279,6 +261,44 @@
       }
     }
 
+    private boolean isEligible(UpdateMsg msg)
+    {
+      return msg.getCSN().getTime() <= eligibleCSN.getTime();
+    }
+
+    private UpdateMsg getNextMessage()
+    {
+      while (true)
+      {
+        final UpdateMsg newMsg = mh.getNextMessage(false /* non blocking */);
+
+        if (newMsg == null)
+        { // in non blocking mode, null means no more messages
+          return null;
+        }
+        else if (newMsg.getCSN().getTime() < domainLatestTrimDate)
+        {
+          // when the replication changelog is trimmed, the last (latest) chg
+          // is left in the db (whatever its age), and we don't want this chg
+          // to be returned in the external changelog.
+          // So let's check if the chg time is older than the trim date
+          return newMsg;
+        }
+      }
+    }
+
+    private String toString(CSN csn)
+    {
+      return csn + " " + asDate(csn);
+    }
+
+    private void debugInfo(String opId, String message)
+    {
+      TRACER.debugInfo("In ECLServerHandler, for baseDN="
+          + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
+          + ") " + message);
+    }
+
     /**
      * Unregister the handler from the DomainContext ReplicationDomain.
      * @return Whether the handler has been unregistered with success.
@@ -297,11 +317,6 @@
     }
   }
 
-  /**
-   * The global list of contexts by domain for the search currently processed.
-   */
-  private DomainContext[] domainCtxts = new DomainContext[0];
-
   private String clDomCtxtsToString(String msg)
   {
     StringBuilder buffer = new StringBuilder();
@@ -313,10 +328,6 @@
     return buffer.toString();
   }
 
-  private static int UNDEFINED_PHASE = 0;
-  private static int INIT_PHASE = 1;
-  private static int PERSISTENT_PHASE = 2;
-
   /**
    * Starts this handler based on a start message received from remote server.
    * @param inECLStartMsg The start msg provided by the remote server.
@@ -509,13 +520,18 @@
 
   /**
    * Initialize the handler from a provided cookie value.
-   * @param crossDomainStartState The provided cookie value.
-   * @throws DirectoryException When an error is raised.
+   *
+   * @param providedCookie
+   *          The provided cookie value.
+   * @throws DirectoryException
+   *           When an error is raised.
    */
-  private void initializeCLSearchFromGenState(String crossDomainStartState)
+  private void initializeCLSearchFromCookie(String providedCookie)
       throws DirectoryException
   {
-    initializeChangelogDomainCtxts(crossDomainStartState, false);
+    this.draftCompat = false;
+
+    initializeChangelogDomainCtxts(providedCookie, false);
   }
 
   /**
@@ -684,12 +700,9 @@
       // Creates the table that will contain the real-time info for each
       // and every domain.
       Set<DomainContext> tmpSet = new HashSet<DomainContext>();
-      String missingDomains = "";
-      for (Iterator<ReplicationServerDomain> iter = rs.getDomainIterator();
-           iter.hasNext();)
+      final StringBuilder missingDomains = new StringBuilder();
+      for (ReplicationServerDomain rsd : toIterable(rs.getDomainIterator()))
       {
-        ReplicationServerDomain rsd = iter.next();
-
         // skip the 'unreal' changelog domain
         if (rsd == this.replicationServerDomain)
           continue;
@@ -704,11 +717,12 @@
         }
 
         // skip unused domains
-        if (rsd.getLatestServerState().isEmpty())
+        final ServerState latestServerState = rsd.getLatestServerState();
+        if (latestServerState.isEmpty())
           continue;
 
         // Creates the new domain context
-        DomainContext newDomainCtxt = new DomainContext();
+        final DomainContext newDomainCtxt = new DomainContext();
         newDomainCtxt.active = true;
         newDomainCtxt.rsd = rsd;
         newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
@@ -716,7 +730,7 @@
         // Assign the start state for the domain
         if (isPersistent == PERSISTENT_CHANGES_ONLY)
         {
-          newDomainCtxt.startState = rsd.getLatestServerState().duplicate();
+          newDomainCtxt.startState = latestServerState;
           startStatesFromProvidedCookie.remove(rsd.getBaseDN());
         }
         else
@@ -746,7 +760,7 @@
             // when there is a cookie provided in the request,
             if (newDomainCtxt.startState == null)
             {
-              missingDomains += (rsd.getBaseDN() + ":;");
+              missingDomains.append(rsd.getBaseDN()).append(":;");
               continue;
             }
             else if (!newDomainCtxt.startState.isEmpty())
@@ -760,7 +774,7 @@
             }
           }
 
-          newDomainCtxt.stopState = rsd.getLatestServerState().duplicate();
+          newDomainCtxt.stopState = latestServerState;
         }
         newDomainCtxt.currentState = new ServerState();
 
@@ -799,13 +813,12 @@
       - if the cookie contains a domain that is even not replicated
       then this case need to be considered here in another loop.
       */
-      if (!startStatesFromProvidedCookie.isEmpty())
+      if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
       {
-        if (allowUnknownDomains)
-          for (DN providedDomain : startStatesFromProvidedCookie.keySet())
-            if (rs.getReplicationServerDomain(providedDomain) == null)
-              // the domain provided in the cookie is not replicated
-              startStatesFromProvidedCookie.remove(providedDomain);
+        for (DN providedDomain : startStatesFromProvidedCookie.keySet())
+          if (rs.getReplicationServerDomain(providedDomain) == null)
+            // the domain provided in the cookie is not replicated
+            startStatesFromProvidedCookie.remove(providedDomain);
       }
 
       // Now do the final checking
@@ -833,7 +846,7 @@
       // Initializes each and every domain with the next(first) eligible message
       // from the domain.
       for (DomainContext domainCtxt : domainCtxts) {
-        domainCtxt.getNextEligibleMessageForDomain(operationId);
+        domainCtxt.computeNextEligibleMessageForDomain(operationId);
 
         if (domainCtxt.nextMsg == null)
           domainCtxt.active = false;
@@ -1055,53 +1068,6 @@
       closeInitPhase();
     }
 
-    /* TODO: From replication CSN
-    //--
-    if (startCLMsg.getStartMode()==2)
-    {
-      if (CLSearchFromProvidedExactCSN(startCLMsg.getCSN()))
-        return;
-    }
-
-    //--
-    if (startCLMsg.getStartMode()==4)
-    {
-      // to get the CL first and last
-      initializeCLDomCtxts(null); // from start
-      CSN crossDomainEligibleCSN = computeCrossDomainEligibleCSN();
-
-      try
-      {
-        // to get the CL first and last
-        // last rely on the crossDomainEligibleCSN thus must have been
-        // computed before
-        int[] limits = computeCLLimits(crossDomainEligibleCSN);
-        // Send the response
-        CLLimitsMsg msg = new CLLimitsMsg(limits[0], limits[1]);
-        session.publish(msg);
-      }
-      catch(Exception e)
-      {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-        try
-        {
-          session.publish(
-            new ErrorMsg(
-             replicationServer.getServerId(),
-             serverId,
-             Message.raw(Category.SYNC, Severity.INFORMATION,
-                 "Exception raised: " + e.getMessage())));
-        }
-        catch(IOException ioe)
-        {
-          // FIXME: close conn ?
-        }
-      }
-      return;
-    }
-     */
-
-    // Store into domain
     registerIntoDomain();
 
     if (debugEnabled())
@@ -1116,7 +1082,7 @@
     short requestType = msg.getECLRequestType();
     if (requestType == REQUEST_TYPE_FROM_COOKIE)
     {
-      initializeCLSearchFromGenState(msg.getCrossDomainServerState());
+      initializeCLSearchFromCookie(msg.getCrossDomainServerState());
     }
     else if (requestType == REQUEST_TYPE_FROM_CHANGE_NUMBER)
     {
@@ -1231,12 +1197,7 @@
         }
 
         // Build the ECLUpdateMsg to be returned
-        final ECLUpdateMsg change = new ECLUpdateMsg(
-            (LDAPUpdateMsg) oldestContext.nextMsg,
-            null, // cookie will be set later
-            oldestContext.rsd.getBaseDN(),
-            0); // changeNumber may be set later
-        oldestContext.nextMsg = null;
+        final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
 
         // Default is not to loop, with one exception
         continueLooping = false;
@@ -1250,8 +1211,7 @@
 
         // Set and test the domain of the oldestChange see if we reached
         // the end of the phase for this domain
-        oldestContext.currentState.update(
-            change.getUpdateMsg().getCSN());
+        oldestContext.currentState.update(change.getUpdateMsg().getCSN());
 
         if (oldestContext.currentState.cover(oldestContext.stopState)
             || (draftCompat
@@ -1264,7 +1224,7 @@
         {
           // populates the table with the next eligible msg from iDom
           // in non blocking mode, return null when no more eligible msg
-          oldestContext.getNextEligibleMessageForDomain(operationId);
+          oldestContext.computeNextEligibleMessageForDomain(operationId);
         }
         oldestChange = change;
       }
@@ -1277,21 +1237,14 @@
                   + "looking for the generalized oldest change"));
 
         for (DomainContext domainCtxt : domainCtxts) {
-          domainCtxt.getNextEligibleMessageForDomain(operationId);
+          domainCtxt.computeNextEligibleMessageForDomain(operationId);
         }
 
         DomainContext oldestContext = findOldestChangeFromDomainCtxts();
         if (oldestContext != null)
         {
-          final ECLUpdateMsg change = new ECLUpdateMsg(
-              (LDAPUpdateMsg) oldestContext.nextMsg,
-              null, // set later
-              oldestContext.rsd.getBaseDN(),
-              0);
-          oldestContext.nextMsg = null; // clean
-
-          oldestContext.currentState.update(
-              change.getUpdateMsg().getCSN());
+          final ECLUpdateMsg change = newECLUpdateMsg(oldestContext);
+          oldestContext.currentState.update(change.getUpdateMsg().getCSN());
 
           if (draftCompat)
           {
@@ -1330,6 +1283,15 @@
     return oldestChange;
   }
 
+  private ECLUpdateMsg newECLUpdateMsg(DomainContext ctx)
+  {
+    // cookie will be set later AND changeNumber may be set later
+    final ECLUpdateMsg change = new ECLUpdateMsg(
+        (LDAPUpdateMsg) ctx.nextMsg, null, ctx.rsd.getBaseDN(), 0);
+    ctx.nextMsg = null; // clean after use
+    return change;
+  }
+
   /**
    * Either retrieves a change number from the DB, or assign a new change number
    * and store in the DB.
@@ -1346,13 +1308,11 @@
   private boolean assignChangeNumber(final ECLUpdateMsg oldestChange)
       throws ChangelogException
   {
-    // We also need to check if the CNIndexDB is consistent with
-    // the changelogdb.
-    // if not, 2 potential reasons
-    // a/ : changelog has been purged (trim)let's traverse the CNIndexDB
-    // b/ : changelog is late .. let's traverse the changelogDb
-    // The following loop allows to loop until being on the same cn
-    // in the 2 dbs
+    // We also need to check if the CNIndexDB is consistent with the
+    // changelogDB. If not, 2 potential reasons:
+    // a/ changelog has been purged (trim) let's traverse the CNIndexDB
+    // b/ changelog is late ... let's traverse the changelogDb
+    // The following loop allows to loop until being on the same cn in the 2 dbs
 
     // replogCSN : the oldest change from the changelog db
     CSN csnFromChangelogDb = oldestChange.getUpdateMsg().getCSN();
@@ -1374,20 +1334,17 @@
       final DN dnFromCNIndexDB = currentRecord.getBaseDN();
 
       if (debugEnabled())
-        TRACER.debugInfo("assignChangeNumber() generating change number "
-            + " comparing the 2 db DNs :" + dnFromChangelogDb + "?="
-            + csnFromChangelogDb + " timestamps:"
-            + new Date(csnFromChangelogDb.getTime()) + " ?older"
-            + new Date(csnFromCNIndexDB.getTime()));
-
+        TRACER.debugInfo("assignChangeNumber() comparing the 2 db DNs :"
+            + dnFromChangelogDb + "?=" + dnFromCNIndexDB + " timestamps:"
+            + asDate(csnFromChangelogDb) + " ?older"
+            + asDate(csnFromCNIndexDB));
 
       if (areSameChange(csnFromChangelogDb, dnFromChangelogDb,
           csnFromCNIndexDB, dnFromCNIndexDB))
       {
         if (debugEnabled())
-          TRACER.debugInfo("assignChangeNumber() generating change number "
-              + " assigning changeNumber=" + currentRecord.getChangeNumber()
-              + " to change=" + oldestChange);
+          TRACER.debugInfo("assignChangeNumber() assigning changeNumber="
+              + currentRecord.getChangeNumber() + " to change=" + oldestChange);
 
         oldestChange.setChangeNumber(currentRecord.getChangeNumber());
         return true;
@@ -1398,11 +1355,11 @@
       {
         // the change from the changelogDb is older
         // it should have been stored lately
-        // let's continue to traverse the changelogdb
+        // let's continue to traverse the changelogDB
         if (debugEnabled())
-          TRACER.debugInfo("assignChangeNumber(): will skip "
+          TRACER.debugInfo("assignChangeNumber() will skip "
               + csnFromChangelogDb
-              + " and read next from the regular changelog.");
+              + " and read next change from the regular changelog.");
         return false; // TO BE CHECKED
       }
 
@@ -1413,18 +1370,17 @@
       try
       {
         // let's traverse the CNIndexDB searching for the change
-        // found in the changelogDb.
+        // found in the changelogDB
         if (debugEnabled())
-          TRACER.debugInfo("assignChangeNumber() generating change number "
-              + " will skip " + csnFromCNIndexDB
+          TRACER.debugInfo("assignChangeNumber() will skip " + csnFromCNIndexDB
               + " and read next change from the CNIndexDB.");
 
         isEndOfCNIndexDBReached = !cnIndexDBCursor.next();
 
         if (debugEnabled())
-          TRACER.debugInfo("assignChangeNumber() generating change number has"
-              + "skipped to  changeNumber=" + currentRecord.getChangeNumber()
-              + " csn=" + currentRecord.getCSN() + " End of CNIndexDB ?"
+          TRACER.debugInfo("assignChangeNumber() has skipped to changeNumber="
+              + currentRecord.getChangeNumber() + " csn="
+              + currentRecord.getCSN() + " End of CNIndexDB ?"
               + isEndOfCNIndexDBReached);
       }
       catch (ChangelogException e)
@@ -1439,6 +1395,11 @@
     }
   }
 
+  private Date asDate(CSN csn)
+  {
+    return new Date(csn.getTime());
+  }
+
   private boolean areSameChange(CSN csn1, DN dn1, CSN csn2, DN dn2)
   {
     boolean sameDN = dn1.compareTo(dn2) == 0;

--
Gitblit v1.10.0