| | |
| | | |
| | | package org.opends.server.backends.jeb.importLDIF; |
| | | |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.loggers.debug.DebugLogger.getTracer; |
| | | import static org.opends.server.loggers.debug.DebugLogger.debugEnabled; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | |
| | | import static org.opends.messages.JebMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | |
| | | import java.io.*; |
| | | import java.nio.*; |
| | | import java.nio.channels.FileChannel; |
| | | import java.util.*; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.atomic.*; |
| | | |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.server.LocalDBBackendCfg; |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.util.StaticUtils; |
| | | import org.opends.server.util.LDIFException; |
| | | import org.opends.server.util.RuntimeInformation; |
| | | import static org.opends.server.util.DynamicConstants.BUILD_ID; |
| | | import static org.opends.server.util.DynamicConstants.REVISION_NUMBER; |
| | | import org.opends.server.backends.jeb.*; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.backends.jeb.*; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.JebMessages; |
| | | import static org.opends.messages.JebMessages.*; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.*; |
| | | import java.io.IOException; |
| | | |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.*; |
| | | import com.sleepycat.je.*; |
| | | |
| | | |
| | | /** |
| | | * Performs a LDIF import. |
| | | */ |
| | | public class Importer |
| | | { |
| | | private final int DRAIN_TO = 3; |
| | | private final int TIMER_INTERVAL = 10000; |
| | | private final int MB = (1024 * 1024); |
| | | private final int LDIF_READER_BUF_SIZE = 2 * MB; |
| | | private final int MIN_IMPORT_MEM_REQUIRED = 16 * MB; |
| | | private final int MAX_BUFFER_SIZE = 48 * MB; |
| | | private final int MIN_BUFFER_SIZE = 1024 * 100; |
| | | private final int MIN_READ_AHEAD_CACHE_SIZE = 4096; |
| | | private final int MAX_DB_CACHE_SIZE = 128 * MB; |
| | | private final int MIN_DB_CACHE_SIZE = 16 * MB; |
| | | private final int MAX_DB_LOG_BUF_BYTES = 100 * MB; |
| | | private final int MEM_PCT_PHASE_1 = 60; |
| | | private final int MEM_PCT_PHASE_2 = 50; |
| | | |
| | | public class Importer implements Thread.UncaughtExceptionHandler { |
| | | private final String DIRECT_PROPERTY = "import.directphase2"; |
| | | |
| | | private final AtomicInteger bufferCount = new AtomicInteger(0); |
| | | private final File tempDir; |
| | | private final int indexCount, threadCount; |
| | | private final boolean dn2idPhase2; |
| | | private final LDIFImportConfig config; |
| | | private final ByteBuffer directBuffer; |
| | | |
| | | /** |
| | | * The tracer object for the debug logger. |
| | | */ |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * The JE backend configuration. |
| | | */ |
| | | private LocalDBBackendCfg config; |
| | | |
| | | /** |
| | | * The root container used for this import job. |
| | | */ |
| | | private RootContainer rootContainer; |
| | | |
| | | /** |
| | | * The LDIF import configuration. |
| | | */ |
| | | private LDIFImportConfig ldifImportConfig; |
| | | |
| | | /** |
| | | * The LDIF reader. |
| | | */ |
| | | private LDIFReader reader; |
| | | |
| | | /** |
| | | * Map of base DNs to their import context. |
| | | */ |
| | | private LinkedHashMap<DN, DNContext> importMap = |
| | | new LinkedHashMap<DN, DNContext>(); |
| | | private int bufferSize; |
| | | private long dbCacheSize = 0, dbLogBufSize = 0; |
| | | |
| | | |
| | | /** |
| | | * The number of entries migrated. |
| | | */ |
| | | private int migratedCount; |
| | | //The executor service used for the sort tasks. |
| | | private ExecutorService sortService; |
| | | |
| | | /** |
| | | * The number of entries imported. |
| | | */ |
| | | private int importedCount; |
| | | //The executor service used for the index processing tasks. |
| | | private ExecutorService indexProcessService; |
| | | |
| | | /** |
| | | * The number of milliseconds between job progress reports. |
| | | */ |
| | | private long progressInterval = 10000; |
| | | //Queue of free index buffers -- used to re-cycle index buffers; |
| | | private final BlockingQueue<IndexBuffer> freeBufQue = |
| | | new LinkedBlockingQueue<IndexBuffer>(); |
| | | |
| | | /** |
| | | * The progress report timer. |
| | | */ |
| | | private Timer timer; |
| | | //Map of DB containers to que of index buffers. Used to allocate sorted |
| | | //index buffers to a index writer thread. |
| | | private final |
| | | Map<DatabaseContainer, BlockingQueue<IndexBuffer>> containerQueMap = |
| | | new LinkedHashMap<DatabaseContainer, BlockingQueue<IndexBuffer>>(); |
| | | |
| | | //Thread array. |
| | | private CopyOnWriteArrayList<WorkThread> threads; |
| | | //Map of DB containers to index managers. Used to start phase 2. |
| | | private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap = |
| | | new LinkedHashMap<DatabaseContainer, IndexManager>(); |
| | | |
| | | //Progress task. |
| | | private ProgressTask pTask; |
| | | //Futures used to indicate when the index file writers are done flushing |
| | | //their work queues and have exited. End of phase one. |
| | | private final List<Future<?>> indexWriterFutures; |
| | | |
| | | //Number of entries import before checking if cleaning is needed after |
| | | //eviction has been detected. |
| | | private static final int entryCleanInterval = 250000; |
| | | //List of index file writer tasks. Used to signal stopIndexWriterTasks to the |
| | | //index file writer tasks when the LDIF file has been done. |
| | | private final List<IndexFileWriterTask> indexWriterList; |
| | | |
| | | //Minimum buffer amount to give to a buffer manager. |
| | | private static final long minBuffer = 1024 * 1024; |
| | | |
| | | //Total available memory for the buffer managers. |
| | | private long totalAvailBufferMemory = 0; |
| | | |
| | | //Memory size to be used for the DB cache in string format. |
| | | private String dbCacheSizeStr; |
| | | |
| | | //Used to do an initial clean after eviction has been detected. |
| | | private boolean firstClean=false; |
| | | |
| | | //A thread threw an Runtime exception stop the import. |
| | | private boolean unCaughtExceptionThrown = false; |
| | | |
| | | //Set to true if substring indexes are defined. |
| | | private boolean hasSubIndexes = false; |
| | | |
| | | //Work thread 0, used to add the first 20 or so entries single threaded. |
| | | private WorkThread workThread0; |
| | | |
| | | //Counter for thread 0; |
| | | private int worker0Proc=0; |
| | | |
| | | //Max thread 0 adds. |
| | | private static final int maxWorker0 = 20; |
| | | //Map of DNs to Suffix objects. Placeholder for when multiple suffixes are |
| | | //supported. |
| | | private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>(); |
| | | |
| | | /** |
| | | * Create a new import job with the specified ldif import config. |
| | | * |
| | | * @param ldifImportConfig The LDIF import config. |
| | | * @param hasSubIndexes <CODE>True</CODE> If substring indexes are defined. |
| | | * @param config The LDIF import config. |
| | | * @param cfg The local DB backend config. |
| | | * @throws IOException If a problem occurs while opening the LDIF file for |
| | | * reading. |
| | | */ |
| | | public Importer(LDIFImportConfig ldifImportConfig, boolean hasSubIndexes) |
| | | public Importer(LDIFImportConfig config, |
| | | LocalDBBackendCfg cfg ) |
| | | throws IOException |
| | | { |
| | | this.ldifImportConfig = ldifImportConfig; |
| | | this.threads = new CopyOnWriteArrayList<WorkThread>(); |
| | | this.hasSubIndexes = hasSubIndexes; |
| | | calcMemoryLimits(); |
| | | this.config = config; |
| | | threadCount = cfg.getImportThreadCount(); |
| | | indexCount = cfg.listLocalDBIndexes().length + 2; |
| | | indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount); |
| | | indexWriterFutures = new CopyOnWriteArrayList<Future<?>>(); |
| | | File parentDir; |
| | | if(config.getTmpDirectory() == null) |
| | | { |
| | | parentDir = getFileForPath("import-tmp"); |
| | | } |
| | | else |
| | | { |
| | | parentDir = getFileForPath(config.getTmpDirectory()); |
| | | } |
| | | tempDir = new File(parentDir, cfg.getBackendId()); |
| | | if(!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get( |
| | | String.valueOf(tempDir)); |
| | | throw new IOException(msg.toString()); |
| | | } |
| | | if (tempDir.listFiles() != null) |
| | | { |
| | | for (File f : tempDir.listFiles()) |
| | | { |
| | | f.delete(); |
| | | } |
| | | } |
| | | dn2idPhase2 = config.getDNCheckPhase2(); |
| | | String propString = System.getProperty(DIRECT_PROPERTY); |
| | | if(propString != null) |
| | | { |
| | | int directSize = Integer.valueOf(propString); |
| | | directBuffer = ByteBuffer.allocateDirect(directSize); |
| | | } |
| | | else |
| | | { |
| | | directBuffer = null; |
| | | } |
| | | } |
| | | |
| | | private void getBufferSizes(long availMem, int buffers) |
| | | { |
| | | long mem = availMem - (MAX_DB_CACHE_SIZE + MAX_DB_LOG_BUF_BYTES); |
| | | bufferSize = (int) (mem/buffers); |
| | | if(bufferSize >= MIN_BUFFER_SIZE) |
| | | { |
| | | dbCacheSize = MAX_DB_CACHE_SIZE; |
| | | dbLogBufSize = MAX_DB_LOG_BUF_BYTES; |
| | | if(bufferSize > MAX_BUFFER_SIZE) |
| | | { |
| | | bufferSize = MAX_BUFFER_SIZE; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | mem = availMem - MIN_DB_CACHE_SIZE - (MIN_DB_CACHE_SIZE * 7) / 100; |
| | | bufferSize = (int) (mem/buffers); |
| | | dbCacheSize = MIN_DB_CACHE_SIZE; |
| | | if(bufferSize < MIN_BUFFER_SIZE) |
| | | { |
| | | System.out.println("Log size less than default -- give it a try"); |
| | | bufferSize = MIN_BUFFER_SIZE; |
| | | } |
| | | else |
| | | { |
| | | long constrainedMem = mem - (buffers * MIN_BUFFER_SIZE); |
| | | bufferSize = (int) ((buffers * MIN_BUFFER_SIZE) + |
| | | (constrainedMem * 50/100)); |
| | | bufferSize /= buffers; |
| | | dbCacheSize = MIN_DB_CACHE_SIZE + (constrainedMem * 50/100); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the suffix instance in the specified map that matches the specified |
| | | * DN. |
| | | * |
| | | * @param dn The DN to search for. |
| | | * @param map The map to search. |
| | | * @return The suffix instance that matches the DN, or null if no match is |
| | | * found. |
| | | */ |
| | | public static Suffix getMatchSuffix(DN dn, Map<DN, Suffix> map) |
| | | { |
| | | Suffix suffix = null; |
| | | DN nodeDN = dn; |
| | | |
| | | while (suffix == null && nodeDN != null) { |
| | | suffix = map.get(nodeDN); |
| | | if (suffix == null) |
| | | { |
| | | nodeDN = nodeDN.getParentDNInSuffix(); |
| | | } |
| | | } |
| | | return suffix; |
| | | } |
| | | |
| | | /** |
| | | * Start the worker threads. |
| | | * Calculate buffer sizes and initialize JEB properties based on memory. |
| | | * |
| | | * @throws DatabaseException If a DB problem occurs. |
| | | * @param envConfig The environment config to use in the calculations. |
| | | * |
| | | * @throws InitializationException If a problem occurs during calculation. |
| | | */ |
| | | private void startWorkerThreads() |
| | | throws DatabaseException { |
| | | |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | //Figure out how much buffer memory to give to each context. |
| | | int contextCount = importMap.size(); |
| | | long memoryPerContext = totalAvailBufferMemory / contextCount; |
| | | //Below min, use the min value. |
| | | if(memoryPerContext < minBuffer) { |
| | | Message msg = |
| | | NOTE_JEB_IMPORT_LDIF_BUFFER_CONTEXT_AVAILMEM.get(memoryPerContext, |
| | | minBuffer); |
| | | public void init(EnvironmentConfig envConfig) |
| | | throws InitializationException |
| | | { |
| | | Message msg; |
| | | Runtime runtime = Runtime.getRuntime(); |
| | | long freeMemory = runtime.freeMemory(); |
| | | long availMemImport = (freeMemory * MEM_PCT_PHASE_1) / 100; |
| | | int phaseOneBuffers = 2 * (indexCount * threadCount); |
| | | msg = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(availMemImport, phaseOneBuffers); |
| | | logError(msg); |
| | | if (availMemImport < MIN_IMPORT_MEM_REQUIRED) |
| | | { |
| | | msg = ERR_IMPORT_LDIF_LACK_MEM.get(16); |
| | | throw new InitializationException(msg); |
| | | } |
| | | getBufferSizes(availMemImport, phaseOneBuffers); |
| | | envConfig.setConfigParam("je.maxMemory", Long.toString(dbCacheSize)); |
| | | msg = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize); |
| | | logError(msg); |
| | | if(dbLogBufSize != 0) |
| | | { |
| | | envConfig.setConfigParam("je.log.totalBufferBytes", |
| | | Long.toString(dbLogBufSize)); |
| | | msg = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(dbLogBufSize); |
| | | logError(msg); |
| | | memoryPerContext = minBuffer; |
| | | } |
| | | // Create one set of worker threads/buffer managers for each base DN. |
| | | for (DNContext context : importMap.values()) { |
| | | BufferManager bufferManager = |
| | | new BufferManager(memoryPerContext); |
| | | context.setBufferManager(bufferManager); |
| | | for (int i = 0; i < importThreadCount; i++) { |
| | | WorkThread t = new WorkThread(context.getWorkQueue(), i, |
| | | bufferManager, rootContainer, importMap); |
| | | t.setUncaughtExceptionHandler(this); |
| | | threads.add(t); |
| | | if(i == 0) { |
| | | workThread0 = t; |
| | | } |
| | | t.start(); |
| | | } |
| | | } |
| | | // Start a timer for the progress report. |
| | | timer = new Timer(); |
| | | TimerTask progressTask = new ProgressTask(); |
| | | //Used to get at extra functionality such as eviction detected. |
| | | pTask = (ProgressTask) progressTask; |
| | | timer.scheduleAtFixedRate(progressTask, progressInterval, |
| | | progressInterval); |
| | | |
| | | return; |
| | | } |
| | | |
| | | |
| | | private void initIndexBuffers(int threadCount) |
| | | { |
| | | int bufferCount = 2 * (indexCount * threadCount); |
| | | for(int i = 0; i < bufferCount; i++) |
| | | { |
| | | IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize); |
| | | freeBufQue.add(b); |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private void initSuffixes() |
| | | throws ConfigException, InitializationException |
| | | { |
| | | Iterator<EntryContainer> i = rootContainer.getEntryContainers().iterator(); |
| | | EntryContainer ec = i.next(); |
| | | Suffix suffix = Suffix.createSuffixContext(ec, config, rootContainer); |
| | | dnSuffixMap.put(ec.getBaseDN(), suffix); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Import a ldif using the specified root container. |
| | | * |
| | | * @param rootContainer The root container. |
| | | * @param rootContainer The root container to use during the import. |
| | | * |
| | | * @return A LDIF result. |
| | | * @throws DatabaseException If a DB error occurs. |
| | | * @throws IOException If a IO error occurs. |
| | | * @throws org.opends.server.backends.jeb.JebException If a JEB error occurs. |
| | | * @throws DirectoryException If a directory error occurs. |
| | | * @throws ConfigException If a configuration has an error. |
| | | * @throws ConfigException If the import failed because of an configuration |
| | | * error. |
| | | * @throws IOException If the import failed because of an IO error. |
| | | * @throws InitializationException If the import failed because of an |
| | | * initialization error. |
| | | * @throws JebException If the import failed due to a database error. |
| | | * @throws InterruptedException If the import failed due to an interrupted |
| | | * error. |
| | | * @throws ExecutionException If the import failed due to an execution error. |
| | | */ |
| | | public LDIFImportResult processImport(RootContainer rootContainer) |
| | | throws DatabaseException, IOException, JebException, DirectoryException, |
| | | ConfigException { |
| | | |
| | | // Create an LDIF reader. Throws an exception if the file does not exist. |
| | | reader = new LDIFReader(ldifImportConfig); |
| | | public LDIFImportResult |
| | | processImport(RootContainer rootContainer) throws ConfigException, |
| | | InitializationException, IOException, JebException, |
| | | InterruptedException, ExecutionException |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | this.config = rootContainer.getConfiguration(); |
| | | |
| | | Message message; |
| | | long startTime; |
| | | try { |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | message = NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(), |
| | | BUILD_ID, REVISION_NUMBER); |
| | | logError(message); |
| | | message = NOTE_JEB_IMPORT_THREAD_COUNT.get(importThreadCount); |
| | | logError(message); |
| | | RuntimeInformation.logInfo(); |
| | | for (EntryContainer entryContainer : rootContainer.getEntryContainers()) { |
| | | DNContext DNContext = getImportContext(entryContainer); |
| | | if(DNContext != null) { |
| | | importMap.put(entryContainer.getBaseDN(), DNContext); |
| | | } |
| | | } |
| | | // Make a note of the time we started. |
| | | startTime = System.currentTimeMillis(); |
| | | startWorkerThreads(); |
| | | try { |
| | | importedCount = 0; |
| | | migratedCount = 0; |
| | | migrateExistingEntries(); |
| | | processLDIF(); |
| | | migrateExcludedEntries(); |
| | | } finally { |
| | | if(!unCaughtExceptionThrown) { |
| | | cleanUp(); |
| | | switchContainers(); |
| | | } |
| | | } |
| | | } |
| | | finally { |
| | | reader.close(); |
| | | } |
| | | importProlog(startTime); |
| | | return new LDIFImportResult(reader.getEntriesRead(), |
| | | reader.getEntriesRejected(), |
| | | reader.getEntriesIgnored()); |
| | | } |
| | | |
| | | /** |
| | | * Switch containers if the migrated entries were written to the temporary |
| | | * container. |
| | | * |
| | | * @throws DatabaseException If a DB problem occurs. |
| | | * @throws JebException If a JEB problem occurs. |
| | | */ |
| | | private void switchContainers() throws DatabaseException, JebException { |
| | | |
| | | for(DNContext importContext : importMap.values()) { |
| | | DN baseDN = importContext.getBaseDN(); |
| | | EntryContainer srcEntryContainer = |
| | | importContext.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null) { |
| | | if (debugEnabled()) { |
| | | TRACER.debugInfo("Deleteing old entry container for base DN " + |
| | | "%s and renaming temp entry container", baseDN); |
| | | } |
| | | EntryContainer unregEC = |
| | | rootContainer.unregisterEntryContainer(baseDN); |
| | | //Make sure the unregistered EC for the base DN is the same as |
| | | //the one in the import context. |
| | | if(unregEC != srcEntryContainer) { |
| | | if(debugEnabled()) { |
| | | TRACER.debugInfo("Current entry container used for base DN " + |
| | | "%s is not the same as the source entry container used " + |
| | | "during the migration process.", baseDN); |
| | | } |
| | | rootContainer.registerEntryContainer(baseDN, unregEC); |
| | | continue; |
| | | } |
| | | srcEntryContainer.lock(); |
| | | srcEntryContainer.close(); |
| | | srcEntryContainer.delete(); |
| | | srcEntryContainer.unlock(); |
| | | EntryContainer newEC = importContext.getEntryContainer(); |
| | | newEC.lock(); |
| | | newEC.setDatabasePrefix(baseDN.toNormalizedString()); |
| | | newEC.unlock(); |
| | | rootContainer.registerEntryContainer(baseDN, newEC); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Create and log messages at the end of the successful import. |
| | | * |
| | | * @param startTime The time the import started. |
| | | */ |
| | | private void importProlog(long startTime) { |
| | | Message message; |
| | | this.reader = new LDIFReader(config, rootContainer, LDIF_READER_BUF_SIZE); |
| | | Message message = |
| | | NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(), |
| | | BUILD_ID, REVISION_NUMBER); |
| | | logError(message); |
| | | message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount); |
| | | logError(message); |
| | | RuntimeInformation.logInfo(); |
| | | initSuffixes(); |
| | | long startTime = System.currentTimeMillis(); |
| | | processPhaseOne(); |
| | | processPhaseTwo(); |
| | | setIndexesTrusted(); |
| | | tempDir.delete(); |
| | | long finishTime = System.currentTimeMillis(); |
| | | long importTime = (finishTime - startTime); |
| | | |
| | | float rate = 0; |
| | | if (importTime > 0) |
| | | rate = 1000f * reader.getEntriesRead() / importTime; |
| | | message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(), |
| | | reader.getEntriesRead(), reader.getEntriesIgnored(), reader |
| | | .getEntriesRejected(), 0, importTime / 1000, rate); |
| | | logError(message); |
| | | return new LDIFImportResult(reader.getEntriesRead(), reader |
| | | .getEntriesRejected(), reader.getEntriesIgnored()); |
| | | } |
| | | |
| | | |
| | | private void setIndexesTrusted() throws JebException |
| | | { |
| | | try { |
| | | for(Suffix s : dnSuffixMap.values()) { |
| | | s.setIndexesTrusted(); |
| | | } |
| | | } |
| | | catch (DatabaseException ex) |
| | | { |
| | | rate = 1000f*importedCount / importTime; |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_TRUSTED_FAILED.get(ex.getMessage()); |
| | | throw new JebException(msg); |
| | | } |
| | | } |
| | | |
| | | message = NOTE_JEB_IMPORT_FINAL_STATUS. |
| | | get(reader.getEntriesRead(), importedCount, |
| | | reader.getEntriesIgnored(), reader.getEntriesRejected(), |
| | | migratedCount, importTime/1000, rate); |
| | | logError(message); |
| | | |
| | | message = NOTE_JEB_IMPORT_ENTRY_LIMIT_EXCEEDED_COUNT.get( |
| | | getEntryLimitExceededCount()); |
| | | logError(message); |
| | | private void processPhaseOne() throws InterruptedException, ExecutionException |
| | | { |
| | | initIndexBuffers(threadCount); |
| | | FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask(); |
| | | Timer timer = new Timer(); |
| | | timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL); |
| | | indexProcessService = Executors.newFixedThreadPool(2 * indexCount); |
| | | sortService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | //Import tasks are collective tasks. |
| | | List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new ImportTask()); |
| | | } |
| | | ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | List<Future<Void>> results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | assert result.isDone(); |
| | | stopIndexWriterTasks(); |
| | | for (Future<?> result : indexWriterFutures) |
| | | { |
| | | result.get(); |
| | | } |
| | | execService.shutdown(); |
| | | freeBufQue.clear(); |
| | | sortService.shutdown(); |
| | | timer.cancel(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void processPhaseTwo() throws InterruptedException |
| | | { |
| | | SecondPhaseProgressTask progress2Task = |
| | | new SecondPhaseProgressTask(containerIndexMgrMap); |
| | | Timer timer2 = new Timer(); |
| | | timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL); |
| | | processIndexFiles(); |
| | | timer2.cancel(); |
| | | } |
| | | |
| | | |
| | | |
| | | private void processIndexFiles() throws InterruptedException |
| | | { |
| | | List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(indexCount); |
| | | if(bufferCount.get() == 0) |
| | | { |
| | | return; |
| | | } |
| | | int cacheSize = cacheSizeFromFreeMemory(); |
| | | int p = 0; |
| | | int offSet = 0; |
| | | if(directBuffer != null) |
| | | { |
| | | cacheSize = cacheSizeFromDirectMemory(); |
| | | } |
| | | for(Map.Entry<DatabaseContainer, IndexManager> e : |
| | | containerIndexMgrMap.entrySet()) |
| | | { |
| | | DatabaseContainer container = e.getKey(); |
| | | IndexManager indexMgr = e.getValue(); |
| | | boolean isDN2ID = false; |
| | | if(container instanceof DN2ID) |
| | | { |
| | | isDN2ID = true; |
| | | } |
| | | if(directBuffer != null) |
| | | { |
| | | int cacheSizes = cacheSize * indexMgr.getBufferList().size(); |
| | | offSet += cacheSizes; |
| | | directBuffer.limit(offSet); |
| | | directBuffer.position(p); |
| | | ByteBuffer b = directBuffer.slice(); |
| | | tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, b, cacheSize)); |
| | | p += cacheSizes; |
| | | } |
| | | else |
| | | { |
| | | tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, cacheSize)); |
| | | } |
| | | } |
| | | List<Future<Void>> results = indexProcessService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | assert result.isDone(); |
| | | indexProcessService.shutdown(); |
| | | } |
| | | |
| | | |
| | | private int cacheSizeFromDirectMemory() |
| | | { |
| | | int cap = directBuffer.capacity(); |
| | | int cacheSize = cap/bufferCount.get(); |
| | | if(cacheSize > bufferSize) |
| | | { |
| | | cacheSize = bufferSize; |
| | | } |
| | | System.out.println("Direct indexes begin Total bufferCount: " + |
| | | bufferCount.get() + " cacheSize: " + cacheSize); |
| | | return cacheSize; |
| | | } |
| | | |
| | | private int cacheSizeFromFreeMemory() |
| | | { |
| | | Runtime runtime = Runtime.getRuntime(); |
| | | long availMemory = runtime.freeMemory() * MEM_PCT_PHASE_2 / 100; |
| | | int avgBufSize = (int)(availMemory / bufferCount.get()); |
| | | int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, avgBufSize); |
| | | if(cacheSize > bufferSize) |
| | | { |
| | | cacheSize = bufferSize; |
| | | } |
| | | System.out.println("Indirect indexes begin Total bufferCount: " + |
| | | bufferCount.get() + " avgBufSize: " |
| | | + avgBufSize + " cacheSize: " + cacheSize); |
| | | return cacheSize; |
| | | } |
| | | |
| | | |
| | | private void stopIndexWriterTasks() |
| | | { |
| | | IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0); |
| | | for(IndexFileWriterTask task : indexWriterList) |
| | | { |
| | | task.que.add(idxBuffer); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Run the cleaner if it is needed. |
| | | * |
| | | * @param entriesRead The number of entries read so far. |
| | | * @param evictEntryNumber The number of entries to run the cleaner after |
| | | * being read. |
| | | * @throws DatabaseException If a DB problem occurs. |
| | | * This task processes the LDIF file during phase 1. |
| | | */ |
| | | private void |
| | | runCleanerIfNeeded(long entriesRead, long evictEntryNumber) |
| | | throws DatabaseException { |
| | | if(!firstClean || (entriesRead % evictEntryNumber) == 0) { |
| | | //Make sure work queue is empty before starting. |
| | | drainWorkQueue(); |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_CLEAN.get(); |
| | | runCleaner(msg); |
| | | if(!firstClean) { |
| | | firstClean=true; |
| | | } |
| | | } |
| | | } |
| | | private final class ImportTask implements Callable<Void> { |
| | | private final Map<Suffix, Map<DatabaseContainer, IndexBuffer>> suffixMap = |
| | | new HashMap<Suffix, Map<DatabaseContainer, IndexBuffer>>(); |
| | | private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); |
| | | private final IndexBuffer.DNComparator dnComparator |
| | | = new IndexBuffer.DNComparator(); |
| | | private final IndexBuffer.IndexComparator indexComparator = |
| | | new IndexBuffer.IndexComparator(); |
| | | |
| | | /** |
| | | * Run the cleaner, pausing the task thread output. |
| | | * |
| | | * @param header Message to be printed before cleaning. |
| | | * @throws DatabaseException If a DB problem occurs. |
| | | */ |
| | | private void runCleaner(Message header) throws DatabaseException { |
| | | Message msg; |
| | | long startTime = System.currentTimeMillis(); |
| | | //Need to force a checkpoint. |
| | | rootContainer.importForceCheckPoint(); |
| | | logError(header); |
| | | pTask.setPause(true); |
| | | //Actually clean the files. |
| | | int cleaned = rootContainer.cleanedLogFiles(); |
| | | //This checkpoint removes the files if any were cleaned. |
| | | if(cleaned > 0) { |
| | | msg = NOTE_JEB_IMPORT_LDIF_CLEANER_REMOVE_LOGS.get(cleaned); |
| | | logError(msg); |
| | | rootContainer.importForceCheckPoint(); |
| | | } |
| | | pTask.setPause(false); |
| | | long finishTime = System.currentTimeMillis(); |
| | | long cleanTime = (finishTime - startTime) / 1000; |
| | | msg = NOTE_JEB_IMPORT_LDIF_CLEANER_RUN_DONE.get(cleanTime, cleaned); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | | * Process a LDIF reader. |
| | | * |
| | | * @throws JebException If a JEB problem occurs. |
| | | * @throws DatabaseException If a DB problem occurs. |
| | | * @throws IOException If an IO exception occurs. |
| | | */ |
| | | private void |
| | | processLDIF() throws JebException, DatabaseException, IOException { |
| | | Message message = NOTE_JEB_IMPORT_LDIF_START.get(); |
| | | logError(message); |
| | | do { |
| | | if (ldifImportConfig.isCancelled()) { |
| | | break; |
| | | } |
| | | if(threads.size() <= 0) { |
| | | message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); |
| | | throw new JebException(message); |
| | | } |
| | | if(unCaughtExceptionThrown) { |
| | | abortImport(); |
| | | } |
| | | try { |
| | | // Read the next entry. |
| | | Entry entry = reader.readEntry(); |
| | | // Check for end of file. |
| | | if (entry == null) { |
| | | message = NOTE_JEB_IMPORT_LDIF_END.get(); |
| | | logError(message); |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | Suffix suffix = null; |
| | | while (true) |
| | | { |
| | | if (config.isCancelled()) |
| | | { |
| | | IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0); |
| | | freeBufQue.add(idxBuffer); |
| | | return null; |
| | | } |
| | | Entry entry = reader.readEntry(dnSuffixMap); |
| | | |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | // Route it according to base DN. |
| | | DNContext DNContext = getImportConfig(entry.getDN()); |
| | | processEntry(DNContext, entry); |
| | | //If the progress task has noticed eviction proceeding, start running |
| | | //the cleaner. |
| | | if(pTask.isEvicting()) { |
| | | runCleanerIfNeeded(reader.getEntriesRead(), entryCleanInterval); |
| | | } |
| | | } catch (LDIFException e) { |
| | | if (debugEnabled()) { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } catch (DirectoryException e) { |
| | | if (debugEnabled()) { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } catch (DatabaseException e) { |
| | | if (debugEnabled()) { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | } while (true); |
| | | } |
| | | |
| | | /** |
| | | * Process an entry using the specified import context. |
| | | * |
| | | * @param DNContext The import context. |
| | | * @param entry The entry to process. |
| | | */ |
| | | private void processEntry(DNContext DNContext, Entry entry) |
| | | throws DirectoryException, DatabaseException, JebException { |
| | | if(worker0Proc < maxWorker0) { |
| | | DNContext.addPending(entry.getDN()); |
| | | WorkElement element = |
| | | WorkElement.decode(entry, DNContext); |
| | | workThread0.process(element); |
| | | worker0Proc++; |
| | | } else { |
| | | //Add this DN to the pending map. |
| | | DNContext.addPending(entry.getDN()); |
| | | addEntryQueue(DNContext, entry); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Add work item to specified import context's queue. |
| | | * @param context The import context. |
| | | * @param item The work item to add. |
| | | * @return <CODE>True</CODE> if the the work item was added to the queue. |
| | | */ |
| | | private boolean |
| | | addQueue(DNContext context, WorkElement item) { |
| | | try { |
| | | while(!context.getWorkQueue().offer(item, 1000, |
| | | TimeUnit.MILLISECONDS)) { |
| | | if(threads.size() <= 0) { |
| | | // All worker threads died. We must stop now. |
| | | return false; |
| | | } |
| | | } |
| | | } catch (InterruptedException e) { |
| | | if (debugEnabled()) { |
| | | TRACER.debugCaught(DebugLogLevel.ERROR, e); |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Wait until the work queue is empty. |
| | | */ |
| | | private void drainWorkQueue() { |
| | | if(threads.size() > 0) { |
| | | for (DNContext context : importMap.values()) { |
| | | while (context.getWorkQueue().size() > 0) { |
| | | try { |
| | | Thread.sleep(100); |
| | | } catch (Exception e) { |
| | | // No action needed. |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void abortImport() throws JebException { |
| | | //Stop work threads telling them to skip substring flush. |
| | | stopWorkThreads(false); |
| | | timer.cancel(); |
| | | Message message = ERR_JEB_IMPORT_LDIF_ABORT.get(); |
| | | throw new JebException(message); |
| | | } |
| | | |
| | | /** |
| | | * Stop work threads. |
| | | * |
| | | * @param abort <CODE>True</CODE> if stop work threads was called from an |
| | | * abort. |
| | | * @throws JebException if a Jeb error occurs. |
| | | */ |
| | | private void |
| | | stopWorkThreads(boolean abort) throws JebException { |
| | | for (WorkThread t : threads) { |
| | | t.stopProcessing(); |
| | | } |
| | | // Wait for each thread to stop. |
| | | for (WorkThread t : threads) { |
| | | try { |
| | | if(!abort && unCaughtExceptionThrown) { |
| | | timer.cancel(); |
| | | Message message = ERR_JEB_IMPORT_LDIF_ABORT.get(); |
| | | throw new JebException(message); |
| | | } |
| | | t.join(); |
| | | importedCount += t.getImportedCount(); |
| | | } catch (InterruptedException ie) { |
| | | // No action needed? |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Clean up after a successful import. |
| | | * |
| | | * @throws DatabaseException If a DB error occurs. |
| | | * @throws JebException If a Jeb error occurs. |
| | | */ |
| | | private void cleanUp() throws DatabaseException, JebException { |
| | | Message msg; |
| | | //Drain the work queue. |
| | | drainWorkQueue(); |
| | | pTask.setPause(true); |
| | | long startTime = System.currentTimeMillis(); |
| | | stopWorkThreads(true); |
| | | //Flush the buffer managers. |
| | | for(DNContext context : importMap.values()) { |
| | | context.getBufferManager().prepareFlush(); |
| | | context.getBufferManager().flushAll(); |
| | | } |
| | | long finishTime = System.currentTimeMillis(); |
| | | long flushTime = (finishTime - startTime) / 1000; |
| | | msg = NOTE_JEB_IMPORT_LDIF_BUFFER_FLUSH_COMPLETED.get(flushTime); |
| | | logError(msg); |
| | | timer.cancel(); |
| | | for(DNContext context : importMap.values()) { |
| | | context.setIndexesTrusted(); |
| | | } |
| | | msg = NOTE_JEB_IMPORT_LDIF_FINAL_CLEAN.get(); |
| | | //Run the cleaner. |
| | | runCleaner(msg); |
| | | closeIndexCursors(); |
| | | } |
| | | |
| | | |
| | | private void closeIndexCursors() throws DatabaseException { |
| | | for (DNContext ic : importMap.values()) |
| | | { |
| | | ic.getEntryContainer().closeIndexCursors(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Uncaught exception handler. |
| | | * |
| | | * @param t The thread working when the exception was thrown. |
| | | * @param e The exception. |
| | | */ |
| | | public void uncaughtException(Thread t, Throwable e) { |
| | | unCaughtExceptionThrown = true; |
| | | threads.remove(t); |
| | | Message msg = ERR_JEB_IMPORT_THREAD_EXCEPTION.get( |
| | | t.getName(), StaticUtils.stackTraceToSingleLineString(e.getCause())); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | | * Get the entry limit exceeded counts from the indexes. |
| | | * |
| | | * @return Count of the index with entry limit exceeded values. |
| | | */ |
| | | private int getEntryLimitExceededCount() { |
| | | int count = 0; |
| | | for (DNContext ic : importMap.values()) |
| | | { |
| | | count += ic.getEntryContainer().getEntryLimitExceededCount(); |
| | | } |
| | | return count; |
| | | } |
| | | |
| | | /** |
| | | * Return an import context related to the specified DN. |
| | | * @param dn The dn. |
| | | * @return An import context. |
| | | * @throws DirectoryException If an directory error occurs. |
| | | */ |
| | | private DNContext getImportConfig(DN dn) throws DirectoryException { |
| | | DNContext DNContext = null; |
| | | DN nodeDN = dn; |
| | | |
| | | while (DNContext == null && nodeDN != null) { |
| | | DNContext = importMap.get(nodeDN); |
| | | if (DNContext == null) |
| | | { |
| | | nodeDN = nodeDN.getParentDNInSuffix(); |
| | | } |
| | | } |
| | | |
| | | if (nodeDN == null) { |
| | | // The entry should not have been given to this backend. |
| | | Message message = |
| | | JebMessages.ERR_JEB_INCORRECT_ROUTING.get(String.valueOf(dn)); |
| | | throw new DirectoryException(ResultCode.NO_SUCH_OBJECT, message); |
| | | } |
| | | |
| | | return DNContext; |
| | | } |
| | | |
| | | /** |
| | | * Creates an import context for the specified entry container. |
| | | * |
| | | * @param entryContainer The entry container. |
| | | * @return Import context to use during import. |
| | | * @throws DatabaseException If a database error occurs. |
| | | * @throws JebException If a JEB error occurs. |
| | | * @throws ConfigException If a configuration contains error. |
| | | */ |
| | | private DNContext getImportContext(EntryContainer entryContainer) |
| | | throws DatabaseException, JebException, ConfigException { |
| | | DN baseDN = entryContainer.getBaseDN(); |
| | | EntryContainer srcEntryContainer = null; |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | List<DN> excludeBranches = new ArrayList<DN>(); |
| | | |
| | | if(!ldifImportConfig.appendToExistingData() && |
| | | !ldifImportConfig.clearBackend()) |
| | | { |
| | | for(DN dn : ldifImportConfig.getExcludeBranches()) |
| | | { |
| | | if(baseDN.equals(dn)) |
| | | DN entryDN = entry.getDN(); |
| | | EntryID entryID = (EntryID) entry.getAttachment(); |
| | | suffix = getMatchSuffix(entryDN, dnSuffixMap); |
| | | if(!suffixMap.containsKey(suffix)) |
| | | { |
| | | // This entire base DN was explicitly excluded. Skip. |
| | | return null; |
| | | suffixMap.put(suffix, new HashMap<DatabaseContainer, IndexBuffer>()); |
| | | } |
| | | if(baseDN.isAncestorOf(dn)) |
| | | if(!dn2idPhase2) |
| | | { |
| | | excludeBranches.add(dn); |
| | | } |
| | | } |
| | | |
| | | if(!ldifImportConfig.getIncludeBranches().isEmpty()) |
| | | { |
| | | for(DN dn : ldifImportConfig.getIncludeBranches()) |
| | | { |
| | | if(baseDN.isAncestorOf(dn)) |
| | | if(!processParent(entryDN, entryID, entry, suffix)) |
| | | { |
| | | includeBranches.add(dn); |
| | | suffix.removePending(entryDN); |
| | | continue; |
| | | } |
| | | } |
| | | |
| | | if(includeBranches.isEmpty()) |
| | | { |
| | | // There are no branches in the explicitly defined include list under |
| | | // this base DN. Skip this base DN alltogether. |
| | | |
| | | return null; |
| | | } |
| | | |
| | | // Remove any overlapping include branches. |
| | | Iterator<DN> includeBranchIterator = includeBranches.iterator(); |
| | | while(includeBranchIterator.hasNext()) |
| | | { |
| | | DN includeDN = includeBranchIterator.next(); |
| | | boolean keep = true; |
| | | for(DN dn : includeBranches) |
| | | if(!suffix.getDN2ID().insert(null, entryDN, entryID)) |
| | | { |
| | | if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) |
| | | { |
| | | keep = false; |
| | | break; |
| | | } |
| | | suffix.removePending(entryDN); |
| | | Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); |
| | | reader.rejectEntry(entry, msg); |
| | | continue; |
| | | } |
| | | if(!keep) |
| | | { |
| | | includeBranchIterator.remove(); |
| | | } |
| | | } |
| | | |
| | | // Remvoe any exclude branches that are not are not under a include |
| | | // branch since they will be migrated as part of the existing entries |
| | | // outside of the include branches anyways. |
| | | Iterator<DN> excludeBranchIterator = excludeBranches.iterator(); |
| | | while(excludeBranchIterator.hasNext()) |
| | | { |
| | | DN excludeDN = excludeBranchIterator.next(); |
| | | boolean keep = false; |
| | | for(DN includeDN : includeBranches) |
| | | { |
| | | if(includeDN.isAncestorOf(excludeDN)) |
| | | { |
| | | keep = true; |
| | | break; |
| | | } |
| | | } |
| | | if(!keep) |
| | | { |
| | | excludeBranchIterator.remove(); |
| | | } |
| | | } |
| | | |
| | | if(includeBranches.size() == 1 && excludeBranches.size() == 0 && |
| | | includeBranches.get(0).equals(baseDN)) |
| | | { |
| | | // This entire base DN is explicitly included in the import with |
| | | // no exclude branches that we need to migrate. Just clear the entry |
| | | // container. |
| | | entryContainer.lock(); |
| | | entryContainer.clear(); |
| | | entryContainer.unlock(); |
| | | suffix.removePending(entryDN); |
| | | processID2SC(entryID, entry, suffix); |
| | | } |
| | | else |
| | | { |
| | | // Create a temp entry container |
| | | srcEntryContainer = entryContainer; |
| | | entryContainer = |
| | | rootContainer.openEntryContainer(baseDN, |
| | | baseDN.toNormalizedString() + |
| | | "_importTmp"); |
| | | processDN2ID(suffix, entryDN, entryID); |
| | | suffix.removePending(entryDN); |
| | | } |
| | | suffix.getID2Entry().put(null, entryID, entry); |
| | | processIndexes(suffix, entry, entryID); |
| | | } |
| | | flushIndexBuffers(); |
| | | if(!dn2idPhase2) |
| | | { |
| | | suffix.getEntryContainer().getID2Children().closeCursor(); |
| | | suffix.getEntryContainer().getID2Subtree().closeCursor(); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | |
| | | private boolean processParent(DN entryDN, EntryID entryID, Entry entry, |
| | | Suffix suffix) throws DatabaseException |
| | | { |
| | | EntryID parentID = null; |
| | | DN parentDN = |
| | | suffix.getEntryContainer().getParentWithinBase(entryDN); |
| | | DN2ID dn2id = suffix.getDN2ID(); |
| | | if(dn2id.get(null, entryDN, LockMode.DEFAULT) != null) |
| | | { |
| | | Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); |
| | | reader.rejectEntry(entry, msg); |
| | | return false; |
| | | } |
| | | |
| | | if (parentDN != null) { |
| | | parentID = suffix.getParentID(parentDN); |
| | | if (parentID == null) { |
| | | dn2id.remove(null, entryDN); |
| | | Message msg = |
| | | ERR_JEB_IMPORT_PARENT_NOT_FOUND.get(parentDN.toString()); |
| | | reader.rejectEntry(entry, msg); |
| | | return false; |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Create an import context. |
| | | DNContext DNContext = new DNContext(); |
| | | DNContext.setConfig(config); |
| | | DNContext.setLDIFImportConfig(this.ldifImportConfig); |
| | | DNContext.setLDIFReader(reader); |
| | | |
| | | DNContext.setBaseDN(baseDN); |
| | | DNContext.setEntryContainer(entryContainer); |
| | | DNContext.setSrcEntryContainer(srcEntryContainer); |
| | | |
| | | //Create queue. |
| | | LinkedBlockingQueue<WorkElement> works = |
| | | new LinkedBlockingQueue<WorkElement> |
| | | (config.getImportQueueSize()); |
| | | DNContext.setWorkQueue(works); |
| | | |
| | | // Set the include and exclude branches |
| | | DNContext.setIncludeBranches(includeBranches); |
| | | DNContext.setExcludeBranches(excludeBranches); |
| | | |
| | | return DNContext; |
| | | } |
| | | |
| | | /** |
| | | * Add specified context and entry to the work queue. |
| | | * |
| | | * @param context The context related to the entry DN. |
| | | * @param entry The entry to work on. |
| | | * @return <CODE>True</CODE> if the element was added to the work queue. |
| | | */ |
| | | private boolean |
| | | addEntryQueue(DNContext context, Entry entry) { |
| | | WorkElement element = |
| | | WorkElement.decode(entry, context); |
| | | return addQueue(context, element); |
| | | } |
| | | |
| | | /** |
| | | * Calculate the memory usage for the substring buffer and the DB cache. |
| | | */ |
| | | private void calcMemoryLimits() { |
| | | Message msg; |
| | | Runtime runtime = Runtime.getRuntime(); |
| | | long freeMemory = runtime.freeMemory(); |
| | | long maxMemory = runtime.maxMemory(); |
| | | long totMemory = runtime.totalMemory(); |
| | | long totFreeMemory = (freeMemory + (maxMemory - totMemory)); |
| | | long dbCacheLimit = (totFreeMemory * 60) / 100; |
| | | //If there are no substring indexes defined, set the DB cache |
| | | //size to 75% and take a minimal substring buffer. |
| | | if(!hasSubIndexes) { |
| | | dbCacheLimit = (totFreeMemory * 75) / 100; |
| | | } |
| | | dbCacheSizeStr = Long.toString(dbCacheLimit); |
| | | totalAvailBufferMemory = (totFreeMemory * 10) / 100; |
| | | if(totalAvailBufferMemory < (10 * minBuffer)) { |
| | | msg = |
| | | NOTE_JEB_IMPORT_LDIF_BUFFER_TOT_AVAILMEM.get(totalAvailBufferMemory, |
| | | (10 * minBuffer)); |
| | | logError(msg); |
| | | totalAvailBufferMemory = (10 * minBuffer); |
| | | } else if(!hasSubIndexes) { |
| | | totalAvailBufferMemory = (10 * minBuffer); |
| | | } |
| | | msg=NOTE_JEB_IMPORT_LDIF_MEMORY_INFO.get(dbCacheLimit, |
| | | totalAvailBufferMemory); |
| | | logError(msg); |
| | | } |
| | | |
| | | /** |
| | | * Return the string representation of the DB cache size. |
| | | * |
| | | * @return DB cache size string. |
| | | */ |
| | | public String getDBCacheSize() { |
| | | return dbCacheSizeStr; |
| | | } |
| | | |
| | | /** |
| | | * Migrate any existing entries. |
| | | * |
| | | * @throws JebException If a JEB error occurs. |
| | | * @throws DatabaseException If a DB error occurs. |
| | | * @throws DirectoryException If a directory error occurs. |
| | | */ |
| | | private void migrateExistingEntries() |
| | | throws JebException, DatabaseException, DirectoryException { |
| | | for(DNContext context : importMap.values()) { |
| | | EntryContainer srcEntryContainer = context.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null && |
| | | !context.getIncludeBranches().isEmpty()) { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode lockMode = LockMode.DEFAULT; |
| | | OperationStatus status; |
| | | Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( |
| | | "existing", String.valueOf(context.getBaseDN())); |
| | | logError(message); |
| | | Cursor cursor = |
| | | srcEntryContainer.getDN2ID().openCursor(null, |
| | | CursorConfig.READ_COMMITTED); |
| | | try { |
| | | status = cursor.getFirst(key, data, lockMode); |
| | | while(status == OperationStatus.SUCCESS && |
| | | !ldifImportConfig.isCancelled()) { |
| | | if(threads.size() <= 0) { |
| | | message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); |
| | | throw new JebException(message); |
| | | } |
| | | DN dn = DN.decode(ByteString.wrap(key.getData())); |
| | | if(!context.getIncludeBranches().contains(dn)) { |
| | | EntryID id = new EntryID(data); |
| | | Entry entry = |
| | | srcEntryContainer.getID2Entry().get(null, |
| | | id, LockMode.DEFAULT); |
| | | processEntry(context, entry); |
| | | migratedCount++; |
| | | status = cursor.getNext(key, data, lockMode); |
| | | } else { |
| | | // This is the base entry for a branch that will be included |
| | | // in the import so we don't want to copy the branch to the new |
| | | // entry container. |
| | | |
| | | /** |
| | | * Advance the cursor to next entry at the same level in the DIT |
| | | * skipping all the entries in this branch. |
| | | * Set the next starting value to a value of equal length but |
| | | * slightly greater than the previous DN. Since keys are compared |
| | | * in reverse order we must set the first byte (the comma). |
| | | * No possibility of overflow here. |
| | | */ |
| | | byte[] begin = |
| | | StaticUtils.getBytes("," + dn.toNormalizedString()); |
| | | begin[0] = (byte) (begin[0] + 1); |
| | | key.setData(begin); |
| | | status = cursor.getSearchKeyRange(key, data, lockMode); |
| | | ArrayList<EntryID> IDs; |
| | | if (parentDN != null && suffix.getParentDN() != null && |
| | | parentDN.equals(suffix.getParentDN())) { |
| | | IDs = new ArrayList<EntryID>(suffix.getIDs()); |
| | | IDs.set(0, entryID); |
| | | } |
| | | else |
| | | { |
| | | EntryID nodeID; |
| | | IDs = new ArrayList<EntryID>(entryDN.getNumComponents()); |
| | | IDs.add(entryID); |
| | | if (parentID != null) |
| | | { |
| | | IDs.add(parentID); |
| | | EntryContainer ec = suffix.getEntryContainer(); |
| | | for (DN dn = ec.getParentWithinBase(parentDN); dn != null; |
| | | dn = ec.getParentWithinBase(dn)) { |
| | | if((nodeID = getAncestorID(dn2id, dn)) == null) { |
| | | return false; |
| | | } else { |
| | | IDs.add(nodeID); |
| | | } |
| | | } |
| | | } finally { |
| | | cursor.close(); |
| | | } |
| | | } |
| | | suffix.setParentDN(parentDN); |
| | | suffix.setIDs(IDs); |
| | | entry.setAttachment(IDs); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private void processID2SC(EntryID entryID, Entry entry, Suffix suffix) |
| | | throws DatabaseException |
| | | { |
| | | Set<byte[]> childKeySet = new HashSet<byte[]>(); |
| | | Set<byte[]> subtreeKeySet = new HashSet<byte[]>(); |
| | | Index id2children = suffix.getEntryContainer().getID2Children(); |
| | | Index id2subtree = suffix.getEntryContainer().getID2Subtree(); |
| | | id2children.indexer.indexEntry(entry, childKeySet); |
| | | id2subtree.indexer.indexEntry(entry, subtreeKeySet); |
| | | |
| | | DatabaseEntry dbKey = new DatabaseEntry(); |
| | | DatabaseEntry dbVal = new DatabaseEntry(); |
| | | ImportIDSet idSet = new ImportIDSet(); |
| | | idSet.addEntryID(entryID, id2children.getIndexEntryLimit(), |
| | | id2children.getMaintainCount()); |
| | | id2children.insert(idSet, childKeySet, dbKey, dbVal); |
| | | |
| | | DatabaseEntry dbSubKey = new DatabaseEntry(); |
| | | DatabaseEntry dbSubVal = new DatabaseEntry(); |
| | | ImportIDSet idSubSet = new ImportIDSet(); |
| | | idSubSet.addEntryID(entryID, id2subtree.getIndexEntryLimit(), |
| | | id2subtree.getMaintainCount()); |
| | | id2subtree.insert(idSubSet, subtreeKeySet, dbSubKey, dbSubVal); |
| | | } |
| | | |
| | | private EntryID getAncestorID(DN2ID dn2id, DN dn) |
| | | throws DatabaseException |
| | | { |
| | | int i=0; |
| | | EntryID nodeID = dn2id.get(null, dn, LockMode.DEFAULT); |
| | | if(nodeID == null) { |
| | | while((nodeID = dn2id.get(null, dn, LockMode.DEFAULT)) == null) { |
| | | try { |
| | | Thread.sleep(50); |
| | | if(i == 3) { |
| | | return null; |
| | | } |
| | | i++; |
| | | } catch (Exception e) { |
| | | return null; |
| | | } |
| | | } |
| | | } |
| | | return nodeID; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Migrate excluded entries. |
| | | * |
| | | * @throws JebException If a JEB error occurs. |
| | | * @throws DatabaseException If a DB error occurs. |
| | | * @throws DirectoryException If a directory error occurs. |
| | | */ |
| | | private void migrateExcludedEntries() |
| | | throws JebException, DatabaseException, DirectoryException { |
| | | for(DNContext importContext : importMap.values()) { |
| | | EntryContainer srcEntryContainer = importContext.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null && |
| | | !importContext.getExcludeBranches().isEmpty()) { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode lockMode = LockMode.DEFAULT; |
| | | OperationStatus status; |
| | | Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( |
| | | "excluded", String.valueOf(importContext.getBaseDN())); |
| | | logError(message); |
| | | Cursor cursor = |
| | | srcEntryContainer.getDN2ID().openCursor(null, |
| | | CursorConfig.READ_COMMITTED); |
| | | Comparator<byte[]> dn2idComparator = |
| | | srcEntryContainer.getDN2ID().getComparator(); |
| | | try { |
| | | for(DN excludedDN : importContext.getExcludeBranches()) { |
| | | byte[] suffix = |
| | | StaticUtils.getBytes(excludedDN.toNormalizedString()); |
| | | key.setData(suffix); |
| | | status = cursor.getSearchKeyRange(key, data, lockMode); |
| | | if(status == OperationStatus.SUCCESS && |
| | | Arrays.equals(key.getData(), suffix)) { |
| | | // This is the base entry for a branch that was excluded in the |
| | | // import so we must migrate all entries in this branch over to |
| | | // the new entry container. |
| | | byte[] end = |
| | | StaticUtils.getBytes("," + excludedDN.toNormalizedString()); |
| | | end[0] = (byte) (end[0] + 1); |
| | | |
| | | while(status == OperationStatus.SUCCESS && |
| | | dn2idComparator.compare(key.getData(), end) < 0 && |
| | | !ldifImportConfig.isCancelled()) { |
| | | if(threads.size() <= 0) { |
| | | message = ERR_JEB_IMPORT_NO_WORKER_THREADS.get(); |
| | | throw new JebException(message); |
| | | } |
| | | EntryID id = new EntryID(data); |
| | | Entry entry = srcEntryContainer.getID2Entry().get(null, |
| | | id, LockMode.DEFAULT); |
| | | processEntry(importContext, entry); |
| | | migratedCount++; |
| | | status = cursor.getNext(key, data, lockMode); |
| | | private void |
| | | processIndexes(Suffix ctx, Entry entry, EntryID entryID) throws |
| | | DatabaseException, DirectoryException, JebException, ConfigException |
| | | { |
| | | Transaction txn = null; |
| | | Map<AttributeType, AttributeIndex> attrMap = ctx.getAttrIndexMap(); |
| | | for(Map.Entry<AttributeType, AttributeIndex> mapEntry : |
| | | attrMap.entrySet()) { |
| | | AttributeType attrType = mapEntry.getKey(); |
| | | if(entry.hasAttribute(attrType)) { |
| | | AttributeIndex attributeIndex = mapEntry.getValue(); |
| | | Index index; |
| | | if((index=attributeIndex.getEqualityIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getPresenceIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getSubstringIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getOrderingIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | } |
| | | if((index=attributeIndex.getApproximateIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | } |
| | | for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) { |
| | | vlvIdx.addEntry(txn, entryID, entry); |
| | | } |
| | | Map<String,Collection<Index>> extensibleMap = |
| | | attributeIndex.getExtensibleIndexes(); |
| | | if(!extensibleMap.isEmpty()) { |
| | | Collection<Index> subIndexes = |
| | | attributeIndex.getExtensibleIndexes().get( |
| | | EXTENSIBLE_INDEXER_ID_SUBSTRING); |
| | | if(subIndexes != null) { |
| | | for(Index subIndex: subIndexes) { |
| | | indexAttr(ctx, subIndex, entry, entryID); |
| | | } |
| | | } |
| | | Collection<Index> sharedIndexes = |
| | | attributeIndex.getExtensibleIndexes().get( |
| | | EXTENSIBLE_INDEXER_ID_SHARED); |
| | | if(sharedIndexes !=null) { |
| | | for(Index sharedIndex:sharedIndexes) { |
| | | indexAttr(ctx, sharedIndex, entry, entryID); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private void indexAttr(Suffix ctx, Index index, Entry entry, |
| | | EntryID entryID) |
| | | throws DatabaseException, ConfigException |
| | | { |
| | | insertKeySet.clear(); |
| | | index.indexer.indexEntry(entry, insertKeySet); |
| | | for(byte[] key : insertKeySet) |
| | | { |
| | | processKey(ctx, index, key, entryID, indexComparator, null); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void flushIndexBuffers() throws InterruptedException, |
| | | ExecutionException |
| | | { |
| | | Iterator<Suffix> i = dnSuffixMap.values().iterator(); |
| | | Suffix suffix = i.next(); |
| | | for(Map<DatabaseContainer, IndexBuffer> map : suffixMap.values()) |
| | | { |
| | | for(Map.Entry<DatabaseContainer, IndexBuffer> e : map.entrySet()) |
| | | { |
| | | cursor.close(); |
| | | DatabaseContainer container = e.getKey(); |
| | | IndexBuffer indexBuffer = e.getValue(); |
| | | if(container instanceof DN2ID) |
| | | { |
| | | indexBuffer.setComparator(dnComparator); |
| | | } |
| | | else |
| | | { |
| | | indexBuffer.setComparator(indexComparator); |
| | | } |
| | | indexBuffer.setContainer(container); |
| | | indexBuffer.setEntryContainer(suffix.getEntryContainer()); |
| | | Future<Void> future = sortService.submit(new SortTask(indexBuffer)); |
| | | future.get(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private void |
| | | processKey(Suffix ctx, DatabaseContainer container, byte[] key, |
| | | EntryID entryID,IndexBuffer.ComparatorBuffer<byte[]> comparator, |
| | | EntryContainer entryContainer) throws ConfigException |
| | | { |
| | | IndexBuffer indexBuffer; |
| | | Map<DatabaseContainer, IndexBuffer> conMap = suffixMap.get(ctx); |
| | | if(!conMap.containsKey(container)) |
| | | { |
| | | indexBuffer = getNewIndexBuffer(); |
| | | conMap.put(container, indexBuffer); |
| | | } |
| | | else |
| | | { |
| | | indexBuffer = conMap.get(container); |
| | | } |
| | | if(!indexBuffer.isSpaceAvailable(key)) |
| | | { |
| | | indexBuffer.setContainer(container); |
| | | indexBuffer.setComparator(comparator); |
| | | indexBuffer.setEntryContainer(entryContainer); |
| | | sortService.submit(new SortTask(indexBuffer)); |
| | | indexBuffer = getNewIndexBuffer(); |
| | | conMap.remove(container); |
| | | conMap.put(container, indexBuffer); |
| | | } |
| | | indexBuffer.add(key, entryID); |
| | | } |
| | | |
| | | |
| | | private IndexBuffer getNewIndexBuffer() throws ConfigException |
| | | { |
| | | IndexBuffer indexBuffer = freeBufQue.poll(); |
| | | if(indexBuffer.isPoison()) |
| | | { |
| | | Message msg = Message.raw(Category.JEB, Severity.SEVERE_ERROR, |
| | | "Abort import - MPD"); |
| | | throw new ConfigException(msg); |
| | | } |
| | | return indexBuffer; |
| | | } |
| | | |
| | | |
| | | private void processDN2ID(Suffix suffix, DN dn, EntryID entryID) |
| | | throws ConfigException |
| | | { |
| | | DatabaseContainer dn2id = suffix.getDN2ID(); |
| | | byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString()); |
| | | processKey(suffix, dn2id, dnBytes, entryID, dnComparator, |
| | | suffix.getEntryContainer()); |
| | | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The task reads the temporary index files and writes their results to the |
| | | * index database. |
| | | */ |
| | | private final class IndexWriteDBTask implements Callable<Void> { |
| | | |
| | | private final IndexManager indexMgr; |
| | | private final boolean isDN2ID; |
| | | private final DatabaseEntry dbKey, dbValue; |
| | | private final DN2ID dn2id; |
| | | private final Index index; |
| | | |
| | | private final EntryContainer entryContainer; |
| | | private final int id2ChildLimit; |
| | | private final boolean id2ChildMCount; |
| | | |
| | | private TreeMap<DN,EntryID> parentIDMap = new TreeMap<DN,EntryID>(); |
| | | private DN parentDN, lastDN; |
| | | private EntryID parentID, lastID; |
| | | private final Map<byte[], ImportIDSet> id2childTree; |
| | | private final Map<byte[], ImportIDSet> id2subtreeTree; |
| | | private final int cacheSize; |
| | | private ByteBuffer directBuffer = null; |
| | | |
| | | public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID, |
| | | ByteBuffer b, int cacheSize) |
| | | { |
| | | this(indexMgr, isDN2ID, cacheSize); |
| | | directBuffer = b; |
| | | } |
| | | |
| | | public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID, |
| | | int cacheSize) |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | this.entryContainer = indexMgr.entryContainer; |
| | | this.isDN2ID = isDN2ID; |
| | | this.dbKey = new DatabaseEntry(); |
| | | this.dbValue = new DatabaseEntry(); |
| | | this.cacheSize = cacheSize; |
| | | if(isDN2ID) |
| | | { |
| | | this.dn2id = indexMgr.dn2id; |
| | | this.index = null; |
| | | id2ChildLimit = entryContainer.getID2Children().getIndexEntryLimit(); |
| | | id2ChildMCount = entryContainer.getID2Subtree().getMaintainCount(); |
| | | Comparator<byte[]> id2ChildComparator = |
| | | entryContainer.getID2Children().getComparator(); |
| | | Comparator<byte[]> id2SubtreeComparator = |
| | | entryContainer.getID2Subtree().getComparator(); |
| | | id2childTree = |
| | | new TreeMap<byte[], ImportIDSet>(id2ChildComparator); |
| | | id2subtreeTree = |
| | | new TreeMap<byte[], ImportIDSet>(id2SubtreeComparator); |
| | | } |
| | | else |
| | | { |
| | | this.dn2id = null; |
| | | this.index = indexMgr.getIndex(); |
| | | id2subtreeTree = null; |
| | | id2childTree = null; |
| | | id2ChildLimit = 0; |
| | | id2ChildMCount = false; |
| | | } |
| | | } |
| | | |
| | | |
| | | public Void call() throws Exception |
| | | { |
| | | |
| | | Comparator<byte[]> comparator = indexMgr.getComparator(); |
| | | int limit = indexMgr.getLimit(); |
| | | boolean maintainCount = indexMgr.getMaintainCount(); |
| | | byte[] cKey = null; |
| | | ImportIDSet cIDSet = null; |
| | | indexMgr.init(); |
| | | List<Buffer> bufferList = indexMgr.getBufferList(); |
| | | SortedSet<Buffer> bufferSet = new TreeSet<Buffer>(); |
| | | int p = 0; |
| | | int offSet = cacheSize; |
| | | for(Buffer b : bufferList) |
| | | { |
| | | if(directBuffer != null) |
| | | { |
| | | directBuffer.position(p); |
| | | directBuffer.limit(offSet); |
| | | ByteBuffer slice = directBuffer.slice(); |
| | | b.init(indexMgr, slice, cacheSize); |
| | | p += cacheSize; |
| | | offSet += cacheSize; |
| | | } |
| | | else |
| | | { |
| | | b.init(indexMgr, null, cacheSize); |
| | | } |
| | | bufferSet.add(b); |
| | | } |
| | | while(!bufferSet.isEmpty()) |
| | | { |
| | | Buffer b; |
| | | b = bufferSet.first(); |
| | | if(b == null) { |
| | | System.out.println("null b"); |
| | | } |
| | | bufferSet.remove(b); |
| | | byte[] key = b.getKey(); |
| | | ImportIDSet idSet = b.getIDSet(); |
| | | if(cKey == null) |
| | | { |
| | | cKey = key; |
| | | cIDSet = idSet; |
| | | } |
| | | else |
| | | { |
| | | if(comparator.compare(key, cKey) != 0) |
| | | { |
| | | addToDB(cKey, cIDSet); |
| | | indexMgr.incrKeyCount(); |
| | | cKey = key; |
| | | cIDSet = idSet; |
| | | } |
| | | else |
| | | { |
| | | cIDSet.setKey(cKey); |
| | | cIDSet.merge(idSet, limit, maintainCount); |
| | | } |
| | | } |
| | | if(b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | bufferSet.add(b); |
| | | } |
| | | } |
| | | if(cKey != null) |
| | | { |
| | | addToDB(cKey, cIDSet); |
| | | } |
| | | cleanUP(); |
| | | return null; |
| | | } |
| | | |
| | | |
| | | private void cleanUP() throws DatabaseException, DirectoryException, |
| | | IOException |
| | | { |
| | | if(!isDN2ID) { |
| | | index.closeCursor(); |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(index.getName()); |
| | | logError(msg); |
| | | |
| | | } |
| | | else |
| | | { |
| | | if(dn2idPhase2) |
| | | { |
| | | flushSubTreeChildIndexes(); |
| | | } |
| | | } |
| | | indexMgr.setDone(); |
| | | indexMgr.close(); |
| | | indexMgr.deleteIndexFile(); |
| | | } |
| | | |
| | | |
| | | private void flushSubTreeChildIndexes() |
| | | throws DatabaseException, DirectoryException |
| | | { |
| | | Index id2child = entryContainer.getID2Children(); |
| | | Set<Map.Entry<byte[], ImportIDSet>> id2childSet = |
| | | id2childTree.entrySet(); |
| | | for(Map.Entry<byte[], ImportIDSet> e : id2childSet) |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | dbKey.setData(key); |
| | | id2child.insert(dbKey, idSet, dbValue); |
| | | } |
| | | id2child.closeCursor(); |
| | | Index id2subtree = entryContainer.getID2Subtree(); |
| | | Set<Map.Entry<byte[], ImportIDSet>> subtreeSet = |
| | | id2subtreeTree.entrySet(); |
| | | for(Map.Entry<byte[], ImportIDSet> e : subtreeSet) |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | dbKey.setData(key); |
| | | id2subtree.insert(dbKey, idSet, dbValue); |
| | | } |
| | | id2subtree.closeCursor(); |
| | | Message msg = |
| | | NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getTotDNCount()); |
| | | logError(msg); |
| | | } |
| | | |
| | | |
| | | private void addToDB(byte[] key, ImportIDSet record) |
| | | throws InterruptedException, DatabaseException, DirectoryException |
| | | { |
| | | record.setKey(key); |
| | | if(!this.isDN2ID) |
| | | { |
| | | addIndex(record); |
| | | } |
| | | else |
| | | { |
| | | if(dn2idPhase2) |
| | | { |
| | | addDN2ID(record); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private void id2Subtree(EntryContainer ec, EntryID childID, |
| | | int limit, boolean mCount) throws DatabaseException |
| | | { |
| | | ImportIDSet idSet; |
| | | if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | { |
| | | idSet = new ImportIDSet(); |
| | | id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData()); |
| | | } |
| | | idSet.addEntryID(childID, limit, mCount); |
| | | for (DN dn = ec.getParentWithinBase(parentDN); dn != null; |
| | | dn = ec.getParentWithinBase(dn)) |
| | | { |
| | | EntryID nodeID = parentIDMap.get(dn); |
| | | if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData())) |
| | | { |
| | | idSet = new ImportIDSet(); |
| | | id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData()); |
| | | } |
| | | idSet.addEntryID(childID, limit, mCount); |
| | | } |
| | | } |
| | | |
| | | private void id2child(EntryID childID, int limit, boolean mCount) |
| | | { |
| | | ImportIDSet idSet; |
| | | if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | { |
| | | idSet = new ImportIDSet(); |
| | | id2childTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2childTree.get(parentID.getDatabaseEntry().getData()); |
| | | } |
| | | idSet.addEntryID(childID, limit, mCount); |
| | | } |
| | | |
| | | private boolean checkParent(DN dn, EntryID id, EntryContainer ec) |
| | | { |
| | | if(parentIDMap.isEmpty()) |
| | | { |
| | | parentIDMap.put(dn, id); |
| | | return true; |
| | | } |
| | | else if(lastDN != null && lastDN.isAncestorOf(dn)) |
| | | { |
| | | parentIDMap.put(lastDN, lastID); |
| | | parentDN = lastDN; |
| | | parentID = lastID; |
| | | lastDN = dn; |
| | | lastID = id; |
| | | return true; |
| | | } |
| | | else if(parentIDMap.lastKey().isAncestorOf(dn)) |
| | | { |
| | | parentDN = parentIDMap.lastKey(); |
| | | parentID = parentIDMap.get(parentDN); |
| | | lastDN = dn; |
| | | lastID = id; |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | DN pDN = ec.getParentWithinBase(dn); |
| | | if(parentIDMap.containsKey(pDN)) { |
| | | DN lastKey = parentIDMap.lastKey(); |
| | | Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey); |
| | | for(Map.Entry<DN, EntryID> e : subMap.entrySet()) |
| | | { |
| | | subMap.remove(e.getKey()); |
| | | } |
| | | parentDN = pDN; |
| | | parentID = parentIDMap.get(pDN); |
| | | lastDN = dn; |
| | | lastID = id; |
| | | } |
| | | else |
| | | { |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString()); |
| | | Entry e = new Entry(dn, null, null, null); |
| | | reader.rejectEntry(e, msg); |
| | | return false; |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private void addDN2ID(ImportIDSet record) |
| | | throws DatabaseException, DirectoryException |
| | | { |
| | | DatabaseEntry idVal = new DatabaseEntry(); |
| | | dbKey.setData(record.getKey()); |
| | | idVal.setData(record.toDatabase()); |
| | | DN dn = DN.decode(ByteString.wrap(dbKey.getData())); |
| | | EntryID entryID = new EntryID(idVal); |
| | | if(!checkParent(dn, entryID, entryContainer)) |
| | | { |
| | | return; |
| | | } |
| | | dn2id.putRaw(null, dbKey, idVal); |
| | | indexMgr.addTotDNCount(1); |
| | | if(parentDN != null) |
| | | { |
| | | id2child(entryID, id2ChildLimit, id2ChildMCount); |
| | | id2Subtree(entryContainer, |
| | | entryID, id2ChildLimit, id2ChildMCount); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void addIndex(ImportIDSet record) throws DatabaseException |
| | | { |
| | | dbKey.setData(record.getKey()); |
| | | index.insert(dbKey, record, dbValue); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This task writes the temporary index files using the sorted buffers read |
| | | * from a blocking queue. |
| | | */ |
| | | private final class IndexFileWriterTask implements Runnable |
| | | { |
| | | private final IndexManager indexMgr; |
| | | private final BlockingQueue<IndexBuffer> que; |
| | | private final ByteArrayOutputStream byteStream = |
| | | new ByteArrayOutputStream(2 * bufferSize); |
| | | private final DataOutputStream dataStream; |
| | | private long bufCount = 0; |
| | | private final File file; |
| | | private final SortedSet<IndexBuffer> indexSortedSet; |
| | | private boolean poisonSeen = false; |
| | | |
| | | public IndexFileWriterTask(BlockingQueue<IndexBuffer> que, |
| | | IndexManager indexMgr) throws FileNotFoundException |
| | | { |
| | | this.que = que; |
| | | file = indexMgr.getFile(); |
| | | this.indexMgr = indexMgr; |
| | | BufferedOutputStream bufferedStream = |
| | | new BufferedOutputStream(new FileOutputStream(file), 2 * MB); |
| | | dataStream = new DataOutputStream(bufferedStream); |
| | | indexSortedSet = new TreeSet<IndexBuffer>(); |
| | | } |
| | | |
| | | |
| | | public void run() |
| | | { |
| | | long offset = 0; |
| | | List<IndexBuffer> l = new LinkedList<IndexBuffer>(); |
| | | try { |
| | | while(true) |
| | | { |
| | | IndexBuffer indexBuffer = que.poll(); |
| | | if(indexBuffer != null) |
| | | { |
| | | long beginOffset = offset; |
| | | long bufLen; |
| | | if(!que.isEmpty()) |
| | | { |
| | | que.drainTo(l, DRAIN_TO); |
| | | l.add(indexBuffer); |
| | | bufLen = writeIndexBuffers(l); |
| | | for(IndexBuffer id : l) |
| | | { |
| | | id.reset(); |
| | | } |
| | | freeBufQue.addAll(l); |
| | | l.clear(); |
| | | if(poisonSeen) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if(indexBuffer.isPoison()) |
| | | { |
| | | break; |
| | | } |
| | | bufLen = writeIndexBuffer(indexBuffer); |
| | | indexBuffer.reset(); |
| | | freeBufQue.add(indexBuffer); |
| | | } |
| | | offset += bufLen; |
| | | indexMgr.addBuffer(new Buffer(beginOffset, offset, bufCount)); |
| | | bufCount++; |
| | | bufferCount.incrementAndGet(); |
| | | } |
| | | } |
| | | dataStream.close(); |
| | | indexMgr.setFileLength(); |
| | | } |
| | | catch (IOException e) { |
| | | Message msg = |
| | | ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(), |
| | | e.getMessage()); |
| | | logError(msg); |
| | | } |
| | | } |
| | | |
| | | |
| | | private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException |
| | | { |
| | | int numKeys = indexBuffer.getNumberKeys(); |
| | | indexBuffer.setPos(-1); |
| | | long bufLen = 0; |
| | | byteStream.reset(); |
| | | for(int i = 0; i < numKeys; i++) |
| | | { |
| | | if(indexBuffer.getPos() == -1) |
| | | { |
| | | indexBuffer.setPos(i); |
| | | byteStream.write(indexBuffer.getID(i)); |
| | | continue; |
| | | } |
| | | |
| | | if(!indexBuffer.compare(i)) |
| | | { |
| | | int recLen = indexBuffer.getKeySize(); |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | indexBuffer.writeKey(dataStream); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | indexBuffer.setPos(i); |
| | | byteStream.reset(); |
| | | } |
| | | byteStream.write(indexBuffer.getID(i)); |
| | | } |
| | | |
| | | if(indexBuffer.getPos() != -1) |
| | | { |
| | | int recLen = indexBuffer.getKeySize(); |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | indexBuffer.writeKey(dataStream); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | } |
| | | return bufLen; |
| | | } |
| | | |
| | | |
| | | private long writeIndexBuffers(List<IndexBuffer> buffers) |
| | | throws IOException |
| | | { |
| | | long id = 0; |
| | | long bufLen = 0; |
| | | byteStream.reset(); |
| | | for(IndexBuffer b : buffers) |
| | | { |
| | | if(b.isPoison()) |
| | | { |
| | | poisonSeen = true; |
| | | } |
| | | else |
| | | { |
| | | b.setPos(0); |
| | | b.setID(id++); |
| | | indexSortedSet.add(b); |
| | | } |
| | | } |
| | | byte[] saveKey = null; |
| | | while(!indexSortedSet.isEmpty()) |
| | | { |
| | | IndexBuffer b = indexSortedSet.first(); |
| | | indexSortedSet.remove(b); |
| | | byte[] key = b.getKeyBytes(b.getPos()); |
| | | if(saveKey == null) |
| | | { |
| | | saveKey = key; |
| | | byteStream.write(b.getID(b.getPos())); |
| | | } |
| | | else |
| | | { |
| | | if(!b.compare(saveKey)) |
| | | { |
| | | int recLen = saveKey.length; |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | dataStream.writeInt(saveKey.length); |
| | | dataStream.write(saveKey); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | byteStream.reset(); |
| | | saveKey = key; |
| | | byteStream.write(b.getID(b.getPos())); |
| | | } |
| | | else |
| | | { |
| | | byteStream.write(b.getID(b.getPos())); |
| | | } |
| | | } |
| | | if(b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | indexSortedSet.add(b); |
| | | } |
| | | } |
| | | if(saveKey != null) |
| | | { |
| | | int recLen = saveKey.length; |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | dataStream.writeInt(saveKey.length); |
| | | dataStream.write(saveKey); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | } |
| | | return bufLen; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This task main function is to sort the index buffers given to it from |
| | | * the import tasks reading the LDIF file. It will also create a index |
| | | * file writer task and corresponding queue if needed. The sorted index |
| | | * buffers are put on the index file writer queues for writing to a temporary |
| | | * file. |
| | | */ |
| | | private final class SortTask implements Callable<Void> |
| | | { |
| | | |
| | | private final IndexBuffer indexBuffer; |
| | | |
| | | public SortTask(IndexBuffer indexBuffer) |
| | | { |
| | | this.indexBuffer = indexBuffer; |
| | | } |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | if (config.isCancelled()) |
| | | { |
| | | return null; |
| | | } |
| | | indexBuffer.sort(); |
| | | if(containerQueMap.containsKey(indexBuffer.getContainer())) { |
| | | BlockingQueue<IndexBuffer> q = |
| | | containerQueMap.get(indexBuffer.getContainer()); |
| | | q.add(indexBuffer); |
| | | } |
| | | else |
| | | { |
| | | DatabaseContainer container = indexBuffer.getContainer(); |
| | | EntryContainer entryContainer = indexBuffer.getEntryContainer(); |
| | | createIndexWriterTask(container, entryContainer); |
| | | BlockingQueue<IndexBuffer> q = containerQueMap.get(container); |
| | | q.add(indexBuffer); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | private void createIndexWriterTask(DatabaseContainer container, |
| | | EntryContainer entryContainer) |
| | | throws FileNotFoundException |
| | | { |
| | | synchronized(container) { |
| | | if(containerQueMap.containsKey(container)) |
| | | { |
| | | return; |
| | | } |
| | | IndexManager indexMgr; |
| | | if(container instanceof Index) |
| | | { |
| | | Index index = (Index) container; |
| | | indexMgr = new IndexManager(index); |
| | | } |
| | | else |
| | | { |
| | | DN2ID dn2id = (DN2ID) container; |
| | | indexMgr = new IndexManager(dn2id, entryContainer); |
| | | } |
| | | containerIndexMgrMap.put(container, indexMgr); |
| | | BlockingQueue<IndexBuffer> newQue = |
| | | new ArrayBlockingQueue<IndexBuffer>(threadCount + 5); |
| | | IndexFileWriterTask indexWriter = |
| | | new IndexFileWriterTask(newQue, indexMgr); |
| | | indexWriterList.add(indexWriter); |
| | | indexWriterFutures.add(indexProcessService.submit(indexWriter)); |
| | | containerQueMap.put(container, newQue); |
| | | } |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The buffer class is used to process a buffer from the temporary index files |
| | | * during phase 2 processing. |
| | | */ |
| | | private final class Buffer implements Comparable<Buffer> |
| | | { |
| | | private IndexManager indexMgr; |
| | | private final long begin, end, id; |
| | | private long offset; |
| | | private ByteBuffer cache; |
| | | private int keyLen, idLen; |
| | | private byte[] key; |
| | | private ImportIDSet idSet; |
| | | |
| | | |
| | | public Buffer(long begin, long end, long id) |
| | | { |
| | | this.begin = begin; |
| | | this.end = end; |
| | | this.offset = 0; |
| | | this.id = id; |
| | | } |
| | | |
| | | |
| | | private void init(IndexManager indexMgr, ByteBuffer b, |
| | | long cacheSize) throws IOException |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | if(b == null) |
| | | { |
| | | cache = ByteBuffer.allocate((int)cacheSize); |
| | | } |
| | | else |
| | | { |
| | | cache = b; |
| | | } |
| | | loadCache(); |
| | | cache.flip(); |
| | | getNextRecord(); |
| | | } |
| | | |
| | | |
| | | private void loadCache() throws IOException |
| | | { |
| | | FileChannel fileChannel = indexMgr.getChannel(); |
| | | fileChannel.position(begin + offset); |
| | | long leftToRead = end - (begin + offset); |
| | | long bytesToRead; |
| | | if(leftToRead < cache.remaining()) |
| | | { |
| | | int pos = cache.position(); |
| | | cache.limit((int) (pos + leftToRead)); |
| | | bytesToRead = (int)leftToRead; |
| | | } |
| | | else |
| | | { |
| | | bytesToRead = Math.min((end - offset),cache.remaining()); |
| | | } |
| | | int bytesRead = 0; |
| | | while(bytesRead < bytesToRead) |
| | | { |
| | | bytesRead += fileChannel.read(cache); |
| | | } |
| | | offset += bytesRead; |
| | | indexMgr.addBytesRead(bytesRead); |
| | | } |
| | | |
| | | public boolean hasMoreData() throws IOException |
| | | { |
| | | boolean ret = ((begin + offset) >= end) ? true: false; |
| | | if(cache.remaining() == 0 && ret) |
| | | { |
| | | return false; |
| | | } |
| | | else |
| | | { |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | public byte[] getKey() |
| | | { |
| | | return key; |
| | | } |
| | | |
| | | public ImportIDSet getIDSet() |
| | | { |
| | | return idSet; |
| | | } |
| | | |
| | | public long getBufID() |
| | | { |
| | | return id; |
| | | } |
| | | |
| | | public void getNextRecord() throws IOException |
| | | { |
| | | getNextKey(); |
| | | getNextIDSet(); |
| | | } |
| | | |
| | | private int getInt() throws IOException |
| | | { |
| | | ensureData(4); |
| | | return cache.getInt(); |
| | | } |
| | | |
| | | private long getLong() throws IOException |
| | | { |
| | | ensureData(8); |
| | | return cache.getLong(); |
| | | } |
| | | |
| | | private void getBytes(byte[] b) throws IOException |
| | | { |
| | | ensureData(b.length); |
| | | cache.get(b); |
| | | } |
| | | |
| | | private void getNextKey() throws IOException, BufferUnderflowException |
| | | { |
| | | keyLen = getInt(); |
| | | key = new byte[keyLen]; |
| | | getBytes(key); |
| | | } |
| | | |
| | | |
| | | private void getNextIDSet() throws IOException, BufferUnderflowException |
| | | { |
| | | idLen = getInt(); |
| | | int idCount = idLen/8; |
| | | idSet = new ImportIDSet(idCount); |
| | | for(int i = 0; i < idCount; i++) |
| | | { |
| | | long l = getLong(); |
| | | idSet.addEntryID(l, indexMgr.getLimit(), indexMgr.getMaintainCount()); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void ensureData(int len) throws IOException |
| | | { |
| | | if(cache.remaining() == 0) |
| | | { |
| | | cache.clear(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | } |
| | | else if(cache.remaining() < len) |
| | | { |
| | | cache.compact(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | } |
| | | } |
| | | |
| | | public int compareTo(Buffer o) { |
| | | if(key == null) { |
| | | if(id == o.getBufID()) |
| | | { |
| | | return 0; |
| | | } |
| | | else |
| | | { |
| | | return id > o.getBufID() ? 1 : -1; |
| | | } |
| | | } |
| | | if(this.equals(o)) |
| | | { |
| | | return 0; |
| | | } |
| | | int rc = indexMgr.getComparator().compare(key, o.getKey()); |
| | | if(rc == 0) |
| | | { |
| | | if(idSet.isDefined()) |
| | | { |
| | | return -1; |
| | | } |
| | | else if(o.getIDSet().isDefined()) |
| | | { |
| | | return 1; |
| | | } |
| | | else if(idSet.size() == o.getIDSet().size()) |
| | | { |
| | | rc = id > o.getBufID() ? 1 : -1; |
| | | } |
| | | else |
| | | { |
| | | rc = idSet.size() - o.getIDSet().size(); |
| | | } |
| | | } |
| | | return rc; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * The index manager class is used to carry information about index processing |
| | | * from phase 1 to phase 2. |
| | | */ |
| | | private final class IndexManager |
| | | { |
| | | private final Index index; |
| | | private final DN2ID dn2id; |
| | | private final EntryContainer entryContainer; |
| | | private final File file; |
| | | |
| | | |
| | | private RandomAccessFile raf = null; |
| | | private final List<Buffer> bufferList = new LinkedList<Buffer>(); |
| | | private final int limit; |
| | | private long fileLength, bytesRead = 0; |
| | | private final boolean maintainCount; |
| | | private final Comparator<byte[]> comparator; |
| | | private boolean done = false; |
| | | private long totalDNS; |
| | | private AtomicInteger keyCount = new AtomicInteger(0); |
| | | private final String name; |
| | | |
| | | public IndexManager(Index index) |
| | | { |
| | | this.index = index; |
| | | dn2id = null; |
| | | file = new File(tempDir, index.getName()); |
| | | name = index.getName(); |
| | | limit = index.getIndexEntryLimit(); |
| | | maintainCount = index.getMaintainCount(); |
| | | comparator = index.getComparator(); |
| | | entryContainer = null; |
| | | } |
| | | |
| | | |
| | | public IndexManager(DN2ID dn2id, EntryContainer entryContainer) |
| | | { |
| | | index = null; |
| | | this.dn2id = dn2id; |
| | | file = new File(tempDir, dn2id.getName()); |
| | | limit = 1; |
| | | maintainCount = false; |
| | | comparator = dn2id.getComparator(); |
| | | this.entryContainer = entryContainer; |
| | | name = dn2id.getName(); |
| | | } |
| | | |
| | | public void init() throws FileNotFoundException |
| | | { |
| | | raf = new RandomAccessFile(file, "r"); |
| | | } |
| | | |
| | | public FileChannel getChannel() |
| | | { |
| | | return raf.getChannel(); |
| | | } |
| | | |
| | | public void addBuffer(Buffer o) |
| | | { |
| | | this.bufferList.add(o); |
| | | } |
| | | |
| | | public List<Buffer> getBufferList() |
| | | { |
| | | return bufferList; |
| | | } |
| | | |
| | | public File getFile() |
| | | { |
| | | return file; |
| | | } |
| | | |
| | | public void deleteIndexFile() |
| | | { |
| | | file.delete(); |
| | | } |
| | | |
| | | public void close() throws IOException |
| | | { |
| | | raf.close(); |
| | | } |
| | | |
| | | public int getLimit() |
| | | { |
| | | return limit; |
| | | } |
| | | |
| | | public boolean getMaintainCount() |
| | | { |
| | | return maintainCount; |
| | | } |
| | | |
| | | public Comparator<byte[]> getComparator() |
| | | { |
| | | return comparator; |
| | | } |
| | | |
| | | public Index getIndex() |
| | | { |
| | | return index; |
| | | } |
| | | |
| | | public void setFileLength() |
| | | { |
| | | this.fileLength = file.length(); |
| | | } |
| | | |
| | | public void addBytesRead(int bytesRead) |
| | | { |
| | | this.bytesRead += bytesRead; |
| | | } |
| | | |
| | | public void setDone() |
| | | { |
| | | this.done = true; |
| | | } |
| | | |
| | | public void addTotDNCount(int delta) |
| | | { |
| | | this.totalDNS += delta; |
| | | } |
| | | |
| | | |
| | | public long getTotDNCount() |
| | | { |
| | | return totalDNS; |
| | | } |
| | | |
| | | |
| | | public void printStats(long deltaTime) |
| | | { |
| | | if(!done) |
| | | { |
| | | float rate = 1000f * keyCount.getAndSet(0) / deltaTime; |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_REPORT.get(name, |
| | | (fileLength - bytesRead), rate); |
| | | logError(msg); |
| | | } |
| | | } |
| | | |
| | | public void incrKeyCount() |
| | | { |
| | | keyCount.incrementAndGet(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This class reports progress of the import job at fixed intervals. |
| | | */ |
| | | private final class ProgressTask extends TimerTask |
| | | private final class FirstPhaseProgressTask extends TimerTask |
| | | { |
| | | /** |
| | | * The number of entries that had been read at the time of the |
| | |
| | | private EnvironmentStats prevEnvStats; |
| | | |
| | | /** |
| | | * The number of bytes in a megabyte. |
| | | * Note that 1024*1024 bytes may eventually become known as a mebibyte(MiB). |
| | | * The number of bytes in a megabyte. Note that 1024*1024 bytes may |
| | | * eventually become known as a mebibyte(MiB). |
| | | */ |
| | | public static final int bytesPerMegabyte = 1024*1024; |
| | | public static final int bytesPerMegabyte = 1024 * 1024; |
| | | |
| | | //Determines if the ldif is being read. |
| | | // Determines if the ldif is being read. |
| | | private boolean ldifRead = false; |
| | | |
| | | //Determines if eviction has been detected. |
| | | // Determines if eviction has been detected. |
| | | private boolean evicting = false; |
| | | |
| | | //Entry count when eviction was detected. |
| | | // Entry count when eviction was detected. |
| | | private long evictionEntryCount = 0; |
| | | |
| | | //Suspend output. |
| | | // Suspend output. |
| | | private boolean pause = false; |
| | | |
| | | |
| | | |
| | | /** |
| | | * Create a new import progress task. |
| | | * @throws DatabaseException If an error occurs in the JE database. |
| | | */ |
| | | public ProgressTask() throws DatabaseException |
| | | public FirstPhaseProgressTask() |
| | | { |
| | | previousTime = System.currentTimeMillis(); |
| | | prevEnvStats = rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | try |
| | | { |
| | | prevEnvStats = |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | throw new RuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Return if reading the LDIF file. |
| | | */ |
| | | public void ldifRead() { |
| | | ldifRead=true; |
| | | } |
| | | |
| | | /** |
| | | * Return value of evicting flag. |
| | | * |
| | | * @return <CODE>True</CODE> if eviction is detected. |
| | | */ |
| | | public boolean isEvicting() { |
| | | return evicting; |
| | | } |
| | | |
| | | /** |
| | | * Return count of entries when eviction was detected. |
| | | * |
| | | * @return The entry count when eviction was detected. |
| | | */ |
| | | public long getEvictionEntryCount() { |
| | | return evictionEntryCount; |
| | | } |
| | | |
| | | /** |
| | | * Suspend output if true. |
| | | * |
| | | * @param v The value to set the suspend value to. |
| | | */ |
| | | public void setPause(boolean v) { |
| | | pause=v; |
| | | } |
| | | |
| | | /** |
| | | * The action to be performed by this timer task. |
| | | */ |
| | | public void run() { |
| | | @Override |
| | | public void run() |
| | | { |
| | | long latestCount = reader.getEntriesRead() + 0; |
| | | long deltaCount = (latestCount - previousCount); |
| | | long latestTime = System.currentTimeMillis(); |
| | | long deltaTime = latestTime - previousTime; |
| | | Message message; |
| | | if (deltaTime == 0) { |
| | | if (deltaTime == 0) |
| | | { |
| | | return; |
| | | } |
| | | if(pause) { |
| | | if (pause) |
| | | { |
| | | return; |
| | | } |
| | | if(!ldifRead) { |
| | | long numRead = reader.getEntriesRead(); |
| | | long numIgnored = reader.getEntriesIgnored(); |
| | | if (!ldifRead) |
| | | { |
| | | long numRead = reader.getEntriesRead(); |
| | | long numIgnored = reader.getEntriesIgnored(); |
| | | long numRejected = reader.getEntriesRejected(); |
| | | float rate = 1000f*deltaCount / deltaTime; |
| | | message = NOTE_JEB_IMPORT_PROGRESS_REPORT.get( |
| | | numRead, numIgnored, numRejected, 0, rate); |
| | | float rate = 1000f * deltaCount / deltaTime; |
| | | message = |
| | | NOTE_JEB_IMPORT_PROGRESS_REPORT.get(numRead, numIgnored, |
| | | numRejected, 0, rate); |
| | | logError(message); |
| | | } |
| | | try |
| | |
| | | Runtime runtime = Runtime.getRuntime(); |
| | | long freeMemory = runtime.freeMemory() / bytesPerMegabyte; |
| | | EnvironmentStats envStats = |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | long nCacheMiss = |
| | | envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); |
| | | envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); |
| | | |
| | | float cacheMissRate = 0; |
| | | if (deltaCount > 0) { |
| | | cacheMissRate = nCacheMiss/(float)deltaCount; |
| | | if (deltaCount > 0) |
| | | { |
| | | cacheMissRate = nCacheMiss / (float) deltaCount; |
| | | } |
| | | message = NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get( |
| | | freeMemory, cacheMissRate); |
| | | message = |
| | | NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory, |
| | | cacheMissRate); |
| | | logError(message); |
| | | long evictPasses = envStats.getNEvictPasses(); |
| | | long evictNodes = envStats.getNNodesExplicitlyEvicted(); |
| | |
| | | long cleanerEntriesRead = envStats.getNCleanerEntriesRead(); |
| | | long cleanerINCleaned = envStats.getNINsCleaned(); |
| | | long checkPoints = envStats.getNCheckpoints(); |
| | | if(evictPasses != 0) { |
| | | if(!evicting) { |
| | | evicting=true; |
| | | if(!ldifRead) { |
| | | evictionEntryCount=reader.getEntriesRead(); |
| | | if (evictPasses != 0) |
| | | { |
| | | if (!evicting) |
| | | { |
| | | evicting = true; |
| | | if (!ldifRead) |
| | | { |
| | | evictionEntryCount = reader.getEntriesRead(); |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED.get(evictionEntryCount); |
| | | NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED |
| | | .get(evictionEntryCount); |
| | | logError(message); |
| | | } |
| | | } |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get(evictPasses, |
| | | evictNodes, evictBinsStrip); |
| | | NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get( |
| | | evictPasses, evictNodes, evictBinsStrip); |
| | | logError(message); |
| | | } |
| | | if(cleanerRuns != 0) { |
| | | message = NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, |
| | | cleanerDeletions, cleanerEntriesRead, cleanerINCleaned); |
| | | if (cleanerRuns != 0) |
| | | { |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, |
| | | cleanerDeletions, cleanerEntriesRead, |
| | | cleanerINCleaned); |
| | | logError(message); |
| | | } |
| | | if(checkPoints > 1) { |
| | | message = NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); |
| | | if (checkPoints > 1) |
| | | { |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); |
| | | logError(message); |
| | | } |
| | | prevEnvStats = envStats; |
| | | } catch (DatabaseException e) { |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | // Unlikely to happen and not critical. |
| | | } |
| | | previousCount = latestCount; |
| | | previousTime = latestTime; |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * This class reports progress of the import job at fixed intervals. |
| | | */ |
| | | private final class SecondPhaseProgressTask extends TimerTask |
| | | { |
| | | /** |
| | | * The number of entries that had been read at the time of the |
| | | * previous progress report. |
| | | */ |
| | | private long previousCount = 0; |
| | | |
| | | /** |
| | | * The time in milliseconds of the previous progress report. |
| | | */ |
| | | private long previousTime; |
| | | |
| | | /** |
| | | * The environment statistics at the time of the previous report. |
| | | */ |
| | | private EnvironmentStats prevEnvStats; |
| | | |
| | | /** |
| | | * The number of bytes in a megabyte. Note that 1024*1024 bytes may |
| | | * eventually become known as a mebibyte(MiB). |
| | | */ |
| | | public static final int bytesPerMegabyte = 1024 * 1024; |
| | | |
| | | // Determines if eviction has been detected. |
| | | private boolean evicting = false; |
| | | |
| | | // Suspend output. |
| | | private boolean pause = false; |
| | | |
| | | private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap; |
| | | |
| | | |
| | | /** |
| | | * Create a new import progress task. |
| | | * @param containerIndexMgrMap Map of database container objects to |
| | | * index manager objects. |
| | | */ |
| | | public SecondPhaseProgressTask(Map<DatabaseContainer, |
| | | IndexManager> containerIndexMgrMap) |
| | | { |
| | | previousTime = System.currentTimeMillis(); |
| | | this.containerIndexMgrMap = containerIndexMgrMap; |
| | | try |
| | | { |
| | | prevEnvStats = |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | throw new RuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * The action to be performed by this timer task. |
| | | */ |
| | | @Override |
| | | public void run() |
| | | { |
| | | long latestCount = reader.getEntriesRead() + 0; |
| | | long deltaCount = (latestCount - previousCount); |
| | | long latestTime = System.currentTimeMillis(); |
| | | long deltaTime = latestTime - previousTime; |
| | | Message message; |
| | | if (deltaTime == 0) |
| | | { |
| | | return; |
| | | } |
| | | if (pause) |
| | | { |
| | | return; |
| | | } |
| | | |
| | | try |
| | | { |
| | | Runtime runtime = Runtime.getRuntime(); |
| | | long freeMemory = runtime.freeMemory() / bytesPerMegabyte; |
| | | EnvironmentStats envStats = |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | long nCacheMiss = |
| | | envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); |
| | | |
| | | float cacheMissRate = 0; |
| | | if (deltaCount > 0) |
| | | { |
| | | cacheMissRate = nCacheMiss / (float) deltaCount; |
| | | } |
| | | message = |
| | | NOTE_JEB_IMPORT_CACHE_AND_MEMORY_REPORT.get(freeMemory, |
| | | cacheMissRate); |
| | | logError(message); |
| | | long evictPasses = envStats.getNEvictPasses(); |
| | | long evictNodes = envStats.getNNodesExplicitlyEvicted(); |
| | | long evictBinsStrip = envStats.getNBINsStripped(); |
| | | long cleanerRuns = envStats.getNCleanerRuns(); |
| | | long cleanerDeletions = envStats.getNCleanerDeletions(); |
| | | long cleanerEntriesRead = envStats.getNCleanerEntriesRead(); |
| | | long cleanerINCleaned = envStats.getNINsCleaned(); |
| | | long checkPoints = envStats.getNCheckpoints(); |
| | | if (evictPasses != 0) |
| | | { |
| | | if (!evicting) |
| | | { |
| | | evicting = true; |
| | | } |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_EVICTION_DETECTED_STATS.get( |
| | | evictPasses, evictNodes, evictBinsStrip); |
| | | logError(message); |
| | | } |
| | | if (cleanerRuns != 0) |
| | | { |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_CLEANER_STATS.get(cleanerRuns, |
| | | cleanerDeletions, cleanerEntriesRead, |
| | | cleanerINCleaned); |
| | | logError(message); |
| | | } |
| | | if (checkPoints > 1) |
| | | { |
| | | message = |
| | | NOTE_JEB_IMPORT_LDIF_BUFFER_CHECKPOINTS.get(checkPoints); |
| | | logError(message); |
| | | } |
| | | prevEnvStats = envStats; |
| | | } |
| | | catch (DatabaseException e) |
| | | { |
| | | // Unlikely to happen and not critical. |
| | | } |
| | | previousCount = latestCount; |
| | | previousTime = latestTime; |
| | | |
| | | for(Map.Entry<DatabaseContainer, IndexManager> e : |
| | | containerIndexMgrMap.entrySet()) |
| | | { |
| | | IndexManager indexMgr = e.getValue(); |
| | | indexMgr.printStats(deltaTime); |
| | | } |
| | | } |
| | | } |
| | | } |