mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

pgamba
07.32.2009 f97c626a4abcd3f86332aeb896c051a2454e7dd0
opendj-sdk/opends/src/server/org/opends/server/replication/server/DbHandler.java
@@ -113,6 +113,8 @@
  // The maximum number of retries in case of DatabaseDeadlock Exception.
  private static final int DEADLOCK_RETRIES = 10;
  private long latestTrimDate = 0;
  /**
   *
   * The trim age in milliseconds. Changes record in the change DB that
@@ -393,6 +395,16 @@
  }
  /**
   * Retrieves the latest trim date.
   * @return the latest trim date.
   */
  public long getLatestTrimDate()
  {
    return latestTrimDate;
  }
  /**
   * Trim old changes from this replicationServer database.
   * @throws DatabaseException In case of database problem.
   */
@@ -403,7 +415,10 @@
    int size = 0;
    boolean finished = false;
    boolean done = false;
    ChangeNumber trimDate = new ChangeNumber(TimeThread.getTime() - trimage,
    latestTrimDate = TimeThread.getTime() - trimage;
    ChangeNumber trimDate = new ChangeNumber(latestTrimDate,
        (short) 0, (short)0);
    // In case of deadlock detection by the Database, this thread can
opendj-sdk/opends/src/server/org/opends/server/replication/server/ECLServerHandler.java
@@ -154,6 +154,7 @@
    ServerState startState;
    ServerState currentState;
    ServerState stopState;
    long domainLatestTrimDate;
    /**
     * {@inheritDoc}
@@ -230,7 +231,12 @@
        {
          // Here comes a new message !!!
          // non blocking
          UpdateMsg newMsg = mh.getnextMessage(false);
          UpdateMsg newMsg;
          do {
            newMsg = mh.getnextMessage(false);
            // older than latest domain trimdate ?
          } while ((newMsg!=null) &&
              (newMsg.getChangeNumber().getTime() < domainLatestTrimDate));
          if (debugEnabled())
            TRACER.debugInfo(" In ECLServerHandler, for " + mh.getServiceId() +
@@ -639,6 +645,7 @@
          DomainContext newDomainCtxt = new DomainContext();
          newDomainCtxt.active = true;
          newDomainCtxt.rsd = rsd;
          newDomainCtxt.domainLatestTrimDate = rsd.getLatestDomainTrimDate();
          // Assign the start state for the domain
          if (isPersistent ==
opendj-sdk/opends/src/server/org/opends/server/replication/server/ReplicationServerDomain.java
@@ -2615,7 +2615,7 @@
   *
   * @param delay The new purge delay to use.
   */
  void setPurgeDelay(long delay)
  public void setPurgeDelay(long delay)
  {
    for (DbHandler handler : sourceDbHandlers.values())
    {
@@ -3201,4 +3201,22 @@
    }
    return res;
  }
  /**
   * Get the latest (more recent) trim date of the changelog dbs associated
   * to this domain.
   * @return The latest trim date.
   */
  public long getLatestDomainTrimDate()
  {
    long latest = 0;
    for (DbHandler db : sourceDbHandlers.values())
    {
      if ((latest==0) || (latest<db.getLatestTrimDate()))
      {
        latest = db.getLatestTrimDate();
      }
    }
    return latest;
  }
}
opendj-sdk/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/ExternalChangeLogTest.java
@@ -229,7 +229,11 @@
    ECLRemoteNonEmpty();replicationServer.clearDb();
    // Test with a mix of domains, a mix of DSes
    ECLTwoDomains();replicationServer.clearDb();
    ECLTwoDomains();
    // changelogDb required NOT empty for the next test
    // Test ECL after changelog triming
    ECLAfterChangelogTrim();replicationServer.clearDb();
    
    // Persistent search with changesOnly request
    ECLPsearch(true, false);replicationServer.clearDb();
@@ -287,7 +291,7 @@
    ECLCompatTestLimitsAndAdd(1,8, ts);
    // Test DraftCNDb is purged when replication change log is purged
    ECLCompatPurge();
    ECLPurgeDraftCNDbAfterChangelogClear();
    
    // Test first and last are updated
    ECLCompatTestLimits(0,0);
@@ -691,7 +695,12 @@
      // (does only refer to non private backend)
      MultiDomainServerState expectedLastCookie =
        new MultiDomainServerState("o=test:"+cn1+";");
      assertLastCookieEquals(tn, expectedLastCookie);
      String lastCookie = readLastCookie(tn);
      assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(lastCookie)),
          " Expected last cookie attribute value:" + expectedLastCookie +
          " Read from server: " + lastCookie + " are equal :");
      
      // Cleaning
      if (domain2 != null)
@@ -710,9 +719,11 @@
    debugInfo(tn, "Ending test successfully");
  }
  //=======================================================
  // From embebbded ECL
  // Search ECL with 4 messages on 2 suffixes from 2 brokers
  /**
   * From embebbded ECL
   * Search ECL with 4 messages on 2 suffixes from 2 brokers
   *
   */
  private void ECLTwoDomains()
  {
    String tn = "ECLTwoDomains";
@@ -1014,14 +1025,19 @@
      //
      MultiDomainServerState expectedLastCookie =
        new MultiDomainServerState("o=test:"+cn5+" "+cn9+";o=test2:"+cn3+" "+cn8+";");
      assertLastCookieEquals(tn, expectedLastCookie);
      String lastCookie = readLastCookie(tn);
      assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(lastCookie)),
          " Expected last cookie attribute value:" + expectedLastCookie +
          " Read from server: " + lastCookie + " are equal :");
      
      s1test.stop();
      s1test2.stop();
      s2test.stop();
      s2test2.stop();
      removeTestBackend2(backend2);
      // removeTestBackend2(backend2);
    }
    catch(Exception e)
    {
@@ -1031,8 +1047,101 @@
    debugInfo(tn, "Ending test successfully");
  }
  private void assertLastCookieEquals(String tn,
      MultiDomainServerState expectedLastCookie)
  //=======================================================
  // Test ECL content after changelog triming
  private void ECLAfterChangelogTrim()
  {
    String tn = "ECLAfterChangelogTrim";
    debugInfo(tn, "Starting test");
    try
    {
      replicationServer.getReplicationServerDomain("o=test", false).setPurgeDelay(1);
      replicationServer.getReplicationServerDomain("o=test2", false).setPurgeDelay(1);
      Thread.sleep(1000);
      //
      LDIFWriter ldifWriter = getLDIFWriter();
      // Test with empty cookie : from the beginning
      String cookie= "";
      // search on 'cn=changelog'
      LinkedHashSet<String> attributes = new LinkedHashSet<String>();
      attributes.add("+");
      attributes.add("*");
      debugInfo(tn, "Search with cookie=" + cookie + "\"");
      InternalSearchOperation searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
            SearchScope.WHOLE_SUBTREE,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(targetDN=*)"),
            attributes,
            createControls(cookie),
            null);
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString());
      cookie="";
      LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        for (SearchResultEntry entry : entries)
        {
          debugInfo(tn, " RESULT entry returned:" + entry.toSingleLineString());
          ldifWriter.writeEntry(entry);
        }
      }
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS);
      assertEquals(searchOp.getSearchEntries().size(), 0);
      // Read last cookie
      cookie = readLastCookie(tn);
      // Test from last cookie
      // search on 'cn=changelog'
      debugInfo(tn, "Search with cookie=" + cookie + "\"");
      searchOp =
        connection.processSearch(
            ByteString.valueOf("cn=changelog"),
            SearchScope.WHOLE_SUBTREE,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(targetDN=*)"),
            attributes,
            createControls(cookie),
            null);
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString());
      entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        for (SearchResultEntry entry : entries)
        {
          debugInfo(tn, " RESULT entry returned:" + entry.toSingleLineString());
          ldifWriter.writeEntry(entry);
        }
      }
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS);
      assertEquals(searchOp.getSearchEntries().size(), 0);
    }
    catch(Exception e)
    {
      fail("Ending test " + tn + "with exception:\n"
          +  stackTraceToSingleLineString(e));
    }
    debugInfo(tn, "Ending test successfully");
  }
  private String readLastCookie(String tn)
  {
    String cookie = "";
    LDIFWriter ldifWriter = getLDIFWriter();
@@ -1043,47 +1152,45 @@
    try
    {
    InternalSearchOperation searchOp =
     connection.processSearch(
        ByteString.valueOf(""),
        SearchScope.BASE_OBJECT,
        DereferencePolicy.NEVER_DEREF_ALIASES,
        0, // Size limit
        0, // Time limit
        false, // Types only
        LDAPFilter.decode("(objectclass=*)"),
        lastcookieattribute,
        NO_CONTROL,
        null);
      InternalSearchOperation searchOp =
        connection.processSearch(
            ByteString.valueOf(""),
            SearchScope.BASE_OBJECT,
            DereferencePolicy.NEVER_DEREF_ALIASES,
            0, // Size limit
            0, // Time limit
            false, // Types only
            LDAPFilter.decode("(objectclass=*)"),
            lastcookieattribute,
            NO_CONTROL,
            null);
    assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
        searchOp.getErrorMessage().toString()
        + searchOp.getAdditionalLogMessage());
    LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
    if (entries != null)
    {
      for (SearchResultEntry resultEntry : entries)
      assertEquals(searchOp.getResultCode(), ResultCode.SUCCESS,
          searchOp.getErrorMessage().toString()
          + searchOp.getAdditionalLogMessage());
      LinkedList<SearchResultEntry> entries = searchOp.getSearchEntries();
      if (entries != null)
      {
        ldifWriter.writeEntry(resultEntry);
        try
        for (SearchResultEntry resultEntry : entries)
        {
          List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
          cookie = l.get(0).iterator().next().toString();
          ldifWriter.writeEntry(resultEntry);
          try
          {
            List<Attribute> l = resultEntry.getAttribute("lastexternalchangelogcookie");
            cookie = l.get(0).iterator().next().toString();
          }
          catch(NullPointerException e)
          {}
        }
        catch(NullPointerException e)
        {}
      }
    }
    }
    catch(Exception e)
    {
      fail("Ending test " + tn + " with exception:\n"
          +  stackTraceToSingleLineString(e));      
    }
    assertTrue(expectedLastCookie.equalsTo(new MultiDomainServerState(cookie)),
        " Expected last cookie attribute value:" + expectedLastCookie +
        " Read from server: " + cookie + " are equal :");
    return cookie;
  }
  
  // simple update to be received
@@ -3080,10 +3187,14 @@
    }
    debugInfo(tn, "Ending test with success");
  }
  private void ECLCompatPurge()
  /**
   * Put a short purge delay to the draftCNDB, clear the changelogDB,
   * expect the draftCNDb to be purged accordingly.
   */
  private void ECLPurgeDraftCNDbAfterChangelogClear()
  {
    String tn = "ECLCompatPurge";
    String tn = "ECLPurgeDraftCNDbAfterChangelogClear";
    debugInfo(tn, "Starting test\n\n");
    try
    {