From aacb8bbf0a764ce8eb205e0f6376c055b3e1baa8 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 03 Apr 2014 13:30:34 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3314) Re-implement changelog purging logic
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 10 +-
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 86 ++++++++++++++-------
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 35 ++------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 91 ++++++++++++++--------
4 files changed, 131 insertions(+), 91 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index a106a98..6c22e5c 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -37,7 +37,6 @@
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
@@ -218,23 +217,19 @@
* Synchronously purges the change number index DB up to and excluding the
* provided timestamp.
*
- * @param purgeTimestamp
+ * @param purgeCSN
* the timestamp up to which purging must happen
- * @return the {@link MultiDomainServerState} object that drives purging the
- * replicaDBs.
+ * @return the oldest non purged CSN.
* @throws ChangelogException
* if a database problem occurs.
*/
- public MultiDomainServerState purgeUpTo(long purgeTimestamp)
- throws ChangelogException
+ public CSN purgeUpTo(CSN purgeCSN) throws ChangelogException
{
- if (isEmpty())
+ if (isEmpty() || purgeCSN == null)
{
return null;
}
- final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
-
final DraftCNDBCursor cursor = db.openDeleteCursor();
try
{
@@ -245,21 +240,18 @@
{
oldestChangeNumber = record.getChangeNumber();
}
- if (record.getChangeNumber() == newestChangeNumber)
- {
- // do not purge the newest record to avoid having the last generated
- // changenumber dropping back to 0 if the server restarts
- return getPurgeCookie(record);
- }
- if (record.getCSN().isOlderThan(purgeCSN))
+ if (record.getChangeNumber() != newestChangeNumber
+ && record.getCSN().isOlderThan(purgeCSN))
{
cursor.delete();
}
else
{
- // Current record is not old enough to purge.
- return getPurgeCookie(record);
+ // 1- Current record is not old enough to purge.
+ // 2- Do not purge the newest record to avoid having the last
+ // generated changenumber dropping back to 0 when the server restarts
+ return record.getCSN();
}
}
@@ -281,13 +273,6 @@
}
}
- private MultiDomainServerState getPurgeCookie(
- final ChangeNumberIndexRecord record) throws DirectoryException
- {
- // Do not include the record's CSN to avoid having it purged
- return new MultiDomainServerState(record.getPreviousCookie());
- }
-
/**
* Clear the changes from this DB (from both memory cache and DB storage) for
* the provided baseDN.
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index f18bd1e..4241fce 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,7 +27,6 @@
import java.io.File;
import java.util.*;
-import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -40,7 +39,6 @@
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
-import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -64,7 +62,7 @@
public class JEChangelogDB implements ChangelogDB, ReplicationDomainDB
{
/** The tracer object for the debug logger. */
- protected static final DebugTracer TRACER = getTracer();
+ private static final DebugTracer TRACER = getTracer();
/**
* This map contains the List of updates received from each LDAP server.
@@ -846,6 +844,7 @@
*/
private final class ChangelogDBPurger extends DirectoryThread
{
+ private static final int DEFAULT_SLEEP = 500;
protected ChangelogDBPurger()
{
@@ -862,42 +861,58 @@
{
try
{
- final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
- if (localCNIndexDB == null)
- { // shutdown has been called
- return;
- }
-
final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
- final MultiDomainServerState purgeUpToCookie =
- localCNIndexDB.purgeUpTo(purgeTimestamp);
- if (purgeUpToCookie == null)
- { // this can happen when the change number index DB is empty
- continue;
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+ final CSN oldestNotPurgedCSN;
+
+ // next code assumes that the compute-change-number config
+ // never changes during the life time of an RS
+ if (!config.isComputeChangeNumber())
+ {
+ oldestNotPurgedCSN = purgeCSN;
+ }
+ else
+ {
+ final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been initiated
+ return;
+ }
+
+ oldestNotPurgedCSN = localCNIndexDB.purgeUpTo(purgeCSN);
+ if (oldestNotPurgedCSN == null)
+ { // shutdown may have been initiated...
+ if (!isShutdownInitiated())
+ {
+ // ... or the change number index DB is empty,
+ // wait for new changes to come in.
+
+ // Note we cannot sleep for as long as the purge delay
+ // (3 days default), because we might receive late updates
+ // that will have to be purged before the purge delay elapses.
+ // This can particularly happen in case of network partitions.
+ sleep(DEFAULT_SLEEP);
+ }
+ continue;
+ }
}
- /*
- * Drive purge of the replica DBs by the oldest non purged cookie in
- * the change number index DB.
- */
- for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
- : domainToReplicaDBs.entrySet())
+ for (final Map<Integer, JEReplicaDB> domainMap
+ : domainToReplicaDBs.values())
{
- final DN baseDN = entry1.getKey();
- final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
- for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+ for (final JEReplicaDB replicaDB : domainMap.values())
{
- final Integer serverId = entry2.getKey();
- final JEReplicaDB replicaDB = entry2.getValue();
- replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+ replicaDB.purgeUpTo(oldestNotPurgedCSN);
}
}
latestPurgeDate = purgeTimestamp;
- // purge delay is specified in seconds so it should not be a problem
- // to sleep for 500 millis
- sleep(500);
+ sleep(computeSleepTimeUntilNextPurge(oldestNotPurgedCSN));
+ }
+ catch (InterruptedException e)
+ {
+ // shutdown initiated?
}
catch (Exception e)
{
@@ -910,5 +925,18 @@
}
}
}
+
+ private long computeSleepTimeUntilNextPurge(CSN notPurgedCSN)
+ {
+ final long nextPurgeTime = notPurgedCSN.getTime();
+ final long currentPurgeTime = TimeThread.getTime() - purgeDelayInMillis;
+ if (currentPurgeTime <= nextPurgeTime)
+ {
+ // sleep till the next CSN to purge,
+ return nextPurgeTime - currentPurgeTime;
+ }
+ // wait a bit before purging more
+ return DEFAULT_SLEEP;
+ }
}
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 05e27d7..51e7a8e 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -372,7 +372,7 @@
// Test CNIndexDB is purged when replication change log is purged
final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
- cnIndexDB.purgeUpTo(Long.MAX_VALUE);
+ cnIndexDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
assertTrue(cnIndexDB.isEmpty());
ECLPurgeCNIndexDBAfterChangelogClear();
@@ -2514,44 +2514,69 @@
{
String tn = "ECLCompatTestLimits";
debugInfo(tn, "Starting test\n\n");
-
- LDIFWriter ldifWriter = getLDIFWriter();
-
- // search on 'cn=changelog'
- Set<String> attributes = new LinkedHashSet<String>();
- if (expectedFirst > 0)
- attributes.add("firstchangenumber");
- attributes.add("lastchangenumber");
- attributes.add("changelog");
- attributes.add("lastExternalChangelogCookie");
-
debugInfo(tn, " Search: rootDSE");
- final InternalSearchOperation searchOp = searchOnRootDSE(attributes);
- final List<SearchResultEntry> entries = searchOp.getSearchEntries();
- assertThat(entries).hasSize(1);
- debugAndWriteEntries(ldifWriter, entries, tn);
- final SearchResultEntry resultEntry = entries.get(0);
- if (eclEnabled)
- {
- if (expectedFirst > 0)
- checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst));
- checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast));
- checkValue(resultEntry, "changelog", String.valueOf("cn=changelog"));
- assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
- }
- else
- {
- if (expectedFirst > 0)
- assertNull(getAttributeValue(resultEntry, "firstchangenumber"));
- assertNull(getAttributeValue(resultEntry, "lastchangenumber"));
- assertNull(getAttributeValue(resultEntry, "changelog"));
- assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
- }
+ final List<SearchResultEntry> entries =
+ assertECLLimits(eclEnabled, expectedFirst, expectedLast);
+ debugAndWriteEntries(getLDIFWriter(), entries, tn);
debugInfo(tn, "Ending test with success");
}
+ private List<SearchResultEntry> assertECLLimits(
+ boolean eclEnabled, int expectedFirst, int expectedLast) throws Exception
+ {
+ AssertionError e = null;
+
+ int count = 0;
+ while (count < 30)
+ {
+ count++;
+
+ try
+ {
+ final Set<String> attributes = new LinkedHashSet<String>();
+ if (expectedFirst > 0)
+ attributes.add("firstchangenumber");
+ attributes.add("lastchangenumber");
+ attributes.add("changelog");
+ attributes.add("lastExternalChangelogCookie");
+
+ final InternalSearchOperation searchOp = searchOnRootDSE(attributes);
+ final List<SearchResultEntry> entries = searchOp.getSearchEntries();
+ assertThat(entries).hasSize(1);
+
+ final SearchResultEntry resultEntry = entries.get(0);
+ if (eclEnabled)
+ {
+ if (expectedFirst > 0)
+ checkValue(resultEntry, "firstchangenumber", String.valueOf(expectedFirst));
+ checkValue(resultEntry, "lastchangenumber", String.valueOf(expectedLast));
+ checkValue(resultEntry, "changelog", String.valueOf("cn=changelog"));
+ assertNotNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
+ }
+ else
+ {
+ if (expectedFirst > 0)
+ assertNull(getAttributeValue(resultEntry, "firstchangenumber"));
+ assertNull(getAttributeValue(resultEntry, "lastchangenumber"));
+ assertNull(getAttributeValue(resultEntry, "changelog"));
+ assertNull(getAttributeValue(resultEntry, "lastExternalChangelogCookie"));
+ }
+ return entries;
+ }
+ catch (AssertionError ae)
+ {
+ // try again to see if changes have been persisted
+ e = ae;
+ }
+
+ Thread.sleep(100);
+ }
+ assertNotNull(e);
+ throw e;
+ }
+
private InternalSearchOperation searchOnRootDSE(Set<String> attributes)
throws Exception
{
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 5fb4500..10aad5f 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -76,8 +76,8 @@
* in the replication changelog, the ChangeNumberIndexDB will be cleared.</li>
* </ol>
*/
- @Test()
- void testTrim() throws Exception
+ @Test
+ void testPurge() throws Exception
{
ReplicationServer replicationServer = null;
try
@@ -254,8 +254,10 @@
{
TestCaseUtils.startServer();
final int port = TestCaseUtils.findFreePort();
- return new ReplicationServer(
- new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null));
+ final ReplServerFakeConfiguration cfg =
+ new ReplServerFakeConfiguration(port, null, 0, 2, 0, 100, null);
+ cfg.setComputeChangeNumber(true);
+ return new ReplicationServer(cfg);
}
private String getPreviousCookie(JEChangeNumberIndexDB cnIndexDB,
--
Gitblit v1.10.0