From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic
---
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 327 ++++++++++++++++--------------------------------------
1 files changed, 98 insertions(+), 229 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 5f1b63e..a106a98 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
/**
* This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
* This class publishes some monitoring information below <code>
* cn=monitor</code>.
*/
-public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
+public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
{
/**
* The tracer object for the debug logger.
@@ -74,7 +66,7 @@
/** FIXME What is this field used for? */
private volatile long oldestChangeNumber = NO_KEY;
/**
- * The newest changenumber stored in the DB. It is used to avoid trimming the
+ * The newest changenumber stored in the DB. It is used to avoid purging the
* record with the newest changenumber. The newest record in the changenumber
* index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
* then retrieved on server startup.
@@ -86,48 +78,25 @@
* condition between:
* <ol>
* <li>this atomic long being incremented for a new record ('recordB')</li>
- * <li>the current newest record ('recordA') being trimmed from the DB</li>
+ * <li>the current newest record ('recordA') being purged from the DB</li>
* <li>'recordB' failing to be inserted in the DB</li>
* </ol>
*/
private final AtomicLong lastGeneratedChangeNumber;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
- /**
- * A dedicated thread loops trim().
- * <p>
- * trim() : deletes from the DB a number of changes that are older than a
- * certain date.
- */
- private DirectoryThread trimmingThread;
- /**
- * The trim age in milliseconds. Changes record in the change DB that are
- * older than this age are removed.
- * <p>
- * FIXME it never gets updated even when the replication server purge delay is
- * updated
- */
- private volatile long trimAge;
-
- private ReplicationServer replicationServer;
/**
* Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
*
- * @param replicationServer The ReplicationServer that creates this instance.
- * @param dbenv the Database Env to use to create the ReplicationServer DB.
+ * @param dbEnv the Database Env to use to create the ReplicationServer DB.
* server for this domain.
* @throws ChangelogException If a database problem happened
*/
- public JEChangeNumberIndexDB(ReplicationServer replicationServer,
- ReplicationDbEnv dbenv) throws ChangelogException
+ public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
{
- this.replicationServer = replicationServer;
- this.trimAge = replicationServer.getTrimAge();
-
- // DB initialization
- db = new DraftCNDB(dbenv);
+ db = new DraftCNDB(dbEnv);
final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
DirectoryServer.registerMonitorProvider(dbMonitor);
}
- /**
- * Creates and starts the thread trimming the CNIndexDB.
- */
- public void startTrimmingThread()
- {
- trimmingThread =
- new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
- trimmingThread.start();
- }
-
private long getChangeNumber(ChangeNumberIndexRecord record)
throws ChangelogException
{
@@ -251,77 +210,82 @@
notifyAll();
}
- if (trimmingThread != null)
- {
- try
- {
- trimmingThread.join();
- }
- catch (InterruptedException ignored)
- {
- // Nothing can be done about it, just proceed
- }
- }
-
db.shutdown();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
/**
- * Run method for this class.
- * Periodically Flushes the ReplicationServerDomain cache from memory to the
- * stable storage and trims the old updates.
+ * Synchronously purges the change number index DB up to and excluding the
+ * provided timestamp.
+ *
+ * @param purgeTimestamp
+ * the timestamp up to which purging must happen
+ * @return the {@link MultiDomainServerState} object that drives purging the
+ * replicaDBs.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- @Override
- public void run()
+ public MultiDomainServerState purgeUpTo(long purgeTimestamp)
+ throws ChangelogException
{
- while (!shutdown.get())
+ if (isEmpty())
{
- try {
- trim(shutdown);
+ return null;
+ }
- synchronized (this)
- {
- if (!shutdown.get())
- {
- try
- {
- wait(1000);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- catch (Exception end)
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+
+ final DraftCNDBCursor cursor = db.openDeleteCursor();
+ try
+ {
+ while (!mustShutdown(shutdown) && cursor.next())
{
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(end)));
- if (replicationServer != null)
+ final ChangeNumberIndexRecord record = cursor.currentRecord();
+ if (record.getChangeNumber() != oldestChangeNumber)
{
- replicationServer.shutdown();
+ oldestChangeNumber = record.getChangeNumber();
}
- break;
+ if (record.getChangeNumber() == newestChangeNumber)
+ {
+ // do not purge the newest record to avoid having the last generated
+ // changenumber dropping back to 0 if the server restarts
+ return getPurgeCookie(record);
+ }
+
+ if (record.getCSN().isOlderThan(purgeCSN))
+ {
+ cursor.delete();
+ }
+ else
+ {
+ // Current record is not old enough to purge.
+ return getPurgeCookie(record);
+ }
}
+
+ return null;
+ }
+ catch (ChangelogException e)
+ {
+ cursor.abort();
+ throw e;
+ }
+ catch (Exception e)
+ {
+ cursor.abort();
+ throw new ChangelogException(e);
+ }
+ finally
+ {
+ cursor.close();
}
}
- /**
- * Trim old changes from this database.
- *
- * @param shutdown
- * AtomicBoolean telling whether the current run must be stopped
- * @throws ChangelogException
- * In case of database problem.
- */
- public void trim(AtomicBoolean shutdown) throws ChangelogException
+ private MultiDomainServerState getPurgeCookie(
+ final ChangeNumberIndexRecord record) throws DirectoryException
{
- if (trimAge == 0)
- return;
-
- clear(null, shutdown);
+ // Do not include the record's CSN to avoid having it purged
+ return new MultiDomainServerState(record.getPreviousCookie());
}
/**
@@ -334,127 +298,49 @@
* @throws ChangelogException
* if a database problem occurs.
*/
- public void clear(DN baseDNToClear) throws ChangelogException
- {
- clear(baseDNToClear, null);
- }
-
- private void clear(DN baseDNToClear, AtomicBoolean shutdown)
- throws ChangelogException
+ public void removeDomain(DN baseDNToClear) throws ChangelogException
{
if (isEmpty())
{
return;
}
- for (int i = 0; i < 100; i++)
+ final DraftCNDBCursor cursor = db.openDeleteCursor();
+ try
{
- if (mustShutdown(shutdown))
+ boolean isOldestRecord = true;
+ while (!mustShutdown(shutdown) && cursor.next())
{
- return;
- }
- final DraftCNDBCursor cursor = db.openDeleteCursor();
- try
- {
- for (int j = 0; j < 50; j++)
+ final ChangeNumberIndexRecord record = cursor.currentRecord();
+ if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
{
- // let's traverse the CNIndexDB
- if (mustShutdown(shutdown) || !cursor.next())
- {
- cursor.close();
- return;
- }
-
- final ChangeNumberIndexRecord record = cursor.currentRecord();
- if (record.getChangeNumber() != oldestChangeNumber)
- {
- oldestChangeNumber = record.getChangeNumber();
- }
- if (record.getChangeNumber() == newestChangeNumber)
- {
- // do not trim the newest record to avoid having the last generated
- // changenumber dropping back to 0 if the server restarts
- cursor.close();
- return;
- }
-
- if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
- {
- cursor.delete();
- continue;
- }
-
- final ReplicationServerDomain domain =
- replicationServer.getReplicationServerDomain(record.getBaseDN());
- if (domain == null)
- {
- // the domain has been removed since the record was written in the
- // CNIndexDB, thus it makes no sense to keep this record in the DB.
- cursor.delete();
- continue;
- }
-
- // FIXME there is an opportunity for a phantom record in the CNIndexDB
- // if the replicaDB gets purged after call to domain.getOldestState().
- final CSN csn = record.getCSN();
- final ServerState oldestState = domain.getOldestState();
- final CSN fcsn = oldestState.getCSN(csn.getServerId());
- if (csn.isOlderThan(fcsn))
- {
- // This change which has already been purged from the corresponding
- // replicaDB => purge it from CNIndexDB
- cursor.delete();
- continue;
- }
-
- ServerState csnVector;
- try
- {
- Map<DN, ServerState> csnStartStates =
- MultiDomainServerState.splitGenStateToServerStates(
- record.getPreviousCookie());
- csnVector = csnStartStates.get(record.getBaseDN());
-
- if (debugEnabled())
- TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
- + csnVector + " -- StartState:" + oldestState);
- }
- catch(Exception e)
- {
- // We could not parse the MultiDomainServerState from the record
- // FIXME this is quite an aggressive delete()
- cursor.delete();
- continue;
- }
-
- if (csnVector == null
- || (csnVector.getCSN(csn.getServerId()) != null
- && !csnVector.cover(oldestState)))
- {
- cursor.delete();
- if (debugEnabled())
- TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
- + "Not covering startState");
- continue;
- }
-
oldestChangeNumber = record.getChangeNumber();
- cursor.close();
+ }
+ if (record.getChangeNumber() == newestChangeNumber)
+ {
+ // do not purge the newest record to avoid having the last generated
+ // changenumber dropping back to 0 if the server restarts
return;
}
- cursor.close();
+ if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
+ {
+ cursor.delete();
+ }
+ else
+ {
+ isOldestRecord = false;
+ }
}
- catch (ChangelogException e)
- {
- cursor.abort();
- throw e;
- }
- catch (Exception e)
- {
- cursor.abort();
- throw new ChangelogException(e);
- }
+ }
+ catch (ChangelogException e)
+ {
+ cursor.abort();
+ throw e;
+ }
+ finally
+ {
+ cursor.close();
}
}
@@ -469,9 +355,7 @@
*/
private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
{
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public List<Attribute> getMonitorData()
{
@@ -509,18 +393,14 @@
return 0;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String getMonitorInstanceName()
{
return "ChangeNumber Index Database";
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void initializeMonitorProvider(MonitorProviderCfg configuration)
throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -540,15 +418,6 @@
}
/**
- * Set the Purge delay for this db Handler.
- * @param delay The purge delay in Milliseconds.
- */
- public void setPurgeDelay(long delay)
- {
- trimAge = delay;
- }
-
- /**
* Clear the changes from this DB (from both memory cache and DB storage).
*
* @throws ChangelogException
--
Gitblit v1.10.0