From 819f74758a1c464bbf578e70ca8592cc8d101d75 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 02 Apr 2014 09:51:11 +0000
Subject: [PATCH] OPENDJ-1177 (CR-3304) Re-implement changelog purging logic
---
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java | 84 +++--
opends/src/server/org/opends/server/replication/server/ReplicationServer.java | 6
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java | 189 ++++++++---
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java | 45 -
opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java | 22
opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java | 129 ++------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java | 5
opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java | 327 ++++++--------------
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java | 47 +-
opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java | 9
opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java | 44 +-
11 files changed, 395 insertions(+), 512 deletions(-)
diff --git a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
index 97cb8e0..bf00104 100644
--- a/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
+++ b/opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2010 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server;
@@ -722,7 +722,7 @@
* @return The time after which changes must be deleted from the
* persistent storage (in milliseconds).
*/
- public long getTrimAge()
+ public long getPurgeDelay()
{
return this.config.getReplicationPurgeDelay() * 1000;
}
@@ -780,7 +780,7 @@
final long newPurgeDelay = config.getReplicationPurgeDelay();
if (newPurgeDelay != oldConfig.getReplicationPurgeDelay())
{
- this.changelogDB.setPurgeDelay(getTrimAge());
+ this.changelogDB.setPurgeDelay(getPurgeDelay());
}
final boolean computeCN = config.isComputeChangeNumber();
if (computeCN != oldConfig.isComputeChangeNumber())
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
index 33eaa30..0489065 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -487,13 +487,6 @@
// OK, the oldest change is older than the medium consistency point
// let's publish it to the CNIndexDB.
-
- // Next if statement is ugly but ensures the first change will not be
- // immediately trimmed from the CNIndexDB. Yuck!
- if (mediumConsistencyRUV.isEmpty())
- {
- mediumConsistencyRUV.replace(baseDN, new ServerState());
- }
final String previousCookie = mediumConsistencyRUV.toString();
final ChangeNumberIndexRecord record =
new ChangeNumberIndexRecord(previousCookie, baseDN, csn);
@@ -620,20 +613,21 @@
}
/**
- * Asks the current thread to clear its state.
+ * Asks the current thread to clear its state and blocks until state is
+ * cleared.
* <p>
* This method is only useful for unit tests.
*/
public void clear()
{
doClear.set(true);
- synchronized (this)
+ while (doClear.get() && !State.TERMINATED.equals(getState()))
{
- notify();
- }
- while (doClear.get())
- {
- // wait until clear() has been done by thread
+ // wait until clear() has been done by thread, always waking it up
+ synchronized (this)
+ {
+ notify();
+ }
// ensures unit tests wait that this thread's state is cleaned up
Thread.yield();
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
index 5f1b63e..a106a98 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.opends.server.admin.std.server.MonitorProviderCfg;
-import org.opends.server.api.DirectoryThread;
import org.opends.server.api.MonitorProvider;
import org.opends.server.config.ConfigException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
import org.opends.server.replication.common.MultiDomainServerState;
-import org.opends.server.replication.common.ServerState;
-import org.opends.server.replication.server.ReplicationServer;
-import org.opends.server.replication.server.ReplicationServerDomain;
import org.opends.server.replication.server.changelog.api.*;
import org.opends.server.replication.server.changelog.je.DraftCNDB.*;
import org.opends.server.types.*;
-import static org.opends.messages.ReplicationMessages.*;
-import static org.opends.server.loggers.ErrorLogger.*;
import static org.opends.server.loggers.debug.DebugLogger.*;
-import static org.opends.server.util.StaticUtils.*;
/**
* This class is used for managing the replicationServer database for each
@@ -62,7 +54,7 @@
* This class publishes some monitoring information below <code>
* cn=monitor</code>.
*/
-public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable
+public class JEChangeNumberIndexDB implements ChangeNumberIndexDB
{
/**
* The tracer object for the debug logger.
@@ -74,7 +66,7 @@
/** FIXME What is this field used for? */
private volatile long oldestChangeNumber = NO_KEY;
/**
- * The newest changenumber stored in the DB. It is used to avoid trimming the
+ * The newest changenumber stored in the DB. It is used to avoid purging the
* record with the newest changenumber. The newest record in the changenumber
* index DB is used to persist the {@link #lastGeneratedChangeNumber} which is
* then retrieved on server startup.
@@ -86,48 +78,25 @@
* condition between:
* <ol>
* <li>this atomic long being incremented for a new record ('recordB')</li>
- * <li>the current newest record ('recordA') being trimmed from the DB</li>
+ * <li>the current newest record ('recordA') being purged from the DB</li>
* <li>'recordB' failing to be inserted in the DB</li>
* </ol>
*/
private final AtomicLong lastGeneratedChangeNumber;
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private final AtomicBoolean shutdown = new AtomicBoolean(false);
- /**
- * A dedicated thread loops trim().
- * <p>
- * trim() : deletes from the DB a number of changes that are older than a
- * certain date.
- */
- private DirectoryThread trimmingThread;
- /**
- * The trim age in milliseconds. Changes record in the change DB that are
- * older than this age are removed.
- * <p>
- * FIXME it never gets updated even when the replication server purge delay is
- * updated
- */
- private volatile long trimAge;
-
- private ReplicationServer replicationServer;
/**
* Creates a new JEChangeNumberIndexDB associated to a given LDAP server.
*
- * @param replicationServer The ReplicationServer that creates this instance.
- * @param dbenv the Database Env to use to create the ReplicationServer DB.
+ * @param dbEnv the Database Env to use to create the ReplicationServer DB.
* server for this domain.
* @throws ChangelogException If a database problem happened
*/
- public JEChangeNumberIndexDB(ReplicationServer replicationServer,
- ReplicationDbEnv dbenv) throws ChangelogException
+ public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException
{
- this.replicationServer = replicationServer;
- this.trimAge = replicationServer.getTrimAge();
-
- // DB initialization
- db = new DraftCNDB(dbenv);
+ db = new DraftCNDB(dbEnv);
final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord();
final ChangeNumberIndexRecord newestRecord = db.readLastRecord();
oldestChangeNumber = getChangeNumber(oldestRecord);
@@ -142,16 +111,6 @@
DirectoryServer.registerMonitorProvider(dbMonitor);
}
- /**
- * Creates and starts the thread trimming the CNIndexDB.
- */
- public void startTrimmingThread()
- {
- trimmingThread =
- new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer");
- trimmingThread.start();
- }
-
private long getChangeNumber(ChangeNumberIndexRecord record)
throws ChangelogException
{
@@ -251,77 +210,82 @@
notifyAll();
}
- if (trimmingThread != null)
- {
- try
- {
- trimmingThread.join();
- }
- catch (InterruptedException ignored)
- {
- // Nothing can be done about it, just proceed
- }
- }
-
db.shutdown();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
}
/**
- * Run method for this class.
- * Periodically Flushes the ReplicationServerDomain cache from memory to the
- * stable storage and trims the old updates.
+ * Synchronously purges the change number index DB up to and excluding the
+ * provided timestamp.
+ *
+ * @param purgeTimestamp
+ * the timestamp up to which purging must happen
+ * @return the {@link MultiDomainServerState} object that drives purging the
+ * replicaDBs.
+ * @throws ChangelogException
+ * if a database problem occurs.
*/
- @Override
- public void run()
+ public MultiDomainServerState purgeUpTo(long purgeTimestamp)
+ throws ChangelogException
{
- while (!shutdown.get())
+ if (isEmpty())
{
- try {
- trim(shutdown);
+ return null;
+ }
- synchronized (this)
- {
- if (!shutdown.get())
- {
- try
- {
- wait(1000);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- catch (Exception end)
+ final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0);
+
+ final DraftCNDBCursor cursor = db.openDeleteCursor();
+ try
+ {
+ while (!mustShutdown(shutdown) && cursor.next())
{
- logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
- .get(stackTraceToSingleLineString(end)));
- if (replicationServer != null)
+ final ChangeNumberIndexRecord record = cursor.currentRecord();
+ if (record.getChangeNumber() != oldestChangeNumber)
{
- replicationServer.shutdown();
+ oldestChangeNumber = record.getChangeNumber();
}
- break;
+ if (record.getChangeNumber() == newestChangeNumber)
+ {
+ // do not purge the newest record to avoid having the last generated
+ // changenumber dropping back to 0 if the server restarts
+ return getPurgeCookie(record);
+ }
+
+ if (record.getCSN().isOlderThan(purgeCSN))
+ {
+ cursor.delete();
+ }
+ else
+ {
+ // Current record is not old enough to purge.
+ return getPurgeCookie(record);
+ }
}
+
+ return null;
+ }
+ catch (ChangelogException e)
+ {
+ cursor.abort();
+ throw e;
+ }
+ catch (Exception e)
+ {
+ cursor.abort();
+ throw new ChangelogException(e);
+ }
+ finally
+ {
+ cursor.close();
}
}
- /**
- * Trim old changes from this database.
- *
- * @param shutdown
- * AtomicBoolean telling whether the current run must be stopped
- * @throws ChangelogException
- * In case of database problem.
- */
- public void trim(AtomicBoolean shutdown) throws ChangelogException
+ private MultiDomainServerState getPurgeCookie(
+ final ChangeNumberIndexRecord record) throws DirectoryException
{
- if (trimAge == 0)
- return;
-
- clear(null, shutdown);
+ // Do not include the record's CSN to avoid having it purged
+ return new MultiDomainServerState(record.getPreviousCookie());
}
/**
@@ -334,127 +298,49 @@
* @throws ChangelogException
* if a database problem occurs.
*/
- public void clear(DN baseDNToClear) throws ChangelogException
- {
- clear(baseDNToClear, null);
- }
-
- private void clear(DN baseDNToClear, AtomicBoolean shutdown)
- throws ChangelogException
+ public void removeDomain(DN baseDNToClear) throws ChangelogException
{
if (isEmpty())
{
return;
}
- for (int i = 0; i < 100; i++)
+ final DraftCNDBCursor cursor = db.openDeleteCursor();
+ try
{
- if (mustShutdown(shutdown))
+ boolean isOldestRecord = true;
+ while (!mustShutdown(shutdown) && cursor.next())
{
- return;
- }
- final DraftCNDBCursor cursor = db.openDeleteCursor();
- try
- {
- for (int j = 0; j < 50; j++)
+ final ChangeNumberIndexRecord record = cursor.currentRecord();
+ if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber)
{
- // let's traverse the CNIndexDB
- if (mustShutdown(shutdown) || !cursor.next())
- {
- cursor.close();
- return;
- }
-
- final ChangeNumberIndexRecord record = cursor.currentRecord();
- if (record.getChangeNumber() != oldestChangeNumber)
- {
- oldestChangeNumber = record.getChangeNumber();
- }
- if (record.getChangeNumber() == newestChangeNumber)
- {
- // do not trim the newest record to avoid having the last generated
- // changenumber dropping back to 0 if the server restarts
- cursor.close();
- return;
- }
-
- if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN()))
- {
- cursor.delete();
- continue;
- }
-
- final ReplicationServerDomain domain =
- replicationServer.getReplicationServerDomain(record.getBaseDN());
- if (domain == null)
- {
- // the domain has been removed since the record was written in the
- // CNIndexDB, thus it makes no sense to keep this record in the DB.
- cursor.delete();
- continue;
- }
-
- // FIXME there is an opportunity for a phantom record in the CNIndexDB
- // if the replicaDB gets purged after call to domain.getOldestState().
- final CSN csn = record.getCSN();
- final ServerState oldestState = domain.getOldestState();
- final CSN fcsn = oldestState.getCSN(csn.getServerId());
- if (csn.isOlderThan(fcsn))
- {
- // This change which has already been purged from the corresponding
- // replicaDB => purge it from CNIndexDB
- cursor.delete();
- continue;
- }
-
- ServerState csnVector;
- try
- {
- Map<DN, ServerState> csnStartStates =
- MultiDomainServerState.splitGenStateToServerStates(
- record.getPreviousCookie());
- csnVector = csnStartStates.get(record.getBaseDN());
-
- if (debugEnabled())
- TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:"
- + csnVector + " -- StartState:" + oldestState);
- }
- catch(Exception e)
- {
- // We could not parse the MultiDomainServerState from the record
- // FIXME this is quite an aggressive delete()
- cursor.delete();
- continue;
- }
-
- if (csnVector == null
- || (csnVector.getCSN(csn.getServerId()) != null
- && !csnVector.cover(oldestState)))
- {
- cursor.delete();
- if (debugEnabled())
- TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn
- + "Not covering startState");
- continue;
- }
-
oldestChangeNumber = record.getChangeNumber();
- cursor.close();
+ }
+ if (record.getChangeNumber() == newestChangeNumber)
+ {
+ // do not purge the newest record to avoid having the last generated
+ // changenumber dropping back to 0 if the server restarts
return;
}
- cursor.close();
+ if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear))
+ {
+ cursor.delete();
+ }
+ else
+ {
+ isOldestRecord = false;
+ }
}
- catch (ChangelogException e)
- {
- cursor.abort();
- throw e;
- }
- catch (Exception e)
- {
- cursor.abort();
- throw new ChangelogException(e);
- }
+ }
+ catch (ChangelogException e)
+ {
+ cursor.abort();
+ throw e;
+ }
+ finally
+ {
+ cursor.close();
}
}
@@ -469,9 +355,7 @@
*/
private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg>
{
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public List<Attribute> getMonitorData()
{
@@ -509,18 +393,14 @@
return 0;
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String getMonitorInstanceName()
{
return "ChangeNumber Index Database";
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public void initializeMonitorProvider(MonitorProviderCfg configuration)
throws ConfigException,InitializationException
@@ -529,9 +409,7 @@
}
}
- /**
- * {@inheritDoc}
- */
+ /** {@inheritDoc} */
@Override
public String toString()
{
@@ -540,15 +418,6 @@
}
/**
- * Set the Purge delay for this db Handler.
- * @param delay The purge delay in Milliseconds.
- */
- public void setPurgeDelay(long delay)
- {
- trimAge = delay;
- }
-
- /**
* Clear the changes from this DB (from both memory cache and DB storage).
*
* @throws ChangelogException
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
index fbd19e8..f18bd1e 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@
* CDDL HEADER END
*
*
- * Copyright 2013 ForgeRock AS
+ * Copyright 2013-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
import java.io.File;
import java.util.*;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -35,9 +36,11 @@
import org.opends.messages.Message;
import org.opends.messages.MessageBuilder;
import org.opends.server.admin.std.server.ReplicationServerCfg;
+import org.opends.server.api.DirectoryThread;
import org.opends.server.config.ConfigException;
import org.opends.server.loggers.debug.DebugTracer;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.common.ServerState;
import org.opends.server.replication.protocol.UpdateMsg;
import org.opends.server.replication.server.ChangelogState;
@@ -46,6 +49,7 @@
import org.opends.server.types.DN;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.util.StaticUtils;
+import org.opends.server.util.TimeThread;
import com.forgerock.opendj.util.Pair;
@@ -87,7 +91,7 @@
* The handler of the changelog database, the database stores the relation
* between a change number and the associated cookie.
* <p>
- * Guarded by cnIndexDBLock
+ * @GuardedBy("cnIndexDBLock")
*/
private JEChangeNumberIndexDB cnIndexDB;
private final AtomicReference<ChangeNumberIndexer> cnIndexer =
@@ -96,6 +100,15 @@
/** Used for protecting {@link ChangeNumberIndexDB} related state. */
private final Object cnIndexDBLock = new Object();
+ /**
+ * The purge delay (in milliseconds). Records in the changelog DB that are
+ * older than this delay might be removed.
+ */
+ private long purgeDelayInMillis;
+ private final AtomicReference<ChangelogDBPurger> cnPurger =
+ new AtomicReference<ChangelogDBPurger>();
+ private volatile long latestPurgeDate;
+
/** The local replication server. */
private final ReplicationServer replicationServer;
private AtomicBoolean shutdown = new AtomicBoolean();
@@ -312,13 +325,9 @@
initializeChangelogState(changelogState);
if (config.isComputeChangeNumber())
{
- final ChangeNumberIndexer indexer =
- new ChangeNumberIndexer(this, changelogState);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
+ startIndexer(changelogState);
}
+ setPurgeDelay(replicationServer.getPurgeDelay());
}
catch (ChangelogException e)
{
@@ -374,12 +383,17 @@
// - then throw the first encountered exception
ChangelogException firstException = null;
- final ChangeNumberIndexer indexer = cnIndexer.get();
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
- cnIndexer.compareAndSet(indexer, null);
}
+ final ChangelogDBPurger purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
+ }
+
try
{
shutdownCNIndexDB();
@@ -581,7 +595,7 @@
{
try
{
- cnIndexDB.clear(baseDN);
+ cnIndexDB.removeDomain(baseDN);
}
catch (ChangelogException e)
{
@@ -607,7 +621,9 @@
firstException = e;
}
else if (debugEnabled())
+ {
TRACER.debugCaught(DebugLogLevel.ERROR, e);
+ }
}
if (firstException != null)
@@ -618,18 +634,24 @@
/** {@inheritDoc} */
@Override
- public void setPurgeDelay(long delay)
+ public void setPurgeDelay(long purgeDelayInMillis)
{
- final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB;
- if (cnIndexDB != null)
+ this.purgeDelayInMillis = purgeDelayInMillis;
+ final ChangelogDBPurger purger;
+ if (purgeDelayInMillis > 0)
{
- cnIndexDB.setPurgeDelay(delay);
- }
- for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values())
- {
- for (JEReplicaDB replicaDB : domainMap.values())
+ purger = new ChangelogDBPurger();
+ if (cnPurger.compareAndSet(null, purger))
{
- replicaDB.setPurgeDelay(delay);
+ purger.start();
+ } // otherwise a purger was already running
+ }
+ else
+ {
+ purger = cnPurger.getAndSet(null);
+ if (purger != null)
+ {
+ purger.initiateShutdown();
}
}
}
@@ -639,19 +661,13 @@
public void setComputeChangeNumber(boolean computeChangeNumber)
throws ChangelogException
{
- final ChangeNumberIndexer indexer;
if (computeChangeNumber)
{
- final ChangelogState changelogState = dbEnv.readChangelogState();
- indexer = new ChangeNumberIndexer(this, changelogState);
- if (cnIndexer.compareAndSet(null, indexer))
- {
- indexer.start();
- }
+ startIndexer(dbEnv.readChangelogState());
}
else
{
- indexer = cnIndexer.getAndSet(null);
+ final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null);
if (indexer != null)
{
indexer.initiateShutdown();
@@ -659,48 +675,34 @@
}
}
+ private void startIndexer(final ChangelogState changelogState)
+ {
+ final ChangeNumberIndexer indexer =
+ new ChangeNumberIndexer(this, changelogState);
+ if (cnIndexer.compareAndSet(null, indexer))
+ {
+ indexer.start();
+ }
+ }
+
/** {@inheritDoc} */
@Override
public long getDomainLatestTrimDate(DN baseDN)
{
- long latest = 0;
- for (JEReplicaDB replicaDB : getDomainMap(baseDN).values())
- {
- if (latest == 0 || latest < replicaDB.getLatestTrimDate())
- {
- latest = replicaDB.getLatestTrimDate();
- }
- }
- return latest;
+ return latestPurgeDate;
}
/** {@inheritDoc} */
@Override
public ChangeNumberIndexDB getChangeNumberIndexDB()
{
- return getChangeNumberIndexDB(true);
- }
-
- /**
- * Returns the {@link ChangeNumberIndexDB} object.
- *
- * @param startTrimmingThread
- * whether the trimming thread should be started
- * @return the {@link ChangeNumberIndexDB} object
- */
- ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread)
- {
synchronized (cnIndexDBLock)
{
if (cnIndexDB == null)
{
try
{
- cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv);
- if (startTrimmingThread)
- {
- cnIndexDB.startTrimmingThread();
- }
+ cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv);
}
catch (Exception e)
{
@@ -830,4 +832,83 @@
}
// TODO save this state in the changelogStateDB?
}
+
+ /**
+ * The thread purging the changelogDB on a regular interval. Records are
+ * purged from the changelogDB is they are older than a delay specified in
+ * seconds. The purge process works in two steps:
+ * <ol>
+ * <li>first purge the changeNumberIndexDB and retrieve information to drive
+ * replicaDBs purging</li>
+ * <li>proceed to purge each replicaDBs based on the information collected
+ * when purging the changeNumberIndexDB</li>
+ * </ol>
+ */
+ private final class ChangelogDBPurger extends DirectoryThread
+ {
+
+ protected ChangelogDBPurger()
+ {
+ super("changelog DB purger");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void run()
+ {
+ // initialize CNIndexDB
+ getChangeNumberIndexDB();
+ while (!isShutdownInitiated())
+ {
+ try
+ {
+ final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB;
+ if (localCNIndexDB == null)
+ { // shutdown has been called
+ return;
+ }
+
+ final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis;
+ final MultiDomainServerState purgeUpToCookie =
+ localCNIndexDB.purgeUpTo(purgeTimestamp);
+ if (purgeUpToCookie == null)
+ { // this can happen when the change number index DB is empty
+ continue;
+ }
+
+ /*
+ * Drive purge of the replica DBs by the oldest non purged cookie in
+ * the change number index DB.
+ */
+ for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1
+ : domainToReplicaDBs.entrySet())
+ {
+ final DN baseDN = entry1.getKey();
+ final Map<Integer, JEReplicaDB> domainMap = entry1.getValue();
+ for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet())
+ {
+ final Integer serverId = entry2.getKey();
+ final JEReplicaDB replicaDB = entry2.getValue();
+ replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId));
+ }
+ }
+
+ latestPurgeDate = purgeTimestamp;
+
+ // purge delay is specified in seconds so it should not be a problem
+ // to sleep for 500 millis
+ sleep(500);
+ }
+ catch (Exception e)
+ {
+ logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH
+ .get(stackTraceToSingleLineString(e)));
+ if (replicationServer != null)
+ {
+ replicationServer.shutdown();
+ }
+ }
+ }
+ }
+ }
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
index e7309b0..bbde1d1 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@
import org.opends.server.types.Attributes;
import org.opends.server.types.DN;
import org.opends.server.types.InitializationException;
-import org.opends.server.util.TimeThread;
import static org.opends.messages.ReplicationMessages.*;
import static org.opends.server.loggers.ErrorLogger.*;
@@ -95,16 +94,8 @@
* <p>
* This blocking queue is only used as a temporary placeholder so that the
* write in the stable storage can be grouped for efficiency reason. Adding an
- * update synchronously add the update to this list. A dedicated thread loops
- * on {@link #flush()} and {@link #trim()}.
- * <dl>
- * <dt>flush()</dt>
- * <dd>get a number of changes from the in memory list by block and write them
- * to the db.</dd>
- * <dt>trim()</dt>
- * <dd>deletes from the DB a number of changes that are older than a certain
- * date.</dd>
- * </dl>
+ * update synchronously add the update to this list. A dedicated thread
+ * flushes this blocking queue.
* <p>
* Changes are not read back by replicationServer threads that are responsible
* for pushing the changes to other replication server or to LDAP server
@@ -133,22 +124,12 @@
private DbMonitorProvider dbMonitor = new DbMonitorProvider();
private DirectoryThread thread;
/**
- * Used to prevent race conditions between threads calling {@link #clear()}
- * {@link #flush()} or {@link #trim()}. This can happen with the thread
- * flushing the queue, on shutdown or on cursor opening, a thread calling
- * clear(), etc.
+ * Used to prevent race conditions between threads calling {@link #flush()}.
+ * This can happen with the thread flushing the queue, or else on shutdown.
*/
private final Object flushLock = new Object();
private ReplicationServer replicationServer;
- private long latestTrimDate = 0;
-
- /**
- * The trim age in milliseconds. Changes record in the change DB that
- * are older than this age are removed.
- */
- private long trimAge;
-
/**
* Creates a new ReplicaDB associated to a given LDAP server.
*
@@ -166,15 +147,14 @@
this.replicationServer = replicationServer;
this.serverId = serverId;
this.baseDN = baseDN;
- trimAge = replicationServer.getTrimAge();
queueMaxBytes = replicationServer.getQueueSize() * 200;
queueSizeBytes = new Semaphore(queueMaxBytes);
db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv);
csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN());
thread = new DirectoryThread(this, "Replication server RS("
- + replicationServer.getServerId()
- + ") changelog checkpointer for Replica DS(" + serverId
- + ") for domain \"" + baseDN + "\"");
+ + replicationServer.getServerId()
+ + ") flusher thread for Replica DS(" + serverId
+ + ") for domain \"" + baseDN + "\"");
thread.start();
DirectoryServer.deregisterMonitorProvider(dbMonitor);
@@ -334,9 +314,7 @@
}
/**
- * Run method for this class.
- * Periodically Flushes the ReplicationServerDomain cache from memory to the
- * stable storage and trims the old updates.
+ * Flushes the replicaDB queue from memory to stable storage.
*/
@Override
public void run()
@@ -350,7 +328,6 @@
try
{
flush();
- trim();
}
catch (ChangelogException end)
{
@@ -390,55 +367,26 @@
}
/**
- * Retrieves the latest trim date.
- * @return the latest trim date.
+ * Synchronously purge changes older than purgeCSN from this replicaDB.
+ *
+ * @param purgeCSN
+ * The CSN up to which changes can be purged. No purging happens when
+ * it is null.
+ * @throws ChangelogException
+ * In case of database problem.
*/
- public long getLatestTrimDate()
+ void purgeUpTo(final CSN purgeCSN) throws ChangelogException
{
- return latestTrimDate;
- }
-
-
- /**
- * Trim old changes from this replicationServer database.
- * @throws ChangelogException In case of database problem.
- */
- private void trim() throws ChangelogException
- {
- if (trimAge == 0)
+ if (purgeCSN == null)
{
return;
}
- latestTrimDate = TimeThread.getTime() - trimAge;
-
- CSN trimDate = new CSN(latestTrimDate, 0, 0);
-
- // Find the last CSN before the trimDate, in the Database.
- CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate);
- if (lastBeforeTrimDate != null)
- {
- // If we found it, we want to stop trimming when reaching it.
- trimDate = lastBeforeTrimDate;
- }
-
for (int i = 0; i < 100; i++)
{
/*
- * Perform at least some trimming regardless of the flush backlog. Then
- * continue trim iterations while the flush backlog is low (below the
- * lowmark). Once the flush backlog increases, stop trimming and start
- * flushing more eagerly.
- */
- if (i > 20 && isQueueAboveLowMark())
- {
- break;
- }
-
- /*
- * the trim is done by group in order to save some CPU, IO bandwidth and
- * DB caches: start the transaction then do a bunch of remove then
- * commit.
+ * the purge is done by group in order to save some CPU, IO bandwidth and
+ * DB caches: start the transaction then do a bunch of remove then commit.
*/
/*
* Matt wrote: The record removal is done as a DB transaction and the
@@ -464,7 +412,7 @@
return;
}
- if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate))
+ if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN))
{
cursor.delete();
}
@@ -490,37 +438,31 @@
}
}
- private boolean isQueueAboveLowMark()
- {
- final int lowMarkBytes = queueMaxBytes / 5;
- final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits();
- return bytesUsed > lowMarkBytes;
- }
-
/**
* Flush a number of updates from the memory list to the stable storage.
* <p>
* Flush is done by chunk sized to 500 messages, starting from the beginning
* of the list.
- *
+ * <p>
+ * @GuardedBy("flushLock")
* @throws ChangelogException
* If a database problem happened
*/
- public void flush() throws ChangelogException
+ private void flush() throws ChangelogException
{
try
{
synchronized (flushLock)
{
- final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
- final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS);
+ final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS);
if (change == null)
{
- // nothing to persist, move on to the trim phase
+ // nothing to persist, check if shutdown was invoked
return;
}
// Try to see if there are more changes and persist them all.
+ final List<UpdateMsg> changes = new LinkedList<UpdateMsg>();
changes.add(change);
msgQueue.drainTo(changes);
@@ -604,15 +546,6 @@
}
/**
- * Set the Purge delay for this db Handler.
- * @param delay The purge delay in Milliseconds.
- */
- public void setPurgeDelay(long delay)
- {
- trimAge = delay;
- }
-
- /**
* Clear the changes from this DB (from both memory cache and DB storage).
* @throws ChangelogException When an exception occurs while removing the
* changes from the DB.
@@ -636,13 +569,15 @@
}
/**
- * Return the size of the msgQueue (the memory cache of the ReplicaDB).
+ * Return the number of records of this replicaDB.
+ * <p>
* For test purpose.
- * @return The memory queue size.
+ *
+ * @return The number of records of this replicaDB.
*/
- int getQueueSize()
+ long getNumberRecords()
{
- return this.msgQueue.size();
+ return db.getNumberRecords();
}
/**
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
index 919ad34..577fc39 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -22,7 +22,7 @@
*
*
* Copyright 2006-2009 Sun Microsystems, Inc.
- * Portions Copyright 2011-2013 ForgeRock AS
+ * Portions Copyright 2011-2014 ForgeRock AS
*/
package org.opends.server.replication.server.changelog.je;
@@ -73,15 +73,6 @@
// we didn't find it in the db
cursor = null;
}
-
- if (cursor == null)
- {
- // flush the queue into the db
- replicaDB.flush();
-
- // look again in the db
- cursor = db.openReadCursor(startAfterCSN);
- }
}
/** {@inheritDoc} */
@@ -96,15 +87,7 @@
public boolean next() throws ChangelogException
{
final ReplServerDBCursor localCursor = cursor;
- if (localCursor != null)
- {
- currentChange = localCursor.next();
- }
- else
- {
- currentChange = null;
- }
-
+ currentChange = localCursor != null ? localCursor.next() : null;
if (currentChange != null)
{
@@ -114,12 +97,8 @@
{
synchronized (this)
{
- if (cursor != null)
- {
- cursor.close();
- cursor = null;
- }
- replicaDB.flush();
+ closeCursor();
+ // previously exhausted cursor must be able to reinitialize themselves
cursor = db.openReadCursor(lastNonNullCurrentCSN);
currentChange = cursor.next();
if (currentChange != null)
@@ -137,13 +116,17 @@
{
synchronized (this)
{
- if (cursor != null)
- {
- cursor.close();
- cursor = null;
- }
+ closeCursor();
this.replicaDB = null;
- this.db = null;
+ }
+ }
+
+ private void closeCursor()
+ {
+ if (cursor != null)
+ {
+ cursor.close();
+ cursor = null;
}
}
diff --git a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
index f755ec8..818fba5 100644
--- a/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
+++ b/opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -943,4 +943,13 @@
return db == null || !db.getEnvironment().isValid();
}
+ /**
+ * Returns the number of records in this DB.
+ *
+ * @return the number of records in this DB.
+ */
+ long getNumberRecords()
+ {
+ return db.count();
+ }
}
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
index 17e37c4..05e27d7 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -67,7 +67,10 @@
import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation;
import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement;
import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation;
-import org.testng.annotations.*;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
import static org.assertj.core.api.Assertions.*;
import static org.opends.messages.ReplicationMessages.*;
@@ -168,7 +171,7 @@
@Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"})
public void ECLReplicationServerTest() throws Exception
{
- getCNIndexDB().setPurgeDelay(0);
+ replicationServer.getChangelogDB().setPurgeDelay(0);
// let's enable ECl manually now that we tested that ECl is not available
ECLWorkflowElement wfe =
(ECLWorkflowElement) DirectoryServer
@@ -189,7 +192,7 @@
@Test(enabled=false, dependsOnMethods = { "ECLReplicationServerTest"})
public void ECLReplicationServerTest1() throws Exception
{
- getCNIndexDB().setPurgeDelay(0);
+ replicationServer.getChangelogDB().setPurgeDelay(0);
// Test with a mix of domains, a mix of DSes
ECLTwoDomains();
}
@@ -204,7 +207,7 @@
@Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"})
public void ECLReplicationServerTest3() throws Exception
{
- getCNIndexDB().setPurgeDelay(0);
+ replicationServer.getChangelogDB().setPurgeDelay(0);
// Write changes and read ECL from start
ECLCompatWriteReadAllOps(1);
@@ -263,7 +266,7 @@
@Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
public void ECLReplicationServerFullTest3() throws Exception
{
- getCNIndexDB().setPurgeDelay(0);
+ replicationServer.getChangelogDB().setPurgeDelay(0);
// Test all types of ops.
ECLAllOps(); // Do not clean the db for the next test
@@ -347,8 +350,7 @@
@Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"})
public void ECLReplicationServerFullTest15() throws Exception
{
- final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
- cnIndexDB.setPurgeDelay(0);
+ replicationServer.getChangelogDB().setPurgeDelay(0);
// Write 4 changes and read ECL from start
ECLCompatWriteReadAllOps(1);
@@ -369,8 +371,9 @@
ECLCompatTestLimitsAndAdd(1, 8, 4);
// Test CNIndexDB is purged when replication change log is purged
- cnIndexDB.setPurgeDelay(1);
- cnIndexDB.trim(null);
+ final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
+ cnIndexDB.purgeUpTo(Long.MAX_VALUE);
+ assertTrue(cnIndexDB.isEmpty());
ECLPurgeCNIndexDBAfterChangelogClear();
// Test first and last are updated
@@ -896,7 +899,7 @@
null);
cnt++;
}
- while (cnt < 100 // wait at most 1s
+ while (cnt < 300 // wait at most 3s
&& op.getSearchEntries().size() != expectedNbEntries);
final List<SearchResultEntry> entries = op.getSearchEntries();
assertThat(entries).hasSize(expectedNbEntries);
@@ -1951,16 +1954,6 @@
clearChangelogDB(replicationServer);
}
- @AfterTest
- public void setPurgeDelayToInitialValue() throws Exception
- {
- JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
- if (cnIndexDB != null)
- {
- cnIndexDB.setPurgeDelay(1);
- }
- }
-
/**
* After the tests stop the replicationServer.
*/
@@ -2461,10 +2454,9 @@
String tn = "ECLPurgeCNIndexDBAfterChangelogClear";
debugInfo(tn, "Starting test\n\n");
- JEChangeNumberIndexDB cnIndexDB =
- (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
+ final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB();
assertEquals(cnIndexDB.count(), 8);
- cnIndexDB.setPurgeDelay(1000);
+ replicationServer.getChangelogDB().setPurgeDelay(1000);
clearChangelogDB(replicationServer);
@@ -2620,11 +2612,7 @@
private JEChangeNumberIndexDB getCNIndexDB()
{
- if (replicationServer != null)
- {
- return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
- }
- return null;
+ return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB();
}
/**
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
index 83c1180..8cb6ed4 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -447,11 +447,6 @@
{
final ReplicatedUpdateMsg msg = msgs[i];
final ChangeNumberIndexRecord record = allValues.get(i);
- if (previousCookie.isEmpty())
- {
- // ugly hack to go round strange legacy code @see OPENDJ-67
- previousCookie.replace(record.getBaseDN(), new ServerState());
- }
// check content in order
String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">";
assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN());
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
index 8f7717c..5fb4500 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -26,16 +26,22 @@
*/
package org.opends.server.replication.server.changelog.je;
+import java.util.ArrayList;
+import java.util.List;
+
import org.opends.server.TestCaseUtils;
import org.opends.server.replication.ReplicationTestCase;
import org.opends.server.replication.common.CSN;
+import org.opends.server.replication.common.MultiDomainServerState;
import org.opends.server.replication.server.ReplServerFakeConfiguration;
import org.opends.server.replication.server.ReplicationServer;
import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord;
+import org.opends.server.replication.server.changelog.api.ChangelogDB;
import org.opends.server.replication.server.changelog.api.ChangelogException;
import org.opends.server.replication.server.changelog.api.DBCursor;
import org.opends.server.types.DN;
import org.opends.server.util.StaticUtils;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*;
@@ -48,9 +54,16 @@
@SuppressWarnings("javadoc")
public class JEChangeNumberIndexDBTest extends ReplicationTestCase
{
- private static final String value1 = "value1";
- private static final String value2 = "value2";
- private static final String value3 = "value3";
+ private final MultiDomainServerState previousCookie =
+ new MultiDomainServerState();
+ private final List<String> cookies = new ArrayList<String>();
+
+ @BeforeMethod
+ public void clearCookie()
+ {
+ previousCookie.clear();
+ cookies.clear();
+ }
/**
* This test makes basic operations of a JEChangeNumberIndexDB:
@@ -67,12 +80,11 @@
void testTrim() throws Exception
{
ReplicationServer replicationServer = null;
- JEChangeNumberIndexDB cnIndexDB = null;
try
{
replicationServer = newReplicationServer();
- cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
- cnIndexDB.setPurgeDelay(0);
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0); // disable purging
// Prepare data to be stored in the db
DN baseDN1 = DN.decode("o=baseDN1");
@@ -82,9 +94,10 @@
CSN[] csns = newCSNs(1, 0, 3);
// Add records
- long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
- addRecord(cnIndexDB, value2, baseDN2, csns[1]);
- long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
+ final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+ long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+ addRecord(cnIndexDB, baseDN2, csns[1]);
+ long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
// The ChangeNumber should not get purged
final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber();
@@ -94,11 +107,11 @@
DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN);
try
{
- assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1);
+ assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0));
assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2);
+ assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1));
assertTrue(cursor.next());
- assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3);
+ assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2));
assertFalse(cursor.next());
}
finally
@@ -106,14 +119,13 @@
StaticUtils.close(cursor);
}
- // Now test that the trimming thread does its job => start it
- cnIndexDB.setPurgeDelay(100);
- cnIndexDB.startTrimmingThread();
-
- // Check the db is cleared.
- while (cnIndexDB.count() > 1)
+ // Now test that purging removes all changes bar the last one
+ changelogDB.setPurgeDelay(1);
+ int count = 0;
+ while (cnIndexDB.count() > 1 && count < 100)
{
- Thread.yield();
+ Thread.sleep(10);
+ count++;
}
assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
}
@@ -123,10 +135,14 @@
}
}
- private long addRecord(JEChangeNumberIndexDB cnIndexDB, String cookie, DN baseDN, CSN csn)
- throws ChangelogException
+ private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException
{
- return cnIndexDB.addRecord(new ChangeNumberIndexRecord(cookie, baseDN, csn));
+ final String cookie = previousCookie.toString();
+ cookies.add(cookie);
+ final long changeNumber = cnIndexDB.addRecord(
+ new ChangeNumberIndexRecord(cookie, baseDN, csn));
+ previousCookie.update(baseDN, csn);
+ return changeNumber;
}
private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie)
@@ -136,11 +152,11 @@
assertEquals(record.getPreviousCookie(), cookie);
}
- private JEChangeNumberIndexDB getCNIndexDBNoTrimming(ReplicationServer rs) throws ChangelogException
+ private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException
{
final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB();
final JEChangeNumberIndexDB cnIndexDB =
- (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(false);
+ (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB();
assertTrue(cnIndexDB.isEmpty());
return cnIndexDB;
}
@@ -160,12 +176,11 @@
void testClear() throws Exception
{
ReplicationServer replicationServer = null;
- JEChangeNumberIndexDB cnIndexDB = null;
try
{
replicationServer = newReplicationServer();
- cnIndexDB = getCNIndexDBNoTrimming(replicationServer);
- cnIndexDB.setPurgeDelay(0);
+ final ChangelogDB changelogDB = replicationServer.getChangelogDB();
+ changelogDB.setPurgeDelay(0);
// Prepare data to be stored in the db
@@ -176,9 +191,10 @@
CSN[] csns = newCSNs(1, 0, 3);
// Add records
- long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]);
- long cn2 = addRecord(cnIndexDB, value2, baseDN2, csns[1]);
- long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]);
+ final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer);
+ long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]);
+ long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]);
+ long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]);
// Checks
assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1);
@@ -187,9 +203,9 @@
assertEquals(cnIndexDB.count(), 3, "Db count");
assertFalse(cnIndexDB.isEmpty());
- assertEquals(getPreviousCookie(cnIndexDB, cn1), value1);
- assertEquals(getPreviousCookie(cnIndexDB, cn2), value2);
- assertEquals(getPreviousCookie(cnIndexDB, cn3), value3);
+ assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0));
+ assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1));
+ assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2));
DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1);
assertCursorReadsInOrder(cursor, cn1, cn2, cn3);
@@ -200,7 +216,7 @@
cursor = cnIndexDB.getCursorFrom(cn3);
assertCursorReadsInOrder(cursor, cn3);
- cnIndexDB.clear(null);
+ cnIndexDB.removeDomain(null);
assertOnlyNewestRecordIsLeft(cnIndexDB, 3);
// Check the db is cleared.
diff --git a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
index e17878a..851afa2 100644
--- a/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
+++ b/opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,7 +97,7 @@
//--
// Iterator tests with changes persisted
- waitChangesArePersisted(replicaDB);
+ waitChangesArePersisted(replicaDB, 3);
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]);
assertNotFound(replicaDB, csns[4]);
@@ -108,7 +108,7 @@
//--
// Cursor tests with changes persisted
replicaDB.add(update4);
- waitChangesArePersisted(replicaDB);
+ waitChangesArePersisted(replicaDB, 4);
assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]);
// Test cursor from existing CSN
@@ -116,7 +116,7 @@
assertFoundInOrder(replicaDB, csns[3]);
assertNotFound(replicaDB, csns[4]);
- replicaDB.setPurgeDelay(1);
+ replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
int count = 0;
boolean purgeSucceeded = false;
@@ -141,16 +141,27 @@
}
}
- private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception
+ private void waitChangesArePersisted(JEReplicaDB replicaDB,
+ int nbRecordsInserted) throws Exception
{
- final int expected = 0;
+ waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000);
+ }
+
+ private void waitChangesArePersisted(JEReplicaDB replicaDB,
+ int nbRecordsInserted, int counterWindow) throws Exception
+ {
+ // one counter record is inserted every time "counterWindow"
+ // records have been inserted
+ int expectedNbRecords =
+ nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow;
+
int count = 0;
- while (replicaDB.getQueueSize() != expected && count < 100)
+ while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100)
{
Thread.sleep(10);
count++;
}
- assertEquals(replicaDB.getQueueSize(), expected);
+ assertEquals(replicaDB.getNumberRecords(), expectedNbRecords);
}
static CSN[] newCSNs(int serverId, long timestamp, int number)
@@ -204,8 +215,9 @@
assertNull(cursor.getRecord());
for (int i = 1; i < csns.length; i++)
{
- assertTrue(cursor.next());
- assertEquals(cursor.getRecord().getCSN(), csns[i]);
+ final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI();
+ assertTrue(cursor.next(), msg);
+ assertEquals(cursor.getRecord().getCSN(), csns[i], msg);
}
assertFalse(cursor.next());
assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord()
@@ -274,11 +286,12 @@
{
ReplicationServer replicationServer = null;
DBCursor<UpdateMsg> cursor = null;
+ JEReplicaDB replicaDB = null;
try
{
TestCaseUtils.startServer();
replicationServer = configureReplicationServer(100000, 10);
- JEReplicaDB replicaDB = newReplicaDB(replicationServer);
+ replicaDB = newReplicaDB(replicationServer);
CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6);
for (int i = 0; i < 5; i++)
@@ -288,7 +301,7 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
}
}
- replicaDB.flush();
+ waitChangesArePersisted(replicaDB, 4);
cursor = replicaDB.generateCursorFrom(csns[0]);
assertTrue(cursor.next());
@@ -307,6 +320,8 @@
finally
{
StaticUtils.close(cursor);
+ if (replicaDB != null)
+ replicaDB.shutdown();
remove(replicationServer);
}
}
@@ -334,7 +349,7 @@
testGetOldestNewestCSNs(4000, 1000);
}
- private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception
+ private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception
{
String tn = "testDBCount("+max+","+counterWindow+")";
debugInfo(tn, "Starting test");
@@ -363,7 +378,7 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
mySeqnum+=2;
}
- replicaDB.flush();
+ waitChangesArePersisted(replicaDB, max, counterWindow);
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN");
@@ -387,15 +402,13 @@
replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid"));
mySeqnum+=2;
}
- replicaDB.flush();
+ waitChangesArePersisted(replicaDB, 2 * max, counterWindow);
assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN");
assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN");
//
-
- replicaDB.setPurgeDelay(100);
- sleep(1000);
+ replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0));
String testcase = "AFTER PURGE (oldest, newest)=";
debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());
--
Gitblit v1.10.0