mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
07.32.2009 40e614c194cf2adb66c2deb6fca196ba4d7eab58
Fix for #4209 ECL: trailing changelog entries after purge can make ECL search result incoherent

Description

Example:
Using 1 suffix, 2 DSes on which write operations are done when replication is enabled.
After a given time (24 h by default config), the replication db is trimed.
Searching the ECL, returns then the latest change done for each (DS, suffix).
This result is not correct :
- because the db is supposed to be trimmed
- because it does not respect time consistency, since it contains some old changes (for example from DS1) while it does not contain some more recent changes (for example
from DS2).

Roughly, this happens because the ECL relies on the "logical" replication changelog db, and because of the triming policy of the changelog db.
With more details, the replication db is made of one db instance by (DS, suffix) tuple. Trimming the "changelog db" consists in trimming each db instance, and for each
instance to always leave at least the latest change (for some reason purely replication oriented).

The fix of the ECL consists in :
- at the beginning of the ECL search processing, getting the latest trim date from each replication server domain (stored when trim is done)
- when each change is processed, to skip it when it is older than the latest trim date of the domain.

From the above example:
Now the ECL is empty after the changelog db trimming.

Diff
Code and new unit test

4 files modified
243 ■■■■ changed files
opends/src/server/org/opends/server/replication/server/DbHandler.java 17 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java 9 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java 20 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java 197 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -113,6 +113,8 @@
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  private long latestTrimDate = 0;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
@@ -393,6 +395,16 @@
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
   */
  public long getLatestTrimDate()
  {
    return latestTrimDate;
  }
  /**
   * Trim old changes from this replicationServer database.
   * @throws DatabaseException In case of database problem.
   */
@@ -403,7 +415,10 @@
    int size = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
    latestTrimDate = TimeThread.getTime() - trimage;
    ChangeNumber trimDate = new ChangeNumber(latestTrimDate,
        (short) 0, (short)0);
    // In case of deadlock detection by the Database, this thread can
opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -154,6 +154,7 @@
    ServerState startState;
    ServerState currentState;
    ServerState stopState;
    long domainLatestTrimDate;
    /**
     * {@inheritDoc}
@@ -230,7 +231,12 @@
        {
          // Here comes a new message !!!
          // non blocking
          UpdateMsg newMsg = mh.getnextMessage(false);
          UpdateMsg newMsg;
          do {
            newMsg = mh.getnextMessage(false);
            // older than latest domain trimdate ?
          } while ((newMsg!=null) &&
              (newMsg.getChangeNumber().getTime() < domainLatestTrimDate));
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
@@ -639,6 +645,7 @@
          DomainContext newDomainCtxt = new DomainContext();
          newDomainCtxt.active = true;
          newDomainCtxt.rsd = rsd;
          newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
          // Assign the start state for the domain
          if (isPersistent ==
opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2615,7 +2615,7 @@
   *
   * @param delay The new purge delay to use.
   */
  void setPurgeDelay(long delay)
  public void setPurgeDelay(long delay)
  {
    for (DbHandler handler : sourceDbHandlers.values())
    {
@@ -3201,4 +3201,22 @@
    }
    return res;
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
   * @return The latest trim date.
   */
  public long getLatestDomainTrimDate()
  {
    long latest = 0;
    for (DbHandler db : sourceDbHandlers.values())
    {
      if ((latest==0) || (latest<db.getLatestTrimDate()))
      {
        latest = db.getLatestTrimDate();
      }
    }
    return latest;
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -229,7 +229,11 @@
    ECLRemoteNonEmpty();replicationServer.clearDb();
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();replicationServer.clearDb();
    ECLTwoDomains();
    // changelogDb required NOT empty for the next test
    // Test ECL after changelog triming
    ECLAfterChangelogTrim();replicationServer.clearDb();
    
    // Persistent search with changesOnly request
    ECLPsearch(true, false);replicationServer.clearDb();
@@ -287,7 +291,7 @@
    ECLCompatTestLimitsAndAdd(1,8, ts);
    // Test DraftCNDb is purged when replication change log is purged
    ECLCompatPurge();
    ECLPurgeDraftCNDbAfterChangelogClear();
    
    // Test first and last are updated
    ECLCompatTestLimits(0,0);
@@ -691,7 +695,12 @@
      // (does only refer to non private backend)
      MultiDomainServerState expectedLastCookie =
        new MultiDomainServerState("o=test:"+cn1+";");
      assertLastCookieEquals(tn, expectedLastCookie);
      String lastCookie = readLastCookie(tn);
      assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(lastCookie)),
          " Expected last cookie attribute value:" + expectedLastCookie +
          " Read from server: " + lastCookie + " are equal :");
      
      // Cleaning
      if (domain2 != null)
@@ -710,9 +719,11 @@
    debugInfo(tn, "Ending test successfully");
  }
  //=======================================================
  // From embebbded ECL
  // Search ECL with 4 messages on 2 suffixes from 2 brokers
  /**
   * From embebbded ECL
   * Search ECL with 4 messages on 2 suffixes from 2 brokers
   *
   */
  private void ECLTwoDomains()
  {
    String tn = "ECLTwoDomains";
@@ -1014,14 +1025,19 @@
      //
      MultiDomainServerState expectedLastCookie =
        new MultiDomainServerState("o=test:"+cn5+" "+cn9+";o=test2:"+cn3+" "+cn8+";");
      assertLastCookieEquals(tn, expectedLastCookie);
      String lastCookie = readLastCookie(tn);
      assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(lastCookie)),
          " Expected last cookie attribute value:" + expectedLastCookie +
          " Read from server: " + lastCookie + " are equal :");
      
      s1test.stop();
      s1test2.stop();
      s2test.stop();
      s2test2.stop();
      removeTestBackend2(backend2);
      // removeTestBackend2(backend2);
    }
    catch(Exception e)
    {
@@ -1031,8 +1047,101 @@
    debugInfo(tn, "Ending test successfully");
  }
  private void assertLastCookieEquals(String tn,
      MultiDomainServerState expectedLastCookie)
  //=======================================================
  // Test ECL content after changelog triming
  private void ECLAfterChangelogTrim()
  {
    String tn = "ECLAfterChangelogTrim";
    debugInfo(tn, "Starting test");
    try
    {
      replicationServer.getReplicationServerDomain("o=test", false).setPurgeDelay(1);
      replicationServer.getReplicationServerDomain("o=test2", false).setPurgeDelay(1);
      Thread.sleep(1000);
      //
      LDIFWriter ldifWriter = getLDIFWriter();
      // Test with empty cookie : from the beginning
      String cookie= "";
      // search on 'cn=changelog'
      LinkedHashSet<String> attributes = new LinkedHashSet<String>();
      attributes.add("+");
      attributes.add("*");
      debugInfo(tn, "Search with cookie=" + cookie + "\"");
      InternalSearchOperation searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
            SearchScope.WHOLE_SUBTREE,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(targetDN=*)"),
            attributes,
            createControls(cookie),
            null);
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString());
      cookie="";
      LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        for (SearchResultEntry entry : entries)
        {
          debugInfo(tn, " RESULT entry returned:" + entry.toSingleLineString());
          ldifWriter.writeEntry(entry);
        }
      }
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      // Read last cookie
      cookie = readLastCookie(tn);
      // Test from last cookie
      // search on 'cn=changelog'
      debugInfo(tn, "Search with cookie=" + cookie + "\"");
      searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
            SearchScope.WHOLE_SUBTREE,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(targetDN=*)"),
            attributes,
            createControls(cookie),
            null);
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString());
      entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        for (SearchResultEntry entry : entries)
        {
          debugInfo(tn, " RESULT entry returned:" + entry.toSingleLineString());
          ldifWriter.writeEntry(entry);
        }
      }
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS);
      assertEquals(searchOp.getSearchEntries().size(), 0);
    }
    catch(Exception e)
    {
      fail("Ending test " + tn + "with exception:\n"
          +  stackTraceToSingleLineString(e));
    }
    debugInfo(tn, "Ending test successfully");
  }
  private String readLastCookie(String tn)
  {
    String cookie = "";
    LDIFWriter ldifWriter = getLDIFWriter();
@@ -1043,47 +1152,45 @@
    try
    {
    InternalSearchOperation searchOp =
     connection.processSearch(
        ByteString.valueOf(""),
        SearchScope.BASE_OBJECT,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, // Size limit
        0, // Time limit
        false, // Types only
        LDAPFilter.decode("(objectclass=*)"),
        lastcookieattribute,
        NO_CONTROL,
        null);
      InternalSearchOperation searchOp =
        connection.processSearch(
            ByteString.valueOf(""),
            SearchScope.BASE_OBJECT,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(objectclass=*)"),
            lastcookieattribute,
            NO_CONTROL,
            null);
    assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
        searchOp.getErrorMessage().toString()
        + searchOp.getAdditionalLogMessage());
    LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
    if (entries != null)
    {
      for (SearchResultEntry resultEntry : entries)
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString()
          + searchOp.getAdditionalLogMessage());
      LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        ldifWriter.writeEntry(resultEntry);
        try
        for (SearchResultEntry resultEntry : entries)
        {
          List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
          cookie = l.get(0).iterator().next().toString();
          ldifWriter.writeEntry(resultEntry);
          try
          {
            List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
            cookie = l.get(0).iterator().next().toString();
          }
          catch(NullPointerException e)
          {}
        }
        catch(NullPointerException e)
        {}
      }
    }
    }
    catch(Exception e)
    {
      fail("Ending test " + tn + " with exception:\n"
          +  stackTraceToSingleLineString(e));      
    }
    assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
        " Expected last cookie attribute value:" + expectedLastCookie +
        " Read from server: " + cookie + " are equal :");
    return cookie;
  }
  
  // simple update to be received
@@ -3080,10 +3187,14 @@
    }
    debugInfo(tn, "Ending test with success");
  }
  private void ECLCompatPurge()
  /**
   * Put a short purge delay to the draftCNDB, clear the changelogDB,
   * expect the draftCNDb to be purged accordingly.
   */
  private void ECLPurgeDraftCNDbAfterChangelogClear()
  {
    String tn = "ECLCompatPurge";
    String tn = "ECLPurgeDraftCNDbAfterChangelogClear";
    debugInfo(tn, "Starting test\n\n");
    try
    {