From 8339e4504244448349666eda0af74e2a080913b4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 14 Apr 2014 13:57:14 +0000
Subject: [PATCH] OPENDJ-1430 - Some changes are missing from the external changelog

---
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java |  276 +++++++++++++++++++-----------------------------------
 1 files changed, 99 insertions(+), 177 deletions(-)

diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index c51a1ee..ea6af09 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -116,12 +116,6 @@
   private MultiDomainServerState previousCookie = new MultiDomainServerState();
 
   /**
-   * Eligible CSN - only changes older or equal to eligibleCSN are published in
-   * the ECL.
-   */
-  private CSN eligibleCSN;
-
-  /**
    * The global list of contexts by domain for the search currently processed.
    */
   private Set<DomainContext> domainCtxts = Collections.emptySet();
@@ -132,16 +126,15 @@
    */
   private String dumpState()
   {
-    return getClass().getCanonicalName() +
-           "[" +
-           "[draftCompat=" + draftCompat +
-           "] [persistent=" + startECLSessionMsg.getPersistent() +
-           "] [startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
-           "] [isEndOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
-           "] [searchPhase=" + searchPhase +
-           "] [startCookie=" + startCookie +
-           "] [previousCookie=" + previousCookie +
-           "]]";
+    return getClass().getSimpleName() +
+           " [draftCompat=" + draftCompat +
+           ", persistent=" + startECLSessionMsg.getPersistent() +
+           ", startChangeNumber=" + startECLSessionMsg.getLastChangeNumber() +
+           ", endOfCNIndexDBReached=" + isEndOfCNIndexDBReached +
+           ", searchPhase=" + searchPhase +
+           ", startCookie=" + startCookie +
+           ", previousCookie=" + previousCookie +
+           "]";
   }
 
   /**
@@ -154,27 +147,35 @@
    */
   private class DomainContext
   {
-    private ReplicationServerDomain rsDomain;
+    private final ReplicationServerDomain rsDomain;
 
     /**
-     * active when there are still changes supposed eligible for the ECL.
+     * Active when there are still changes supposed eligible for the ECL. It is
+     * active by default.
      */
-    private boolean active;
+    private boolean active = true;
+    private UpdateMsg nextMsg;
 
     /**
      * the message handler from which are reading the changes for this domain.
      */
-    private MessageHandler mh;
-    private UpdateMsg nextMsg;
-    private UpdateMsg nextNonEligibleMsg;
-    private ServerState startState;
-    private ServerState currentState;
-    private ServerState stopState;
-    private long domainLatestTrimDate;
+    private final MessageHandler mh;
+    private final ServerState startState;
+    private final ServerState currentState = new ServerState();
+    private final ServerState stopState;
+    private final long domainLatestTrimDate;
 
-    /**
-     * {@inheritDoc}
-     */
+    public DomainContext(ReplicationServerDomain domain,
+        ServerState startState, ServerState stopState, MessageHandler mh)
+    {
+      this.rsDomain = domain;
+      this.startState = startState;
+      this.stopState = stopState;
+      this.mh = mh;
+      this.domainLatestTrimDate = domain.getLatestDomainTrimDate();
+    }
+
+    /** {@inheritDoc} */
     @Override
     public String toString()
     {
@@ -182,96 +183,38 @@
       toString(buffer);
       return buffer.toString();
     }
-    /**
-     * Provide a string representation of this object for debug purpose..
-     * @param buffer Append to this buffer.
-     */
-    public void toString(StringBuilder buffer)
+
+    private StringBuilder toString(StringBuilder buffer)
     {
-      buffer.append("[ [active=").append(active)
-          .append("] [rsDomain=").append(rsDomain)
-          .append("] [nextMsg=").append(nextMsg).append("(")
-          .append(nextMsg != null ? asDate(nextMsg.getCSN()).toString() : "")
-          .append(")")
-          .append("] [nextNonEligibleMsg=").append(nextNonEligibleMsg)
-          .append("] [startState=").append(startState)
-          .append("] [currentState=").append(currentState)
-          .append("] [stopState=").append(stopState)
-          .append("]]");
+      buffer.append(getClass().getSimpleName());
+      buffer.append(" [");
+      buffer.append(active ? "active" : "inactive");
+      buffer.append(", baseDN=\"").append(rsDomain.getBaseDN()).append("\"");
+      if (nextMsg != null)
+      {
+        buffer.append(", csn=").append(nextMsg.getCSN().toStringUI());
+      }
+      buffer.append(", nextMsg=[").append(nextMsg);
+      buffer.append("]")
+          .append(", startState=").append(startState)
+          .append(", currentState=").append(currentState)
+          .append(", stopState=").append(stopState)
+          .append("]");
+      return buffer;
     }
 
     /**
-     * Computes the next message eligible regarding the crossDomain eligible
-     * CSN.
-     *
-     * @param opId The operation id.
+     * Computes the next available message for this domain context.
      */
-    private void computeNextEligibleMessageForDomain(String opId)
+    private void computeNextAvailableMessage()
     {
+      nextMsg = getNextMessage();
       if (debugEnabled())
-        debugInfo(opId, "ctxt=" + this);
-
-      assert(nextMsg == null);
-      try
       {
-        // Before get a new message from the domain, evaluate in priority
-        // a message that has not been published to the ECL because it was
-        // not eligible
-        if (nextNonEligibleMsg != null)
-        {
-          final boolean hasBecomeEligible = isEligible(nextNonEligibleMsg);
-
-          if (debugEnabled())
-            debugInfo(opId, "stored nonEligibleMsg " + nextNonEligibleMsg
-                + " has now become eligible regarding the eligibleCSN ("
-                + eligibleCSN + " ): " + hasBecomeEligible);
-
-          if (hasBecomeEligible)
-          {
-            nextMsg = nextNonEligibleMsg;
-            nextNonEligibleMsg = null;
-          }
-          // else the oldest is still not eligible - let's wait next
-        }
-        else
-        {
-          // Here comes a new message !!!
-          final UpdateMsg newMsg = getNextMessage();
-          if (newMsg == null)
-          {
-            return;
-          }
-
-          if (debugEnabled())
-            debugInfo(opId, "got new message : [newMsg=" + newMsg + "] "
-                + dumpState());
-
-          final boolean isEligible = isEligible(newMsg);
-
-          if (debugEnabled())
-            debugInfo(opId, "newMsg isEligible=" + isEligible + " since "
-                + "newMsg=[" + toString(newMsg.getCSN()) + "] eligibleCSN=["
-                + toString(eligibleCSN) + "] " + dumpState());
-
-          if (isEligible)
-          {
-            nextMsg = newMsg;
-          }
-          else
-          {
-            nextNonEligibleMsg = newMsg;
-          }
-        }
+        TRACER.debugInfo("In ECLServerHandler, for baseDN="
+            + mh.getBaseDNString() + " computeNextAvailableMessage("
+            + getOperationId() + ") : newMsg=[" + nextMsg + "] " + dumpState());
       }
-      catch(Exception e)
-      {
-        TRACER.debugCaught(DebugLogLevel.ERROR, e);
-      }
-    }
-
-    private boolean isEligible(UpdateMsg msg)
-    {
-      return msg.getCSN().getTime() <= eligibleCSN.getTime();
     }
 
     private UpdateMsg getNextMessage()
@@ -295,18 +238,6 @@
       }
     }
 
-    private String toString(CSN csn)
-    {
-      return csn + " " + asDate(csn);
-    }
-
-    private void debugInfo(String opId, String message)
-    {
-      TRACER.debugInfo("In ECLServerHandler, for baseDN="
-          + mh.getBaseDNString() + " getNextEligibleMessageForDomain(" + opId
-          + ") " + message);
-    }
-
     /**
      * Unregister the handler from the DomainContext ReplicationDomain.
      * @return Whether the handler has been unregistered with success.
@@ -327,11 +258,10 @@
 
   private String domaimCtxtsToString(String msg)
   {
-    StringBuilder buffer = new StringBuilder();
+    final StringBuilder buffer = new StringBuilder();
     buffer.append(msg).append("\n");
     for (DomainContext domainCtxt : domainCtxts) {
-      domainCtxt.toString(buffer);
-      buffer.append("\n");
+      domainCtxt.toString(buffer).append("\n");
     }
     return buffer.toString();
   }
@@ -702,10 +632,12 @@
       // Initializes each and every domain with the next(first) eligible message
       // from the domain.
       for (DomainContext domainCtxt : domainCtxts) {
-        domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
+        domainCtxt.computeNextAvailableMessage();
 
         if (domainCtxt.nextMsg == null)
+        {
           domainCtxt.active = false;
+        }
       }
     }
     catch(DirectoryException de)
@@ -767,64 +699,48 @@
         continue;
 
       // Creates the new domain context
-      final DomainContext newDomainCtxt = new DomainContext();
-      newDomainCtxt.active = true;
-      newDomainCtxt.rsDomain = domain;
-      newDomainCtxt.domainLatestTrimDate = domain.getLatestDomainTrimDate();
-
-      // Assign the start state for the domain
+      final DomainContext newDomainCtxt;
+      final ServerState domainStartState =
+          startStatesFromProvidedCookie.remove(domain.getBaseDN());
       if (startECLSessionMsg.getPersistent() == PERSISTENT_CHANGES_ONLY)
       {
-        newDomainCtxt.startState = latestState;
-        startStatesFromProvidedCookie.remove(domain.getBaseDN());
+        newDomainCtxt = newDomainContext(domain, null, latestState);
       }
       else
       {
         // let's take the start state for this domain from the provided cookie
-        newDomainCtxt.startState =
-            startStatesFromProvidedCookie.remove(domain.getBaseDN());
-
+        ServerState startState = domainStartState;
         if (providedCookie == null || providedCookie.length() == 0
             || allowUnknownDomains)
         {
           // when there is no cookie provided in the request,
           // let's start traversing this domain from the beginning of
           // what we have in the replication changelog
-          if (newDomainCtxt.startState == null)
+          if (startState == null)
           {
-            newDomainCtxt.startState =
+            startState =
                 domain.getOldestState().duplicateOnlyOlderThan(
-                    newDomainCtxt.domainLatestTrimDate);
+                    domain.getLatestDomainTrimDate());
           }
         }
         else
         {
           // when there is a cookie provided in the request,
-          if (newDomainCtxt.startState == null)
+          if (startState == null)
           {
             missingDomains.append(domain.getBaseDN()).append(":;");
             continue;
           }
-          else if (!newDomainCtxt.startState.isEmpty()
-              && hasCookieBeenTrimmedFromDB(domain, newDomainCtxt.startState))
+          else if (!startState.isEmpty()
+              && hasCookieBeenTrimmedFromDB(domain, startState))
           {
             throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                 ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
-                    newDomainCtxt.rsDomain.getBaseDN().toNormalizedString()));
+                    domain.getBaseDN().toNormalizedString()));
           }
         }
-
-        newDomainCtxt.stopState = latestState;
+        newDomainCtxt = newDomainContext(domain, startState, latestState);
       }
-      newDomainCtxt.currentState = new ServerState();
-
-      // Creates an unconnected SH for the domain
-      MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
-      mh.setInitialServerState(newDomainCtxt.startState);
-      mh.setBaseDNAndDomain(domain.getBaseDN(), false);
-      // register the unconnected into the domain
-      domain.registerHandler(mh);
-      newDomainCtxt.mh = mh;
 
       previousCookie.replace(newDomainCtxt.rsDomain.getBaseDN(),
                              newDomainCtxt.startState.duplicate());
@@ -832,7 +748,7 @@
       results.add(newDomainCtxt);
     }
 
-    if (missingDomains.length()>0)
+    if (missingDomains.length() > 0)
     {
       // If there are domain missing in the provided cookie,
       // the request is rejected and a full resync is required.
@@ -851,11 +767,16 @@
     */
     if (!startStatesFromProvidedCookie.isEmpty() && allowUnknownDomains)
     {
-      // JNR: Will the following code trigger a ConcurrentModificationException?
-      for (DN providedDomain : startStatesFromProvidedCookie.keySet())
+      final Set<DN> providedDomains = startStatesFromProvidedCookie.keySet();
+      for (Iterator<DN> iter = providedDomains.iterator(); iter.hasNext();)
+      {
+        DN providedDomain = iter.next();
         if (rs.getReplicationServerDomain(providedDomain) == null)
+        {
           // the domain provided in the cookie is not replicated
-          startStatesFromProvidedCookie.remove(providedDomain);
+          iter.remove();
+        }
+      }
     }
 
     // Now do the final checking
@@ -880,6 +801,19 @@
     return results;
   }
 
+  private DomainContext newDomainContext(ReplicationServerDomain domain,
+      ServerState startState, ServerState stopState) throws DirectoryException
+  {
+    // Create an unconnected MessageHandler for the domain
+    MessageHandler mh = new MessageHandler(maxQueueSize, replicationServer);
+    mh.setInitialServerState(startState);
+    mh.setBaseDNAndDomain(domain.getBaseDN(), false);
+    // register the unconnected into the domain
+    domain.registerHandler(mh);
+
+    return new DomainContext(domain, startState, stopState, mh);
+  }
+
   private boolean hasCookieBeenTrimmedFromDB(ReplicationServerDomain rsDomain,
       ServerState cookie)
   {
@@ -1072,15 +1006,12 @@
 
     if (debugEnabled())
       TRACER.debugInfo(getClass().getCanonicalName() + " " + getOperationId()
-          + " initialized: " + " " + dumpState() + " " + " "
-          + domaimCtxtsToString(""));
+          + " initialized: " + " " + dumpState() + domaimCtxtsToString(""));
   }
 
   private void initializeChangelogSearch(StartECLSessionMsg msg)
       throws DirectoryException
   {
-    refreshEligibleCSN();
-
     if (msg.getECLRequestType() == REQUEST_TYPE_FROM_COOKIE)
     {
       initializeCLSearchFromCookie(msg.getCrossDomainServerState());
@@ -1101,7 +1032,6 @@
    */
   public ECLUpdateMsg takeECLUpdate() throws DirectoryException
   {
-    refreshEligibleCSN();
     ECLUpdateMsg msg = getNextECLUpdate();
 
     // TODO:ECL We should refactor so that a SH always have a session
@@ -1159,7 +1089,7 @@
   public ECLUpdateMsg getNextECLUpdate() throws DirectoryException
   {
     if (debugEnabled())
-      TRACER.debugInfo("In cn=changelog" + this +
+      TRACER.debugInfo("In cn=changelog " + this +
           " getNextECLUpdate starts: " + dumpState());
 
     ECLUpdateMsg oldestChange = null;
@@ -1223,7 +1153,7 @@
         }
         if (oldestContext.active)
         {
-          oldestContext.computeNextEligibleMessageForDomain(getOperationId());
+          oldestContext.computeNextAvailableMessage();
         }
         oldestChange = change;
       }
@@ -1235,8 +1165,9 @@
               "In getNextECLUpdate (persistent): "
                   + "looking for the generalized oldest change"));
 
-        for (DomainContext domainCtxt : domainCtxts) {
-          domainCtxt.computeNextEligibleMessageForDomain(getOperationId());
+        for (DomainContext domainCtxt : domainCtxts)
+        {
+          domainCtxt.computeNextAvailableMessage();
         }
 
         final DomainContext oldestContext = findDomainCtxtWithOldestChange();
@@ -1499,13 +1430,4 @@
     return this.searchPhase != INIT_PHASE;
   }
 
-  /**
-   * Refresh the eligibleCSN by requesting the replication server.
-   */
-  private void refreshEligibleCSN()
-  {
-    Set<String> excludedBaseDNs = startECLSessionMsg.getExcludedBaseDNs();
-    eligibleCSN = replicationServer.getEligibleCSN(excludedBaseDNs);
-  }
-
 }

--
Gitblit v1.10.0