From 35105a63e174a9d9e4d837e3f45528ada1577a87 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 04 Apr 2014 13:48:10 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic
---
opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 189 +++++++++++++++++++++++++++++++++-------------
1 files changed, 135 insertions(+), 54 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index a6e1a5f..a88cf62 100644
--- a/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opendj3-server-dev/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -27,6 +27,7 @@
import java.io.File;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -36,7 +37,9 @@
import org.opends.server.admin.std.server.ReplicationServerCfg;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.opends.server.api.DirectoryThread;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -44,6 +47,7 @@
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
@@ -83,7 +87,7 @@
* The handler of the changelog database, the database stores the relation
* between a change number and the associated cookie.
* <p>
- * Guarded by cnIndexDBLock
+ * @GuardedBy("cnIndexDBLock")
*/
private JEChangeNumberIndexDB cnIndexDB;
private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -92,6 +96,15 @@
/** Used for protecting {@link ChangeNumberIndexDB} related state. */
private final Object cnIndexDBLock = new Object();
+ /**
+ * The purge delay (in milliseconds). Records in the changelog DB that are
+ * older than this delay might be removed.
+ */
+ private long purgeDelayInMillis;
+ private final AtomicReference<ChangelogDBPurger> cnPurger =
+ new AtomicReference<ChangelogDBPurger>();
+ private volatile long latestPurgeDate;
+
/** The local replication server. */
private final ReplicationServer replicationServer;
private AtomicBoolean shutdown = new AtomicBoolean();
@@ -306,13 +319,9 @@
initializeChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
- final ChangeNumberIndexer indexer =
- new ChangeNumberIndexer(this, changelogState);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
+ startIndexer(changelogState);
}
+ setPurgeDelay(replicationServer.getPurgeDelay());
}
catch (ChangelogException e)
{
@@ -366,12 +375,17 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
- final ChangeNumberIndexer indexer = cnIndexer.get();
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
- cnIndexer.compareAndSet(indexer, null);
}
+ final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+
try
{
shutdownCNIndexDB();
@@ -572,7 +586,7 @@
{
try
{
- cnIndexDB.clear(baseDN);
+ cnIndexDB.removeDomain(baseDN);
}
catch (ChangelogException e)
{
@@ -596,7 +610,10 @@
{
firstException = e;
}
- else logger.traceException(e);
+ else
+ {
+ logger.traceException(e);
+ }
}
if (firstException != null)
@@ -607,18 +624,24 @@
/** {@inheritDoc} */
@Override
- public void setPurgeDelay(long delay)
+ public void setPurgeDelay(long purgeDelayInMillis)
{
- final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
- if (cnIndexDB != null)
+ this.purgeDelayInMillis = purgeDelayInMillis;
+ final ChangelogDBPurger purger;
+ if (purgeDelayInMillis > 0)
{
- cnIndexDB.setPurgeDelay(delay);
- }
- for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
- {
- for (JEReplicaDB replicaDB : domainMap.values())
+ purger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, purger))
{
- replicaDB.setPurgeDelay(delay);
+ purger.start();
+ } // otherwise a purger was already running
+ }
+ else
+ {
+ purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
}
}
}
@@ -628,19 +651,13 @@
public void setComputeChangeNumber(boolean computeChangeNumber)
throws ChangelogException
{
- final ChangeNumberIndexer indexer;
if (computeChangeNumber)
{
- final ChangelogState changelogState = dbEnv.readChangelogState();
- indexer = new ChangeNumberIndexer(this, changelogState);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
+ startIndexer(dbEnv.readChangelogState());
}
else
{
- indexer = cnIndexer.getAndSet(null);
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
@@ -648,48 +665,34 @@
}
}
+ private void startIndexer(final ChangelogState changelogState)
+ {
+ final ChangeNumberIndexer indexer =
+ new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
+ }
+
/** {@inheritDoc} */
@Override
public long getDomainLatestTrimDate(DN baseDN)
{
- long latest = 0;
- for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- if (latest == 0 || latest < replicaDB.getLatestTrimDate())
- {
- latest = replicaDB.getLatestTrimDate();
- }
- }
- return latest;
+ return latestPurgeDate;
}
/** {@inheritDoc} */
@Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
- return getChangeNumberIndexDB(true);
- }
-
- /**
- * Returns the {@link ChangeNumberIndexDB} object.
- *
- * @param startTrimmingThread
- * whether the trimming thread should be started
- * @return the {@link ChangeNumberIndexDB} object
- */
- ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
- {
synchronized (cnIndexDBLock)
{
if (cnIndexDB == null)
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
- if (startTrimmingThread)
- {
- cnIndexDB.startTrimmingThread();
- }
+ cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
}
catch (Exception e)
{
@@ -818,4 +821,82 @@
}
// TODO save this state in the changelogStateDB?
}
+
+ /**
+ * The thread purging the changelogDB on a regular interval. Records are
+ * purged from the changelogDB is they are older than a delay specified in
+ * seconds. The purge process works in two steps:
+ * <ol>
+ * <li>first purge the changeNumberIndexDB and retrieve information to drive
+ * replicaDBs purging</li>
+ * <li>proceed to purge each replicaDBs based on the information collected
+ * when purging the changeNumberIndexDB</li>
+ * </ol>
+ */
+ private final class ChangelogDBPurger extends DirectoryThread
+ {
+
+ protected ChangelogDBPurger()
+ {
+ super("changelog DB purger");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run()
+ {
+ // initialize CNIndexDB
+ getChangeNumberIndexDB();
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been called
+ return;
+ }
+
+ final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+ final MultiDomainServerState purgeUpToCookie =
+ localCNIndexDB.purgeUpTo(purgeTimestamp);
+ if (purgeUpToCookie == null)
+ { // this can happen when the change number index DB is empty
+ continue;
+ }
+
+ /*
+ * Drive purge of the replica DBs by the oldest non purged cookie in
+ * the change number index DB.
+ */
+ for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
+ : domainToReplicaDBs.entrySet())
+ {
+ final DN baseDN = entry1.getKey();
+ final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
+ for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+ {
+ final Integer serverId = entry2.getKey();
+ final JEReplicaDB replicaDB = entry2.getValue();
+ replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+ }
+ }
+
+ latestPurgeDate = purgeTimestamp;
+
+ // purge delay is specified in seconds so it should not be a problem
+ // to sleep for 500 millis
+ sleep(500);
+ }
+ catch (Exception e)
+ {
+ logger.error(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH, stackTraceToSingleLineString(e));
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ }
+ }
+ }
+ }
}
--
Gitblit v1.10.0