/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2008 Sun Microsystems, Inc. */ package org.opends.server.extensions; import org.opends.messages.Message; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import org.opends.server.api.Backend; import org.opends.server.api.DirectoryThread; import org.opends.server.api.ServerShutdownListener; import org.opends.server.core.DirectoryServer; import org.opends.server.types.DN; import org.opends.server.types.DebugLogLevel; import org.opends.server.types.DirectoryException; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.types.LockManager; import static org.opends.server.util.StaticUtils.*; import static org.opends.server.loggers.debug.DebugLogger.*; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.messages.ExtensionMessages.*; /** * This class defines a utility that will be used to pre-load the Directory * Server entry cache. Pre-loader is multi-threaded and consist of the * following threads: * * - The Arbiter thread which monitors overall pre-load progress and manages * pre-load worker threads by adding or removing them as deemed necessary. * * - The Collector thread which collects all entry DNs stored within every * configured and active backend to a shared object workers consume from. * * - Worker threads which are responsible for monitoring the collector feed * and requesting the actual entries for retrieval and in cache storage. * * This implementation is entry cache and backend independent and can be * used to pre-load from any backend to any entry cache as long as both * are capable of initiating and sustaining such pre-load activity. * * This implementation is fully synchronized and safe to use with the server * online and pre-load activities going in parallel with server operations. * * This implementation is self-adjusting to any system workload and does not * require any configuration parameters to optimize for initial system * resources availability and/or any subsequent fluctuations. */ public class EntryCachePreloader extends DirectoryThread implements ServerShutdownListener { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * Interrupt flag for the arbiter to terminate worker threads. */ private AtomicBoolean interruptFlag = new AtomicBoolean(false); /** * Processed entries counter. */ private AtomicLong processedEntries = new AtomicLong(0); /** * Progress report resolution. */ private static final long progressInterval = 5000; /** * Default arbiter resolution time. */ public static final long PRELOAD_ARBITER_DEFAULT_SLEEP_TIME = 1000; /** * Effective arbiter resolution time. */ private static long arbiterSleepTime; /** * Pre-load arbiter thread name. */ private String preloadArbiterThreadName; /** * Pre-load arbiter thread. */ private Thread preloadArbiterThread; /** * Worker threads. */ private List preloadThreads = Collections.synchronizedList( new LinkedList()); /** * DN Collector thread. */ private EntryCacheDNCollector dnCollector = new EntryCacheDNCollector(); /** * This queue is for workers to take from. */ private LinkedBlockingQueue dnQueue = new LinkedBlockingQueue(); /** * The number of bytes in a megabyte. */ private static final int bytesPerMegabyte = 1024*1024; /** * Default constructor. */ public EntryCachePreloader() { super("Entry Cache Preload Arbiter"); preloadArbiterThreadName = getName(); DirectoryServer.registerShutdownListener(this); // This should not be exposed as configuration // parameter and is only useful for testing. arbiterSleepTime = Long.getLong( "org.opends.server.entrycache.preload.sleep", PRELOAD_ARBITER_DEFAULT_SLEEP_TIME); } /** * The Arbiter thread. */ @Override public void run() { preloadArbiterThread = Thread.currentThread(); logError(NOTE_CACHE_PRELOAD_PROGRESS_START.get()); // Start DN collector thread first. dnCollector.start(); // Kick off a single worker. EntryCachePreloadWorker singleWorkerThread = new EntryCachePreloadWorker(); singleWorkerThread.start(); preloadThreads.add(singleWorkerThread); // Progress report timer task. Timer timer = new Timer(); TimerTask progressTask = new TimerTask() { // Persistent state restore progress report. public void run() { if (processedEntries.get() > 0) { long freeMemory = Runtime.getRuntime().freeMemory() / bytesPerMegabyte; Message message = NOTE_CACHE_PRELOAD_PROGRESS_REPORT.get( processedEntries.get(), freeMemory); logError(message); } } }; timer.scheduleAtFixedRate(progressTask, progressInterval, progressInterval); // Cycle to monitor progress and adjust workers. long processedEntriesDeltaLow = 0; long processedEntriesDeltaHigh = 0; long lastKnownProcessedEntries = 0; try { while (!dnQueue.isEmpty() || dnCollector.isAlive()) { long processedEntriesCycle = processedEntries.get(); long processedEntriesDelta = processedEntriesCycle - lastKnownProcessedEntries; lastKnownProcessedEntries = processedEntriesCycle; // Spawn another worker if scaling up. if (processedEntriesDelta > processedEntriesDeltaHigh) { processedEntriesDeltaLow = processedEntriesDeltaHigh; processedEntriesDeltaHigh = processedEntriesDelta; EntryCachePreloadWorker workerThread = new EntryCachePreloadWorker(); workerThread.start(); preloadThreads.add(workerThread); } // Interrupt random worker if scaling down. if (processedEntriesDelta < processedEntriesDeltaLow) { processedEntriesDeltaHigh = processedEntriesDeltaLow; processedEntriesDeltaLow = processedEntriesDelta; // Leave at least one worker to progress. if (preloadThreads.size() > 1) { interruptFlag.set(true); } } Thread.sleep(arbiterSleepTime); } // Join the collector. dnCollector.join(); // Join all spawned workers. for (Thread workerThread : preloadThreads) { workerThread.join(); } // Cancel progress report task and report done. timer.cancel(); Message message = NOTE_CACHE_PRELOAD_PROGRESS_DONE.get( processedEntries.get()); logError(message); } catch (InterruptedException ex) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, ex); } // Interrupt the collector. dnCollector.interrupt(); // Interrupt all preload threads. for (Thread thread : preloadThreads) { thread.interrupt(); } logError(WARN_CACHE_PRELOAD_INTERRUPTED.get()); } finally { // Kill the task in case of exception. timer.cancel(); } } /** * The worker thread. */ private class EntryCachePreloadWorker extends DirectoryThread { public EntryCachePreloadWorker() { super("Entry Cache Preload Worker"); } @Override public void run() { while (!dnQueue.isEmpty() || dnCollector.isAlive()) { // Check if interrupted. if (Thread.interrupted()) { break; } if (interruptFlag.compareAndSet(true, false)) { break; } // Dequeue the next entry DN. try { DN entryDN = dnQueue.take(); Lock readLock = null; try { // Acquire a read lock on the entry. readLock = LockManager.lockRead(entryDN); if (readLock == null) { // It is cheaper to put this DN back on the // queue then pick it up and process later. dnQueue.add(entryDN); continue; } // Even if getEntry() below fails the entry is // still treated as a processed entry anyways. processedEntries.getAndIncrement(); // getEntry() will trigger putEntryIfAbsent() to the // cache if given entry is not in the cache already. DirectoryServer.getEntry(entryDN); } catch (DirectoryException ex) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, ex); } Message message = ERR_CACHE_PRELOAD_ENTRY_FAILED.get( entryDN.toNormalizedString(), (ex.getCause() != null ? ex.getCause().getMessage() : stackTraceToSingleLineString(ex))); logError(message); } finally { LockManager.unlock(entryDN, readLock); } } catch (InterruptedException ex) { break; } } preloadThreads.remove(Thread.currentThread()); } } /** * The Collector thread. */ private class EntryCacheDNCollector extends DirectoryThread { public EntryCacheDNCollector() { super("Entry Cache Preload Collector"); } @Override public void run() { Map baseDNMap = DirectoryServer.getPublicNamingContexts(); Set proccessedBackends = new HashSet(); // Collect all DNs from every active public backend. for (Backend backend : baseDNMap.values()) { // Check if interrupted. if (Thread.interrupted()) { return; } if (!proccessedBackends.contains(backend)) { proccessedBackends.add(backend); try { if (!backend.collectStoredDNs(dnQueue)) { // DN collection is incomplete, likely // due to some backend problem occured. // Log an error message and carry on. Message message = ERR_CACHE_PRELOAD_COLLECTOR_FAILED.get( backend.getBackendID()); logError(message); } } catch (UnsupportedOperationException ex) { // Some backends dont have collectStoredDNs() // method implemented, log a warning, skip // such backend and continue. if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, ex); } Message message = WARN_CACHE_PRELOAD_BACKEND_FAILED.get( backend.getBackendID()); logError(message); } } } } } /** * {@inheritDoc} */ public String getShutdownListenerName() { return preloadArbiterThreadName; } /** * {@inheritDoc} */ public void processServerShutdown(Message reason) { if ((preloadArbiterThread != null) && preloadArbiterThread.isAlive()) { // Interrupt the arbiter so it can interrupt // the collector and all spawned workers. preloadArbiterThread.interrupt(); try { // This should be quick although if it // gets interrupted it is no big deal. preloadArbiterThread.join(); } catch (InterruptedException ex) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, ex); } } } DirectoryServer.deregisterShutdownListener(this); } }