mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
10.57.2009 1c126204d9add5e14c4ce3d60c9c89b1a5260133
Fix #4395 ECL cookie older than server changelog db trim is not detected
4 files modified
315 ■■■■ changed files
opends/src/messages/messages/replication.properties 10 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 87 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 5 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 213 ●●●●● patch | view | raw | blame | history
opends/src/messages/messages/replication.properties
@@ -369,8 +369,8 @@
 the replication server domain: %s 
SEVERE_ERR_REPLICATION_PROTOCOL_MESSAGE_TYPE_157=Replication \
 protocol error. Bad message type. %s received, %s required  
SEVERE_ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED_158=The provided cookie is \
 invalid in the current context due to %s. Full resync is required
SEVERE_ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE_158=Full resync \
 required. Reason: The provided cookie is missing the replicated domain(s) %s
SEVERE_ERR_BYTE_COUNT_159=The Server Handler byte count is not correct (Fixed)
NOTICE_ERR_FRACTIONAL_CONFIG_UNKNOWN_OBJECT_CLASS_160=Wrong fractional \
 replication configuration: could not find object class definition for %s in \
@@ -442,3 +442,9 @@
NOTICE_ERR_ENTRY_UID_DSEE_MAPPING_184=Error for entry %s when mapping entry UID\
  attribute to DSEE NsUniqueID attribute. Value to be mapped: %s \
 Error : %s
SEVERE_ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE_185=Full resync \
 required. Reason: The provided cookie contains unknown replicated domain %s
SEVERE_ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE_186=Full resync \
 required. Reason: The provided cookie is older than the start of historical \
 in the server for the replicated domain : %s
SEVERE_ERR_INVALID_COOKIE_SYNTAX_187=Invalid syntax of the provided cookie
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -686,24 +686,24 @@
      boolean allowUnknownDomains)
  throws DirectoryException
  {
    HashMap<String,ServerState> startStates = new HashMap<String,ServerState>();
    HashMap<String,ServerState> startStatesFromProvidedCookie =
      new HashMap<String,ServerState>();
    ReplicationServer rs = this.replicationServer;
    // Parse the provided cookie and overwrite startState from it.
    if ((providedCookie != null) && (providedCookie.length()!=0))
      startStates =
      startStatesFromProvidedCookie =
        MultiDomainServerState.splitGenStateToServerStates(providedCookie);
    try
    {
      // Now traverse all domains and build all the initial contexts :
      // - the global one : dumpState()
      // - the domain by domain ones : domainCtxts
      Iterator<ReplicationServerDomain> rsdi = rs.getDomainIterator();
      // Creates the table that will contain the real-time info by domain.
      // Creates the table that will contain the real-time info for each
      // and every domain.
      HashSet<DomainContext> tmpSet = new HashSet<DomainContext>();
      String missingDomains = "";
      int i =0;
      if (rsdi != null)
      {
@@ -738,18 +738,45 @@
          }
          else
          {
            newDomainCtxt.startState = startStates.remove(rsd.getBaseDn());
            // let's take the start state for this domain from the provided
            // cookie
            newDomainCtxt.startState =
              startStatesFromProvidedCookie.remove(rsd.getBaseDn());
            if ((providedCookie==null)||(providedCookie.length()==0)
                ||allowUnknownDomains)
            {
              // when there is no cookie provided in the request,
              // let's start traversing this domain from the beginning of
              // what we have in the replication changelog
              if (newDomainCtxt.startState == null)
                newDomainCtxt.startState = new ServerState();
            }
            else
            {
              // when there is a cookie provided in the request,
              if (newDomainCtxt.startState == null)
                throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                    ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
                        "missing " + rsd.getBaseDn()));
              {
                missingDomains += (rsd.getBaseDn() + ":;");
                continue;
              }
              else if (!newDomainCtxt.startState.isEmpty())
              {
                // when the provided startState is older than the replication
                // changelogdb start state, it means that the replication
                // changelog db has been trimed and the cookie is not valid
                // anymore.
                if (newDomainCtxt.startState.cover(rsd.getStartState())==false)
                {
                  // the provided start
                  throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
                      ERR_RESYNC_REQUIRED_TOO_OLD_DOMAIN_IN_PROVIDED_COOKIE.get(
                          newDomainCtxt.rsd.getBaseDn()));
                }
              }
            }
            // Set the stop state for the domain from the eligibleCN
            newDomainCtxt.stopState = rsd.getEligibleState(eligibleCN);
          }
          newDomainCtxt.currentState = new ServerState();
@@ -774,18 +801,34 @@
          i++;
        }
      }
      if (!startStates.isEmpty())
      if (missingDomains.length()>0)
      {
        // If there are domain missing in the provided cookie,
        // the request is rejected and a full resync is required.
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
            ERR_INVALID_COOKIE_FULL_RESYNC_REQUIRED.get(
                "unknown " + startStates.toString()));
          ERR_RESYNC_REQUIRED_MISSING_DOMAIN_IN_PROVIDED_COOKIE.get(
              missingDomains + " .Possible cookie:" +
              (providedCookie + missingDomains)));
      }
      if (!startStatesFromProvidedCookie.isEmpty())
      {
        // After reading all the knows domains from the provided cookie, there
        // is one (or several) domain that are not currently configured.
        // This domain has probably been removed or replication disabled on it.
        // The request is rejected and full resync is required.
        throw new DirectoryException(ResultCode.UNWILLING_TO_PERFORM,
            ERR_RESYNC_REQUIRED_UNKNOWN_DOMAIN_IN_PROVIDED_COOKIE.get(
                startStatesFromProvidedCookie.toString()));
      }
      domainCtxts = tmpSet.toArray(new DomainContext[0]);
      // the next record from the DraftCNdb should be the one
      startCookie = providedCookie;
      // Initializes all domain with the next(first) elligible message
      // Initializes each and every domain with the next(first) eligible message
      // from the domain.
      for (int j=0; j<domainCtxts.length; j++)
      {
        domainCtxts[j].getNextEligibleMessageForDomain(operationId);
@@ -794,6 +837,10 @@
          domainCtxts[j].active = false;
      }
    }
    catch(DirectoryException de)
    {
      throw de;
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
@@ -939,8 +986,18 @@
    isPersistent  = startECLSessionMsg.isPersistent();
    lastDraftCN   = startECLSessionMsg.getLastDraftChangeNumber();
    searchPhase   = INIT_PHASE;
    previousCookie = new MultiDomainServerState(
    try
    {
      previousCookie = new MultiDomainServerState(
        startECLSessionMsg.getCrossDomainServerState());
    }
    catch(Exception e)
    {
      TRACER.debugCaught(DebugLogLevel.ERROR, e);
      throw new DirectoryException(
          ResultCode.UNWILLING_TO_PERFORM,
          ERR_INVALID_COOKIE_SYNTAX.get());
    }
    excludedServiceIDs = startECLSessionMsg.getExcludedServiceIDs();
    replicationServer.disableEligibility(excludedServiceIDs);
    eligibleCN = replicationServer.getEligibleCN();
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -3196,7 +3196,10 @@
  /**
   * Returns the start state of the domain, made of the first (oldest)
   * change stored for each serverId.
   * @return t start state of the domain.
   * Note: Because the replication changelogdb triming always keep one change
   * whatever its date, the change contained in the returned state can be very
   * old.
   * @return the start state of the domain.
   */
  public ServerState getStartState()
  {
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -246,6 +246,9 @@
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains(); replicationServer.clearDb();
    // Test ECL after changelog triming
    ECLAfterChangelogTrim();replicationServer.clearDb();
    // Write changes and read ECL from start
    int ts = ECLCompatWriteReadAllOps(1);
@@ -285,8 +288,7 @@
    ECLRemoteNonEmpty();replicationServer.clearDb();
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();
    // changelogDb required NOT empty for the next test
    ECLTwoDomains();replicationServer.clearDb();
    // Test ECL after changelog triming
    ECLAfterChangelogTrim();replicationServer.clearDb();
@@ -1096,6 +1098,90 @@
          " Expected last cookie attribute value:" + expectedLastCookie +
          " Read from server: " + lastCookie + " are equal :");
      // Test invalid cookie
      cookie += ";o=test6:";
      control =
        new ExternalChangelogRequestControl(true,
            new MultiDomainServerState(cookie));
      controls = new ArrayList<Control>(0);
      controls.add(control);
      debugInfo(tn, "Search with bad domain in cookie=" + cookie);
      searchOp = connection.processSearch(
      ByteString.valueOf("cn=changelog"),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, // Size limit
      0, // Time limit
      false, // Types only
      LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"),
      attributes,
      controls,
      null);
      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
          "Invalid syntax of the provided cookie"),
          searchOp.getErrorMessage().toString());
      // Test unknown domain in provided cookie
      // This case seems to be very hard to obtain in the real life
      // (how to remove a domain from a RS topology ?)
      // let's do a very quick test here.
      String newCookie = lastCookie + "o=test6:";
      control =
        new ExternalChangelogRequestControl(true,
            new MultiDomainServerState(newCookie));
      controls = new ArrayList<Control>(0);
      controls.add(control);
      debugInfo(tn, "Search with bad domain in cookie=" + cookie);
      searchOp = connection.processSearch(
      ByteString.valueOf("cn=changelog"),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, // Size limit
      0, // Time limit
      false, // Types only
      LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"),
      attributes,
      controls,
      null);
      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
          "Full resync required. Reason: The provided cookie contains unknown replicated domain {o=test6=}"),
          searchOp.getErrorMessage().toString());
      // Test missing domain in provided cookie
      newCookie = lastCookie.substring(lastCookie.indexOf(';')+1);
      control =
        new ExternalChangelogRequestControl(true,
            new MultiDomainServerState(newCookie));
      controls = new ArrayList<Control>(0);
      controls.add(control);
      debugInfo(tn, "Search with bad domain in cookie=" + cookie);
      searchOp = connection.processSearch(
      ByteString.valueOf("cn=changelog"),
      SearchScope.WHOLE_SUBTREE,
      DereferencePolicy.NEVER_DEREF_ALIASES,
      0, // Size limit
      0, // Time limit
      false, // Types only
      LDAPFilter.decode("(targetDN=*"+tn+"*,o=test)"),
      attributes,
      controls,
      null);
      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      assertTrue(searchOp.getErrorMessage().toString().equalsIgnoreCase(
          "Full resync required. Reason: The provided cookie is missing the replicated domain(s) o=test:; .Possible cookie:"
          + newCookie + "o=test:;"), "Server output:" +
          searchOp.getErrorMessage().toString());
      s1test.stop();
      s1test2.stop();
      s2test.stop();
@@ -1113,7 +1199,7 @@
  }
  //=======================================================
  // Test ECL content after changelog triming
  // Test ECL content after replication changelogdb triming
  private void ECLAfterChangelogTrim()
  {
    String tn = "ECLAfterChangelogTrim";
@@ -1121,13 +1207,59 @@
    try
    {
      replicationServer.getReplicationServerDomain("o=test", false).setPurgeDelay(1);
      replicationServer.getReplicationServerDomain("o=test2", false).setPurgeDelay(1);
      // ---
      // 1. Populate the changelog and read the cookie
      // Creates broker on o=test
      ReplicationBroker server01 = openReplicationSession(
          DN.decode(TEST_ROOT_DN_STRING),  1201,
          100, replicationServerPort,
          brokerSessionTimeout, true);
      int ts = 1;
      ChangeNumber cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
      DeleteMsg delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, tn+"uuid1");
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      Thread.sleep(1000);
      // Test that last cookie has been updated
      String cookieNotEmpty = readLastCookie(tn);
      debugInfo(tn, "Store cookie not empty=\"" + cookieNotEmpty + "\"");
      cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"2," + TEST_ROOT_DN_STRING, cn1, tn+"uuid2");
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"3," + TEST_ROOT_DN_STRING, cn1, tn+"uuid3");
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      // Sleep longer than this delay - the changelog will be trimmed
      Thread.sleep(1000);
      // ---
      // 2. Now set up a very short purge delay on the replication changelogs
      // so that this test can play with a trimmed changelog.
      ReplicationServerDomain d1 = replicationServer.getReplicationServerDomain("o=test", false);
      ReplicationServerDomain d2 = replicationServer.getReplicationServerDomain("o=test2", false);
      d1.setPurgeDelay(1);
      d2.setPurgeDelay(1);
      // Sleep longer than this delay - so that the changelog is trimmed
      Thread.sleep(1000);
      //
      LDIFWriter ldifWriter = getLDIFWriter();
      // Test with empty cookie : from the beginning
      // ---
      // 3. Assert that a request with an empty cookie returns nothing
      String cookie= "";
      // search on 'cn=changelog'
@@ -1135,7 +1267,7 @@
      attributes.add("+");
      attributes.add("*");
      debugInfo(tn, "Search with cookie=" + cookie + "\"");
      debugInfo(tn, "1. Search with cookie=" + cookie + "\"");
      InternalSearchOperation searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
@@ -1161,14 +1293,13 @@
          ldifWriter.writeEntry(entry);
        }
      }
      // Assert ECL is empty since replication changelog has been trimmed
      assertEquals(searchOp.getSearchEntries().size(), 0);
      // Read last cookie
      // 4. Assert that a request with the current last cookie returns nothing
      cookie = readLastCookie(tn);
      // Test from last cookie
      // search on 'cn=changelog'
      debugInfo(tn, "Search with cookie=" + cookie + "\"");
      debugInfo(tn, "2. Search with last cookie=" + cookie + "\"");
      searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
@@ -1192,7 +1323,54 @@
          ldifWriter.writeEntry(entry);
        }
      }
      // Assert ECL is empty since replication changelog has been trimmed
      assertEquals(searchOp.getSearchEntries().size(), 0);
      // ---
      // 5. Assert that a request with an "old" cookie - one that refers to
      //    changes that have been removed by the replication changelog trimming
      //    returns the appropriate error.
      cn1 = new ChangeNumber(TimeThread.getTime(), ts++, 1201);
      delMsg =
        new DeleteMsg("uid="+tn+"1," + TEST_ROOT_DN_STRING, cn1, tn+"uuid1");
      server01.publish(delMsg);
      debugInfo(tn, " publishes " + delMsg.getChangeNumber());
      attributes = new LinkedHashSet<String>();
      attributes.add("+");
      attributes.add("*");
      debugInfo(tn, "3. Search with cookie=\"" + cookieNotEmpty + "\"");
      debugInfo(tn, "d1 trimdate" + d1.getStartState());
      debugInfo(tn, "d2 trimdate" + d2.getStartState());
      searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
            SearchScope.WHOLE_SUBTREE,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(targetDN=*)"),
            attributes,
            createControls(cookieNotEmpty),
            null);
      waitOpResult(searchOp, ResultCode.UNWILLING_TO_PERFORM);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      assertTrue(searchOp.getErrorMessage().toString().startsWith(
          "Full resync required. Reason: The provided cookie is older than the start of historical in the server for the replicated domain : o=test"),
          searchOp.getErrorMessage().toString());
      // Clean
      server01.stop();
      // And reset changelog purge delay for the other tests.
      d1.setPurgeDelay(15 * 1000);
      d2.setPurgeDelay(15 * 1000);
    }
    catch(Exception e)
    {
@@ -2864,6 +3042,7 @@
      }
      server01.stop();
      // Test with filter on draft changenumber
      filter = "(&(targetdn=*"+tn.toLowerCase()+"*,o=test)(&(changenumber>="+
      firstDraftChangeNumber+")(changenumber<="+(firstDraftChangeNumber+3)+")))";
      debugInfo(tn, " Search: " + filter);
@@ -3898,18 +4077,18 @@
    debugInfo(tn, "Ending test with success");
  }
  
  private void waitOpResult(AbstractOperation searchOp,
  private void waitOpResult(AbstractOperation operation,
      ResultCode expectedResult)
  {
    int ii=0;
    while((searchOp.getResultCode()==ResultCode.UNDEFINED) ||
        (searchOp.getResultCode()!=expectedResult))
    while((operation.getResultCode()==ResultCode.UNDEFINED) ||
        (operation.getResultCode()!=expectedResult))
    {
      sleep(50);
      ii++;
      if (ii>10)
        assertEquals(searchOp.getResultCode(), expectedResult,
            searchOp.getErrorMessage().toString());
        assertEquals(operation.getResultCode(), expectedResult,
            operation.getErrorMessage().toString());
    }
  }
}