OPENDJ-1177 (CR-3304) Re-implement changelog purging logic
After OPENDJ-1174, change number index DB is now populated eagerly (was populated lazily).
This means we can reap the benefits by changing how purging is done for changelogDB.
Previous purge process was driven by the replicaDBs: first purge replicaDBs then purge the change number index DB and it was causing lots of problems like stale CNIndexDB records for example.
New purge process is driven by the change number index DB: first purge change number index DB then purge the replicaDBs based on the oldest valid referenced record in the change number index DB.
Moved JEChangeNumberIndexDB purge thread to JEChangelogDB + made it responsible for purging the ChangeNumberIndexDB and all the ReplicaDBs.
In JEReplicaDB, thread is now only responsible for flushing, previously it was responsible for trimming and flushing which complicated the design and made it less efficient for both operations.
JEChangelogDB.java:
Added inner class ChangelogDBPurger.
Added fields purgeDelay, cnPurger and latestTrimDate.
Extracted method startIndexer()
Removed getChangeNumberIndexDB(boolean) + associated code.
Reimplemented getDomainLatestTrimDate() and setPurgeDelay().
JEChangeNumberIndexDB.java:
No longer implements Runnable + removed run().
Removed fields trimmingThread, trimAge and replicationServer.
In ctor, removed ReplicationServer parameter.
Removed startTrimmingThread(), setPurgeDelay() clear(DN, AtomicBoolean).
Renamed clear(DN) to removeDomain(DN).
Renamed trim(AtomicBoolean) to purgeUpTo(long).
Added purgeUpToCookie(ChangeNumberIndexRecord).
JEReplicaDB.java:
Removed fields latestTrimDate and trimAge.
In run(), no longer call trim().
Removed getLatestTrimDate(), isQueueAboveLowMark(), setPurgeDelay() and getQueueSize().
Renamed trim() to purgeUpTo(CSN).
Made flush() private + reduced wait time on polling the msgQueue to speed up shutdown.
Added getNumberRecords() for unit tests.
JEReplicaDBCursor.java:
Since flushing is now eager, removed all calls to JEReplicaDB.flush().
Extracted methods closeCursor().
ReplicationServer.java:
Renamed getTrimAge() to getPurgeDelay() for consistency.
ReplicationDB.java:
Added getNumberRecords().
ChangeNumberIndexer.java, ChangeNumberIndexerTest.java:
Removed hacks due to old purging code.
ExternalChangeLogTest.java:
Called ChangelogDB.setPurgeDelay() instead of ChangeNumberIndexDB.setPurgeDelay(0).
Consequence of the changes to JEReplicaDB.
Removed method setPurgeDelayToInitialValue() that was not doing anything (JEChangeNumberIndexDB was always null).
In getCNIndexDB(), removed useless null check.
JEChangeNumberIndexDBTest.java:
Removed constants value1, value2, value3 which are invalid cookies.
Replaced them with fields previousCookie and cookies.
Added clearCookie() method.
In addRecord(), removed cookie parameter and build the cookie from the new fields.
Consequence of the changes to JEChangeNumberIndexDB.
JEReplicaDBTest.java:
In waitChangesArePersisted(), used JEReplicaDB.getNumberRecords() instead of JEReplicaDB.getQueueSize() + added parameters describing the number of expected records + the counter record window.
Replaced all calls to JEReplicaDB.flush() with calls to waitChangesArePersisted().
Consequence of the changes to JEReplicaDB.
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2010 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server; |
| | | |
| | |
| | | * @return The time after which changes must be deleted from the |
| | | * persistent storage (in milliseconds). |
| | | */ |
| | | public long getTrimAge() |
| | | public long getPurgeDelay() |
| | | { |
| | | return this.config.getReplicationPurgeDelay() * 1000; |
| | | } |
| | |
| | | final long newPurgeDelay = config.getReplicationPurgeDelay(); |
| | | if (newPurgeDelay != oldConfig.getReplicationPurgeDelay()) |
| | | { |
| | | this.changelogDB.setPurgeDelay(getTrimAge()); |
| | | this.changelogDB.setPurgeDelay(getPurgeDelay()); |
| | | } |
| | | final boolean computeCN = config.isComputeChangeNumber(); |
| | | if (computeCN != oldConfig.isComputeChangeNumber()) |
| | |
| | | |
| | | // OK, the oldest change is older than the medium consistency point |
| | | // let's publish it to the CNIndexDB. |
| | | |
| | | // Next if statement is ugly but ensures the first change will not be |
| | | // immediately trimmed from the CNIndexDB. Yuck! |
| | | if (mediumConsistencyRUV.isEmpty()) |
| | | { |
| | | mediumConsistencyRUV.replace(baseDN, new ServerState()); |
| | | } |
| | | final String previousCookie = mediumConsistencyRUV.toString(); |
| | | final ChangeNumberIndexRecord record = |
| | | new ChangeNumberIndexRecord(previousCookie, baseDN, csn); |
| | |
| | | } |
| | | |
| | | /** |
| | | * Asks the current thread to clear its state. |
| | | * Asks the current thread to clear its state and blocks until state is |
| | | * cleared. |
| | | * <p> |
| | | * This method is only useful for unit tests. |
| | | */ |
| | | public void clear() |
| | | { |
| | | doClear.set(true); |
| | | while (doClear.get() && !State.TERMINATED.equals(getState())) |
| | | { |
| | | // wait until clear() has been done by thread, always waking it up |
| | | synchronized (this) |
| | | { |
| | | notify(); |
| | | } |
| | | while (doClear.get()) |
| | | { |
| | | // wait until clear() has been done by thread |
| | | // ensures unit tests wait that this thread's state is cleaned up |
| | | Thread.yield(); |
| | | } |
| | |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.opends.server.admin.std.server.MonitorProviderCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.api.MonitorProvider; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.ReplicationServerDomain; |
| | | import org.opends.server.replication.server.changelog.api.*; |
| | | import org.opends.server.replication.server.changelog.je.DraftCNDB.*; |
| | | import org.opends.server.types.*; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | /** |
| | | * This class is used for managing the replicationServer database for each |
| | |
| | | * This class publishes some monitoring information below <code> |
| | | * cn=monitor</code>. |
| | | */ |
| | | public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable |
| | | public class JEChangeNumberIndexDB implements ChangeNumberIndexDB |
| | | { |
| | | /** |
| | | * The tracer object for the debug logger. |
| | |
| | | /** FIXME What is this field used for? */ |
| | | private volatile long oldestChangeNumber = NO_KEY; |
| | | /** |
| | | * The newest changenumber stored in the DB. It is used to avoid trimming the |
| | | * The newest changenumber stored in the DB. It is used to avoid purging the |
| | | * record with the newest changenumber. The newest record in the changenumber |
| | | * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is |
| | | * then retrieved on server startup. |
| | |
| | | * condition between: |
| | | * <ol> |
| | | * <li>this atomic long being incremented for a new record ('recordB')</li> |
| | | * <li>the current newest record ('recordA') being trimmed from the DB</li> |
| | | * <li>the current newest record ('recordA') being purged from the DB</li> |
| | | * <li>'recordB' failing to be inserted in the DB</li> |
| | | * </ol> |
| | | */ |
| | | private final AtomicLong lastGeneratedChangeNumber; |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private final AtomicBoolean shutdown = new AtomicBoolean(false); |
| | | /** |
| | | * A dedicated thread loops trim(). |
| | | * <p> |
| | | * trim() : deletes from the DB a number of changes that are older than a |
| | | * certain date. |
| | | */ |
| | | private DirectoryThread trimmingThread; |
| | | /** |
| | | * The trim age in milliseconds. Changes record in the change DB that are |
| | | * older than this age are removed. |
| | | * <p> |
| | | * FIXME it never gets updated even when the replication server purge delay is |
| | | * updated |
| | | */ |
| | | private volatile long trimAge; |
| | | |
| | | private ReplicationServer replicationServer; |
| | | |
| | | |
| | | /** |
| | | * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. |
| | | * |
| | | * @param replicationServer The ReplicationServer that creates this instance. |
| | | * @param dbenv the Database Env to use to create the ReplicationServer DB. |
| | | * @param dbEnv the Database Env to use to create the ReplicationServer DB. |
| | | * server for this domain. |
| | | * @throws ChangelogException If a database problem happened |
| | | */ |
| | | public JEChangeNumberIndexDB(ReplicationServer replicationServer, |
| | | ReplicationDbEnv dbenv) throws ChangelogException |
| | | public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException |
| | | { |
| | | this.replicationServer = replicationServer; |
| | | this.trimAge = replicationServer.getTrimAge(); |
| | | |
| | | // DB initialization |
| | | db = new DraftCNDB(dbenv); |
| | | db = new DraftCNDB(dbEnv); |
| | | final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord(); |
| | | final ChangeNumberIndexRecord newestRecord = db.readLastRecord(); |
| | | oldestChangeNumber = getChangeNumber(oldestRecord); |
| | |
| | | DirectoryServer.registerMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | /** |
| | | * Creates and starts the thread trimming the CNIndexDB. |
| | | */ |
| | | public void startTrimmingThread() |
| | | { |
| | | trimmingThread = |
| | | new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer"); |
| | | trimmingThread.start(); |
| | | } |
| | | |
| | | private long getChangeNumber(ChangeNumberIndexRecord record) |
| | | throws ChangelogException |
| | | { |
| | |
| | | notifyAll(); |
| | | } |
| | | |
| | | if (trimmingThread != null) |
| | | { |
| | | try |
| | | { |
| | | trimmingThread.join(); |
| | | } |
| | | catch (InterruptedException ignored) |
| | | { |
| | | // Nothing can be done about it, just proceed |
| | | } |
| | | } |
| | | |
| | | db.shutdown(); |
| | | DirectoryServer.deregisterMonitorProvider(dbMonitor); |
| | | } |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | * Periodically Flushes the ReplicationServerDomain cache from memory to the |
| | | * stable storage and trims the old updates. |
| | | * Synchronously purges the change number index DB up to and excluding the |
| | | * provided timestamp. |
| | | * |
| | | * @param purgeTimestamp |
| | | * the timestamp up to which purging must happen |
| | | * @return the {@link MultiDomainServerState} object that drives purging the |
| | | * replicaDBs. |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | public MultiDomainServerState purgeUpTo(long purgeTimestamp) |
| | | throws ChangelogException |
| | | { |
| | | while (!shutdown.get()) |
| | | if (isEmpty()) |
| | | { |
| | | try { |
| | | trim(shutdown); |
| | | return null; |
| | | } |
| | | |
| | | synchronized (this) |
| | | { |
| | | if (!shutdown.get()) |
| | | { |
| | | final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); |
| | | |
| | | final DraftCNDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | wait(1000); |
| | | } |
| | | catch (InterruptedException e) |
| | | while (!mustShutdown(shutdown) && cursor.next()) |
| | | { |
| | | Thread.currentThread().interrupt(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | catch (Exception end) |
| | | final ChangeNumberIndexRecord record = cursor.currentRecord(); |
| | | if (record.getChangeNumber() != oldestChangeNumber) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(end))); |
| | | if (replicationServer != null) |
| | | oldestChangeNumber = record.getChangeNumber(); |
| | | } |
| | | if (record.getChangeNumber() == newestChangeNumber) |
| | | { |
| | | replicationServer.shutdown(); |
| | | // do not purge the newest record to avoid having the last generated |
| | | // changenumber dropping back to 0 if the server restarts |
| | | return getPurgeCookie(record); |
| | | } |
| | | break; |
| | | |
| | | if (record.getCSN().isOlderThan(purgeCSN)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | | else |
| | | { |
| | | // Current record is not old enough to purge. |
| | | return getPurgeCookie(record); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Trim old changes from this database. |
| | | * |
| | | * @param shutdown |
| | | * AtomicBoolean telling whether the current run must be stopped |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | | public void trim(AtomicBoolean shutdown) throws ChangelogException |
| | | return null; |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | if (trimAge == 0) |
| | | return; |
| | | cursor.abort(); |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | cursor.abort(); |
| | | throw new ChangelogException(e); |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | |
| | | clear(null, shutdown); |
| | | private MultiDomainServerState getPurgeCookie( |
| | | final ChangeNumberIndexRecord record) throws DirectoryException |
| | | { |
| | | // Do not include the record's CSN to avoid having it purged |
| | | return new MultiDomainServerState(record.getPreviousCookie()); |
| | | } |
| | | |
| | | /** |
| | |
| | | * @throws ChangelogException |
| | | * if a database problem occurs. |
| | | */ |
| | | public void clear(DN baseDNToClear) throws ChangelogException |
| | | { |
| | | clear(baseDNToClear, null); |
| | | } |
| | | |
| | | private void clear(DN baseDNToClear, AtomicBoolean shutdown) |
| | | throws ChangelogException |
| | | public void removeDomain(DN baseDNToClear) throws ChangelogException |
| | | { |
| | | if (isEmpty()) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | if (mustShutdown(shutdown)) |
| | | { |
| | | return; |
| | | } |
| | | final DraftCNDBCursor cursor = db.openDeleteCursor(); |
| | | try |
| | | { |
| | | for (int j = 0; j < 50; j++) |
| | | boolean isOldestRecord = true; |
| | | while (!mustShutdown(shutdown) && cursor.next()) |
| | | { |
| | | // let's traverse the CNIndexDB |
| | | if (mustShutdown(shutdown) || !cursor.next()) |
| | | { |
| | | cursor.close(); |
| | | return; |
| | | } |
| | | |
| | | final ChangeNumberIndexRecord record = cursor.currentRecord(); |
| | | if (record.getChangeNumber() != oldestChangeNumber) |
| | | if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber) |
| | | { |
| | | oldestChangeNumber = record.getChangeNumber(); |
| | | } |
| | | if (record.getChangeNumber() == newestChangeNumber) |
| | | { |
| | | // do not trim the newest record to avoid having the last generated |
| | | // do not purge the newest record to avoid having the last generated |
| | | // changenumber dropping back to 0 if the server restarts |
| | | cursor.close(); |
| | | return; |
| | | } |
| | | |
| | | if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN())) |
| | | if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear)) |
| | | { |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | final ReplicationServerDomain domain = |
| | | replicationServer.getReplicationServerDomain(record.getBaseDN()); |
| | | if (domain == null) |
| | | else |
| | | { |
| | | // the domain has been removed since the record was written in the |
| | | // CNIndexDB, thus it makes no sense to keep this record in the DB. |
| | | cursor.delete(); |
| | | continue; |
| | | isOldestRecord = false; |
| | | } |
| | | |
| | | // FIXME there is an opportunity for a phantom record in the CNIndexDB |
| | | // if the replicaDB gets purged after call to domain.getOldestState(). |
| | | final CSN csn = record.getCSN(); |
| | | final ServerState oldestState = domain.getOldestState(); |
| | | final CSN fcsn = oldestState.getCSN(csn.getServerId()); |
| | | if (csn.isOlderThan(fcsn)) |
| | | { |
| | | // This change which has already been purged from the corresponding |
| | | // replicaDB => purge it from CNIndexDB |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | ServerState csnVector; |
| | | try |
| | | { |
| | | Map<DN, ServerState> csnStartStates = |
| | | MultiDomainServerState.splitGenStateToServerStates( |
| | | record.getPreviousCookie()); |
| | | csnVector = csnStartStates.get(record.getBaseDN()); |
| | | |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:" |
| | | + csnVector + " -- StartState:" + oldestState); |
| | | } |
| | | catch(Exception e) |
| | | { |
| | | // We could not parse the MultiDomainServerState from the record |
| | | // FIXME this is quite an aggressive delete() |
| | | cursor.delete(); |
| | | continue; |
| | | } |
| | | |
| | | if (csnVector == null |
| | | || (csnVector.getCSN(csn.getServerId()) != null |
| | | && !csnVector.cover(oldestState))) |
| | | { |
| | | cursor.delete(); |
| | | if (debugEnabled()) |
| | | TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn |
| | | + "Not covering startState"); |
| | | continue; |
| | | } |
| | | |
| | | oldestChangeNumber = record.getChangeNumber(); |
| | | cursor.close(); |
| | | return; |
| | | } |
| | | |
| | | cursor.close(); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | | cursor.abort(); |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | finally |
| | | { |
| | | cursor.abort(); |
| | | throw new ChangelogException(e); |
| | | } |
| | | cursor.close(); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> |
| | | { |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public List<Attribute> getMonitorData() |
| | | { |
| | |
| | | return 0; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String getMonitorInstanceName() |
| | | { |
| | | return "ChangeNumber Index Database"; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void initializeMonitorProvider(MonitorProviderCfg configuration) |
| | | throws ConfigException,InitializationException |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the Purge delay for this db Handler. |
| | | * @param delay The purge delay in Milliseconds. |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | trimAge = delay; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB (from both memory cache and DB storage). |
| | | * |
| | | * @throws ChangelogException |
| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.io.File; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | |
| | | * The handler of the changelog database, the database stores the relation |
| | | * between a change number and the associated cookie. |
| | | * <p> |
| | | * Guarded by cnIndexDBLock |
| | | * @GuardedBy("cnIndexDBLock") |
| | | */ |
| | | private JEChangeNumberIndexDB cnIndexDB; |
| | | private final AtomicReference<ChangeNumberIndexer> cnIndexer = |
| | |
| | | /** Used for protecting {@link ChangeNumberIndexDB} related state. */ |
| | | private final Object cnIndexDBLock = new Object(); |
| | | |
| | | /** |
| | | * The purge delay (in milliseconds). Records in the changelog DB that are |
| | | * older than this delay might be removed. |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = |
| | | new AtomicReference<ChangelogDBPurger>(); |
| | | private volatile long latestPurgeDate; |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | |
| | | initializeChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | | final ChangeNumberIndexer indexer = |
| | | new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | startIndexer(changelogState); |
| | | } |
| | | } |
| | | setPurgeDelay(replicationServer.getPurgeDelay()); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | cnIndexer.compareAndSet(indexer, null); |
| | | } |
| | | final ChangelogDBPurger purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | } |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB.clear(baseDN); |
| | | cnIndexDB.removeDomain(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | firstException = e; |
| | | } |
| | | else if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | |
| | | if (firstException != null) |
| | | { |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setPurgeDelay(long delay) |
| | | public void setPurgeDelay(long purgeDelayInMillis) |
| | | { |
| | | final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB; |
| | | if (cnIndexDB != null) |
| | | this.purgeDelayInMillis = purgeDelayInMillis; |
| | | final ChangelogDBPurger purger; |
| | | if (purgeDelayInMillis > 0) |
| | | { |
| | | cnIndexDB.setPurgeDelay(delay); |
| | | purger = new ChangelogDBPurger(); |
| | | if (cnPurger.compareAndSet(null, purger)) |
| | | { |
| | | purger.start(); |
| | | } // otherwise a purger was already running |
| | | } |
| | | for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | else |
| | | { |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | replicaDB.setPurgeDelay(delay); |
| | | purger.initiateShutdown(); |
| | | } |
| | | } |
| | | } |
| | |
| | | public void setComputeChangeNumber(boolean computeChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer; |
| | | if (computeChangeNumber) |
| | | { |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | indexer = new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | startIndexer(dbEnv.readChangelogState()); |
| | | } |
| | | else |
| | | { |
| | | indexer = cnIndexer.getAndSet(null); |
| | | final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void startIndexer(final ChangelogState changelogState) |
| | | { |
| | | final ChangeNumberIndexer indexer = |
| | | new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | long latest = 0; |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | if (latest == 0 || latest < replicaDB.getLatestTrimDate()) |
| | | { |
| | | latest = replicaDB.getLatestTrimDate(); |
| | | } |
| | | } |
| | | return latest; |
| | | return latestPurgeDate; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDB getChangeNumberIndexDB() |
| | | { |
| | | return getChangeNumberIndexDB(true); |
| | | } |
| | | |
| | | /** |
| | | * Returns the {@link ChangeNumberIndexDB} object. |
| | | * |
| | | * @param startTrimmingThread |
| | | * whether the trimming thread should be started |
| | | * @return the {@link ChangeNumberIndexDB} object |
| | | */ |
| | | ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread) |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | | if (cnIndexDB == null) |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv); |
| | | if (startTrimmingThread) |
| | | { |
| | | cnIndexDB.startTrimmingThread(); |
| | | } |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | // TODO save this state in the changelogStateDB? |
| | | } |
| | | |
| | | /** |
| | | * The thread purging the changelogDB on a regular interval. Records are |
| | | * purged from the changelogDB is they are older than a delay specified in |
| | | * seconds. The purge process works in two steps: |
| | | * <ol> |
| | | * <li>first purge the changeNumberIndexDB and retrieve information to drive |
| | | * replicaDBs purging</li> |
| | | * <li>proceed to purge each replicaDBs based on the information collected |
| | | * when purging the changeNumberIndexDB</li> |
| | | * </ol> |
| | | */ |
| | | private final class ChangelogDBPurger extends DirectoryThread |
| | | { |
| | | |
| | | protected ChangelogDBPurger() |
| | | { |
| | | super("changelog DB purger"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | // initialize CNIndexDB |
| | | getChangeNumberIndexDB(); |
| | | while (!isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | | final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; |
| | | if (localCNIndexDB == null) |
| | | { // shutdown has been called |
| | | return; |
| | | } |
| | | |
| | | final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; |
| | | final MultiDomainServerState purgeUpToCookie = |
| | | localCNIndexDB.purgeUpTo(purgeTimestamp); |
| | | if (purgeUpToCookie == null) |
| | | { // this can happen when the change number index DB is empty |
| | | continue; |
| | | } |
| | | |
| | | /* |
| | | * Drive purge of the replica DBs by the oldest non purged cookie in |
| | | * the change number index DB. |
| | | */ |
| | | for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1 |
| | | : domainToReplicaDBs.entrySet()) |
| | | { |
| | | final DN baseDN = entry1.getKey(); |
| | | final Map<Integer, JEReplicaDB> domainMap = entry1.getValue(); |
| | | for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet()) |
| | | { |
| | | final Integer serverId = entry2.getKey(); |
| | | final JEReplicaDB replicaDB = entry2.getValue(); |
| | | replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId)); |
| | | } |
| | | } |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | // purge delay is specified in seconds so it should not be a problem |
| | | // to sleep for 500 millis |
| | | sleep(500); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(e))); |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | import org.opends.server.types.Attributes; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | |
| | | * <p> |
| | | * This blocking queue is only used as a temporary placeholder so that the |
| | | * write in the stable storage can be grouped for efficiency reason. Adding an |
| | | * update synchronously add the update to this list. A dedicated thread loops |
| | | * on {@link #flush()} and {@link #trim()}. |
| | | * <dl> |
| | | * <dt>flush()</dt> |
| | | * <dd>get a number of changes from the in memory list by block and write them |
| | | * to the db.</dd> |
| | | * <dt>trim()</dt> |
| | | * <dd>deletes from the DB a number of changes that are older than a certain |
| | | * date.</dd> |
| | | * </dl> |
| | | * update synchronously add the update to this list. A dedicated thread |
| | | * flushes this blocking queue. |
| | | * <p> |
| | | * Changes are not read back by replicationServer threads that are responsible |
| | | * for pushing the changes to other replication server or to LDAP server |
| | |
| | | private DbMonitorProvider dbMonitor = new DbMonitorProvider(); |
| | | private DirectoryThread thread; |
| | | /** |
| | | * Used to prevent race conditions between threads calling {@link #clear()} |
| | | * {@link #flush()} or {@link #trim()}. This can happen with the thread |
| | | * flushing the queue, on shutdown or on cursor opening, a thread calling |
| | | * clear(), etc. |
| | | * Used to prevent race conditions between threads calling {@link #flush()}. |
| | | * This can happen with the thread flushing the queue, or else on shutdown. |
| | | */ |
| | | private final Object flushLock = new Object(); |
| | | private ReplicationServer replicationServer; |
| | | |
| | | private long latestTrimDate = 0; |
| | | |
| | | /** |
| | | * The trim age in milliseconds. Changes record in the change DB that |
| | | * are older than this age are removed. |
| | | */ |
| | | private long trimAge; |
| | | |
| | | /** |
| | | * Creates a new ReplicaDB associated to a given LDAP server. |
| | | * |
| | |
| | | this.replicationServer = replicationServer; |
| | | this.serverId = serverId; |
| | | this.baseDN = baseDN; |
| | | trimAge = replicationServer.getTrimAge(); |
| | | queueMaxBytes = replicationServer.getQueueSize() * 200; |
| | | queueSizeBytes = new Semaphore(queueMaxBytes); |
| | | db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); |
| | | csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); |
| | | thread = new DirectoryThread(this, "Replication server RS(" |
| | | + replicationServer.getServerId() |
| | | + ") changelog checkpointer for Replica DS(" + serverId |
| | | + ") flusher thread for Replica DS(" + serverId |
| | | + ") for domain \"" + baseDN + "\""); |
| | | thread.start(); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Run method for this class. |
| | | * Periodically Flushes the ReplicationServerDomain cache from memory to the |
| | | * stable storage and trims the old updates. |
| | | * Flushes the replicaDB queue from memory to stable storage. |
| | | */ |
| | | @Override |
| | | public void run() |
| | |
| | | try |
| | | { |
| | | flush(); |
| | | trim(); |
| | | } |
| | | catch (ChangelogException end) |
| | | { |
| | |
| | | } |
| | | |
| | | /** |
| | | * Retrieves the latest trim date. |
| | | * @return the latest trim date. |
| | | * Synchronously purge changes older than purgeCSN from this replicaDB. |
| | | * |
| | | * @param purgeCSN |
| | | * The CSN up to which changes can be purged. No purging happens when |
| | | * it is null. |
| | | * @throws ChangelogException |
| | | * In case of database problem. |
| | | */ |
| | | public long getLatestTrimDate() |
| | | void purgeUpTo(final CSN purgeCSN) throws ChangelogException |
| | | { |
| | | return latestTrimDate; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Trim old changes from this replicationServer database. |
| | | * @throws ChangelogException In case of database problem. |
| | | */ |
| | | private void trim() throws ChangelogException |
| | | { |
| | | if (trimAge == 0) |
| | | if (purgeCSN == null) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | latestTrimDate = TimeThread.getTime() - trimAge; |
| | | |
| | | CSN trimDate = new CSN(latestTrimDate, 0, 0); |
| | | |
| | | // Find the last CSN before the trimDate, in the Database. |
| | | CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate); |
| | | if (lastBeforeTrimDate != null) |
| | | { |
| | | // If we found it, we want to stop trimming when reaching it. |
| | | trimDate = lastBeforeTrimDate; |
| | | } |
| | | |
| | | for (int i = 0; i < 100; i++) |
| | | { |
| | | /* |
| | | * Perform at least some trimming regardless of the flush backlog. Then |
| | | * continue trim iterations while the flush backlog is low (below the |
| | | * lowmark). Once the flush backlog increases, stop trimming and start |
| | | * flushing more eagerly. |
| | | */ |
| | | if (i > 20 && isQueueAboveLowMark()) |
| | | { |
| | | break; |
| | | } |
| | | |
| | | /* |
| | | * the trim is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then |
| | | * commit. |
| | | * the purge is done by group in order to save some CPU, IO bandwidth and |
| | | * DB caches: start the transaction then do a bunch of remove then commit. |
| | | */ |
| | | /* |
| | | * Matt wrote: The record removal is done as a DB transaction and the |
| | |
| | | return; |
| | | } |
| | | |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) |
| | | if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN)) |
| | | { |
| | | cursor.delete(); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private boolean isQueueAboveLowMark() |
| | | { |
| | | final int lowMarkBytes = queueMaxBytes / 5; |
| | | final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits(); |
| | | return bytesUsed > lowMarkBytes; |
| | | } |
| | | |
| | | /** |
| | | * Flush a number of updates from the memory list to the stable storage. |
| | | * <p> |
| | | * Flush is done by chunk sized to 500 messages, starting from the beginning |
| | | * of the list. |
| | | * |
| | | * <p> |
| | | * @GuardedBy("flushLock") |
| | | * @throws ChangelogException |
| | | * If a database problem happened |
| | | */ |
| | | public void flush() throws ChangelogException |
| | | private void flush() throws ChangelogException |
| | | { |
| | | try |
| | | { |
| | | synchronized (flushLock) |
| | | { |
| | | final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS); |
| | | final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS); |
| | | if (change == null) |
| | | { |
| | | // nothing to persist, move on to the trim phase |
| | | // nothing to persist, check if shutdown was invoked |
| | | return; |
| | | } |
| | | |
| | | // Try to see if there are more changes and persist them all. |
| | | final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); |
| | | changes.add(change); |
| | | msgQueue.drainTo(changes); |
| | | |
| | |
| | | } |
| | | |
| | | /** |
| | | * Set the Purge delay for this db Handler. |
| | | * @param delay The purge delay in Milliseconds. |
| | | */ |
| | | public void setPurgeDelay(long delay) |
| | | { |
| | | trimAge = delay; |
| | | } |
| | | |
| | | /** |
| | | * Clear the changes from this DB (from both memory cache and DB storage). |
| | | * @throws ChangelogException When an exception occurs while removing the |
| | | * changes from the DB. |
| | |
| | | } |
| | | |
| | | /** |
| | | * Return the size of the msgQueue (the memory cache of the ReplicaDB). |
| | | * Return the number of records of this replicaDB. |
| | | * <p> |
| | | * For test purpose. |
| | | * @return The memory queue size. |
| | | * |
| | | * @return The number of records of this replicaDB. |
| | | */ |
| | | int getQueueSize() |
| | | long getNumberRecords() |
| | | { |
| | | return this.msgQueue.size(); |
| | | return db.getNumberRecords(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * |
| | | * |
| | | * Copyright 2006-2009 Sun Microsystems, Inc. |
| | | * Portions Copyright 2011-2013 ForgeRock AS |
| | | * Portions Copyright 2011-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | |
| | | // we didn't find it in the db |
| | | cursor = null; |
| | | } |
| | | |
| | | if (cursor == null) |
| | | { |
| | | // flush the queue into the db |
| | | replicaDB.flush(); |
| | | |
| | | // look again in the db |
| | | cursor = db.openReadCursor(startAfterCSN); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | public boolean next() throws ChangelogException |
| | | { |
| | | final ReplServerDBCursor localCursor = cursor; |
| | | if (localCursor != null) |
| | | { |
| | | currentChange = localCursor.next(); |
| | | } |
| | | else |
| | | { |
| | | currentChange = null; |
| | | } |
| | | |
| | | currentChange = localCursor != null ? localCursor.next() : null; |
| | | |
| | | if (currentChange != null) |
| | | { |
| | |
| | | { |
| | | synchronized (this) |
| | | { |
| | | if (cursor != null) |
| | | { |
| | | cursor.close(); |
| | | cursor = null; |
| | | } |
| | | replicaDB.flush(); |
| | | closeCursor(); |
| | | // previously exhausted cursor must be able to reinitialize themselves |
| | | cursor = db.openReadCursor(lastNonNullCurrentCSN); |
| | | currentChange = cursor.next(); |
| | | if (currentChange != null) |
| | |
| | | { |
| | | synchronized (this) |
| | | { |
| | | closeCursor(); |
| | | this.replicaDB = null; |
| | | } |
| | | } |
| | | |
| | | private void closeCursor() |
| | | { |
| | | if (cursor != null) |
| | | { |
| | | cursor.close(); |
| | | cursor = null; |
| | | } |
| | | this.replicaDB = null; |
| | | this.db = null; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | return db == null || !db.getEnvironment().isValid(); |
| | | } |
| | | |
| | | /** |
| | | * Returns the number of records in this DB. |
| | | * |
| | | * @return the number of records in this DB. |
| | | */ |
| | | long getNumberRecords() |
| | | { |
| | | return db.count(); |
| | | } |
| | | } |
| | |
| | | import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; |
| | | import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; |
| | | import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; |
| | | import org.testng.annotations.*; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.assertj.core.api.Assertions.*; |
| | | import static org.opends.messages.ReplicationMessages.*; |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"}) |
| | | public void ECLReplicationServerTest() throws Exception |
| | | { |
| | | getCNIndexDB().setPurgeDelay(0); |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // let's enable ECl manually now that we tested that ECl is not available |
| | | ECLWorkflowElement wfe = |
| | | (ECLWorkflowElement) DirectoryServer |
| | |
| | | @Test(enabled=false, dependsOnMethods = { "ECLReplicationServerTest"}) |
| | | public void ECLReplicationServerTest1() throws Exception |
| | | { |
| | | getCNIndexDB().setPurgeDelay(0); |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Test with a mix of domains, a mix of DSes |
| | | ECLTwoDomains(); |
| | | } |
| | |
| | | @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"}) |
| | | public void ECLReplicationServerTest3() throws Exception |
| | | { |
| | | getCNIndexDB().setPurgeDelay(0); |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Write changes and read ECL from start |
| | | ECLCompatWriteReadAllOps(1); |
| | | |
| | |
| | | @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) |
| | | public void ECLReplicationServerFullTest3() throws Exception |
| | | { |
| | | getCNIndexDB().setPurgeDelay(0); |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Test all types of ops. |
| | | ECLAllOps(); // Do not clean the db for the next test |
| | | |
| | |
| | | @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) |
| | | public void ECLReplicationServerFullTest15() throws Exception |
| | | { |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); |
| | | cnIndexDB.setPurgeDelay(0); |
| | | replicationServer.getChangelogDB().setPurgeDelay(0); |
| | | // Write 4 changes and read ECL from start |
| | | ECLCompatWriteReadAllOps(1); |
| | | |
| | |
| | | ECLCompatTestLimitsAndAdd(1, 8, 4); |
| | | |
| | | // Test CNIndexDB is purged when replication change log is purged |
| | | cnIndexDB.setPurgeDelay(1); |
| | | cnIndexDB.trim(null); |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); |
| | | cnIndexDB.purgeUpTo(Long.MAX_VALUE); |
| | | assertTrue(cnIndexDB.isEmpty()); |
| | | ECLPurgeCNIndexDBAfterChangelogClear(); |
| | | |
| | | // Test first and last are updated |
| | |
| | | null); |
| | | cnt++; |
| | | } |
| | | while (cnt < 100 // wait at most 1s |
| | | while (cnt < 300 // wait at most 3s |
| | | && op.getSearchEntries().size() != expectedNbEntries); |
| | | final List<SearchResultEntry> entries = op.getSearchEntries(); |
| | | assertThat(entries).hasSize(expectedNbEntries); |
| | |
| | | clearChangelogDB(replicationServer); |
| | | } |
| | | |
| | | @AfterTest |
| | | public void setPurgeDelayToInitialValue() throws Exception |
| | | { |
| | | JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); |
| | | if (cnIndexDB != null) |
| | | { |
| | | cnIndexDB.setPurgeDelay(1); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * After the tests stop the replicationServer. |
| | | */ |
| | |
| | | String tn = "ECLPurgeCNIndexDBAfterChangelogClear"; |
| | | debugInfo(tn, "Starting test\n\n"); |
| | | |
| | | JEChangeNumberIndexDB cnIndexDB = |
| | | (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); |
| | | assertEquals(cnIndexDB.count(), 8); |
| | | cnIndexDB.setPurgeDelay(1000); |
| | | replicationServer.getChangelogDB().setPurgeDelay(1000); |
| | | |
| | | clearChangelogDB(replicationServer); |
| | | |
| | |
| | | |
| | | private JEChangeNumberIndexDB getCNIndexDB() |
| | | { |
| | | if (replicationServer != null) |
| | | { |
| | | return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * Test ECl entry attributes, and there configuration. |
| | |
| | | { |
| | | final ReplicatedUpdateMsg msg = msgs[i]; |
| | | final ChangeNumberIndexRecord record = allValues.get(i); |
| | | if (previousCookie.isEmpty()) |
| | | { |
| | | // ugly hack to go round strange legacy code @see OPENDJ-67 |
| | | previousCookie.replace(record.getBaseDN(), new ServerState()); |
| | | } |
| | | // check content in order |
| | | String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">"; |
| | | assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN()); |
| | |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.util.ArrayList; |
| | | import java.util.List; |
| | | |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.replication.ReplicationTestCase; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.server.ReplServerFakeConfiguration; |
| | | import org.opends.server.replication.server.ReplicationServer; |
| | | import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogDB; |
| | | import org.opends.server.replication.server.changelog.api.ChangelogException; |
| | | import org.opends.server.replication.server.changelog.api.DBCursor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.Test; |
| | | |
| | | import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*; |
| | |
| | | @SuppressWarnings("javadoc") |
| | | public class JEChangeNumberIndexDBTest extends ReplicationTestCase |
| | | { |
| | | private static final String value1 = "value1"; |
| | | private static final String value2 = "value2"; |
| | | private static final String value3 = "value3"; |
| | | private final MultiDomainServerState previousCookie = |
| | | new MultiDomainServerState(); |
| | | private final List<String> cookies = new ArrayList<String>(); |
| | | |
| | | @BeforeMethod |
| | | public void clearCookie() |
| | | { |
| | | previousCookie.clear(); |
| | | cookies.clear(); |
| | | } |
| | | |
| | | /** |
| | | * This test makes basic operations of a JEChangeNumberIndexDB: |
| | |
| | | void testTrim() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEChangeNumberIndexDB cnIndexDB = null; |
| | | try |
| | | { |
| | | replicationServer = newReplicationServer(); |
| | | cnIndexDB = getCNIndexDBNoTrimming(replicationServer); |
| | | cnIndexDB.setPurgeDelay(0); |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | changelogDB.setPurgeDelay(0); // disable purging |
| | | |
| | | // Prepare data to be stored in the db |
| | | DN baseDN1 = DN.decode("o=baseDN1"); |
| | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]); |
| | | addRecord(cnIndexDB, value2, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]); |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | | long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); |
| | | addRecord(cnIndexDB, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); |
| | | |
| | | // The ChangeNumber should not get purged |
| | | final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber(); |
| | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN); |
| | | try |
| | | { |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1); |
| | | assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0)); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2); |
| | | assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1)); |
| | | assertTrue(cursor.next()); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3); |
| | | assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2)); |
| | | assertFalse(cursor.next()); |
| | | } |
| | | finally |
| | |
| | | StaticUtils.close(cursor); |
| | | } |
| | | |
| | | // Now test that the trimming thread does its job => start it |
| | | cnIndexDB.setPurgeDelay(100); |
| | | cnIndexDB.startTrimmingThread(); |
| | | |
| | | // Check the db is cleared. |
| | | while (cnIndexDB.count() > 1) |
| | | // Now test that purging removes all changes bar the last one |
| | | changelogDB.setPurgeDelay(1); |
| | | int count = 0; |
| | | while (cnIndexDB.count() > 1 && count < 100) |
| | | { |
| | | Thread.yield(); |
| | | Thread.sleep(10); |
| | | count++; |
| | | } |
| | | assertOnlyNewestRecordIsLeft(cnIndexDB, 3); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private long addRecord(JEChangeNumberIndexDB cnIndexDB, String cookie, DN baseDN, CSN csn) |
| | | throws ChangelogException |
| | | private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException |
| | | { |
| | | return cnIndexDB.addRecord(new ChangeNumberIndexRecord(cookie, baseDN, csn)); |
| | | final String cookie = previousCookie.toString(); |
| | | cookies.add(cookie); |
| | | final long changeNumber = cnIndexDB.addRecord( |
| | | new ChangeNumberIndexRecord(cookie, baseDN, csn)); |
| | | previousCookie.update(baseDN, csn); |
| | | return changeNumber; |
| | | } |
| | | |
| | | private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie) |
| | |
| | | assertEquals(record.getPreviousCookie(), cookie); |
| | | } |
| | | |
| | | private JEChangeNumberIndexDB getCNIndexDBNoTrimming(ReplicationServer rs) throws ChangelogException |
| | | private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException |
| | | { |
| | | final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB(); |
| | | final JEChangeNumberIndexDB cnIndexDB = |
| | | (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(false); |
| | | (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(); |
| | | assertTrue(cnIndexDB.isEmpty()); |
| | | return cnIndexDB; |
| | | } |
| | |
| | | void testClear() throws Exception |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | JEChangeNumberIndexDB cnIndexDB = null; |
| | | try |
| | | { |
| | | replicationServer = newReplicationServer(); |
| | | cnIndexDB = getCNIndexDBNoTrimming(replicationServer); |
| | | cnIndexDB.setPurgeDelay(0); |
| | | final ChangelogDB changelogDB = replicationServer.getChangelogDB(); |
| | | changelogDB.setPurgeDelay(0); |
| | | |
| | | // Prepare data to be stored in the db |
| | | |
| | |
| | | CSN[] csns = newCSNs(1, 0, 3); |
| | | |
| | | // Add records |
| | | long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]); |
| | | long cn2 = addRecord(cnIndexDB, value2, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]); |
| | | final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); |
| | | long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); |
| | | long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]); |
| | | long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); |
| | | |
| | | // Checks |
| | | assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1); |
| | |
| | | assertEquals(cnIndexDB.count(), 3, "Db count"); |
| | | assertFalse(cnIndexDB.isEmpty()); |
| | | |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn1), value1); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn2), value2); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn3), value3); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0)); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1)); |
| | | assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2)); |
| | | |
| | | DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1); |
| | | assertCursorReadsInOrder(cursor, cn1, cn2, cn3); |
| | |
| | | cursor = cnIndexDB.getCursorFrom(cn3); |
| | | assertCursorReadsInOrder(cursor, cn3); |
| | | |
| | | cnIndexDB.clear(null); |
| | | cnIndexDB.removeDomain(null); |
| | | assertOnlyNewestRecordIsLeft(cnIndexDB, 3); |
| | | |
| | | // Check the db is cleared. |
| | |
| | | |
| | | //-- |
| | | // Iterator tests with changes persisted |
| | | waitChangesArePersisted(replicaDB); |
| | | waitChangesArePersisted(replicaDB, 3); |
| | | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | |
| | | //-- |
| | | // Cursor tests with changes persisted |
| | | replicaDB.add(update4); |
| | | waitChangesArePersisted(replicaDB); |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]); |
| | | // Test cursor from existing CSN |
| | |
| | | assertFoundInOrder(replicaDB, csns[3]); |
| | | assertNotFound(replicaDB, csns[4]); |
| | | |
| | | replicaDB.setPurgeDelay(1); |
| | | replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); |
| | | |
| | | int count = 0; |
| | | boolean purgeSucceeded = false; |
| | |
| | | } |
| | | } |
| | | |
| | | private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception |
| | | private void waitChangesArePersisted(JEReplicaDB replicaDB, |
| | | int nbRecordsInserted) throws Exception |
| | | { |
| | | final int expected = 0; |
| | | waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000); |
| | | } |
| | | |
| | | private void waitChangesArePersisted(JEReplicaDB replicaDB, |
| | | int nbRecordsInserted, int counterWindow) throws Exception |
| | | { |
| | | // one counter record is inserted every time "counterWindow" |
| | | // records have been inserted |
| | | int expectedNbRecords = |
| | | nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow; |
| | | |
| | | int count = 0; |
| | | while (replicaDB.getQueueSize() != expected && count < 100) |
| | | while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100) |
| | | { |
| | | Thread.sleep(10); |
| | | count++; |
| | | } |
| | | assertEquals(replicaDB.getQueueSize(), expected); |
| | | assertEquals(replicaDB.getNumberRecords(), expectedNbRecords); |
| | | } |
| | | |
| | | static CSN[] newCSNs(int serverId, long timestamp, int number) |
| | |
| | | assertNull(cursor.getRecord()); |
| | | for (int i = 1; i < csns.length; i++) |
| | | { |
| | | assertTrue(cursor.next()); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[i]); |
| | | final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); |
| | | assertTrue(cursor.next(), msg); |
| | | assertEquals(cursor.getRecord().getCSN(), csns[i], msg); |
| | | } |
| | | assertFalse(cursor.next()); |
| | | assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord() |
| | |
| | | { |
| | | ReplicationServer replicationServer = null; |
| | | DBCursor<UpdateMsg> cursor = null; |
| | | JEReplicaDB replicaDB = null; |
| | | try |
| | | { |
| | | TestCaseUtils.startServer(); |
| | | replicationServer = configureReplicationServer(100000, 10); |
| | | JEReplicaDB replicaDB = newReplicaDB(replicationServer); |
| | | replicaDB = newReplicaDB(replicationServer); |
| | | |
| | | CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6); |
| | | for (int i = 0; i < 5; i++) |
| | |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | } |
| | | } |
| | | replicaDB.flush(); |
| | | waitChangesArePersisted(replicaDB, 4); |
| | | |
| | | cursor = replicaDB.generateCursorFrom(csns[0]); |
| | | assertTrue(cursor.next()); |
| | |
| | | finally |
| | | { |
| | | StaticUtils.close(cursor); |
| | | if (replicaDB != null) |
| | | replicaDB.shutdown(); |
| | | remove(replicationServer); |
| | | } |
| | | } |
| | |
| | | testGetOldestNewestCSNs(4000, 1000); |
| | | } |
| | | |
| | | private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception |
| | | private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception |
| | | { |
| | | String tn = "testDBCount("+max+","+counterWindow+")"; |
| | | debugInfo(tn, "Starting test"); |
| | |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | mySeqnum+=2; |
| | | } |
| | | replicaDB.flush(); |
| | | waitChangesArePersisted(replicaDB, max, counterWindow); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); |
| | |
| | | replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); |
| | | mySeqnum+=2; |
| | | } |
| | | replicaDB.flush(); |
| | | waitChangesArePersisted(replicaDB, 2 * max, counterWindow); |
| | | |
| | | assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); |
| | | assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); |
| | | |
| | | // |
| | | |
| | | replicaDB.setPurgeDelay(100); |
| | | sleep(1000); |
| | | replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); |
| | | |
| | | String testcase = "AFTER PURGE (oldest, newest)="; |
| | | debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN()); |