/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Copyright 2006-2010 Sun Microsystems, Inc. * Portions copyright 2011-2014 ForgeRock AS */ package org.opends.server.replication.server.changelog.je; import java.util.ArrayList; import java.util.Date; import java.util.LinkedList; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.opends.server.admin.std.server.MonitorProviderCfg; import org.opends.server.api.DirectoryThread; import org.opends.server.api.MonitorProvider; import org.opends.server.config.ConfigException; import org.opends.server.core.DirectoryServer; import org.opends.server.replication.common.CSN; import org.opends.server.replication.protocol.UpdateMsg; import org.opends.server.replication.server.ReplicationServer; import org.opends.server.replication.server.ReplicationServerDomain; import org.opends.server.replication.server.changelog.api.ChangelogException; import org.opends.server.replication.server.changelog.api.DBCursor; import org.opends.server.replication.server.changelog.je.ReplicationDB.*; import org.opends.server.types.Attribute; import org.opends.server.types.Attributes; import org.opends.server.types.DN; import org.opends.server.types.InitializationException; import org.opends.server.util.TimeThread; import static org.opends.messages.ReplicationMessages.*; import static org.opends.server.loggers.ErrorLogger.*; import static org.opends.server.util.StaticUtils.*; /** * This class is used for managing the replicationServer database for each * server in the topology. *

* It is responsible for efficiently saving the updates that is received from * each master server into stable storage. *

* This class is also able to generate a {@link DBCursor} that can be used to * read all changes from a given {@link CSN}. *

* This class publish some monitoring information below cn=monitor. */ public class JEReplicaDB implements Runnable { /** * Class that allows atomically setting oldest and newest CSNs without * synchronization. * * @Immutable */ private static final class CSNLimits { private final CSN oldestCSN; private final CSN newestCSN; public CSNLimits(CSN oldestCSN, CSN newestCSN) { this.oldestCSN = oldestCSN; this.newestCSN = newestCSN; } } /** * The msgQueue holds all the updates not yet saved to stable storage. *

* This blocking queue is only used as a temporary placeholder so that the * write in the stable storage can be grouped for efficiency reason. Adding an * update synchronously add the update to this list. A dedicated thread loops * on {@link #flush()} and {@link #trim()}. *

*
flush()
*
get a number of changes from the in memory list by block and write them * to the db.
*
trim()
*
deletes from the DB a number of changes that are older than a certain * date.
*
*

* Changes are not read back by replicationServer threads that are responsible * for pushing the changes to other replication server or to LDAP server */ private final LinkedBlockingQueue msgQueue = new LinkedBlockingQueue(); /** * Semaphore used to limit the number of bytes used in memory by the queue. * The threads calling {@link #add(UpdateMsg)} method will be blocked if the * size of msgQueue becomes larger than the available permits and will resume * only when the number of available permits allow it. */ private final Semaphore queueSizeBytes; private final int queueMaxBytes; private ReplicationDB db; /** * Holds the oldest and newest CSNs for this replicaDB for fast retrieval. * * @NonNull */ private volatile CSNLimits csnLimits; private int serverId; private DN baseDN; private DbMonitorProvider dbMonitor = new DbMonitorProvider(); private DirectoryThread thread; /** * Used to prevent race conditions between threads calling {@link #clear()} * {@link #flush()} or {@link #trim()}. This can happen with the thread * flushing the queue, on shutdown or on cursor opening, a thread calling * clear(), etc. */ private final Object flushLock = new Object(); private ReplicationServer replicationServer; private long latestTrimDate = 0; /** * The trim age in milliseconds. Changes record in the change DB that * are older than this age are removed. */ private long trimAge; /** * Creates a new ReplicaDB associated to a given LDAP server. * * @param serverId The serverId for which changes will be stored in the DB. * @param baseDN the baseDN for which this DB was created. * @param replicationServer The ReplicationServer that creates this ReplicaDB. * @param dbenv the Database Env to use to create the ReplicationServer DB. * server for this domain. * @throws ChangelogException If a database problem happened */ public JEReplicaDB(int serverId, DN baseDN, ReplicationServer replicationServer, ReplicationDbEnv dbenv) throws ChangelogException { this.replicationServer = replicationServer; this.serverId = serverId; this.baseDN = baseDN; trimAge = replicationServer.getTrimAge(); queueMaxBytes = replicationServer.getQueueSize() * 200; queueSizeBytes = new Semaphore(queueMaxBytes); db = new ReplicationDB(serverId, baseDN, replicationServer, dbenv); csnLimits = new CSNLimits(db.readOldestCSN(), db.readNewestCSN()); thread = new DirectoryThread(this, "Replication server RS(" + replicationServer.getServerId() + ") changelog checkpointer for Replica DS(" + serverId + ") for domain \"" + baseDN + "\""); thread.start(); DirectoryServer.deregisterMonitorProvider(dbMonitor); DirectoryServer.registerMonitorProvider(dbMonitor); } /** * Add an update to the list of messages that must be saved to the db managed * by this db handler. This method is blocking if the size of the list of * message is larger than its maximum. * * @param updateMsg * The update message that must be saved to the db managed by this db * handler. * @throws ChangelogException * If a database problem happened */ public void add(UpdateMsg updateMsg) throws ChangelogException { if (thread.isShutdownInitiated()) { throw new ChangelogException( ERR_COULD_NOT_ADD_CHANGE_TO_SHUTTING_DOWN_REPLICA_DB.get(updateMsg .toString(), String.valueOf(baseDN), String.valueOf(serverId))); } final int msgSize = updateMsg.size(); if (msgSize < queueMaxBytes) { try { queueSizeBytes.acquire(msgSize); } catch (InterruptedException e) { throw new ChangelogException( ERR_EXCEPTION_COULD_NOT_ADD_CHANGE_TO_REPLICA_DB.get(updateMsg .toString(), String.valueOf(baseDN), String.valueOf(serverId), stackTraceToSingleLineString(e))); } } else { // edge case with a very large message collectAllPermits(); } msgQueue.add(updateMsg); final CSNLimits limits = csnLimits; final boolean updateNew = limits.newestCSN == null || limits.newestCSN.isOlderThan(updateMsg.getCSN()); final boolean updateOld = limits.oldestCSN == null; if (updateOld || updateNew) { csnLimits = new CSNLimits( updateOld ? updateMsg.getCSN() : limits.oldestCSN, updateNew ? updateMsg.getCSN() : limits.newestCSN); } } /** Collects all the permits from the {@link #queueSizeBytes} semaphore. */ private void collectAllPermits() { int collectedPermits = queueSizeBytes.drainPermits(); while (collectedPermits != queueMaxBytes) { // Do not use Thread.sleep() because: // 1) it is expected the permits will be released very soon // 2) we want to collect all the permits, so do not leave a chance to // other threads to steal them from us. // 3) we want to keep low latency Thread.yield(); collectedPermits += queueSizeBytes.drainPermits(); } } /** * Get the oldest CSN that has not been purged yet. * * @return the oldest CSN that has not been purged yet. */ public CSN getOldestCSN() { return csnLimits.oldestCSN; } /** * Get the newest CSN that has not been purged yet. * * @return the newest CSN that has not been purged yet. */ public CSN getNewestCSN() { return csnLimits.newestCSN; } /** * Get the number of changes. * * @return Returns the number of changes. */ public long getChangesCount() { final CSNLimits limits = csnLimits; if (limits.newestCSN != null && limits.oldestCSN != null) { return limits.newestCSN.getSeqnum() - limits.oldestCSN.getSeqnum() + 1; } return 0; } /** * Generate a new {@link DBCursor} that allows to browse the db managed by * this ReplicaDB and starting at the position defined by a given CSN. * * @param startAfterCSN * The position where the cursor must start. If null, start from the * oldest CSN * @return a new {@link DBCursor} that allows to browse the db managed by this * ReplicaDB and starting at the position defined by a given CSN. * @throws ChangelogException * if a database problem happened */ public DBCursor generateCursorFrom(CSN startAfterCSN) throws ChangelogException { if (startAfterCSN == null) { // flush any potential changes before opening the cursor flush(); } return new JEReplicaDBCursor(db, startAfterCSN, this); } /** * Shutdown this ReplicaDB. */ public void shutdown() { if (thread.isShutdownInitiated()) { return; } thread.initiateShutdown(); while (msgQueue.size() != 0) { try { flush(); } catch (ChangelogException e) { // We are already shutting down logError(e.getMessageObject()); } } db.shutdown(); DirectoryServer.deregisterMonitorProvider(dbMonitor); } /** * Run method for this class. * Periodically Flushes the ReplicationServerDomain cache from memory to the * stable storage and trims the old updates. */ @Override public void run() { thread.startWork(); try { while (!thread.isShutdownInitiated()) { try { flush(); trim(); } catch (ChangelogException end) { stop(end); break; } } try { // call flush a last time before exiting to make sure that // no change was forgotten in the msgQueue flush(); } catch (ChangelogException e) { stop(e); } } finally { thread.stopWork(); } } private void stop(Exception e) { logError(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); thread.initiateShutdown(); if (replicationServer != null) { replicationServer.shutdown(); } } /** * Retrieves the latest trim date. * @return the latest trim date. */ public long getLatestTrimDate() { return latestTrimDate; } /** * Trim old changes from this replicationServer database. * @throws ChangelogException In case of database problem. */ private void trim() throws ChangelogException { if (trimAge == 0) { return; } latestTrimDate = TimeThread.getTime() - trimAge; CSN trimDate = new CSN(latestTrimDate, 0, 0); // Find the last CSN before the trimDate, in the Database. CSN lastBeforeTrimDate = db.getPreviousCSN(trimDate); if (lastBeforeTrimDate != null) { // If we found it, we want to stop trimming when reaching it. trimDate = lastBeforeTrimDate; } final int queueLowMarkBytes = queueMaxBytes / 5; for (int i = 0; i < 100; i++) { /* * Perform at least some trimming regardless of the flush backlog. Then * continue trim iterations while the flush backlog is low (below the * lowmark). Once the flush backlog increases, stop trimming and start * flushing more eagerly. */ if (i > 20 && msgQueue.size() < queueLowMarkBytes) { break; } synchronized (flushLock) { /* * the trim is done by group in order to save some CPU, IO bandwidth and * DB caches: start the transaction then do a bunch of remove then * commit. */ /* * Matt wrote: The record removal is done as a DB transaction and the * deleted records are only "deleted" on commit. While the txn/cursor is * open the records to be deleted will, I think, be pinned in the DB * cache. In other words, the larger the transaction (the more records * deleted during a single batch) the more DB cache will be used to * process the transaction. */ final ReplServerDBCursor cursor = db.openDeleteCursor(); try { for (int j = 0; j < 50; j++) { if (thread.isShutdownInitiated()) { return; } CSN csn = cursor.nextCSN(); if (csn == null) { return; } if (!csn.equals(csnLimits.newestCSN) && csn.isOlderThan(trimDate)) { cursor.delete(); } else { csnLimits = new CSNLimits(csn, csnLimits.newestCSN); return; } } } catch (ChangelogException e) { // mark shutdown for this db so that we don't try again to // stop it from cursor.close() or methods called by cursor.close() cursor.abort(); thread.initiateShutdown(); throw e; } finally { cursor.close(); } } } } /** * Flush a number of updates from the memory list to the stable storage. *

* Flush is done by chunk sized to 500 messages, starting from the beginning * of the list. * * @throws ChangelogException * If a database problem happened */ public void flush() throws ChangelogException { try { synchronized (flushLock) { final List changes = new LinkedList(); final UpdateMsg change = msgQueue.poll(500, TimeUnit.MILLISECONDS); if (change == null) { // nothing to persist, move on to the trim phase return; } // Try to see if there are more changes and persist them all. changes.add(change); msgQueue.drainTo(changes); int totalSize = db.addEntries(changes); // do not release more than queue max size permits // (be careful of the edge case with the very large message) queueSizeBytes.release(Math.min(totalSize, queueMaxBytes)); } } catch (InterruptedException e) { throw new ChangelogException(ERR_EXCEPTION_CHANGELOG_TRIM_FLUSH .get(stackTraceToSingleLineString(e))); } } /** * This internal class is used to implement the Monitoring capabilities of the * ReplicaDB. */ private class DbMonitorProvider extends MonitorProvider { /** * {@inheritDoc} */ @Override public List getMonitorData() { List attributes = new ArrayList(); create(attributes, "replicationServer-database", String.valueOf(serverId)); create(attributes, "domain-name", baseDN.toNormalizedString()); final CSNLimits limits = csnLimits; if (limits.oldestCSN != null) { create(attributes, "first-change", encode(limits.oldestCSN)); } if (limits.newestCSN != null) { create(attributes, "last-change", encode(limits.newestCSN)); } create(attributes, "queue-size", String.valueOf(msgQueue.size())); create(attributes, "queue-size-bytes", String.valueOf(queueMaxBytes - queueSizeBytes.availablePermits())); return attributes; } private void create(List attributes, String name, String value) { attributes.add(Attributes.create(name, value)); } private String encode(CSN csn) { return csn + " " + new Date(csn.getTime()); } /** * {@inheritDoc} */ @Override public String getMonitorInstanceName() { ReplicationServerDomain domain = replicationServer .getReplicationServerDomain(baseDN); return "Changelog for DS(" + serverId + "),cn=" + domain.getMonitorInstanceName(); } /** * {@inheritDoc} */ @Override public void initializeMonitorProvider(MonitorProviderCfg configuration) throws ConfigException,InitializationException { // Nothing to do for now } } /** * {@inheritDoc} */ @Override public String toString() { final CSNLimits limits = csnLimits; return getClass().getSimpleName() + " " + baseDN + " " + serverId + " " + limits.oldestCSN + " " + limits.newestCSN; } /** * Set the Purge delay for this db Handler. * @param delay The purge delay in Milliseconds. */ public void setPurgeDelay(long delay) { trimAge = delay; } /** * Clear the changes from this DB (from both memory cache and DB storage). * @throws ChangelogException When an exception occurs while removing the * changes from the DB. */ public void clear() throws ChangelogException { synchronized(flushLock) { collectAllPermits(); msgQueue.clear(); db.clear(); csnLimits = new CSNLimits(null, null); } } /** * Getter for the serverID of the server for which this database is managed. * * @return the serverId. */ public int getServerId() { return this.serverId; } /** * Return the size of the msgQueue (the memory cache of the ReplicaDB). * For test purpose. * @return The memory queue size. */ int getQueueSize() { return this.msgQueue.size(); } /** * Set the window size for writing counter records in the DB. *

* for unit tests only!! * * @param size * window size in number of records. */ void setCounterRecordWindowSize(int size) { db.setCounterRecordWindowSize(size); } }