| | |
| | | * CDDL HEADER END |
| | | * |
| | | * |
| | | * Copyright 2013 ForgeRock AS |
| | | * Copyright 2013-2014 ForgeRock AS |
| | | */ |
| | | package org.opends.server.replication.server.changelog.je; |
| | | |
| | | import java.io.File; |
| | | import java.util.*; |
| | | import java.util.Map.Entry; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.MessageBuilder; |
| | | import org.opends.server.admin.std.server.ReplicationServerCfg; |
| | | import org.opends.server.api.DirectoryThread; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import org.opends.server.replication.common.CSN; |
| | | import org.opends.server.replication.common.MultiDomainServerState; |
| | | import org.opends.server.replication.common.ServerState; |
| | | import org.opends.server.replication.protocol.UpdateMsg; |
| | | import org.opends.server.replication.server.ChangelogState; |
| | |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.TimeThread; |
| | | |
| | | import com.forgerock.opendj.util.Pair; |
| | | |
| | |
| | | * The handler of the changelog database, the database stores the relation |
| | | * between a change number and the associated cookie. |
| | | * <p> |
| | | * Guarded by cnIndexDBLock |
| | | * @GuardedBy("cnIndexDBLock") |
| | | */ |
| | | private JEChangeNumberIndexDB cnIndexDB; |
| | | private final AtomicReference<ChangeNumberIndexer> cnIndexer = |
| | |
| | | /** Used for protecting {@link ChangeNumberIndexDB} related state. */ |
| | | private final Object cnIndexDBLock = new Object(); |
| | | |
| | | /** |
| | | * The purge delay (in milliseconds). Records in the changelog DB that are |
| | | * older than this delay might be removed. |
| | | */ |
| | | private long purgeDelayInMillis; |
| | | private final AtomicReference<ChangelogDBPurger> cnPurger = |
| | | new AtomicReference<ChangelogDBPurger>(); |
| | | private volatile long latestPurgeDate; |
| | | |
| | | /** The local replication server. */ |
| | | private final ReplicationServer replicationServer; |
| | | private AtomicBoolean shutdown = new AtomicBoolean(); |
| | |
| | | initializeChangelogState(changelogState); |
| | | if (config.isComputeChangeNumber()) |
| | | { |
| | | final ChangeNumberIndexer indexer = |
| | | new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | startIndexer(changelogState); |
| | | } |
| | | setPurgeDelay(replicationServer.getPurgeDelay()); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | // - then throw the first encountered exception |
| | | ChangelogException firstException = null; |
| | | |
| | | final ChangeNumberIndexer indexer = cnIndexer.get(); |
| | | final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | | cnIndexer.compareAndSet(indexer, null); |
| | | } |
| | | final ChangelogDBPurger purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | } |
| | | |
| | | try |
| | | { |
| | | shutdownCNIndexDB(); |
| | |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB.clear(baseDN); |
| | | cnIndexDB.removeDomain(baseDN); |
| | | } |
| | | catch (ChangelogException e) |
| | | { |
| | |
| | | firstException = e; |
| | | } |
| | | else if (debugEnabled()) |
| | | { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | |
| | | if (firstException != null) |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void setPurgeDelay(long delay) |
| | | public void setPurgeDelay(long purgeDelayInMillis) |
| | | { |
| | | final JEChangeNumberIndexDB cnIndexDB = this.cnIndexDB; |
| | | if (cnIndexDB != null) |
| | | this.purgeDelayInMillis = purgeDelayInMillis; |
| | | final ChangelogDBPurger purger; |
| | | if (purgeDelayInMillis > 0) |
| | | { |
| | | cnIndexDB.setPurgeDelay(delay); |
| | | } |
| | | for (Map<Integer, JEReplicaDB> domainMap : domainToReplicaDBs.values()) |
| | | { |
| | | for (JEReplicaDB replicaDB : domainMap.values()) |
| | | purger = new ChangelogDBPurger(); |
| | | if (cnPurger.compareAndSet(null, purger)) |
| | | { |
| | | replicaDB.setPurgeDelay(delay); |
| | | purger.start(); |
| | | } // otherwise a purger was already running |
| | | } |
| | | else |
| | | { |
| | | purger = cnPurger.getAndSet(null); |
| | | if (purger != null) |
| | | { |
| | | purger.initiateShutdown(); |
| | | } |
| | | } |
| | | } |
| | |
| | | public void setComputeChangeNumber(boolean computeChangeNumber) |
| | | throws ChangelogException |
| | | { |
| | | final ChangeNumberIndexer indexer; |
| | | if (computeChangeNumber) |
| | | { |
| | | final ChangelogState changelogState = dbEnv.readChangelogState(); |
| | | indexer = new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | startIndexer(dbEnv.readChangelogState()); |
| | | } |
| | | else |
| | | { |
| | | indexer = cnIndexer.getAndSet(null); |
| | | final ChangeNumberIndexer indexer = cnIndexer.getAndSet(null); |
| | | if (indexer != null) |
| | | { |
| | | indexer.initiateShutdown(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void startIndexer(final ChangelogState changelogState) |
| | | { |
| | | final ChangeNumberIndexer indexer = |
| | | new ChangeNumberIndexer(this, changelogState); |
| | | if (cnIndexer.compareAndSet(null, indexer)) |
| | | { |
| | | indexer.start(); |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public long getDomainLatestTrimDate(DN baseDN) |
| | | { |
| | | long latest = 0; |
| | | for (JEReplicaDB replicaDB : getDomainMap(baseDN).values()) |
| | | { |
| | | if (latest == 0 || latest < replicaDB.getLatestTrimDate()) |
| | | { |
| | | latest = replicaDB.getLatestTrimDate(); |
| | | } |
| | | } |
| | | return latest; |
| | | return latestPurgeDate; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public ChangeNumberIndexDB getChangeNumberIndexDB() |
| | | { |
| | | return getChangeNumberIndexDB(true); |
| | | } |
| | | |
| | | /** |
| | | * Returns the {@link ChangeNumberIndexDB} object. |
| | | * |
| | | * @param startTrimmingThread |
| | | * whether the trimming thread should be started |
| | | * @return the {@link ChangeNumberIndexDB} object |
| | | */ |
| | | ChangeNumberIndexDB getChangeNumberIndexDB(boolean startTrimmingThread) |
| | | { |
| | | synchronized (cnIndexDBLock) |
| | | { |
| | | if (cnIndexDB == null) |
| | | { |
| | | try |
| | | { |
| | | cnIndexDB = new JEChangeNumberIndexDB(replicationServer, this.dbEnv); |
| | | if (startTrimmingThread) |
| | | { |
| | | cnIndexDB.startTrimmingThread(); |
| | | } |
| | | cnIndexDB = new JEChangeNumberIndexDB(this.dbEnv); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | |
| | | } |
| | | // TODO save this state in the changelogStateDB? |
| | | } |
| | | |
| | | /** |
| | | * The thread purging the changelogDB on a regular interval. Records are |
| | | * purged from the changelogDB is they are older than a delay specified in |
| | | * seconds. The purge process works in two steps: |
| | | * <ol> |
| | | * <li>first purge the changeNumberIndexDB and retrieve information to drive |
| | | * replicaDBs purging</li> |
| | | * <li>proceed to purge each replicaDBs based on the information collected |
| | | * when purging the changeNumberIndexDB</li> |
| | | * </ol> |
| | | */ |
| | | private final class ChangelogDBPurger extends DirectoryThread |
| | | { |
| | | |
| | | protected ChangelogDBPurger() |
| | | { |
| | | super("changelog DB purger"); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | // initialize CNIndexDB |
| | | getChangeNumberIndexDB(); |
| | | while (!isShutdownInitiated()) |
| | | { |
| | | try |
| | | { |
| | | final JEChangeNumberIndexDB localCNIndexDB = cnIndexDB; |
| | | if (localCNIndexDB == null) |
| | | { // shutdown has been called |
| | | return; |
| | | } |
| | | |
| | | final long purgeTimestamp = TimeThread.getTime() - purgeDelayInMillis; |
| | | final MultiDomainServerState purgeUpToCookie = |
| | | localCNIndexDB.purgeUpTo(purgeTimestamp); |
| | | if (purgeUpToCookie == null) |
| | | { // this can happen when the change number index DB is empty |
| | | continue; |
| | | } |
| | | |
| | | /* |
| | | * Drive purge of the replica DBs by the oldest non purged cookie in |
| | | * the change number index DB. |
| | | */ |
| | | for (Entry<DN, ConcurrentMap<Integer, JEReplicaDB>> entry1 |
| | | : domainToReplicaDBs.entrySet()) |
| | | { |
| | | final DN baseDN = entry1.getKey(); |
| | | final Map<Integer, JEReplicaDB> domainMap = entry1.getValue(); |
| | | for (Entry<Integer, JEReplicaDB> entry2 : domainMap.entrySet()) |
| | | { |
| | | final Integer serverId = entry2.getKey(); |
| | | final JEReplicaDB replicaDB = entry2.getValue(); |
| | | replicaDB.purgeUpTo(purgeUpToCookie.getCSN(baseDN, serverId)); |
| | | } |
| | | } |
| | | |
| | | latestPurgeDate = purgeTimestamp; |
| | | |
| | | // purge delay is specified in seconds so it should not be a problem |
| | | // to sleep for 500 millis |
| | | sleep(500); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH |
| | | .get(stackTraceToSingleLineString(e))); |
| | | if (replicationServer != null) |
| | | { |
| | | replicationServer.shutdown(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |