From 1c126204d9add5e14c4ce3d60c9c89b1a5260133 Mon Sep 17 00:00:00 2001
From: pgamba <pgamba@localhost>
Date: Thu, 10 Dec 2009 16:57:24 +0000
Subject: [PATCH] Fix #4395 ECL cookie older than server changelog db trim is not detected

---
 opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java |  213 ++++++++++++++++++++++++++++++++--
 opends/src/server/org/opends/server/replication/server/ECLServerHandler.java                       |   87 ++++++++++++--
 opends/src/messages/messages/replication.properties                                                |   10 +
 opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java                |    5 
 4 files changed, 280 insertions(+), 35 deletions(-)

diff --git a/opends/src/messages/messages/replication.properties b/opends/src/messages/messages/replication.properties
index 535ecfe..7a9d24e 100644
--- a/opends/src/messages/messages/replication.properties
+++ b/opends/src/messages/messages/replication.properties
@@ -369,8 +369,8 @@
  the replication server domain: %s 
 SEVERE_ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE_157=Replication \
  protocol error. Bad message type. %s received, %s required  
-SEVERE_ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED_158=The provided cookie is \
- invalid in the current context due to %s. Full resync is required
+SEVERE_ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE_158=Full resync \
+ required. Reason: The provided cookie is missing the replicated domain(s) %s
 SEVERE_ERR_BYTE_COUNT_159=The Server Handler byte count is not correct (Fixed)
 NOTICE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS_160=Wrong fractional \
  replication configuration: could not find object class definition for %s in \
@@ -442,3 +442,9 @@
 NOTICE_ERR_ENTRY_UID_DSEE_MAPPING_184=Error for entry %s when mapping entry UID\
   attribute to DSEE NsUniqueID attribute. Value to be mapped: %s \
  Error : %s
+SEVERE_ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE_185=Full resync \
+ required. Reason: The provided cookie contains unknown replicated domain %s
+SEVERE_ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE_186=Full resync \
+ required. Reason: The provided cookie is older than the start of historical \
+ in the server for the replicated domain : %s
+SEVERE_ERR_INVALID_COOKIE_SYNTAX_187=Invalid syntax of the provided cookie
diff --git a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
index 08d80ed..d6635ea 100644
--- a/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
+++ b/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -686,24 +686,24 @@
       boolean allowUnknownDomains)
   throws DirectoryException
   {
-    HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
+    HashMap<String,ServerState> startStatesFromProvidedCookie =
+      new HashMap<String,ServerState>();
 
     ReplicationServer rs = this.replicationServer;
 
     // Parse the provided cookie and overwrite startState from it.
     if ((providedCookie != null) && (providedCookie.length()!=0))
-      startStates =
+      startStatesFromProvidedCookie =
         MultiDomainServerState.splitGenStateToServerStates(providedCookie);
 
     try
     {
-      // Now traverse all domains and build all the initial contexts :
-      // - the global one : dumpState()
-      // - the domain by domain ones : domainCtxts
       Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
 
-      // Creates the table that will contain the real-time info by domain.
+      // Creates the table that will contain the real-time info for each
+      // and every domain.
       HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
+      String missingDomains = "";
       int i =0;
       if (rsdi != null)
       {
@@ -738,18 +738,45 @@
           }
           else
           {
-            newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
+            // let's take the start state for this domain from the provided
+            // cookie
+            newDomainCtxt.startState =
+              startStatesFromProvidedCookie.remove(rsd.getBaseDn());
+
             if ((providedCookie==null)||(providedCookie.length()==0)
                 ||allowUnknownDomains)
             {
+              // when there is no cookie provided in the request,
+              // let's start traversing this domain from the beginning of
+              // what we have in the replication changelog
               if (newDomainCtxt.startState == null)
                 newDomainCtxt.startState = new ServerState();
             }
             else
+            {
+              // when there is a cookie provided in the request,
               if (newDomainCtxt.startState == null)
-                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-                    ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
-                        "missing " + rsd.getBaseDn()));
+              {
+                missingDomains += (rsd.getBaseDn() + ":;");
+                continue;
+              }
+              else if (!newDomainCtxt.startState.isEmpty())
+              {
+                // when the provided startState is older than the replication
+                // changelogdb start state, it means that the replication
+                // changelog db has been trimed and the cookie is not valid
+                // anymore.
+                if (newDomainCtxt.startState.cover(rsd.getStartState())==false)
+                {
+                  // the provided start
+                  throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+                      ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
+                          newDomainCtxt.rsd.getBaseDn()));
+                }
+              }
+            }
+
+            // Set the stop state for the domain from the eligibleCN
             newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
           }
           newDomainCtxt.currentState = new ServerState();
@@ -774,18 +801,34 @@
           i++;
         }
       }
-      if (!startStates.isEmpty())
+
+      if (missingDomains.length()>0)
       {
+        // If there are domain missing in the provided cookie,
+        // the request is rejected and a full resync is required.
         throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
-            ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
-                "unknown " + startStates.toString()));
+          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
+              missingDomains + " .Possible cookie:" +
+              (providedCookie + missingDomains)));
+      }
+
+      if (!startStatesFromProvidedCookie.isEmpty())
+      {
+        // After reading all the knows domains from the provided cookie, there
+        // is one (or several) domain that are not currently configured.
+        // This domain has probably been removed or replication disabled on it.
+        // The request is rejected and full resync is required.
+        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
+            ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
+                startStatesFromProvidedCookie.toString()));
       }
       domainCtxts = tmpSet.toArray(new DomainContext[0]);
 
       // the next record from the DraftCNdb should be the one
       startCookie = providedCookie;
 
-      // Initializes all domain with the next(first) elligible message
+      // Initializes each and every domain with the next(first) eligible message
+      // from the domain.
       for (int j=0; j<domainCtxts.length; j++)
       {
         domainCtxts[j].getNextEligibleMessageForDomain(operationId);
@@ -794,6 +837,10 @@
           domainCtxts[j].active = false;
       }
     }
+    catch(DirectoryException de)
+    {
+      throw de;
+    }
     catch(Exception e)
     {
       TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -939,8 +986,18 @@
     isPersistent  = startECLSessionMsg.isPersistent();
     lastDraftCN   = startECLSessionMsg.getLastDraftChangeNumber();
     searchPhase   = INIT_PHASE;
-    previousCookie = new MultiDomainServerState(
+    try
+    {
+      previousCookie = new MultiDomainServerState(
         startECLSessionMsg.getCrossDomainServerState());
+    }
+    catch(Exception e)
+    {
+      TRACER.debugCaught(DebugLogLevel.ERROR, e);
+      throw new DirectoryException(
+          ResultCode.UNWILLING_TO_PERFORM,
+          ERR_INVALID_COOKIE_SYNTAX.get());
+    }
     excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
     replicationServer.disableEligibility(excludedServiceIDs);
     eligibleCN = replicationServer.getEligibleCN();
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
index 48a5c35..118d73d 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3196,7 +3196,10 @@
   /**
    * Returns the start state of the domain, made of the first (oldest)
    * change stored for each serverId.
-   * @return t start state of the domain.
+   * Note: Because the replication changelogdb triming always keep one change
+   * whatever its date, the change contained in the returned state can be very
+   * old.
+   * @return the start state of the domain.
    */
   public ServerState getStartState()
   {
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
index 30cdaca..748101b 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -246,6 +246,9 @@
     // Test with a mix of domains, a mix of DSes
     ECLTwoDomains(); replicationServer.clearDb();
 
+    // Test ECL after changelog triming
+    ECLAfterChangelogTrim();replicationServer.clearDb();
+
     // Write changes and read ECL from start
     int ts = ECLCompatWriteReadAllOps(1);
 
@@ -285,8 +288,7 @@
     ECLRemoteNonEmpty();replicationServer.clearDb();
 
     // Test with a mix of domains, a mix of DSes
-    ECLTwoDomains();
-    // changelogDb required NOT empty for the next test
+    ECLTwoDomains();replicationServer.clearDb();
 
     // Test ECL after changelog triming
     ECLAfterChangelogTrim();replicationServer.clearDb();
@@ -1096,6 +1098,90 @@
           " Expected last cookie attribute value:" + expectedLastCookie +
           " Read from server: " + lastCookie + " are equal :");
 
+      // Test invalid cookie
+      cookie += ";o=test6:";
+
+      control =
+        new ExternalChangelogRequestControl(true,
+            new MultiDomainServerState(cookie));
+      controls = new ArrayList<Control>(0);
+      controls.add(control);
+      debugInfo(tn, "Search with bad domain in cookie=" + cookie);
+      searchOp = connection.processSearch(
+      ByteString.valueOf("cn=changelog"),
+      SearchScope.WHOLE_SUBTREE,
+      DereferencePolicy.NEVER_DEREF_ALIASES,
+      0, // Size limit
+      0, // Time limit
+      false, // Types only
+      LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"),
+      attributes,
+      controls,
+      null);
+      
+      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
+      assertEquals(searchOp.getSearchEntries().size(), 0);
+      assertTrue(searchOp.getErrorMessage().toString().startsWith(
+          "Invalid syntax of the provided cookie"),
+          searchOp.getErrorMessage().toString());
+      
+      // Test unknown domain in provided cookie
+      // This case seems to be very hard to obtain in the real life 
+      // (how to remove a domain from a RS topology ?)
+      // let's do a very quick test here.
+      String newCookie = lastCookie + "o=test6:";
+
+      control =
+        new ExternalChangelogRequestControl(true,
+            new MultiDomainServerState(newCookie));
+      controls = new ArrayList<Control>(0);
+      controls.add(control);
+      debugInfo(tn, "Search with bad domain in cookie=" + cookie);
+      searchOp = connection.processSearch(
+      ByteString.valueOf("cn=changelog"),
+      SearchScope.WHOLE_SUBTREE,
+      DereferencePolicy.NEVER_DEREF_ALIASES,
+      0, // Size limit
+      0, // Time limit
+      false, // Types only
+      LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"),
+      attributes,
+      controls,
+      null);
+      
+      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
+      assertEquals(searchOp.getSearchEntries().size(), 0);
+      assertTrue(searchOp.getErrorMessage().toString().startsWith(
+          "Full resync required. Reason: The provided cookie contains unknown replicated domain {o=test6=}"),
+          searchOp.getErrorMessage().toString());
+
+      // Test missing domain in provided cookie
+      newCookie = lastCookie.substring(lastCookie.indexOf(';')+1);
+      control =
+        new ExternalChangelogRequestControl(true,
+            new MultiDomainServerState(newCookie));
+      controls = new ArrayList<Control>(0);
+      controls.add(control);
+      debugInfo(tn, "Search with bad domain in cookie=" + cookie);
+      searchOp = connection.processSearch(
+      ByteString.valueOf("cn=changelog"),
+      SearchScope.WHOLE_SUBTREE,
+      DereferencePolicy.NEVER_DEREF_ALIASES,
+      0, // Size limit
+      0, // Time limit
+      false, // Types only
+      LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"),
+      attributes,
+      controls,
+      null);
+      
+      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
+      assertEquals(searchOp.getSearchEntries().size(), 0);
+      assertTrue(searchOp.getErrorMessage().toString().equalsIgnoreCase(
+          "Full resync required. Reason: The provided cookie is missing the replicated domain(s) o=test:; .Possible cookie:" 
+          + newCookie + "o=test:;"), "Server output:" +
+          searchOp.getErrorMessage().toString());
+
       s1test.stop();
       s1test2.stop();
       s2test.stop();
@@ -1113,7 +1199,7 @@
   }
 
   //=======================================================
-  // Test ECL content after changelog triming
+  // Test ECL content after replication changelogdb triming
   private void ECLAfterChangelogTrim()
   {
     String tn = "ECLAfterChangelogTrim";
@@ -1121,13 +1207,59 @@
 
     try
     {
-      replicationServer.getReplicationServerDomain("o=test", false).setPurgeDelay(1);
-      replicationServer.getReplicationServerDomain("o=test2", false).setPurgeDelay(1);
+      // ---
+      // 1. Populate the changelog and read the cookie
+
+      // Creates broker on o=test
+      ReplicationBroker server01 = openReplicationSession(
+          DN.decode(TEST_ROOT_DN_STRING),  1201,
+          100, replicationServerPort,
+          brokerSessionTimeout, true);
+      int ts = 1;
+
+
+      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
+      DeleteMsg delMsg =
+        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, tn+"uuid1");
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+
+      Thread.sleep(1000);
+
+      // Test that last cookie has been updated
+      String cookieNotEmpty = readLastCookie(tn);
+      debugInfo(tn, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
+
+      cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
+      delMsg =
+        new DeleteMsg("uid="+tn+"2," + TEST_ROOT_DN_STRING, cn1, tn+"uuid2");
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+
+      cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
+      delMsg =
+        new DeleteMsg("uid="+tn+"3," + TEST_ROOT_DN_STRING, cn1, tn+"uuid3");
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+
+      // Sleep longer than this delay - the changelog will be trimmed
+      Thread.sleep(1000);
+      
+      // ---
+      // 2. Now set up a very short purge delay on the replication changelogs
+      // so that this test can play with a trimmed changelog.
+      ReplicationServerDomain d1 = replicationServer.getReplicationServerDomain("o=test", false);
+      ReplicationServerDomain d2 = replicationServer.getReplicationServerDomain("o=test2", false);
+      d1.setPurgeDelay(1);
+      d2.setPurgeDelay(1);
+
+      // Sleep longer than this delay - so that the changelog is trimmed
       Thread.sleep(1000);
       //
       LDIFWriter ldifWriter = getLDIFWriter();
 
-      // Test with empty cookie : from the beginning
+      // ---
+      // 3. Assert that a request with an empty cookie returns nothing
       String cookie= "";
 
       // search on 'cn=changelog'
@@ -1135,7 +1267,7 @@
       attributes.add("+");
       attributes.add("*");
 
-      debugInfo(tn, "Search with cookie=" + cookie + "\"");
+      debugInfo(tn, "1. Search with cookie=" + cookie + "\"");
       InternalSearchOperation searchOp =
         connection.processSearch(
             ByteString.valueOf("cn=changelog"),
@@ -1161,14 +1293,13 @@
           ldifWriter.writeEntry(entry);
         }
       }
+
+      // Assert ECL is empty since replication changelog has been trimmed
       assertEquals(searchOp.getSearchEntries().size(), 0);
 
-      // Read last cookie
+      // 4. Assert that a request with the current last cookie returns nothing
       cookie = readLastCookie(tn);
-
-      // Test from last cookie
-      // search on 'cn=changelog'
-      debugInfo(tn, "Search with cookie=" + cookie + "\"");
+      debugInfo(tn, "2. Search with last cookie=" + cookie + "\"");
       searchOp =
         connection.processSearch(
             ByteString.valueOf("cn=changelog"),
@@ -1192,7 +1323,54 @@
           ldifWriter.writeEntry(entry);
         }
       }
+
+      // Assert ECL is empty since replication changelog has been trimmed
       assertEquals(searchOp.getSearchEntries().size(), 0);
+      
+      // ---
+      // 5. Assert that a request with an "old" cookie - one that refers to 
+      //    changes that have been removed by the replication changelog trimming
+      //    returns the appropriate error.
+
+      cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
+      delMsg =
+        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, tn+"uuid1");
+      server01.publish(delMsg);
+      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
+
+      attributes = new LinkedHashSet<String>();
+      attributes.add("+");
+      attributes.add("*");
+
+      debugInfo(tn, "3. Search with cookie=\"" + cookieNotEmpty + "\"");
+      debugInfo(tn, "d1 trimdate" + d1.getStartState());
+      debugInfo(tn, "d2 trimdate" + d2.getStartState());
+      searchOp =
+        connection.processSearch(
+            ByteString.valueOf("cn=changelog"),
+            SearchScope.WHOLE_SUBTREE,
+            DereferencePolicy.NEVER_DEREF_ALIASES,
+            0, // Size limit
+            0, // Time limit
+            false, // Types only
+            LDAPFilter.decode("(targetDN=*)"),
+            attributes,
+            createControls(cookieNotEmpty),
+            null);
+
+      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
+      assertEquals(searchOp.getSearchEntries().size(), 0);
+      assertTrue(searchOp.getErrorMessage().toString().startsWith(
+          "Full resync required. Reason: The provided cookie is older than the start of historical in the server for the replicated domain : o=test"),
+          searchOp.getErrorMessage().toString());
+
+      // Clean
+      server01.stop();
+
+      // And reset changelog purge delay for the other tests.
+      d1.setPurgeDelay(15 * 1000);
+      d2.setPurgeDelay(15 * 1000);
+      
     }
     catch(Exception e)
     {
@@ -2864,6 +3042,7 @@
       }
       server01.stop();
 
+      // Test with filter on draft changenumber
       filter = "(&(targetdn=*"+tn.toLowerCase()+"*,o=test)(&(changenumber>="+
       firstDraftChangeNumber+")(changenumber<="+(firstDraftChangeNumber+3)+")))";
       debugInfo(tn, " Search: " + filter);
@@ -3898,18 +4077,18 @@
     debugInfo(tn, "Ending test with success");
   }
   
-  private void waitOpResult(AbstractOperation searchOp,
+  private void waitOpResult(AbstractOperation operation,
       ResultCode expectedResult)
   {
     int ii=0;
-    while((searchOp.getResultCode()==ResultCode.UNDEFINED) ||
-        (searchOp.getResultCode()!=expectedResult))
+    while((operation.getResultCode()==ResultCode.UNDEFINED) ||
+        (operation.getResultCode()!=expectedResult))
     {
       sleep(50);
       ii++;
       if (ii>10)
-        assertEquals(searchOp.getResultCode(), expectedResult, 
-            searchOp.getErrorMessage().toString());                
+        assertEquals(operation.getResultCode(), expectedResult, 
+            operation.getErrorMessage().toString());                
     }
   }
 }
\ No newline at end of file

--
Gitblit v1.10.0