From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 189 +++++++++++++++++++++++++++++++++-------------
1 files changed, 135 insertions(+), 54 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fbd19e8..f18bd1e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,9 +36,11 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -46,6 +49,7 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
@@ -87,7 +91,7 @@
* The handler of the changelog database, the database stores the relation
* between a change number and the associated cookie.
* <p>
- * Guarded by cnIndexDBLock
+ * @GuardedBy("cnIndexDBLock")
*/
private JEChangeNumberIndexDB cnIndexDB;
private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -96,6 +100,15 @@
/** Used for protecting {@link ChangeNumberIndexDB} related state. */
private final Object cnIndexDBLock = new Object();
+ /**
+ * The purge delay (in milliseconds). Records in the changelog DB that are
+ * older than this delay might be removed.
+ */
+ private long purgeDelayInMillis;
+ private final AtomicReference<ChangelogDBPurger> cnPurger =
+ new AtomicReference<ChangelogDBPurger>();
+ private volatile long latestPurgeDate;
+
/** The local replication server. */
private final ReplicationServer replicationServer;
private AtomicBoolean shutdown = new AtomicBoolean();
@@ -312,13 +325,9 @@
initializeChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
- final ChangeNumberIndexer indexer =
- new ChangeNumberIndexer(this, changelogState);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
+ startIndexer(changelogState);
}
+ setPurgeDelay(replicationServer.getPurgeDelay());
}
catch (ChangelogException e)
{
@@ -374,12 +383,17 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
- final ChangeNumberIndexer indexer = cnIndexer.get();
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
- cnIndexer.compareAndSet(indexer, null);
}
+ final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+
try
{
shutdownCNIndexDB();
@@ -581,7 +595,7 @@
{
try
{
- cnIndexDB.clear(baseDN);
+ cnIndexDB.removeDomain(baseDN);
}
catch (ChangelogException e)
{
@@ -607,7 +621,9 @@
firstException = e;
}
else if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
if (firstException != null)
@@ -618,18 +634,24 @@
/** {@inheritDoc} */
@Override
- public void setPurgeDelay(long delay)
+ public void setPurgeDelay(long purgeDelayInMillis)
{
- final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
- if (cnIndexDB != null)
+ this.purgeDelayInMillis = purgeDelayInMillis;
+ final ChangelogDBPurger purger;
+ if (purgeDelayInMillis > 0)
{
- cnIndexDB.setPurgeDelay(delay);
- }
- for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
- {
- for (JEReplicaDB replicaDB : domainMap.values())
+ purger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, purger))
{
- replicaDB.setPurgeDelay(delay);
+ purger.start();
+ } // otherwise a purger was already running
+ }
+ else
+ {
+ purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
}
}
}
@@ -639,19 +661,13 @@
public void setComputeChangeNumber(boolean computeChangeNumber)
throws ChangelogException
{
- final ChangeNumberIndexer indexer;
if (computeChangeNumber)
{
- final ChangelogState changelogState = dbEnv.readChangelogState();
- indexer = new ChangeNumberIndexer(this, changelogState);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
+ startIndexer(dbEnv.readChangelogState());
}
else
{
- indexer = cnIndexer.getAndSet(null);
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
@@ -659,48 +675,34 @@
}
}
+ private void startIndexer(final ChangelogState changelogState)
+ {
+ final ChangeNumberIndexer indexer =
+ new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
+ }
+
/** {@inheritDoc} */
@Override
public long getDomainLatestTrimDate(DN baseDN)
{
- long latest = 0;
- for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- if (latest == 0 || latest < replicaDB.getLatestTrimDate())
- {
- latest = replicaDB.getLatestTrimDate();
- }
- }
- return latest;
+ return latestPurgeDate;
}
/** {@inheritDoc} */
@Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
- return getChangeNumberIndexDB(true);
- }
-
- /**
- * Returns the {@link ChangeNumberIndexDB} object.
- *
- * @param startTrimmingThread
- * whether the trimming thread should be started
- * @return the {@link ChangeNumberIndexDB} object
- */
- ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
- {
synchronized (cnIndexDBLock)
{
if (cnIndexDB == null)
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
- if (startTrimmingThread)
- {
- cnIndexDB.startTrimmingThread();
- }
+ cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
}
catch (Exception e)
{
@@ -830,4 +832,83 @@
}
// TODO save this state in the changelogStateDB?
}
+
+ /**
+ * The thread purging the changelogDB on a regular interval. Records are
+ * purged from the changelogDB is they are older than a delay specified in
+ * seconds. The purge process works in two steps:
+ * <ol>
+ * <li>first purge the changeNumberIndexDB and retrieve information to drive
+ * replicaDBs purging</li>
+ * <li>proceed to purge each replicaDBs based on the information collected
+ * when purging the changeNumberIndexDB</li>
+ * </ol>
+ */
+ private final class ChangelogDBPurger extends DirectoryThread
+ {
+
+ protected ChangelogDBPurger()
+ {
+ super("changelog DB purger");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run()
+ {
+ // initialize CNIndexDB
+ getChangeNumberIndexDB();
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been called
+ return;
+ }
+
+ final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+ final MultiDomainServerState purgeUpToCookie =
+ localCNIndexDB.purgeUpTo(purgeTimestamp);
+ if (purgeUpToCookie == null)
+ { // this can happen when the change number index DB is empty
+ continue;
+ }
+
+ /*
+ * Drive purge of the replica DBs by the oldest non purged cookie in
+ * the change number index DB.
+ */
+ for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
+ : domainToReplicaDBs.entrySet())
+ {
+ final DN baseDN = entry1.getKey();
+ final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
+ for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+ {
+ final Integer serverId = entry2.getKey();
+ final JEReplicaDB replicaDB = entry2.getValue();
+ replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+ }
+ }
+
+ latestPurgeDate = purgeTimestamp;
+
+ // purge delay is specified in seconds so it should not be a problem
+ // to sleep for 500 millis
+ sleep(500);
+ }
+ catch (Exception e)
+ {
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(e)));
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ }
+ }
+ }
+ }
}
--
Gitblit v1.10.0