opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -37,7 +37,6 @@ import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; import org.opends.server.types.*; @@ -218,23 +217,19 @@ * Synchronously purges the change number index DB up to and excluding the * provided timestamp. * * @param purgeTimestamp * @param purgeCSN * the timestamp up to which purging must happen * @return the {@link MultiDomainServerState} object that drives purging the * replicaDBs. * @return the oldest non purged CSN. * @throws ChangelogException * if a database problem occurs. */ public MultiDomainServerState purgeUpTo(long purgeTimestamp) throws ChangelogException public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException { if (isEmpty()) if (isEmpty() || purgeCSN == null) { return null; } final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); final DraftCNDBCursor cursor = db.openDeleteCursor(); try { @@ -245,21 +240,18 @@ { oldestChangeNumber = record.getChangeNumber(); } if (record.getChangeNumber() == newestChangeNumber) { // do not purge the newest record to avoid having the last generated // changenumber dropping back to 0 if the server restarts return getPurgeCookie(record); } if (record.getCSN().isOlderThan(purgeCSN)) if (record.getChangeNumber() != newestChangeNumber && record.getCSN().isOlderThan(purgeCSN)) { cursor.delete(); } else { // Current record is not old enough to purge. return getPurgeCookie(record); // 1- Current record is not old enough to purge. // 2- Do not purge the newest record to avoid having the last // generated changenumber dropping back to 0 when the server restarts return record.getCSN(); } } @@ -281,13 +273,6 @@ } } private MultiDomainServerState getPurgeCookie( final ChangeNumberIndexRecord record) throws DirectoryException { // Do not include the record's CSN to avoid having it purged return new MultiDomainServerState(record.getPreviousCookie()); } /** * Clear the changes from this DB (from both memory cache and DB storage) for * the provided baseDN. opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@ import java.io.File; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,7 +39,6 @@ import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; @@ -64,7 +62,7 @@ public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB { /** The tracer object for the debug logger. */ protected static final DebugTracer TRACER = getTracer(); private static final DebugTracer TRACER = getTracer(); /** * This map contains the List of updates received from each LDAP server. @@ -846,6 +844,7 @@ */ private final class ChangelogDBPurger extends DirectoryThread { private static final int DEFAULT_SLEEP = 500; protected ChangelogDBPurger() { @@ -862,42 +861,58 @@ { try { final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; if (localCNIndexDB == null) { // shutdown has been called return; } final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; final MultiDomainServerState purgeUpToCookie = localCNIndexDB.purgeUpTo(purgeTimestamp); if (purgeUpToCookie == null) { // this can happen when the change number index DB is empty continue; final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); final CSN oldestNotPurgedCSN; // next code assumes that the compute-change-number config // never changes during the life time of an RS if (!config.isComputeChangeNumber()) { oldestNotPurgedCSN = purgeCSN; } else { final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; if (localCNIndexDB == null) { // shutdown has been initiated return; } oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN); if (oldestNotPurgedCSN == null) { // shutdown may have been initiated... if (!isShutdownInitiated()) { // ... or the change number index DB is empty, // wait for new changes to come in. // Note we cannot sleep for as long as the purge delay // (3 days default), because we might receive late updates // that will have to be purged before the purge delay elapses. // This can particularly happen in case of network partitions. sleep(DEFAULT_SLEEP); } continue; } } /* * Drive purge of the replica DBs by the oldest non purged cookie in * the change number index DB. */ for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1 : domainToReplicaDBs.entrySet()) for (final Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) { final DN baseDN = entry1.getKey(); final Map<Integer, JEReplicaDB> domainMap = entry1.getValue(); for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet()) for (final JEReplicaDB replicaDB : domainMap.values()) { final Integer serverId = entry2.getKey(); final JEReplicaDB replicaDB = entry2.getValue(); replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId)); replicaDB.purgeUpTo(oldestNotPurgedCSN); } } latestPurgeDate = purgeTimestamp; // purge delay is specified in seconds so it should not be a problem // to sleep for 500 millis sleep(500); sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN)); } catch (InterruptedException e) { // shutdown initiated? } catch (Exception e) { @@ -910,5 +925,18 @@ } } } private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN) { final long nextPurgeTime = notPurgedCSN.getTime(); final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis; if (currentPurgeTime <= nextPurgeTime) { // sleep till the next CSN to purge, return nextPurgeTime - currentPurgeTime; } // wait a bit before purging more return DEFAULT_SLEEP; } } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -372,7 +372,7 @@ // Test CNIndexDB is purged when replication change log is purged final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); cnIndexDB.purgeUpTo(Long.MAX_VALUE); cnIndexDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); assertTrue(cnIndexDB.isEmpty()); ECLPurgeCNIndexDBAfterChangelogClear(); @@ -2514,44 +2514,69 @@ { String tn = "ECLCompatTestLimits"; debugInfo(tn, "Starting test\n\n"); LDIFWriter ldifWriter = getLDIFWriter(); // search on 'cn=changelog' Set<String> attributes = new LinkedHashSet<String>(); if (expectedFirst > 0) attributes.add("firstchangenumber"); attributes.add("lastchangenumber"); attributes.add("changelog"); attributes.add("lastExternalChangelogCookie"); debugInfo(tn, " Search: rootDSE"); final InternalSearchOperation searchOp = searchOnRootDSE(attributes); final List<SearchResultEntry> entries = searchOp.getSearchEntries(); assertThat(entries).hasSize(1); debugAndWriteEntries(ldifWriter, entries, tn); final SearchResultEntry resultEntry = entries.get(0); if (eclEnabled) { if (expectedFirst > 0) checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst)); checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast)); checkValue(resultEntry, "changelog", String.valueOf("cn=changelog")); assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie")); } else { if (expectedFirst > 0) assertNull(getAttributeValue(resultEntry, "firstchangenumber")); assertNull(getAttributeValue(resultEntry, "lastchangenumber")); assertNull(getAttributeValue(resultEntry, "changelog")); assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie")); } final List<SearchResultEntry> entries = assertECLLimits(eclEnabled, expectedFirst, expectedLast); debugAndWriteEntries(getLDIFWriter(), entries, tn); debugInfo(tn, "Ending test with success"); } private List<SearchResultEntry> assertECLLimits( boolean eclEnabled, int expectedFirst, int expectedLast) throws Exception { AssertionError e = null; int count = 0; while (count < 30) { count++; try { final Set<String> attributes = new LinkedHashSet<String>(); if (expectedFirst > 0) attributes.add("firstchangenumber"); attributes.add("lastchangenumber"); attributes.add("changelog"); attributes.add("lastExternalChangelogCookie"); final InternalSearchOperation searchOp = searchOnRootDSE(attributes); final List<SearchResultEntry> entries = searchOp.getSearchEntries(); assertThat(entries).hasSize(1); final SearchResultEntry resultEntry = entries.get(0); if (eclEnabled) { if (expectedFirst > 0) checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst)); checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast)); checkValue(resultEntry, "changelog", String.valueOf("cn=changelog")); assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie")); } else { if (expectedFirst > 0) assertNull(getAttributeValue(resultEntry, "firstchangenumber")); assertNull(getAttributeValue(resultEntry, "lastchangenumber")); assertNull(getAttributeValue(resultEntry, "changelog")); assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie")); } return entries; } catch (AssertionError ae) { // try again to see if changes have been persisted e = ae; } Thread.sleep(100); } assertNotNull(e); throw e; } private InternalSearchOperation searchOnRootDSE(Set<String> attributes) throws Exception { opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -76,8 +76,8 @@ * in the replication changelog, the ChangeNumberIndexDB will be cleared.</li> * </ol> */ @Test() void testTrim() throws Exception @Test void testPurge() throws Exception { ReplicationServer replicationServer = null; try @@ -254,8 +254,10 @@ { TestCaseUtils.startServer(); final int port = TestCaseUtils.findFreePort(); return new ReplicationServer( new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null)); final ReplServerFakeConfiguration cfg = new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null); cfg.setComputeChangeNumber(true); return new ReplicationServer(cfg); } private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,