mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.30.2014 aacb8bbf0a764ce8eb205e0f6376c055b3e1baa8
OPENDJ-1177 (CR-3314) Re-implement changelog purging logic

JEChangelogDB.java:
In ChangelogDBPurger.run(), changed code to:
- support the absence of change number index DB. To simplify matters, code assumes that ds-cfg-compute-change-number does not change during the life time of an RS.
- purged by using a CSN made up from the purge delay rather than using the previous cookie.
- sleep 500 millis if there are no changes to purge, or sleep till the next change to purge.
- gracefully shutdown without fuss in the logs.


JEChangeNumberIndexDB.java
In purgeUpTo(), return the oldest non purged CSN rather than the previous cookie + merged two branches of the code.

ExternalChangeLogTest.java:
Consequence of the change to JEChangeNumberIndexDB.
Extracted method assertECLLimits() from ECLCompatTestLimits() + added a loop inside it to let the code persist changes asynchronously from the test thread.

JEChangeNumberIndexDBTest.java:
Renamed testTrim() to testPurge().
In newReplicationServer(), enabled ds-cfg-compute-change-number.
4 files modified
160 ■■■■■ changed files
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java 35 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java 76 ●●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java 39 ●●●● patch | view | raw | blame | history
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java 10 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -37,7 +37,6 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
@@ -218,23 +217,19 @@
   * Synchronously purges the change number index DB up to and excluding the
   * provided timestamp.
   *
   * @param purgeTimestamp
   * @param purgeCSN
   *          the timestamp up to which purging must happen
   * @return the {@link MultiDomainServerState} object that drives purging the
   *         replicaDBs.
   * @return the oldest non purged CSN.
   * @throws ChangelogException
   *           if a database problem occurs.
   */
  public MultiDomainServerState purgeUpTo(long purgeTimestamp)
      throws ChangelogException
  public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException
  {
    if (isEmpty())
    if (isEmpty() || purgeCSN == null)
    {
      return null;
    }
    final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
    final DraftCNDBCursor cursor = db.openDeleteCursor();
    try
    {
@@ -245,21 +240,18 @@
        {
          oldestChangeNumber = record.getChangeNumber();
        }
        if (record.getChangeNumber() == newestChangeNumber)
        {
          // do not purge the newest record to avoid having the last generated
          // changenumber dropping back to 0 if the server restarts
          return getPurgeCookie(record);
        }
        if (record.getCSN().isOlderThan(purgeCSN))
        if (record.getChangeNumber() != newestChangeNumber
            && record.getCSN().isOlderThan(purgeCSN))
        {
          cursor.delete();
        }
        else
        {
          // Current record is not old enough to purge.
          return getPurgeCookie(record);
          // 1- Current record is not old enough to purge.
          // 2- Do not purge the newest record to avoid having the last
          // generated changenumber dropping back to 0 when the server restarts
          return record.getCSN();
        }
      }
@@ -281,13 +273,6 @@
    }
  }
  private MultiDomainServerState getPurgeCookie(
      final ChangeNumberIndexRecord record) throws DirectoryException
  {
    // Do not include the record's CSN to avoid having it purged
    return new MultiDomainServerState(record.getPreviousCookie());
  }
  /**
   * Clear the changes from this DB (from both memory cache and DB storage) for
   * the provided baseDN.
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@
import java.io.File;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +39,6 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -64,7 +62,7 @@
public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
{
  /** The tracer object for the debug logger. */
  protected static final DebugTracer TRACER = getTracer();
  private static final DebugTracer TRACER = getTracer();
  /**
   * This map contains the List of updates received from each LDAP server.
@@ -846,6 +844,7 @@
   */
  private final class ChangelogDBPurger extends DirectoryThread
  {
    private static final int DEFAULT_SLEEP = 500;
    protected ChangelogDBPurger()
    {
@@ -862,42 +861,58 @@
      {
        try
        {
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
          final CSN oldestNotPurgedCSN;
          // next code assumes that the compute-change-number config
          // never changes during the life time of an RS
          if (!config.isComputeChangeNumber())
          {
            oldestNotPurgedCSN = purgeCSN;
          }
          else
          {
          final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
          if (localCNIndexDB == null)
          { // shutdown has been called
            { // shutdown has been initiated
            return;
          }
          final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
          final MultiDomainServerState purgeUpToCookie =
              localCNIndexDB.purgeUpTo(purgeTimestamp);
          if (purgeUpToCookie == null)
          { // this can happen when the change number index DB is empty
            oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
            if (oldestNotPurgedCSN == null)
            { // shutdown may have been initiated...
              if (!isShutdownInitiated())
              {
                // ... or the change number index DB is empty,
                // wait for new changes to come in.
                // Note we cannot sleep for as long as the purge delay
                // (3 days default), because we might receive late updates
                // that will have to be purged before the purge delay elapses.
                // This can particularly happen in case of network partitions.
                sleep(DEFAULT_SLEEP);
              }
            continue;
          }
          }
          /*
           * Drive purge of the replica DBs by the oldest non purged cookie in
           * the change number index DB.
           */
          for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
              : domainToReplicaDBs.entrySet())
          for (final Map<Integer, JEReplicaDB> domainMap
              : domainToReplicaDBs.values())
          {
            final DN baseDN = entry1.getKey();
            final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
            for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
            for (final JEReplicaDB replicaDB : domainMap.values())
            {
              final Integer serverId = entry2.getKey();
              final JEReplicaDB replicaDB = entry2.getValue();
              replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
              replicaDB.purgeUpTo(oldestNotPurgedCSN);
            }
          }
          latestPurgeDate = purgeTimestamp;
          // purge delay is specified in seconds so it should not be a problem
          // to sleep for 500 millis
          sleep(500);
          sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
        }
        catch (InterruptedException e)
        {
          // shutdown initiated?
        }
        catch (Exception e)
        {
@@ -910,5 +925,18 @@
        }
      }
    }
    private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
    {
      final long nextPurgeTime = notPurgedCSN.getTime();
      final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
      if (currentPurgeTime <= nextPurgeTime)
      {
        // sleep till the next CSN to purge,
        return nextPurgeTime - currentPurgeTime;
      }
      // wait a bit before purging more
      return DEFAULT_SLEEP;
    }
  }
}
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -372,7 +372,7 @@
    // Test CNIndexDB is purged when replication change log is purged
    final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
    cnIndexDB.purgeUpTo(Long.MAX_VALUE);
    cnIndexDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
    assertTrue(cnIndexDB.isEmpty());
    ECLPurgeCNIndexDBAfterChangelogClear();
@@ -2514,22 +2514,37 @@
  {
    String tn = "ECLCompatTestLimits";
    debugInfo(tn, "Starting test\n\n");
    debugInfo(tn, " Search: rootDSE");
    LDIFWriter ldifWriter = getLDIFWriter();
    final List<SearchResultEntry> entries =
        assertECLLimits(eclEnabled, expectedFirst, expectedLast);
    // search on 'cn=changelog'
    Set<String> attributes = new LinkedHashSet<String>();
    debugAndWriteEntries(getLDIFWriter(), entries, tn);
    debugInfo(tn, "Ending test with success");
  }
  private List<SearchResultEntry> assertECLLimits(
      boolean eclEnabled, int expectedFirst, int expectedLast) throws Exception
  {
    AssertionError e = null;
    int count = 0;
    while (count < 30)
    {
      count++;
      try
      {
        final Set<String> attributes = new LinkedHashSet<String>();
    if (expectedFirst > 0)
      attributes.add("firstchangenumber");
    attributes.add("lastchangenumber");
    attributes.add("changelog");
    attributes.add("lastExternalChangelogCookie");
    debugInfo(tn, " Search: rootDSE");
    final InternalSearchOperation searchOp = searchOnRootDSE(attributes);
    final List<SearchResultEntry> entries = searchOp.getSearchEntries();
    assertThat(entries).hasSize(1);
    debugAndWriteEntries(ldifWriter, entries, tn);
    final SearchResultEntry resultEntry = entries.get(0);
    if (eclEnabled)
@@ -2548,8 +2563,18 @@
      assertNull(getAttributeValue(resultEntry, "changelog"));
      assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
    }
        return entries;
      }
      catch (AssertionError ae)
      {
        // try again to see if changes have been persisted
        e = ae;
      }
    debugInfo(tn, "Ending test with success");
      Thread.sleep(100);
    }
    assertNotNull(e);
    throw e;
  }
  private InternalSearchOperation searchOnRootDSE(Set<String> attributes)
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -76,8 +76,8 @@
   * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
   * </ol>
   */
  @Test()
  void testTrim() throws Exception
  @Test
  void testPurge() throws Exception
  {
    ReplicationServer replicationServer = null;
    try
@@ -254,8 +254,10 @@
  {
    TestCaseUtils.startServer();
    final int port = TestCaseUtils.findFreePort();
    return new ReplicationServer(
        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null));
    final ReplServerFakeConfiguration cfg =
        new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null);
    cfg.setComputeChangeNumber(true);
    return new ReplicationServer(cfg);
  }
  private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,