/* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at * trunk/opends/resource/legal-notices/OpenDS.LICENSE * or https://OpenDS.dev.java.net/OpenDS.LICENSE. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at * trunk/opends/resource/legal-notices/OpenDS.LICENSE. If applicable, * add the following below this CDDL HEADER, with the fields enclosed * by brackets "[]" replaced with your own identifying information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * * Portions Copyright 2006-2007 Sun Microsystems, Inc. */ package org.opends.server.backends.jeb; import org.opends.messages.Message; import org.opends.server.types.*; import java.util.ArrayList; import java.util.TimerTask; import java.util.Timer; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReentrantLock; import com.sleepycat.je.DatabaseException; import com.sleepycat.je.StatsConfig; import com.sleepycat.je.EnvironmentStats; import static org.opends.server.loggers.ErrorLogger.logError; import static org.opends.server.loggers.debug.DebugLogger.*; import org.opends.server.loggers.debug.DebugTracer; import org.opends.server.core.DirectoryServer; import static org.opends.messages.JebMessages. ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED; import static org.opends.messages.JebMessages. INFO_JEB_REBUILD_PROGRESS_REPORT; import static org.opends.messages.JebMessages. INFO_JEB_REBUILD_FINAL_STATUS; import static org.opends.messages.JebMessages. INFO_JEB_REBUILD_CACHE_AND_MEMORY_REPORT; import static org.opends.messages.JebMessages. ERR_JEB_REBUILD_INDEX_CONFLICT; import static org.opends.messages.JebMessages. INFO_JEB_REBUILD_START; import static org.opends.messages.JebMessages. ERR_JEB_VLV_INDEX_NOT_CONFIGURED; /** * Runs a index rebuild process on the backend. Each index selected for rebuild * will be done from scratch by first clearing out the database for that index. * Different threads will be used to rebuild each index. * The rebuild process can run concurrently with the backend online and * performing write and read operations. However, during the rebuild process, * other reader and writer activeThreads might notice inconsistencies in index * databases being rebuilt. They can safely ignore these inconsistencies as long * as a rebuild is in progress. */ public class RebuildJob { /** * The tracer object for the debug logger. */ private static final DebugTracer TRACER = getTracer(); /** * The rebuild configuraiton. */ private RebuildConfig rebuildConfig; /** * The root container used for the verify job. */ private RootContainer rootContainer; /** * The number of milliseconds between job progress reports. */ private long progressInterval = 10000; /** * The waiting rebuild threads created to process the rebuild. */ private CopyOnWriteArrayList waitingThreads = new CopyOnWriteArrayList(); /** * The active rebuild threads created to process the rebuild. */ private CopyOnWriteArrayList activeThreads = new CopyOnWriteArrayList(); /** * The completed rebuild threads used to process the rebuild. */ private CopyOnWriteArrayList completedThreads = new CopyOnWriteArrayList(); /** * Rebuild jobs currently running. */ private static CopyOnWriteArrayList rebuildJobs = new CopyOnWriteArrayList(); /** * A mutex that will be used to provide threadsafe access to methods changing * the set of currently running rebuild jobs. */ private static ReentrantLock jobsMutex = new ReentrantLock(); /** * This class reports progress of the rebuild job at fixed intervals. */ class ProgressTask extends TimerTask { /** * The number of records that had been processed at the time of the * previous progress report. */ private long previousProcessed = 0; /** * The time in milliseconds of the previous progress report. */ private long previousTime; /** * The environment statistics at the time of the previous report. */ private EnvironmentStats prevEnvStats; /** * The number of bytes in a megabyte. * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB). */ private static final int bytesPerMegabyte = 1024*1024; /** * Create a new verify progress task. * @throws DatabaseException An error occurred while accessing the JE * database. */ public ProgressTask() throws DatabaseException { previousTime = System.currentTimeMillis(); prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig()); } /** * The action to be performed by this timer task. */ public void run() { long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } long totalEntries = 0; long latestProcessed = 0; ArrayList allThreads = new ArrayList(waitingThreads); allThreads.addAll(activeThreads); allThreads.addAll(completedThreads); for(IndexRebuildThread thread : allThreads) { try { totalEntries += thread.getTotalEntries(); latestProcessed += thread.getProcessedEntries(); if(debugEnabled()) { TRACER.debugVerbose("Rebuild thread %s stats: total %d " + "processed %d rebuilt %d duplicated %d skipped %d", thread.getTotalEntries(), thread.getProcessedEntries(), thread.getRebuiltEntries(), thread.getDuplicatedEntries(), thread.getSkippedEntries()); } } catch(Exception e) { if(debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } } long deltaCount = (latestProcessed - previousProcessed); float rate = 1000f*deltaCount / deltaTime; float completed = 0; if(totalEntries > 0) { completed = 100f*latestProcessed / totalEntries; } Message message = INFO_JEB_REBUILD_PROGRESS_REPORT.get( completed, latestProcessed, totalEntries, rate); logError(message); try { Runtime runtime = Runtime.getRuntime(); long freeMemory = runtime.freeMemory() / bytesPerMegabyte; EnvironmentStats envStats = rootContainer.getEnvironmentStats(new StatsConfig()); long nCacheMiss = envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); float cacheMissRate = 0; if (deltaCount > 0) { cacheMissRate = nCacheMiss/(float)deltaCount; } message = INFO_JEB_REBUILD_CACHE_AND_MEMORY_REPORT.get( freeMemory, cacheMissRate); logError(message); prevEnvStats = envStats; } catch (DatabaseException e) { if (debugEnabled()) { TRACER.debugCaught(DebugLogLevel.ERROR, e); } } previousProcessed = latestProcessed; previousTime = latestTime; } } /** * Construct a new rebuild job. * * @param rebuildConfig The configuration to use for this rebuild job. */ public RebuildJob(RebuildConfig rebuildConfig) { this.rebuildConfig = rebuildConfig; } private static void addJob(RebuildJob job) throws DatabaseException, JebException { //Make sure there are no running rebuild jobs jobsMutex.lock(); try { for(RebuildJob otherJob : rebuildJobs) { String conflictIndex = job.rebuildConfig.checkConflicts(otherJob.rebuildConfig); if(conflictIndex != null) { if(debugEnabled()) { TRACER.debugError("Conflit detected. This job config: %s, " + "That job config: %s.", job.rebuildConfig, otherJob.rebuildConfig); } Message msg = ERR_JEB_REBUILD_INDEX_CONFLICT.get(conflictIndex); throw new JebException(msg); } } //No conflicts are found. Add the job to the list of currently running // jobs. rebuildJobs.add(job); } finally { jobsMutex.unlock(); } } private static void removeJob(RebuildJob job) { jobsMutex.lock(); rebuildJobs.remove(job); jobsMutex.unlock(); } /** * Initiate the rebuild process on a backend. * * @param rootContainer The root container to rebuild in. * @throws DirectoryException If an error occurs during the rebuild process. * @throws DatabaseException If a JE database error occurs during the rebuild * process. * @throws JebException If a JE database error occurs during the rebuild * process. */ public void rebuildBackend(RootContainer rootContainer) throws DirectoryException, DatabaseException, JebException { //TODO: Add check for only performing internal indexType rebuilds when // backend is offline. addJob(this); try { this.rootContainer = rootContainer; EntryContainer entryContainer = rootContainer.getEntryContainer(rebuildConfig.getBaseDN()); ArrayList rebuildList = rebuildConfig.getRebuildList(); if(!rebuildList.isEmpty()) { for (String index : rebuildList) { IndexRebuildThread rebuildThread; String lowerName = index.toLowerCase(); if (lowerName.equals("dn2id")) { rebuildThread = new IndexRebuildThread(entryContainer, IndexRebuildThread.IndexType.DN2ID); } else if (lowerName.equals("dn2uri")) { rebuildThread = new IndexRebuildThread(entryContainer, IndexRebuildThread.IndexType.DN2URI); } else if (lowerName.equals("id2children")) { rebuildThread = new IndexRebuildThread(entryContainer, IndexRebuildThread.IndexType.ID2CHILDREN); } else if (lowerName.equals("id2subtree")) { rebuildThread = new IndexRebuildThread(entryContainer, IndexRebuildThread.IndexType.ID2SUBTREE); } else if (lowerName.startsWith("vlv.")) { if(lowerName.length() < 5) { Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName); throw new JebException(msg); } VLVIndex vlvIndex = entryContainer.getVLVIndex(lowerName.substring(4)); if(vlvIndex == null) { Message msg = ERR_JEB_VLV_INDEX_NOT_CONFIGURED.get(lowerName.substring(4)); throw new JebException(msg); } rebuildThread = new IndexRebuildThread(entryContainer, vlvIndex); } else { String[] attrIndexParts = lowerName.split("\\."); if(attrIndexParts.length <= 0) { Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); throw new JebException(msg); } AttributeType attrType = DirectoryServer.getAttributeType(attrIndexParts[0]); if (attrType == null) { Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); throw new JebException(msg); } AttributeIndex attrIndex = entryContainer.getAttributeIndex(attrType); if (attrIndex == null) { Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); throw new JebException(msg); } if(attrIndexParts.length > 1) { Index partialAttrIndex = null; if(attrIndexParts[1].equals("presence")) { partialAttrIndex = attrIndex.presenceIndex; } else if(attrIndexParts[1].equals("equality")) { partialAttrIndex = attrIndex.equalityIndex; } else if(attrIndexParts[1].equals("substring")) { partialAttrIndex = attrIndex.substringIndex; } else if(attrIndexParts[1].equals("ordering")) { partialAttrIndex = attrIndex.orderingIndex; } else if(attrIndexParts[1].equals("approximate")) { partialAttrIndex = attrIndex.approximateIndex; } if(partialAttrIndex == null) { Message msg = ERR_JEB_ATTRIBUTE_INDEX_NOT_CONFIGURED.get(index); throw new JebException(msg); } rebuildThread = new IndexRebuildThread(entryContainer, partialAttrIndex); } else { rebuildThread = new IndexRebuildThread(entryContainer, attrIndex); } } waitingThreads.add(rebuildThread); if(debugEnabled()) { TRACER.debugInfo("Created rebuild thread %s", rebuildThread.getName()); } } //Log a start message. long totalToProcess = 0; for(IndexRebuildThread thread : waitingThreads) { totalToProcess += thread.getTotalEntries(); } StringBuilder sb = new StringBuilder(); for(String index : rebuildList) { if(sb.length() > 0) { sb.append(", "); } sb.append(index); } Message message = INFO_JEB_REBUILD_START.get(sb.toString(), totalToProcess); logError(message); // Make a note of the time we started. long startTime = System.currentTimeMillis(); // Start a timer for the progress report. Timer timer = new Timer(); TimerTask progressTask = new ProgressTask(); timer.scheduleAtFixedRate(progressTask, progressInterval, progressInterval); entryContainer.exclusiveLock.lock(); try { for(IndexRebuildThread thread : waitingThreads) { thread.clearDatabase(); } } finally { if(!rebuildConfig.includesSystemIndex()) { entryContainer.exclusiveLock.unlock(); } } if(!rebuildConfig.includesSystemIndex()) { entryContainer.sharedLock.lock(); } try { while(!waitingThreads.isEmpty()) { dispatchThreads(); joinThreads(); } } finally { timer.cancel(); if(rebuildConfig.includesSystemIndex()) { entryContainer.exclusiveLock.unlock(); } else { entryContainer.sharedLock.unlock(); } } long totalProcessed = 0; long totalRebuilt = 0; long totalDuplicated = 0; long totalSkipped = 0; for(IndexRebuildThread thread : completedThreads) { totalProcessed += thread.getProcessedEntries(); totalRebuilt += thread.getRebuiltEntries(); totalDuplicated += thread.getDuplicatedEntries(); totalSkipped += thread.getSkippedEntries(); } long finishTime = System.currentTimeMillis(); long totalTime = (finishTime - startTime); float rate = 0; if (totalTime > 0) { rate = 1000f*totalProcessed / totalTime; } message = INFO_JEB_REBUILD_FINAL_STATUS.get( totalProcessed, totalTime/1000, rate); logError(message); if(debugEnabled()) { TRACER.debugInfo("Detailed overall rebuild job stats: rebuilt %d, " + "duplicated %d, skipped %d", totalRebuilt, totalDuplicated, totalSkipped); } } } finally { removeJob(this); } } /** * Dispatch a set of threads based on their dependency and ordering. */ private void dispatchThreads() throws DatabaseException { for(IndexRebuildThread t : waitingThreads) { boolean start = true; //Check to see if we have exceeded the max number of threads to use at //one time. if(rebuildConfig.getMaxRebuildThreads() > 0 && activeThreads.size() > rebuildConfig.getMaxRebuildThreads()) { if(debugEnabled()) { TRACER.debugInfo("Delaying the start of thread %s because " + "the max number of rebuild threads has been reached."); } start = false; } /** * We may need to start the threads in stages since the rebuild process * of some index types (id2children, id2subtree) depends on another * index being rebuilt to be completed first. */ if(t.getIndexType() == IndexRebuildThread.IndexType.ID2CHILDREN || t.getIndexType() == IndexRebuildThread.IndexType.ID2SUBTREE) { //Check to see if we have any waiting threads that needs to go //first for(IndexRebuildThread t2 : waitingThreads) { if(t2.getIndexType() == IndexRebuildThread.IndexType.DN2ID || t2.getIndexType() == IndexRebuildThread.IndexType.DN2URI) { //We gotta wait for these to start before running the //rebuild on ID2CHILDREN or ID2SUBTREE if(debugEnabled()) { TRACER.debugInfo("Delaying the start of thread %s because " + "it depends on another index rebuilt to " + "go first.", t.getName()); } start = false; break; } } //Check to see if we have any active threads that needs to //finish first for(IndexRebuildThread t3 : activeThreads) { if(t3.getIndexType() == IndexRebuildThread.IndexType.DN2ID || t3.getIndexType() == IndexRebuildThread.IndexType.DN2URI) { //We gotta wait for these to start before running the //rebuild on ID2CHILDREN or ID2SUBTREE if(debugEnabled()) { TRACER.debugInfo("Delaying the start of thread %s because " + "it depends on another index being rebuilt to " + "finish.", t.getName()); } start = false; break; } } } if(start) { if(debugEnabled()) { TRACER.debugInfo("Starting rebuild thread %s.", t.getName()); } waitingThreads.remove(t); activeThreads.add(t); t.start(); } } } /** * Wait for all worker activeThreads to exit. */ private void joinThreads() { for (IndexRebuildThread t : activeThreads) { try { t.join(); if(debugEnabled()) { TRACER.debugInfo("Rebuild thread %s finished.", t.getName()); } activeThreads.remove(t); completedThreads.add(t); } catch (InterruptedException ie) { // No action needed? } } } }