opends/src/server/org/opends/server/replication/server/ReplicationServer.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server; @@ -722,7 +722,7 @@ * @return The time after which changes must be deleted from the * persistent storage (in milliseconds). */ public long getTrimAge() public long getPurgeDelay() { return this.config.getReplicationPurgeDelay() * 1000; } @@ -780,7 +780,7 @@ final long newPurgeDelay = config.getReplicationPurgeDelay(); if (newPurgeDelay != oldConfig.getReplicationPurgeDelay()) { this.changelogDB.setPurgeDelay(getTrimAge()); this.changelogDB.setPurgeDelay(getPurgeDelay()); } final boolean computeCN = config.isComputeChangeNumber(); if (computeCN != oldConfig.isComputeChangeNumber()) opends/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexer.java
@@ -487,13 +487,6 @@ // OK, the oldest change is older than the medium consistency point // let's publish it to the CNIndexDB. // Next if statement is ugly but ensures the first change will not be // immediately trimmed from the CNIndexDB. Yuck! if (mediumConsistencyRUV.isEmpty()) { mediumConsistencyRUV.replace(baseDN, new ServerState()); } final String previousCookie = mediumConsistencyRUV.toString(); final ChangeNumberIndexRecord record = new ChangeNumberIndexRecord(previousCookie, baseDN, csn); @@ -620,20 +613,21 @@ } /** * Asks the current thread to clear its state. * Asks the current thread to clear its state and blocks until state is * cleared. * <p> * This method is only useful for unit tests. */ public void clear() { doClear.set(true); synchronized (this) while (doClear.get() && !State.TERMINATED.equals(getState())) { notify(); } while (doClear.get()) { // wait until clear() has been done by thread // wait until clear() has been done by thread, always waking it up synchronized (this) { notify(); } // ensures unit tests wait that this thread's state is cleaned up Thread.yield(); } opends/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDB.java
@@ -28,29 +28,21 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.*; import org.opends.server.replication.server.changelog.je.DraftCNDB.*; import org.opends.server.types.*; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.util.StaticUtils.*; /** * This class is used for managing the replicationServer database for each @@ -62,7 +54,7 @@ * This class publishes some monitoring information below <code> * cn=monitor</code>. */ public class JEChangeNumberIndexDB implements ChangeNumberIndexDB, Runnable public class JEChangeNumberIndexDB implements ChangeNumberIndexDB { /** * The tracer object for the debug logger. @@ -74,7 +66,7 @@ /** FIXME What is this field used for? */ private volatile long oldestChangeNumber = NO_KEY; /** * The newest changenumber stored in the DB. It is used to avoid trimming the * The newest changenumber stored in the DB. It is used to avoid purging the * record with the newest changenumber. The newest record in the changenumber * index DB is used to persist the {@link #lastGeneratedChangeNumber} which is * then retrieved on server startup. @@ -86,48 +78,25 @@ * condition between: * <ol> * <li>this atomic long being incremented for a new record ('recordB')</li> * <li>the current newest record ('recordA') being trimmed from the DB</li> * <li>the current newest record ('recordA') being purged from the DB</li> * <li>'recordB' failing to be inserted in the DB</li> * </ol> */ private final AtomicLong lastGeneratedChangeNumber; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private final AtomicBoolean shutdown = new AtomicBoolean(false); /** * A dedicated thread loops trim(). * <p> * trim() : deletes from the DB a number of changes that are older than a * certain date. */ private DirectoryThread trimmingThread; /** * The trim age in milliseconds. Changes record in the change DB that are * older than this age are removed. * <p> * FIXME it never gets updated even when the replication server purge delay is * updated */ private volatile long trimAge; private ReplicationServer replicationServer; /** * Creates a new JEChangeNumberIndexDB associated to a given LDAP server. * * @param replicationServer The ReplicationServer that creates this instance. * @param dbenv the Database Env to use to create the ReplicationServer DB. * @param dbEnv the Database Env to use to create the ReplicationServer DB. * server for this domain. * @throws ChangelogException If a database problem happened */ public JEChangeNumberIndexDB(ReplicationServer replicationServer, ReplicationDbEnv dbenv) throws ChangelogException public JEChangeNumberIndexDB(ReplicationDbEnv dbEnv) throws ChangelogException { this.replicationServer = replicationServer; this.trimAge = replicationServer.getTrimAge(); // DB initialization db = new DraftCNDB(dbenv); db = new DraftCNDB(dbEnv); final ChangeNumberIndexRecord oldestRecord = db.readFirstRecord(); final ChangeNumberIndexRecord newestRecord = db.readLastRecord(); oldestChangeNumber = getChangeNumber(oldestRecord); @@ -142,16 +111,6 @@ DirectoryServer.registerMonitorProvider(dbMonitor); } /** * Creates and starts the thread trimming the CNIndexDB. */ public void startTrimmingThread() { trimmingThread = new DirectoryThread(this, "Replication ChangeNumberIndexDB Trimmer"); trimmingThread.start(); } private long getChangeNumber(ChangeNumberIndexRecord record) throws ChangelogException { @@ -251,77 +210,82 @@ notifyAll(); } if (trimmingThread != null) { try { trimmingThread.join(); } catch (InterruptedException ignored) { // Nothing can be done about it, just proceed } } db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } /** * Run method for this class. * Periodically Flushes the ReplicationServerDomain cache from memory to the * stable storage and trims the old updates. * Synchronously purges the change number index DB up to and excluding the * provided timestamp. * * @param purgeTimestamp * the timestamp up to which purging must happen * @return the {@link MultiDomainServerState} object that drives purging the * replicaDBs. * @throws ChangelogException * if a database problem occurs. */ @Override public void run() public MultiDomainServerState purgeUpTo(long purgeTimestamp) throws ChangelogException { while (!shutdown.get()) if (isEmpty()) { try { trim(shutdown); return null; } synchronized (this) { if (!shutdown.get()) { try { wait(1000); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } } } catch (Exception end) final CSN purgeCSN = new CSN(purgeTimestamp, 0, 0); final DraftCNDBCursor cursor = db.openDeleteCursor(); try { while (!mustShutdown(shutdown) && cursor.next()) { logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(end))); if (replicationServer != null) final ChangeNumberIndexRecord record = cursor.currentRecord(); if (record.getChangeNumber() != oldestChangeNumber) { replicationServer.shutdown(); oldestChangeNumber = record.getChangeNumber(); } break; if (record.getChangeNumber() == newestChangeNumber) { // do not purge the newest record to avoid having the last generated // changenumber dropping back to 0 if the server restarts return getPurgeCookie(record); } if (record.getCSN().isOlderThan(purgeCSN)) { cursor.delete(); } else { // Current record is not old enough to purge. return getPurgeCookie(record); } } return null; } catch (ChangelogException e) { cursor.abort(); throw e; } catch (Exception e) { cursor.abort(); throw new ChangelogException(e); } finally { cursor.close(); } } /** * Trim old changes from this database. * * @param shutdown * AtomicBoolean telling whether the current run must be stopped * @throws ChangelogException * In case of database problem. */ public void trim(AtomicBoolean shutdown) throws ChangelogException private MultiDomainServerState getPurgeCookie( final ChangeNumberIndexRecord record) throws DirectoryException { if (trimAge == 0) return; clear(null, shutdown); // Do not include the record's CSN to avoid having it purged return new MultiDomainServerState(record.getPreviousCookie()); } /** @@ -334,127 +298,49 @@ * @throws ChangelogException * if a database problem occurs. */ public void clear(DN baseDNToClear) throws ChangelogException { clear(baseDNToClear, null); } private void clear(DN baseDNToClear, AtomicBoolean shutdown) throws ChangelogException public void removeDomain(DN baseDNToClear) throws ChangelogException { if (isEmpty()) { return; } for (int i = 0; i < 100; i++) final DraftCNDBCursor cursor = db.openDeleteCursor(); try { if (mustShutdown(shutdown)) boolean isOldestRecord = true; while (!mustShutdown(shutdown) && cursor.next()) { return; } final DraftCNDBCursor cursor = db.openDeleteCursor(); try { for (int j = 0; j < 50; j++) final ChangeNumberIndexRecord record = cursor.currentRecord(); if (isOldestRecord && record.getChangeNumber() != oldestChangeNumber) { // let's traverse the CNIndexDB if (mustShutdown(shutdown) || !cursor.next()) { cursor.close(); return; } final ChangeNumberIndexRecord record = cursor.currentRecord(); if (record.getChangeNumber() != oldestChangeNumber) { oldestChangeNumber = record.getChangeNumber(); } if (record.getChangeNumber() == newestChangeNumber) { // do not trim the newest record to avoid having the last generated // changenumber dropping back to 0 if the server restarts cursor.close(); return; } if (baseDNToClear != null && baseDNToClear.equals(record.getBaseDN())) { cursor.delete(); continue; } final ReplicationServerDomain domain = replicationServer.getReplicationServerDomain(record.getBaseDN()); if (domain == null) { // the domain has been removed since the record was written in the // CNIndexDB, thus it makes no sense to keep this record in the DB. cursor.delete(); continue; } // FIXME there is an opportunity for a phantom record in the CNIndexDB // if the replicaDB gets purged after call to domain.getOldestState(). final CSN csn = record.getCSN(); final ServerState oldestState = domain.getOldestState(); final CSN fcsn = oldestState.getCSN(csn.getServerId()); if (csn.isOlderThan(fcsn)) { // This change which has already been purged from the corresponding // replicaDB => purge it from CNIndexDB cursor.delete(); continue; } ServerState csnVector; try { Map<DN, ServerState> csnStartStates = MultiDomainServerState.splitGenStateToServerStates( record.getPreviousCookie()); csnVector = csnStartStates.get(record.getBaseDN()); if (debugEnabled()) TRACER.debugInfo("JEChangeNumberIndexDB:clear() - ChangeVector:" + csnVector + " -- StartState:" + oldestState); } catch(Exception e) { // We could not parse the MultiDomainServerState from the record // FIXME this is quite an aggressive delete() cursor.delete(); continue; } if (csnVector == null || (csnVector.getCSN(csn.getServerId()) != null && !csnVector.cover(oldestState))) { cursor.delete(); if (debugEnabled()) TRACER.debugInfo("JEChangeNumberIndexDB:clear() - deleted " + csn + "Not covering startState"); continue; } oldestChangeNumber = record.getChangeNumber(); cursor.close(); } if (record.getChangeNumber() == newestChangeNumber) { // do not purge the newest record to avoid having the last generated // changenumber dropping back to 0 if the server restarts return; } cursor.close(); if (baseDNToClear == null || record.getBaseDN().equals(baseDNToClear)) { cursor.delete(); } else { isOldestRecord = false; } } catch (ChangelogException e) { cursor.abort(); throw e; } catch (Exception e) { cursor.abort(); throw new ChangelogException(e); } } catch (ChangelogException e) { cursor.abort(); throw e; } finally { cursor.close(); } } @@ -469,9 +355,7 @@ */ private class DbMonitorProvider extends MonitorProvider<MonitorProviderCfg> { /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public List<Attribute> getMonitorData() { @@ -509,18 +393,14 @@ return 0; } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public String getMonitorInstanceName() { return "ChangeNumber Index Database"; } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException,InitializationException @@ -529,9 +409,7 @@ } } /** * {@inheritDoc} */ /** {@inheritDoc} */ @Override public String toString() { @@ -540,15 +418,6 @@ } /** * Set the Purge delay for this db Handler. * @param delay The purge delay in Milliseconds. */ public void setPurgeDelay(long delay) { trimAge = delay; } /** * Clear the changes from this DB (from both memory cache and DB storage). * * @throws ChangelogException opends/src/server/org/opends/server/replication/server/changelog/je/JEChangelogDB.java
@@ -21,12 +21,13 @@ * CDDL HEADER END * * * Copyright 2013 ForgeRock AS * Copyright 2013-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.io.File; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -35,9 +36,11 @@ import org.opends.messages.Message; import org.opends.messages.MessageBuilder; import org.opends.server.admin.std.server.ReplicationServerCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.config.ConfigException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.common.ServerState; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ChangelogState; @@ -46,6 +49,7 @@ import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.util.StaticUtils; import org.opends.server.util.TimeThread; import com.forgerock.opendj.util.Pair; @@ -87,7 +91,7 @@ * The handler of the changelog database, the database stores the relation * between a change number and the associated cookie. * <p> * Guarded by cnIndexDBLock * @GuardedBy("cnIndexDBLock") */ private JEChangeNumberIndexDB cnIndexDB; private final AtomicReference<ChangeNumberIndexer> cnIndexer = @@ -96,6 +100,15 @@ /** Used for protecting {@link ChangeNumberIndexDB} related state. */ private final Object cnIndexDBLock = new Object(); /** * The purge delay (in milliseconds). Records in the changelog DB that are * older than this delay might be removed. */ private long purgeDelayInMillis; private final AtomicReference<ChangelogDBPurger> cnPurger = new AtomicReference<ChangelogDBPurger>(); private volatile long latestPurgeDate; /** The local replication server. */ private final ReplicationServer replicationServer; private AtomicBoolean shutdown = new AtomicBoolean(); @@ -312,13 +325,9 @@ initializeChangelogState(changelogState); if (config.isComputeChangeNumber()) { final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); } startIndexer(changelogState); } setPurgeDelay(replicationServer.getPurgeDelay()); } catch (ChangelogException e) { @@ -374,12 +383,17 @@ // - then throw the first encountered exception ChangelogException firstException = null; final ChangeNumberIndexer indexer = cnIndexer.get(); final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); if (indexer != null) { indexer.initiateShutdown(); cnIndexer.compareAndSet(indexer, null); } final ChangelogDBPurger purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); } try { shutdownCNIndexDB(); @@ -581,7 +595,7 @@ { try { cnIndexDB.clear(baseDN); cnIndexDB.removeDomain(baseDN); } catch (ChangelogException e) { @@ -607,7 +621,9 @@ firstException = e; } else if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } if (firstException != null) @@ -618,18 +634,24 @@ /** {@inheritDoc} */ @Override public void setPurgeDelay(long delay) public void setPurgeDelay(long purgeDelayInMillis) { final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB; if (cnIndexDB != null) this.purgeDelayInMillis = purgeDelayInMillis; final ChangelogDBPurger purger; if (purgeDelayInMillis > 0) { cnIndexDB.setPurgeDelay(delay); } for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) { for (JEReplicaDB replicaDB : domainMap.values()) purger = new ChangelogDBPurger(); if (cnPurger.compareAndSet(null, purger)) { replicaDB.setPurgeDelay(delay); purger.start(); } // otherwise a purger was already running } else { purger = cnPurger.getAndSet(null); if (purger != null) { purger.initiateShutdown(); } } } @@ -639,19 +661,13 @@ public void setComputeChangeNumber(boolean computeChangeNumber) throws ChangelogException { final ChangeNumberIndexer indexer; if (computeChangeNumber) { final ChangelogState changelogState = dbEnv.readChangelogState(); indexer = new ChangeNumberIndexer(this, changelogState); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); } startIndexer(dbEnv.readChangelogState()); } else { indexer = cnIndexer.getAndSet(null); final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); if (indexer != null) { indexer.initiateShutdown(); @@ -659,48 +675,34 @@ } } private void startIndexer(final ChangelogState changelogState) { final ChangeNumberIndexer indexer = new ChangeNumberIndexer(this, changelogState); if (cnIndexer.compareAndSet(null, indexer)) { indexer.start(); } } /** {@inheritDoc} */ @Override public long getDomainLatestTrimDate(DN baseDN) { long latest = 0; for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) { if (latest == 0 || latest < replicaDB.getLatestTrimDate()) { latest = replicaDB.getLatestTrimDate(); } } return latest; return latestPurgeDate; } /** {@inheritDoc} */ @Override public ChangeNumberIndexDB getChangeNumberIndexDB() { return getChangeNumberIndexDB(true); } /** * Returns the {@link ChangeNumberIndexDB} object. * * @param startTrimmingThread * whether the trimming thread should be started * @return the {@link ChangeNumberIndexDB} object */ ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread) { synchronized (cnIndexDBLock) { if (cnIndexDB == null) { try { cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv); if (startTrimmingThread) { cnIndexDB.startTrimmingThread(); } cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); } catch (Exception e) { @@ -830,4 +832,83 @@ } // TODO save this state in the changelogStateDB? } /** * The thread purging the changelogDB on a regular interval. Records are * purged from the changelogDB is they are older than a delay specified in * seconds. The purge process works in two steps: * <ol> * <li>first purge the changeNumberIndexDB and retrieve information to drive * replicaDBs purging</li> * <li>proceed to purge each replicaDBs based on the information collected * when purging the changeNumberIndexDB</li> * </ol> */ private final class ChangelogDBPurger extends DirectoryThread { protected ChangelogDBPurger() { super("changelog DB purger"); } /** {@inheritDoc} */ @Override public void run() { // initialize CNIndexDB getChangeNumberIndexDB(); while (!isShutdownInitiated()) { try { final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; if (localCNIndexDB == null) { // shutdown has been called return; } final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; final MultiDomainServerState purgeUpToCookie = localCNIndexDB.purgeUpTo(purgeTimestamp); if (purgeUpToCookie == null) { // this can happen when the change number index DB is empty continue; } /* * Drive purge of the replica DBs by the oldest non purged cookie in * the change number index DB. */ for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1 : domainToReplicaDBs.entrySet()) { final DN baseDN = entry1.getKey(); final Map<Integer, JEReplicaDB> domainMap = entry1.getValue(); for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet()) { final Integer serverId = entry2.getKey(); final JEReplicaDB replicaDB = entry2.getValue(); replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId)); } } latestPurgeDate = purgeTimestamp; // purge delay is specified in seconds so it should not be a problem // to sleep for 500 millis sleep(500); } catch (Exception e) { logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); if (replicationServer != null) { replicationServer.shutdown(); } } } } } } opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDB.java
@@ -50,7 +50,6 @@ import org.opends.server.types.Attributes; import org.opends.server.types.DN; import org.opends.server.types.InitializationException; import org.opends.server.util.TimeThread; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; @@ -95,16 +94,8 @@ * <p> * This blocking queue is only used as a temporary placeholder so that the * write in the stable storage can be grouped for efficiency reason. Adding an * update synchronously add the update to this list. A dedicated thread loops * on {@link #flush()} and {@link #trim()}. * <dl> * <dt>flush()</dt> * <dd>get a number of changes from the in memory list by block and write them * to the db.</dd> * <dt>trim()</dt> * <dd>deletes from the DB a number of changes that are older than a certain * date.</dd> * </dl> * update synchronously add the update to this list. A dedicated thread * flushes this blocking queue. * <p> * Changes are not read back by replicationServer threads that are responsible * for pushing the changes to other replication server or to LDAP server @@ -133,22 +124,12 @@ private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private DirectoryThread thread; /** * Used to prevent race conditions between threads calling {@link #clear()} * {@link #flush()} or {@link #trim()}. This can happen with the thread * flushing the queue, on shutdown or on cursor opening, a thread calling * clear(), etc. * Used to prevent race conditions between threads calling {@link #flush()}. * This can happen with the thread flushing the queue, or else on shutdown. */ private final Object flushLock = new Object(); private ReplicationServer replicationServer; private long latestTrimDate = 0; /** * The trim age in milliseconds. Changes record in the change DB that * are older than this age are removed. */ private long trimAge; /** * Creates a new ReplicaDB associated to a given LDAP server. * @@ -166,15 +147,14 @@ this.replicationServer = replicationServer; this.serverId = serverId; this.baseDN = baseDN; trimAge = replicationServer.getTrimAge(); queueMaxBytes = replicationServer.getQueueSize() * 200; queueSizeBytes = new Semaphore(queueMaxBytes); db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); thread = new DirectoryThread(this, "Replication server RS(" + replicationServer.getServerId() + ") changelog checkpointer for Replica DS(" + serverId + ") for domain \"" + baseDN + "\""); + replicationServer.getServerId() + ") flusher thread for Replica DS(" + serverId + ") for domain \"" + baseDN + "\""); thread.start(); DirectoryServer.deregisterMonitorProvider(dbMonitor); @@ -334,9 +314,7 @@ } /** * Run method for this class. * Periodically Flushes the ReplicationServerDomain cache from memory to the * stable storage and trims the old updates. * Flushes the replicaDB queue from memory to stable storage. */ @Override public void run() @@ -350,7 +328,6 @@ try { flush(); trim(); } catch (ChangelogException end) { @@ -390,55 +367,26 @@ } /** * Retrieves the latest trim date. * @return the latest trim date. * Synchronously purge changes older than purgeCSN from this replicaDB. * * @param purgeCSN * The CSN up to which changes can be purged. No purging happens when * it is null. * @throws ChangelogException * In case of database problem. */ public long getLatestTrimDate() void purgeUpTo(final CSN purgeCSN) throws ChangelogException { return latestTrimDate; } /** * Trim old changes from this replicationServer database. * @throws ChangelogException In case of database problem. */ private void trim() throws ChangelogException { if (trimAge == 0) if (purgeCSN == null) { return; } latestTrimDate = TimeThread.getTime() - trimAge; CSN trimDate = new CSN(latestTrimDate, 0, 0); // Find the last CSN before the trimDate, in the Database. CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate); if (lastBeforeTrimDate != null) { // If we found it, we want to stop trimming when reaching it. trimDate = lastBeforeTrimDate; } for (int i = 0; i < 100; i++) { /* * Perform at least some trimming regardless of the flush backlog. Then * continue trim iterations while the flush backlog is low (below the * lowmark). Once the flush backlog increases, stop trimming and start * flushing more eagerly. */ if (i > 20 && isQueueAboveLowMark()) { break; } /* * the trim is done by group in order to save some CPU, IO bandwidth and * DB caches: start the transaction then do a bunch of remove then * commit. * the purge is done by group in order to save some CPU, IO bandwidth and * DB caches: start the transaction then do a bunch of remove then commit. */ /* * Matt wrote: The record removal is done as a DB transaction and the @@ -464,7 +412,7 @@ return; } if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(purgeCSN)) { cursor.delete(); } @@ -490,37 +438,31 @@ } } private boolean isQueueAboveLowMark() { final int lowMarkBytes = queueMaxBytes / 5; final int bytesUsed = queueMaxBytes - queueSizeBytes.availablePermits(); return bytesUsed > lowMarkBytes; } /** * Flush a number of updates from the memory list to the stable storage. * <p> * Flush is done by chunk sized to 500 messages, starting from the beginning * of the list. * * <p> * @GuardedBy("flushLock") * @throws ChangelogException * If a database problem happened */ public void flush() throws ChangelogException private void flush() throws ChangelogException { try { synchronized (flushLock) { final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS); final UpdateMsg change = msgQueue.poll(100, TimeUnit.MILLISECONDS); if (change == null) { // nothing to persist, move on to the trim phase // nothing to persist, check if shutdown was invoked return; } // Try to see if there are more changes and persist them all. final List<UpdateMsg> changes = new LinkedList<UpdateMsg>(); changes.add(change); msgQueue.drainTo(changes); @@ -604,15 +546,6 @@ } /** * Set the Purge delay for this db Handler. * @param delay The purge delay in Milliseconds. */ public void setPurgeDelay(long delay) { trimAge = delay; } /** * Clear the changes from this DB (from both memory cache and DB storage). * @throws ChangelogException When an exception occurs while removing the * changes from the DB. @@ -636,13 +569,15 @@ } /** * Return the size of the msgQueue (the memory cache of the ReplicaDB). * Return the number of records of this replicaDB. * <p> * For test purpose. * @return The memory queue size. * * @return The number of records of this replicaDB. */ int getQueueSize() long getNumberRecords() { return this.msgQueue.size(); return db.getNumberRecords(); } /** opends/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBCursor.java
@@ -22,7 +22,7 @@ * * * Copyright 2006-2009 Sun Microsystems, Inc. * Portions Copyright 2011-2013 ForgeRock AS * Portions Copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; @@ -73,15 +73,6 @@ // we didn't find it in the db cursor = null; } if (cursor == null) { // flush the queue into the db replicaDB.flush(); // look again in the db cursor = db.openReadCursor(startAfterCSN); } } /** {@inheritDoc} */ @@ -96,15 +87,7 @@ public boolean next() throws ChangelogException { final ReplServerDBCursor localCursor = cursor; if (localCursor != null) { currentChange = localCursor.next(); } else { currentChange = null; } currentChange = localCursor != null ? localCursor.next() : null; if (currentChange != null) { @@ -114,12 +97,8 @@ { synchronized (this) { if (cursor != null) { cursor.close(); cursor = null; } replicaDB.flush(); closeCursor(); // previously exhausted cursor must be able to reinitialize themselves cursor = db.openReadCursor(lastNonNullCurrentCSN); currentChange = cursor.next(); if (currentChange != null) @@ -137,13 +116,17 @@ { synchronized (this) { if (cursor != null) { cursor.close(); cursor = null; } closeCursor(); this.replicaDB = null; this.db = null; } } private void closeCursor() { if (cursor != null) { cursor.close(); cursor = null; } } opends/src/server/org/opends/server/replication/server/changelog/je/ReplicationDB.java
@@ -943,4 +943,13 @@ return db == null || !db.getEnvironment().isValid(); } /** * Returns the number of records in this DB. * * @return the number of records in this DB. */ long getNumberRecords() { return db.count(); } } opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/ExternalChangeLogTest.java
@@ -67,7 +67,10 @@ import org.opends.server.workflowelement.externalchangelog.ECLSearchOperation; import org.opends.server.workflowelement.externalchangelog.ECLWorkflowElement; import org.opends.server.workflowelement.localbackend.LocalBackendModifyDNOperation; import org.testng.annotations.*; import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.assertj.core.api.Assertions.*; import static org.opends.messages.ReplicationMessages.*; @@ -168,7 +171,7 @@ @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerPreTest"}) public void ECLReplicationServerTest() throws Exception { getCNIndexDB().setPurgeDelay(0); replicationServer.getChangelogDB().setPurgeDelay(0); // let's enable ECl manually now that we tested that ECl is not available ECLWorkflowElement wfe = (ECLWorkflowElement) DirectoryServer @@ -189,7 +192,7 @@ @Test(enabled=false, dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerTest1() throws Exception { getCNIndexDB().setPurgeDelay(0); replicationServer.getChangelogDB().setPurgeDelay(0); // Test with a mix of domains, a mix of DSes ECLTwoDomains(); } @@ -204,7 +207,7 @@ @Test(enabled=true, dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerTest3() throws Exception { getCNIndexDB().setPurgeDelay(0); replicationServer.getChangelogDB().setPurgeDelay(0); // Write changes and read ECL from start ECLCompatWriteReadAllOps(1); @@ -263,7 +266,7 @@ @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerFullTest3() throws Exception { getCNIndexDB().setPurgeDelay(0); replicationServer.getChangelogDB().setPurgeDelay(0); // Test all types of ops. ECLAllOps(); // Do not clean the db for the next test @@ -347,8 +350,7 @@ @Test(enabled=false, groups="slow", dependsOnMethods = { "ECLReplicationServerTest"}) public void ECLReplicationServerFullTest15() throws Exception { final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); cnIndexDB.setPurgeDelay(0); replicationServer.getChangelogDB().setPurgeDelay(0); // Write 4 changes and read ECL from start ECLCompatWriteReadAllOps(1); @@ -369,8 +371,9 @@ ECLCompatTestLimitsAndAdd(1, 8, 4); // Test CNIndexDB is purged when replication change log is purged cnIndexDB.setPurgeDelay(1); cnIndexDB.trim(null); final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); cnIndexDB.purgeUpTo(Long.MAX_VALUE); assertTrue(cnIndexDB.isEmpty()); ECLPurgeCNIndexDBAfterChangelogClear(); // Test first and last are updated @@ -896,7 +899,7 @@ null); cnt++; } while (cnt < 100 // wait at most 1s while (cnt < 300 // wait at most 3s && op.getSearchEntries().size() != expectedNbEntries); final List<SearchResultEntry> entries = op.getSearchEntries(); assertThat(entries).hasSize(expectedNbEntries); @@ -1951,16 +1954,6 @@ clearChangelogDB(replicationServer); } @AfterTest public void setPurgeDelayToInitialValue() throws Exception { JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); if (cnIndexDB != null) { cnIndexDB.setPurgeDelay(1); } } /** * After the tests stop the replicationServer. */ @@ -2461,10 +2454,9 @@ String tn = "ECLPurgeCNIndexDBAfterChangelogClear"; debugInfo(tn, "Starting test\n\n"); JEChangeNumberIndexDB cnIndexDB = (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(); assertEquals(cnIndexDB.count(), 8); cnIndexDB.setPurgeDelay(1000); replicationServer.getChangelogDB().setPurgeDelay(1000); clearChangelogDB(replicationServer); @@ -2620,11 +2612,7 @@ private JEChangeNumberIndexDB getCNIndexDB() { if (replicationServer != null) { return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); } return null; return (JEChangeNumberIndexDB) replicationServer.getChangeNumberIndexDB(); } /** opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/ChangeNumberIndexerTest.java
@@ -447,11 +447,6 @@ { final ReplicatedUpdateMsg msg = msgs[i]; final ChangeNumberIndexRecord record = allValues.get(i); if (previousCookie.isEmpty()) { // ugly hack to go round strange legacy code @see OPENDJ-67 previousCookie.replace(record.getBaseDN(), new ServerState()); } // check content in order String desc2 = "actual was:<" + record + ">, but expected was:<" + msg + ">"; assertThat(record.getBaseDN()).as(desc2).isEqualTo(msg.getBaseDN()); opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEChangeNumberIndexDBTest.java
@@ -26,16 +26,22 @@ */ package org.opends.server.replication.server.changelog.je; import java.util.ArrayList; import java.util.List; import org.opends.server.TestCaseUtils; import org.opends.server.replication.ReplicationTestCase; import org.opends.server.replication.common.CSN; import org.opends.server.replication.common.MultiDomainServerState; import org.opends.server.replication.server.ReplServerFakeConfiguration; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.changelog.api.ChangeNumberIndexRecord; import org.opends.server.replication.server.changelog.api.ChangelogDB; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.types.DN; import org.opends.server.util.StaticUtils; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import static org.opends.server.replication.server.changelog.je.JEReplicaDBTest.*; @@ -48,9 +54,16 @@ @SuppressWarnings("javadoc") public class JEChangeNumberIndexDBTest extends ReplicationTestCase { private static final String value1 = "value1"; private static final String value2 = "value2"; private static final String value3 = "value3"; private final MultiDomainServerState previousCookie = new MultiDomainServerState(); private final List<String> cookies = new ArrayList<String>(); @BeforeMethod public void clearCookie() { previousCookie.clear(); cookies.clear(); } /** * This test makes basic operations of a JEChangeNumberIndexDB: @@ -67,12 +80,11 @@ void testTrim() throws Exception { ReplicationServer replicationServer = null; JEChangeNumberIndexDB cnIndexDB = null; try { replicationServer = newReplicationServer(); cnIndexDB = getCNIndexDBNoTrimming(replicationServer); cnIndexDB.setPurgeDelay(0); final ChangelogDB changelogDB = replicationServer.getChangelogDB(); changelogDB.setPurgeDelay(0); // disable purging // Prepare data to be stored in the db DN baseDN1 = DN.decode("o=baseDN1"); @@ -82,9 +94,10 @@ CSN[] csns = newCSNs(1, 0, 3); // Add records long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]); addRecord(cnIndexDB, value2, baseDN2, csns[1]); long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]); final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); addRecord(cnIndexDB, baseDN2, csns[1]); long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); // The ChangeNumber should not get purged final long oldestCN = cnIndexDB.getOldestRecord().getChangeNumber(); @@ -94,11 +107,11 @@ DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(oldestCN); try { assertEqualTo(cursor.getRecord(), csns[0], baseDN1, value1); assertEqualTo(cursor.getRecord(), csns[0], baseDN1, cookies.get(0)); assertTrue(cursor.next()); assertEqualTo(cursor.getRecord(), csns[1], baseDN2, value2); assertEqualTo(cursor.getRecord(), csns[1], baseDN2, cookies.get(1)); assertTrue(cursor.next()); assertEqualTo(cursor.getRecord(), csns[2], baseDN3, value3); assertEqualTo(cursor.getRecord(), csns[2], baseDN3, cookies.get(2)); assertFalse(cursor.next()); } finally @@ -106,14 +119,13 @@ StaticUtils.close(cursor); } // Now test that the trimming thread does its job => start it cnIndexDB.setPurgeDelay(100); cnIndexDB.startTrimmingThread(); // Check the db is cleared. while (cnIndexDB.count() > 1) // Now test that purging removes all changes bar the last one changelogDB.setPurgeDelay(1); int count = 0; while (cnIndexDB.count() > 1 && count < 100) { Thread.yield(); Thread.sleep(10); count++; } assertOnlyNewestRecordIsLeft(cnIndexDB, 3); } @@ -123,10 +135,14 @@ } } private long addRecord(JEChangeNumberIndexDB cnIndexDB, String cookie, DN baseDN, CSN csn) throws ChangelogException private long addRecord(JEChangeNumberIndexDB cnIndexDB, DN baseDN, CSN csn) throws ChangelogException { return cnIndexDB.addRecord(new ChangeNumberIndexRecord(cookie, baseDN, csn)); final String cookie = previousCookie.toString(); cookies.add(cookie); final long changeNumber = cnIndexDB.addRecord( new ChangeNumberIndexRecord(cookie, baseDN, csn)); previousCookie.update(baseDN, csn); return changeNumber; } private void assertEqualTo(ChangeNumberIndexRecord record, CSN csn, DN baseDN, String cookie) @@ -136,11 +152,11 @@ assertEquals(record.getPreviousCookie(), cookie); } private JEChangeNumberIndexDB getCNIndexDBNoTrimming(ReplicationServer rs) throws ChangelogException private JEChangeNumberIndexDB getCNIndexDB(ReplicationServer rs) throws ChangelogException { final JEChangelogDB changelogDB = (JEChangelogDB) rs.getChangelogDB(); final JEChangeNumberIndexDB cnIndexDB = (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(false); (JEChangeNumberIndexDB) changelogDB.getChangeNumberIndexDB(); assertTrue(cnIndexDB.isEmpty()); return cnIndexDB; } @@ -160,12 +176,11 @@ void testClear() throws Exception { ReplicationServer replicationServer = null; JEChangeNumberIndexDB cnIndexDB = null; try { replicationServer = newReplicationServer(); cnIndexDB = getCNIndexDBNoTrimming(replicationServer); cnIndexDB.setPurgeDelay(0); final ChangelogDB changelogDB = replicationServer.getChangelogDB(); changelogDB.setPurgeDelay(0); // Prepare data to be stored in the db @@ -176,9 +191,10 @@ CSN[] csns = newCSNs(1, 0, 3); // Add records long cn1 = addRecord(cnIndexDB, value1, baseDN1, csns[0]); long cn2 = addRecord(cnIndexDB, value2, baseDN2, csns[1]); long cn3 = addRecord(cnIndexDB, value3, baseDN3, csns[2]); final JEChangeNumberIndexDB cnIndexDB = getCNIndexDB(replicationServer); long cn1 = addRecord(cnIndexDB, baseDN1, csns[0]); long cn2 = addRecord(cnIndexDB, baseDN2, csns[1]); long cn3 = addRecord(cnIndexDB, baseDN3, csns[2]); // Checks assertEquals(cnIndexDB.getOldestRecord().getChangeNumber(), cn1); @@ -187,9 +203,9 @@ assertEquals(cnIndexDB.count(), 3, "Db count"); assertFalse(cnIndexDB.isEmpty()); assertEquals(getPreviousCookie(cnIndexDB, cn1), value1); assertEquals(getPreviousCookie(cnIndexDB, cn2), value2); assertEquals(getPreviousCookie(cnIndexDB, cn3), value3); assertEquals(getPreviousCookie(cnIndexDB, cn1), cookies.get(0)); assertEquals(getPreviousCookie(cnIndexDB, cn2), cookies.get(1)); assertEquals(getPreviousCookie(cnIndexDB, cn3), cookies.get(2)); DBCursor<ChangeNumberIndexRecord> cursor = cnIndexDB.getCursorFrom(cn1); assertCursorReadsInOrder(cursor, cn1, cn2, cn3); @@ -200,7 +216,7 @@ cursor = cnIndexDB.getCursorFrom(cn3); assertCursorReadsInOrder(cursor, cn3); cnIndexDB.clear(null); cnIndexDB.removeDomain(null); assertOnlyNewestRecordIsLeft(cnIndexDB, 3); // Check the db is cleared. opends/tests/unit-tests-testng/src/server/org/opends/server/replication/server/changelog/je/JEReplicaDBTest.java
@@ -97,7 +97,7 @@ //-- // Iterator tests with changes persisted waitChangesArePersisted(replicaDB); waitChangesArePersisted(replicaDB, 3); assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2]); assertNotFound(replicaDB, csns[4]); @@ -108,7 +108,7 @@ //-- // Cursor tests with changes persisted replicaDB.add(update4); waitChangesArePersisted(replicaDB); waitChangesArePersisted(replicaDB, 4); assertFoundInOrder(replicaDB, csns[0], csns[1], csns[2], csns[3]); // Test cursor from existing CSN @@ -116,7 +116,7 @@ assertFoundInOrder(replicaDB, csns[3]); assertNotFound(replicaDB, csns[4]); replicaDB.setPurgeDelay(1); replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); int count = 0; boolean purgeSucceeded = false; @@ -141,16 +141,27 @@ } } private void waitChangesArePersisted(JEReplicaDB replicaDB) throws Exception private void waitChangesArePersisted(JEReplicaDB replicaDB, int nbRecordsInserted) throws Exception { final int expected = 0; waitChangesArePersisted(replicaDB, nbRecordsInserted, 1000); } private void waitChangesArePersisted(JEReplicaDB replicaDB, int nbRecordsInserted, int counterWindow) throws Exception { // one counter record is inserted every time "counterWindow" // records have been inserted int expectedNbRecords = nbRecordsInserted + (nbRecordsInserted - 1) / counterWindow; int count = 0; while (replicaDB.getQueueSize() != expected && count < 100) while (replicaDB.getNumberRecords() != expectedNbRecords && count < 100) { Thread.sleep(10); count++; } assertEquals(replicaDB.getQueueSize(), expected); assertEquals(replicaDB.getNumberRecords(), expectedNbRecords); } static CSN[] newCSNs(int serverId, long timestamp, int number) @@ -204,8 +215,9 @@ assertNull(cursor.getRecord()); for (int i = 1; i < csns.length; i++) { assertTrue(cursor.next()); assertEquals(cursor.getRecord().getCSN(), csns[i]); final String msg = "i=" + i + ", csns[i]=" + csns[i].toStringUI(); assertTrue(cursor.next(), msg); assertEquals(cursor.getRecord().getCSN(), csns[i], msg); } assertFalse(cursor.next()); assertNull(cursor.getRecord(), "Actual change=" + cursor.getRecord() @@ -274,11 +286,12 @@ { ReplicationServer replicationServer = null; DBCursor<UpdateMsg> cursor = null; JEReplicaDB replicaDB = null; try { TestCaseUtils.startServer(); replicationServer = configureReplicationServer(100000, 10); JEReplicaDB replicaDB = newReplicaDB(replicationServer); replicaDB = newReplicaDB(replicationServer); CSN[] csns = newCSNs(1, System.currentTimeMillis(), 6); for (int i = 0; i < 5; i++) @@ -288,7 +301,7 @@ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); } } replicaDB.flush(); waitChangesArePersisted(replicaDB, 4); cursor = replicaDB.generateCursorFrom(csns[0]); assertTrue(cursor.next()); @@ -307,6 +320,8 @@ finally { StaticUtils.close(cursor); if (replicaDB != null) replicaDB.shutdown(); remove(replicationServer); } } @@ -334,7 +349,7 @@ testGetOldestNewestCSNs(4000, 1000); } private void testGetOldestNewestCSNs(int max, int counterWindow) throws Exception private void testGetOldestNewestCSNs(final int max, final int counterWindow) throws Exception { String tn = "testDBCount("+max+","+counterWindow+")"; debugInfo(tn, "Starting test"); @@ -363,7 +378,7 @@ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); mySeqnum+=2; } replicaDB.flush(); waitChangesArePersisted(replicaDB, max, counterWindow); assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); assertEquals(replicaDB.getNewestCSN(), csns[max], "Wrong newest CSN"); @@ -387,15 +402,13 @@ replicaDB.add(new DeleteMsg(TEST_ROOT_DN, csns[i], "uid")); mySeqnum+=2; } replicaDB.flush(); waitChangesArePersisted(replicaDB, 2 * max, counterWindow); assertEquals(replicaDB.getOldestCSN(), csns[1], "Wrong oldest CSN"); assertEquals(replicaDB.getNewestCSN(), csns[2 * max], "Wrong newest CSN"); // replicaDB.setPurgeDelay(100); sleep(1000); replicaDB.purgeUpTo(new CSN(Long.MAX_VALUE, 0, 0)); String testcase = "AFTER PURGE (oldest, newest)="; debugInfo(tn, testcase + replicaDB.getOldestCSN() + replicaDB.getNewestCSN());