| | |
| | | package org.opends.server.backends.jeb.importLDIF; |
| | | |
| | | import static org.opends.messages.JebMessages.*; |
| | | import static org.opends.server.loggers.ErrorLogger.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.util.DynamicConstants.BUILD_ID; |
| | | import static org.opends.server.util.DynamicConstants.REVISION_NUMBER; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | |
| | | import java.io.*; |
| | | import java.nio.*; |
| | | import java.nio.ByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | | import java.util.*; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.atomic.*; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | import org.opends.messages.Message; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.opends.messages.Category; |
| | | import org.opends.messages.Message; |
| | | import org.opends.messages.Severity; |
| | | import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn; |
| | | import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn.IndexType; |
| | | import org.opends.server.admin.std.server.LocalDBBackendCfg; |
| | | import org.opends.server.admin.std.server.LocalDBIndexCfg; |
| | | import org.opends.server.admin.std.meta.LocalDBIndexCfgDefn; |
| | | import org.opends.server.backends.jeb.*; |
| | | import org.opends.server.config.ConfigException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.types.*; |
| | | import org.opends.server.util.*; |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.util.Platform; |
| | | import org.opends.server.util.StaticUtils; |
| | | |
| | | import com.sleepycat.je.*; |
| | | import com.sleepycat.util.PackedInteger; |
| | | |
| | |
| | | * This class provides the engine that performs both importing of LDIF files and |
| | | * the rebuilding of indexes. |
| | | */ |
| | | public class Importer |
| | | public final class Importer |
| | | { |
| | | private static final int TIMER_INTERVAL = 10000; |
| | | final static int KB = 1024; |
| | | private static final int KB = 1024; |
| | | private static final int MB = (KB * KB); |
| | | private static final String DEFAULT_TMP_DIR = "import-tmp"; |
| | | private static final String TMPENV_DIR = "tmp-env"; |
| | |
| | | |
| | | //Defaults for LDIF reader buffers, min memory required to import and default |
| | | //size for byte buffers. |
| | | private static final int READER_WRITER_BUFFER_SIZE = 1 * MB; |
| | | private static final int READER_WRITER_BUFFER_SIZE = 8 * KB; |
| | | private static final int MIN_DB_CACHE_MEMORY = MAX_DB_CACHE_SIZE + |
| | | MAX_DB_LOG_SIZE; |
| | | private static final int BYTE_BUFFER_CAPACITY = 128; |
| | | |
| | | //Min and MAX sizes of phase one buffer. |
| | | private static final int MAX_BUFFER_SIZE = 100 * MB; |
| | | private static final int MIN_BUFFER_SIZE = 8 * KB; |
| | | private static final int MAX_BUFFER_SIZE = 2 * MB; |
| | | private static final int MIN_BUFFER_SIZE = 4 * KB; |
| | | |
| | | //Min size of phase two read-ahead cache. |
| | | private static final int MIN_READ_AHEAD_CACHE_SIZE = 2 * KB; |
| | | |
| | | //Set aside this much for the JVM from free memory. |
| | | private static final int JVM_MEM_PCT = 45; |
| | | |
| | | //Percent of import memory to use for temporary environment if the |
| | | //skip DN validation flag isn't specified. |
| | | private static final int TMPENV_MEM_PCT = 50; |
| | | //Small heap threshold used to give more memory to JVM to attempt OOM errors. |
| | | private static final int SMALL_HEAP_SIZE = 256 * MB; |
| | | |
| | | //The DN attribute type. |
| | | private static AttributeType dnType; |
| | | private static final IndexBuffer.IndexComparator indexComparator = |
| | | new IndexBuffer.IndexComparator(); |
| | | static final IndexOutputBuffer.IndexComparator indexComparator = |
| | | new IndexOutputBuffer.IndexComparator(); |
| | | |
| | | //Phase one buffer and imported entries counts. |
| | | private final AtomicInteger bufferCount = new AtomicInteger(0); |
| | |
| | | //Import configuration. |
| | | private final LDIFImportConfig importConfiguration; |
| | | |
| | | //Backend configuration. |
| | | private final LocalDBBackendCfg backendConfiguration; |
| | | |
| | | //LDIF reader. |
| | | private LDIFReader reader; |
| | | |
| | | //Migrated entry count. |
| | | private int migratedCount; |
| | | |
| | | //Size in bytes of temporary env, DB cache, DB log buf size. |
| | | private long tmpEnvCacheSize = 0, dbCacheSize = MAX_DB_CACHE_SIZE, |
| | | dbLogBufSize = MAX_DB_LOG_SIZE; |
| | | // Size in bytes of temporary env. |
| | | private long tmpEnvCacheSize; |
| | | |
| | | // Size in bytes of DB cache. |
| | | private long dbCacheSize; |
| | | |
| | | //The executor service used for the buffer sort tasks. |
| | | private ExecutorService bufferSortService; |
| | |
| | | private ExecutorService scratchFileWriterService; |
| | | |
| | | //Queue of free index buffers -- used to re-cycle index buffers; |
| | | private final BlockingQueue<IndexBuffer> freeBufferQueue = |
| | | new LinkedBlockingQueue<IndexBuffer>(); |
| | | private final BlockingQueue<IndexOutputBuffer> freeBufferQueue = |
| | | new LinkedBlockingQueue<IndexOutputBuffer>(); |
| | | |
| | | //Map of index keys to index buffers. Used to allocate sorted |
| | | //index buffers to a index writer thread. |
| | | private final |
| | | Map<IndexKey, BlockingQueue<IndexBuffer>> indexKeyQueMap = |
| | | new ConcurrentHashMap<IndexKey, BlockingQueue<IndexBuffer>>(); |
| | | Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueMap = |
| | | new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>(); |
| | | |
| | | //Map of DB containers to index managers. Used to start phase 2. |
| | | private final List<IndexManager> indexMgrList = |
| | |
| | | } |
| | | } |
| | | |
| | | //Rebuild-index instance. |
| | | private |
| | | Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg, |
| | | EnvironmentConfig envConfig) throws |
| | | InitializationException, JebException, ConfigException |
| | | |
| | | |
| | | /** |
| | | * Create a new import job with the specified rebuild index config. |
| | | * |
| | | * @param rebuildConfig |
| | | * The rebuild index configuration. |
| | | * @param cfg |
| | | * The local DB back-end configuration. |
| | | * @param envConfig |
| | | * The JEB environment config. |
| | | * @throws InitializationException |
| | | * If a problem occurs during initialization. |
| | | * @throws JebException |
| | | * If an error occurred when opening the DB. |
| | | * @throws ConfigException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | public Importer(RebuildConfig rebuildConfig, LocalDBBackendCfg cfg, |
| | | EnvironmentConfig envConfig) throws InitializationException, |
| | | JebException, ConfigException |
| | | { |
| | | importConfiguration = null; |
| | | tmpEnv = null; |
| | | threadCount = 1; |
| | | rebuildManager = new RebuildIndexManager(rebuildConfig, cfg); |
| | | indexCount = rebuildManager.getIndexCount(); |
| | | scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount); |
| | | scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>(); |
| | | this.importConfiguration = null; |
| | | this.backendConfiguration = cfg; |
| | | this.tmpEnv = null; |
| | | this.threadCount = 1; |
| | | this.rebuildManager = new RebuildIndexManager(rebuildConfig, cfg); |
| | | this.indexCount = rebuildManager.getIndexCount(); |
| | | this.scratchFileWriterList = new ArrayList<ScratchFileWriterTask>( |
| | | indexCount); |
| | | this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>(); |
| | | |
| | | File parentDir; |
| | | if(rebuildConfig.getTmpDirectory() == null) |
| | | if (rebuildConfig.getTmpDirectory() == null) |
| | | { |
| | | parentDir = getFileForPath(DEFAULT_TMP_DIR); |
| | | } |
| | | else |
| | | { |
| | | parentDir = getFileForPath(rebuildConfig.getTmpDirectory()); |
| | | parentDir = getFileForPath(rebuildConfig.getTmpDirectory()); |
| | | } |
| | | tempDir = new File(parentDir, cfg.getBackendId()); |
| | | |
| | | this.tempDir = new File(parentDir, cfg.getBackendId()); |
| | | recursiveDelete(tempDir); |
| | | if(!tempDir.exists() && !tempDir.mkdirs()) |
| | | if (!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | Message message = |
| | | ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir)); |
| | | Message message = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String |
| | | .valueOf(tempDir)); |
| | | throw new InitializationException(message); |
| | | } |
| | | skipDNValidation = true; |
| | | if(envConfig != null) |
| | | { |
| | | initializeDBEnv(envConfig); |
| | | } |
| | | this.skipDNValidation = true; |
| | | initializeDBEnv(envConfig); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Create a new import job with the specified ldif import config. |
| | | * |
| | | * @param importConfiguration The LDIF import configuration. |
| | | * @param localDBBackendCfg The local DB back-end configuration. |
| | | * @param envConfig The JEB environment config. |
| | | * @throws InitializationException If a problem occurs during initialization. |
| | | * @param importConfiguration |
| | | * The LDIF import configuration. |
| | | * @param localDBBackendCfg |
| | | * The local DB back-end configuration. |
| | | * @param envConfig |
| | | * The JEB environment config. |
| | | * @throws InitializationException |
| | | * If a problem occurs during initialization. |
| | | * @throws ConfigException |
| | | * If a problem occurs reading the configuration. |
| | | * @throws DatabaseException |
| | | * If an error occurred when opening the DB. |
| | | */ |
| | | private Importer(LDIFImportConfig importConfiguration, |
| | | LocalDBBackendCfg localDBBackendCfg, |
| | | EnvironmentConfig envConfig) throws |
| | | InitializationException, DatabaseException |
| | | public Importer(LDIFImportConfig importConfiguration, |
| | | LocalDBBackendCfg localDBBackendCfg, EnvironmentConfig envConfig) |
| | | throws InitializationException, ConfigException, DatabaseException |
| | | { |
| | | rebuildManager = null; |
| | | this.rebuildManager = null; |
| | | this.importConfiguration = importConfiguration; |
| | | if(importConfiguration.getThreadCount() == 0) |
| | | this.backendConfiguration = localDBBackendCfg; |
| | | |
| | | if (importConfiguration.getThreadCount() == 0) |
| | | { |
| | | threadCount = Runtime.getRuntime().availableProcessors() * 2; |
| | | this.threadCount = Runtime.getRuntime().availableProcessors() * 2; |
| | | } |
| | | else |
| | | { |
| | | threadCount = importConfiguration.getThreadCount(); |
| | | this.threadCount = importConfiguration.getThreadCount(); |
| | | } |
| | | indexCount = localDBBackendCfg.listLocalDBIndexes().length + 2; |
| | | if(!importConfiguration.appendToExistingData()) { |
| | | if(importConfiguration.clearBackend() || |
| | | localDBBackendCfg.getBaseDN().size() <= 1) { |
| | | clearedBackend = true; |
| | | |
| | | // Determine the number of indexes. |
| | | int indexes = 2; // dn2id + dn2uri |
| | | for (String indexName : localDBBackendCfg.listLocalDBIndexes()) |
| | | { |
| | | LocalDBIndexCfg index = localDBBackendCfg.getLocalDBIndex(indexName); |
| | | SortedSet<IndexType> types = index.getIndexType(); |
| | | if (types.contains(IndexType.EXTENSIBLE)) |
| | | { |
| | | indexes += types.size() - 1 |
| | | + index.getIndexExtensibleMatchingRule().size(); |
| | | } |
| | | else |
| | | { |
| | | indexes += types.size(); |
| | | } |
| | | } |
| | | scratchFileWriterList = new ArrayList<ScratchFileWriterTask>(indexCount); |
| | | scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>(); |
| | | this.indexCount = indexes; |
| | | |
| | | if (!importConfiguration.appendToExistingData()) |
| | | { |
| | | if (importConfiguration.clearBackend() |
| | | || localDBBackendCfg.getBaseDN().size() <= 1) |
| | | { |
| | | this.clearedBackend = true; |
| | | } |
| | | } |
| | | this.scratchFileWriterList = new ArrayList<ScratchFileWriterTask>( |
| | | indexCount); |
| | | this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<?>>(); |
| | | File parentDir; |
| | | if(importConfiguration.getTmpDirectory() == null) |
| | | if (importConfiguration.getTmpDirectory() == null) |
| | | { |
| | | parentDir = getFileForPath(DEFAULT_TMP_DIR); |
| | | } |
| | | else |
| | | { |
| | | parentDir = getFileForPath(importConfiguration.getTmpDirectory()); |
| | | parentDir = getFileForPath(importConfiguration.getTmpDirectory()); |
| | | } |
| | | tempDir = new File(parentDir, localDBBackendCfg.getBackendId()); |
| | | this.tempDir = new File(parentDir, localDBBackendCfg.getBackendId()); |
| | | recursiveDelete(tempDir); |
| | | if(!tempDir.exists() && !tempDir.mkdirs()) |
| | | if (!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | Message message = |
| | | ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String.valueOf(tempDir)); |
| | | Message message = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(String |
| | | .valueOf(tempDir)); |
| | | throw new InitializationException(message); |
| | | } |
| | | skipDNValidation = importConfiguration.getSkipDNValidation(); |
| | | initializeDBEnv(envConfig); |
| | | //Set up temporary environment. |
| | | if(!skipDNValidation) |
| | | |
| | | // Set up temporary environment. |
| | | if (!skipDNValidation) |
| | | { |
| | | File envPath = new File(tempDir, TMPENV_DIR); |
| | | envPath.mkdirs(); |
| | | tmpEnv = new TmpEnv(envPath); |
| | | this.tmpEnv = new TmpEnv(envPath); |
| | | } |
| | | else |
| | | { |
| | | tmpEnv = null; |
| | | this.tmpEnv = null; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return and import LDIF instance using the specified arguments. |
| | | * |
| | | * @param importCfg The import config to use. |
| | | * @param localDBBackendCfg The local DB backend config to use. |
| | | * @param envCfg The JEB environment config to use. |
| | | * @return A import LDIF instance. |
| | | * |
| | | * @throws InitializationException If the instance cannot be initialized. |
| | | */ |
| | | public static |
| | | Importer getInstance(LDIFImportConfig importCfg, |
| | | LocalDBBackendCfg localDBBackendCfg, |
| | | EnvironmentConfig envCfg) |
| | | throws InitializationException |
| | | { |
| | | return new Importer(importCfg, localDBBackendCfg, envCfg); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return an import rebuild index instance using the specified arguments. |
| | | * |
| | | * @param rebuildCfg The rebuild config to use. |
| | | * @param localDBBackendCfg The local DB backend config to use. |
| | | * @param envCfg The JEB environment config to use. |
| | | * @return An import rebuild index instance. |
| | | * |
| | | * @throws InitializationException If the instance cannot be initialized. |
| | | * @throws JebException If a JEB exception occurs. |
| | | * @throws ConfigException If the instance cannot be configured. |
| | | */ |
| | | public static synchronized |
| | | Importer getInstance(RebuildConfig rebuildCfg, |
| | | LocalDBBackendCfg localDBBackendCfg, |
| | | EnvironmentConfig envCfg) |
| | | throws InitializationException, JebException, ConfigException |
| | | { |
| | | return new Importer(rebuildCfg, localDBBackendCfg, envCfg); |
| | | } |
| | | |
| | | |
| | | private void adjustBufferSize(long availMem) |
| | | { |
| | | int oldThreadCount = threadCount; |
| | | for(;threadCount > 0; threadCount--) |
| | | { |
| | | phaseOneBufferCount = 2 * (indexCount * threadCount); |
| | | bufferSize = (int) (availMem/ ((4 * indexCount) + phaseOneBufferCount)); |
| | | if(bufferSize >= MIN_BUFFER_SIZE) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | Message message = |
| | | NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount, threadCount); |
| | | logError(message); |
| | | } |
| | | |
| | | |
| | | private boolean getBufferSizes(long availMem) |
| | | { |
| | | boolean maxBuf = false; |
| | | bufferSize = (int) (availMem / ((4 * indexCount) + phaseOneBufferCount)); |
| | | if(bufferSize >= MIN_BUFFER_SIZE) |
| | | { |
| | | if(bufferSize > MAX_BUFFER_SIZE) |
| | | { |
| | | bufferSize = MAX_BUFFER_SIZE; |
| | | maxBuf = true; |
| | | } |
| | | } |
| | | else if(bufferSize < MIN_BUFFER_SIZE) |
| | | { |
| | | adjustBufferSize(availMem); |
| | | } |
| | | return maxBuf; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Return the suffix instance in the specified map that matches the specified |
| | |
| | | } |
| | | |
| | | |
| | | private long getTmpEnvironmentMemory(long availableMemoryImport) |
| | | { |
| | | int tmpMemPct = TMPENV_MEM_PCT; |
| | | tmpEnvCacheSize = (availableMemoryImport * tmpMemPct) / 100; |
| | | availableMemoryImport -= tmpEnvCacheSize; |
| | | if(!clearedBackend) |
| | | { |
| | | long additionalDBCache = (tmpEnvCacheSize * 85) / 100; |
| | | tmpEnvCacheSize -= additionalDBCache; |
| | | dbCacheSize += additionalDBCache; |
| | | } |
| | | return availableMemoryImport; |
| | | } |
| | | |
| | | |
| | | //Used for large heap sizes when the buffer max size has been identified. Any |
| | | //extra memory can be given to the temporary environment in that case. |
| | | private void adjustTmpEnvironmentMemory(long availableMemoryImport) |
| | | { |
| | | long additionalMem = availableMemoryImport - |
| | | (phaseOneBufferCount * MAX_BUFFER_SIZE); |
| | | if(additionalMem > 0) |
| | | { |
| | | tmpEnvCacheSize += additionalMem; |
| | | if(!clearedBackend) |
| | | { |
| | | //The DN cache probably needs to be smaller and the DB cache bigger |
| | | //because the dn2id is checked if the backend has not been cleared. |
| | | long additionalDBCache = (tmpEnvCacheSize * 85) / 100; |
| | | tmpEnvCacheSize -= additionalDBCache; |
| | | dbCacheSize += additionalDBCache; |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private long defaultMemoryCalc(long availMem) |
| | | throws InitializationException |
| | | { |
| | | long bufMem; |
| | | if(availMem < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE)) |
| | | { |
| | | long minCacheSize = MIN_DB_CACHE_SIZE; |
| | | if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) { |
| | | minCacheSize = 500 *KB; |
| | | } |
| | | dbCacheSize = minCacheSize; |
| | | tmpEnvCacheSize = minCacheSize; |
| | | dbLogBufSize = 0; |
| | | bufMem = availMem - 2 * minCacheSize; |
| | | if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) { |
| | | Message message = |
| | | ERR_IMPORT_LDIF_LACK_MEM.get(availMem, |
| | | ((2 * indexCount) * MIN_BUFFER_SIZE) + 2 * MIN_DB_CACHE_SIZE); |
| | | throw new InitializationException(message); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | bufMem = getTmpEnvironmentMemory(availMem); |
| | | } |
| | | return bufMem; |
| | | } |
| | | |
| | | private long skipDNValidationCalc(long availMem) |
| | | throws InitializationException |
| | | { |
| | | long bufMem = availMem; |
| | | if(availMem < (MIN_DB_CACHE_MEMORY)) |
| | | { |
| | | long minCacheSize = MIN_DB_CACHE_SIZE; |
| | | if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) { |
| | | minCacheSize = 500 *KB; |
| | | } |
| | | dbCacheSize = minCacheSize; |
| | | dbLogBufSize = 0; |
| | | bufMem = availMem - minCacheSize; |
| | | if(bufMem < 0 || (bufMem < (2 * indexCount) * MIN_BUFFER_SIZE)) { |
| | | Message message = |
| | | ERR_IMPORT_LDIF_LACK_MEM.get(availMem, |
| | | ((2 * indexCount) * MIN_BUFFER_SIZE) + MIN_DB_CACHE_SIZE); |
| | | throw new InitializationException(message); |
| | | } |
| | | } |
| | | return bufMem; |
| | | } |
| | | |
| | | /** |
| | | * Calculate buffer sizes and initialize JEB properties based on memory. |
| | | * |
| | | * @param envConfig The environment config to use in the calculations. |
| | | * |
| | | * @throws InitializationException If a problem occurs during calculation. |
| | | * @param envConfig |
| | | * The environment config to use in the calculations. |
| | | * @throws InitializationException |
| | | * If a problem occurs during calculation. |
| | | */ |
| | | private void initializeDBEnv(EnvironmentConfig envConfig) |
| | | throws InitializationException |
| | | throws InitializationException |
| | | { |
| | | Message message; |
| | | phaseOneBufferCount = 2 * (indexCount * threadCount); |
| | | Runtime runTime = Runtime.getRuntime(); |
| | | long totFreeMemory = runTime.freeMemory() + |
| | | (runTime.maxMemory() - runTime.totalMemory()); |
| | | int importMemPct = (100 - JVM_MEM_PCT); |
| | | if(totFreeMemory <= SMALL_HEAP_SIZE) |
| | | // Calculate amount of usable memory. This will need to take into account |
| | | // various fudge factors, including the number of IO buffers used by the |
| | | // scratch writers (1 per index). |
| | | final long availableMemory = calculateAvailableMemory(); |
| | | final long usableMemory = availableMemory |
| | | - (indexCount * READER_WRITER_BUFFER_SIZE); |
| | | |
| | | if (!skipDNValidation) |
| | | { |
| | | // No DN validation: calculate memory for DB cache, DN2ID temporary cache, |
| | | // and buffers. |
| | | if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) |
| | | { |
| | | importMemPct -= 15; |
| | | dbCacheSize = 500 * KB; |
| | | tmpEnvCacheSize = 500 * KB; |
| | | } |
| | | if(rebuildManager != null) |
| | | else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE)) |
| | | { |
| | | importMemPct -= 15; |
| | | dbCacheSize = MIN_DB_CACHE_SIZE; |
| | | tmpEnvCacheSize = MIN_DB_CACHE_SIZE; |
| | | } |
| | | long phaseOneBufferMemory; |
| | | if(!skipDNValidation) |
| | | else if (!clearedBackend) |
| | | { |
| | | phaseOneBufferMemory = |
| | | defaultMemoryCalc((totFreeMemory * importMemPct) / 100); |
| | | // Appending to existing data so reserve extra memory for the DB cache |
| | | // since it will be needed for dn2id queries. |
| | | dbCacheSize = usableMemory * 33 / 100; |
| | | tmpEnvCacheSize = usableMemory * 33 / 100; |
| | | } |
| | | else |
| | | { |
| | | phaseOneBufferMemory = |
| | | skipDNValidationCalc((totFreeMemory * importMemPct) / 100); |
| | | dbCacheSize = MAX_DB_CACHE_SIZE; |
| | | tmpEnvCacheSize = usableMemory * 66 / 100; |
| | | } |
| | | boolean maxBuffers = getBufferSizes(phaseOneBufferMemory); |
| | | //Give any extra memory to the temp environment cache if there is any. |
| | | if(!skipDNValidation && maxBuffers) |
| | | } |
| | | else |
| | | { |
| | | // No DN validation: calculate memory for DB cache and buffers. |
| | | |
| | | // No need for DN2ID cache. |
| | | tmpEnvCacheSize = 0; |
| | | |
| | | if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) |
| | | { |
| | | adjustTmpEnvironmentMemory(phaseOneBufferMemory); |
| | | dbCacheSize = 500 * KB; |
| | | } |
| | | message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get(phaseOneBufferMemory, |
| | | phaseOneBufferCount); |
| | | else if (usableMemory < MIN_DB_CACHE_MEMORY) |
| | | { |
| | | dbCacheSize = MIN_DB_CACHE_SIZE; |
| | | } |
| | | else |
| | | { |
| | | // No need to differentiate between append/clear backend, since dn2id is |
| | | // not being queried. |
| | | dbCacheSize = MAX_DB_CACHE_SIZE; |
| | | } |
| | | } |
| | | |
| | | final long phaseOneBufferMemory = usableMemory - dbCacheSize |
| | | - tmpEnvCacheSize; |
| | | final int oldThreadCount = threadCount; |
| | | while (true) |
| | | { |
| | | phaseOneBufferCount = 2 * indexCount * threadCount; |
| | | |
| | | // Scratch writers allocate 4 buffers per index as well. |
| | | final int totalPhaseOneBufferCount = phaseOneBufferCount |
| | | + (4 * indexCount); |
| | | bufferSize = (int) (phaseOneBufferMemory / totalPhaseOneBufferCount); |
| | | |
| | | if (bufferSize > MAX_BUFFER_SIZE) |
| | | { |
| | | if (!skipDNValidation) |
| | | { |
| | | // The buffers are big enough: the memory is best used for the DN2ID |
| | | // temp DB. |
| | | bufferSize = MAX_BUFFER_SIZE; |
| | | |
| | | final long extraMemory = phaseOneBufferMemory |
| | | - (totalPhaseOneBufferCount * bufferSize); |
| | | if (!clearedBackend) |
| | | { |
| | | dbCacheSize += extraMemory / 2; |
| | | tmpEnvCacheSize += extraMemory / 2; |
| | | } |
| | | else |
| | | { |
| | | tmpEnvCacheSize += extraMemory; |
| | | } |
| | | } |
| | | |
| | | break; |
| | | } |
| | | else if (bufferSize > MIN_BUFFER_SIZE) |
| | | { |
| | | // This is acceptable. |
| | | break; |
| | | } |
| | | else if (threadCount > 1) |
| | | { |
| | | // Retry using less threads. |
| | | threadCount--; |
| | | } |
| | | else |
| | | { |
| | | // Not enough memory. |
| | | final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount |
| | | * MIN_BUFFER_SIZE; |
| | | Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory, |
| | | minimumPhaseOneBufferMemory + dbCacheSize + tmpEnvCacheSize); |
| | | throw new InitializationException(message); |
| | | } |
| | | } |
| | | |
| | | if (oldThreadCount != threadCount) |
| | | { |
| | | Message message = NOTE_JEB_IMPORT_ADJUST_THREAD_COUNT.get(oldThreadCount, |
| | | threadCount); |
| | | logError(message); |
| | | if(tmpEnvCacheSize > 0) |
| | | { |
| | | message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize); |
| | | logError(message); |
| | | } |
| | | envConfig.setConfigParam(EnvironmentConfig.ENV_RUN_CLEANER, "true"); |
| | | envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY, |
| | | Long.toString(dbCacheSize)); |
| | | message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, |
| | | bufferSize); |
| | | } |
| | | |
| | | Message message = NOTE_JEB_IMPORT_LDIF_TOT_MEM_BUF.get( |
| | | phaseOneBufferMemory, phaseOneBufferCount); |
| | | logError(message); |
| | | if (tmpEnvCacheSize > 0) |
| | | { |
| | | message = NOTE_JEB_IMPORT_LDIF_TMP_ENV_MEM.get(tmpEnvCacheSize); |
| | | logError(message); |
| | | if(dbLogBufSize > 0) |
| | | } |
| | | envConfig.setConfigParam(EnvironmentConfig.MAX_MEMORY, Long |
| | | .toString(dbCacheSize)); |
| | | message = NOTE_JEB_IMPORT_LDIF_DB_MEM_BUF_INFO.get(dbCacheSize, bufferSize); |
| | | logError(message); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Returns the amount of available memory which can be used by this import, |
| | | * taking into account whether or not the import is running offline or online |
| | | * as a task. |
| | | */ |
| | | private long calculateAvailableMemory() |
| | | { |
| | | final long availableMemory; |
| | | if (DirectoryServer.isRunning()) |
| | | { |
| | | // Online import/rebuild. |
| | | Runtime runTime = Runtime.getRuntime(); |
| | | runTime.gc(); |
| | | runTime.gc(); |
| | | final long usedMemory = runTime.totalMemory() - runTime.freeMemory(); |
| | | final long maxUsableMemory = Platform.getUsableMemoryForCaching(); |
| | | final long usableMemory = maxUsableMemory - usedMemory; |
| | | |
| | | final long configuredMemory; |
| | | if (backendConfiguration.getDBCacheSize() > 0) |
| | | { |
| | | envConfig.setConfigParam(EnvironmentConfig.LOG_TOTAL_BUFFER_BYTES, |
| | | Long.toString(MAX_DB_LOG_SIZE)); |
| | | message = NOTE_JEB_IMPORT_LDIF_LOG_BYTES.get(MAX_DB_LOG_SIZE); |
| | | logError(message); |
| | | configuredMemory = backendConfiguration.getDBCacheSize(); |
| | | } |
| | | else |
| | | { |
| | | configuredMemory = backendConfiguration.getDBCachePercent() |
| | | * Runtime.getRuntime().maxMemory() / 100; |
| | | } |
| | | availableMemory = Math.min(usableMemory, configuredMemory); |
| | | } |
| | | else |
| | | { |
| | | // Offline import/rebuild. |
| | | availableMemory = Platform.getUsableMemoryForCaching(); |
| | | } |
| | | |
| | | // Now take into account various fudge factors. |
| | | int importMemPct = 90; |
| | | if (availableMemory <= SMALL_HEAP_SIZE) |
| | | { |
| | | // Be pessimistic when memory is low. |
| | | importMemPct -= 25; |
| | | } |
| | | if (rebuildManager != null) |
| | | { |
| | | // Rebuild seems to require more overhead. |
| | | importMemPct -= 15; |
| | | } |
| | | |
| | | return (availableMemory * importMemPct / 100); |
| | | } |
| | | |
| | | |
| | |
| | | { |
| | | for(int i = 0; i < phaseOneBufferCount; i++) |
| | | { |
| | | IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize); |
| | | IndexOutputBuffer b = new IndexOutputBuffer(bufferSize); |
| | | freeBufferQueue.add(b); |
| | | } |
| | | } |
| | |
| | | this.rootContainer = rootContainer; |
| | | try |
| | | { |
| | | reader = new LDIFReader(importConfiguration, rootContainer, |
| | | READER_WRITER_BUFFER_SIZE); |
| | | reader = new LDIFReader(importConfiguration, rootContainer, |
| | | READER_WRITER_BUFFER_SIZE); |
| | | } |
| | | catch(IOException ioe) |
| | | catch (IOException ioe) |
| | | { |
| | | Message message = ERR_JEB_IMPORT_LDIF_READER_IO_ERROR.get(); |
| | | throw new InitializationException(message, ioe); |
| | |
| | | |
| | | try |
| | | { |
| | | Message message = |
| | | NOTE_JEB_IMPORT_STARTING.get(DirectoryServer.getVersionString(), |
| | | BUILD_ID, REVISION_NUMBER); |
| | | Message message = NOTE_JEB_IMPORT_STARTING.get( |
| | | DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER); |
| | | logError(message); |
| | | message = NOTE_JEB_IMPORT_THREAD_COUNT.get(threadCount); |
| | | logError(message); |
| | |
| | | long startTime = System.currentTimeMillis(); |
| | | phaseOne(); |
| | | long phaseOneFinishTime = System.currentTimeMillis(); |
| | | if(!skipDNValidation) |
| | | if (!skipDNValidation) |
| | | { |
| | | tmpEnv.shutdown(); |
| | | tmpEnv.shutdown(); |
| | | } |
| | | if(isPhaseOneCanceled) |
| | | if (isPhaseOneCanceled) |
| | | { |
| | | throw new InterruptedException("Import processing canceled."); |
| | | } |
| | |
| | | long finishTime = System.currentTimeMillis(); |
| | | long importTime = (finishTime - startTime); |
| | | float rate = 0; |
| | | message = NOTE_JEB_IMPORT_PHASE_STATS.get(importTime/1000, |
| | | (phaseOneFinishTime - startTime)/1000, |
| | | (phaseTwoFinishTime - phaseTwoTime)/1000); |
| | | message = NOTE_JEB_IMPORT_PHASE_STATS.get(importTime / 1000, |
| | | (phaseOneFinishTime - startTime) / 1000, |
| | | (phaseTwoFinishTime - phaseTwoTime) / 1000); |
| | | logError(message); |
| | | if (importTime > 0) |
| | | rate = 1000f * reader.getEntriesRead() / importTime; |
| | | message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(), |
| | | importCount.get(), reader.getEntriesIgnored(), |
| | | reader.getEntriesRejected(), migratedCount, |
| | | importTime / 1000, rate); |
| | | if (importTime > 0) rate = 1000f * reader.getEntriesRead() / importTime; |
| | | message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(), |
| | | importCount.get(), reader.getEntriesIgnored(), |
| | | reader.getEntriesRejected(), migratedCount, importTime / 1000, rate); |
| | | logError(message); |
| | | } |
| | | finally |
| | | { |
| | | reader.close(); |
| | | } |
| | | return new LDIFImportResult(reader.getEntriesRead(), reader |
| | | .getEntriesRejected(), reader.getEntriesIgnored()); |
| | | return new LDIFImportResult(reader.getEntriesRead(), |
| | | reader.getEntriesRejected(), reader.getEntriesIgnored()); |
| | | } |
| | | |
| | | |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | private void phaseOne() throws InterruptedException, ExecutionException |
| | | { |
| | | initializeIndexBuffers(); |
| | | FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask(); |
| | | ScheduledThreadPoolExecutor timerService = |
| | | new ScheduledThreadPoolExecutor(1); |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor( |
| | | 1); |
| | | timerService.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, |
| | | TIMER_INTERVAL, TimeUnit.MILLISECONDS); |
| | | TIMER_INTERVAL, TimeUnit.MILLISECONDS); |
| | | scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount); |
| | | bufferSortService = Executors.newFixedThreadPool(threadCount); |
| | | ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | tasks.add(new MigrateExistingTask()); |
| | | List<Future<Void>> results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) { |
| | | if(!result.isDone()) { |
| | | for (Future<Void> result : results) |
| | | { |
| | | if (!result.isDone()) |
| | | { |
| | | result.get(); |
| | | } |
| | | } |
| | | tasks.clear(); |
| | | results.clear(); |
| | | if (importConfiguration.appendToExistingData() && |
| | | importConfiguration.replaceExistingEntries()) |
| | | if (importConfiguration.appendToExistingData() |
| | | && importConfiguration.replaceExistingEntries()) |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new AppendReplaceTask()); |
| | | } |
| | |
| | | } |
| | | results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | if(!result.isDone()) { |
| | | { |
| | | if (!result.isDone()) |
| | | { |
| | | result.get(); |
| | | } |
| | | } |
| | | tasks.clear(); |
| | | results.clear(); |
| | | tasks.add(new MigrateExcludedTask()); |
| | | results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | if(!result.isDone()) { |
| | | result.get(); |
| | | } |
| | | stopScratchFileWriters(); |
| | | for (Future<?> result : scratchFileWriterFutures) |
| | | { |
| | | if(!result.isDone()) { |
| | | if (!result.isDone()) |
| | | { |
| | | result.get(); |
| | | } |
| | | } |
| | | stopScratchFileWriters(); |
| | | for (Future<?> result : scratchFileWriterFutures) |
| | | { |
| | | if (!result.isDone()) |
| | | { |
| | | result.get(); |
| | | } |
| | | } |
| | | |
| | | // Shutdown the executor services |
| | | timerService.shutdown(); |
| | | timerService.awaitTermination(30, TimeUnit.SECONDS); |
| | |
| | | scratchFileWriterService.shutdown(); |
| | | scratchFileWriterService.awaitTermination(30, TimeUnit.SECONDS); |
| | | |
| | | //Try to clear as much memory as possible. |
| | | // Try to clear as much memory as possible. |
| | | scratchFileWriterList.clear(); |
| | | scratchFileWriterFutures.clear(); |
| | | indexKeyQueMap.clear(); |
| | |
| | | |
| | | |
| | | |
| | | private void phaseTwo() throws InterruptedException, JebException, |
| | | ExecutionException |
| | | private void phaseTwo() throws InitializationException, InterruptedException, |
| | | JebException, ExecutionException |
| | | { |
| | | SecondPhaseProgressTask progress2Task = |
| | | new SecondPhaseProgressTask(reader.getEntriesRead()); |
| | | ScheduledThreadPoolExecutor timerService = |
| | | new ScheduledThreadPoolExecutor(1); |
| | | SecondPhaseProgressTask progress2Task = new SecondPhaseProgressTask( |
| | | reader.getEntriesRead()); |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor( |
| | | 1); |
| | | timerService.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, |
| | | TIMER_INTERVAL, TimeUnit.MILLISECONDS); |
| | | processIndexFiles(); |
| | | timerService.shutdown(); |
| | | timerService.awaitTermination(30, TimeUnit.SECONDS); |
| | | } |
| | | |
| | | |
| | | private int getBufferCount(int dbThreads) |
| | | { |
| | | int buffers = 0; |
| | | |
| | | List<IndexManager> totList = new LinkedList<IndexManager>(DNIndexMgrList); |
| | | totList.addAll(indexMgrList); |
| | | Collections.sort(totList, Collections.reverseOrder()); |
| | | int limit = Math.min(dbThreads, totList.size()); |
| | | for(int i = 0; i < limit; i ++) |
| | | TIMER_INTERVAL, TimeUnit.MILLISECONDS); |
| | | try |
| | | { |
| | | buffers += totList.get(i).getBufferList().size(); |
| | | processIndexFiles(); |
| | | } |
| | | return buffers; |
| | | finally |
| | | { |
| | | timerService.shutdown(); |
| | | timerService.awaitTermination(30, TimeUnit.SECONDS); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void processIndexFiles() throws InterruptedException, |
| | | JebException, ExecutionException |
| | | |
| | | private void processIndexFiles() throws InitializationException, |
| | | InterruptedException, JebException, ExecutionException |
| | | { |
| | | ExecutorService dbService; |
| | | if(bufferCount.get() == 0) |
| | | { |
| | | return; |
| | |
| | | { |
| | | dbThreads = 4; |
| | | } |
| | | int readAheadSize = cacheSizeFromFreeMemory(getBufferCount(dbThreads)); |
| | | |
| | | // Calculate memory / buffer counts. |
| | | final long availableMemory = calculateAvailableMemory(); |
| | | final long usableMemory = availableMemory - dbCacheSize; |
| | | |
| | | int readAheadSize; |
| | | int buffers; |
| | | while (true) |
| | | { |
| | | final List<IndexManager> totList = new ArrayList<IndexManager>( |
| | | DNIndexMgrList); |
| | | totList.addAll(indexMgrList); |
| | | Collections.sort(totList, Collections.reverseOrder()); |
| | | |
| | | buffers = 0; |
| | | final int limit = Math.min(dbThreads, totList.size()); |
| | | for (int i = 0; i < limit; i++) |
| | | { |
| | | buffers += totList.get(i).bufferIndexCount; |
| | | } |
| | | |
| | | readAheadSize = (int) (usableMemory / buffers); |
| | | if (readAheadSize > bufferSize) |
| | | { |
| | | // Cache size is never larger than the buffer size. |
| | | readAheadSize = bufferSize; |
| | | break; |
| | | } |
| | | else if (readAheadSize > MIN_READ_AHEAD_CACHE_SIZE) |
| | | { |
| | | // This is acceptable. |
| | | break; |
| | | } |
| | | else if (dbThreads > 1) |
| | | { |
| | | // Reduce thread count. |
| | | dbThreads--; |
| | | } |
| | | else |
| | | { |
| | | // Not enough memory. |
| | | final long minimumPhaseTwoBufferMemory = buffers |
| | | * MIN_READ_AHEAD_CACHE_SIZE; |
| | | Message message = ERR_IMPORT_LDIF_LACK_MEM.get(usableMemory, |
| | | minimumPhaseTwoBufferMemory + dbCacheSize); |
| | | throw new InitializationException(message); |
| | | } |
| | | } |
| | | |
| | | Message message = NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get( |
| | | availableMemory, readAheadSize, buffers); |
| | | logError(message); |
| | | |
| | | // Start indexing tasks. |
| | | List<Future<Void>> futures = new LinkedList<Future<Void>>(); |
| | | dbService = Executors.newFixedThreadPool(dbThreads); |
| | | //Start DN processing first. |
| | | for(IndexManager dnMgr : DNIndexMgrList) |
| | | ExecutorService dbService = Executors.newFixedThreadPool(dbThreads); |
| | | |
| | | // Start DN processing first. |
| | | for (IndexManager dnMgr : DNIndexMgrList) |
| | | { |
| | | futures.add(dbService.submit(new IndexDBWriteTask(dnMgr, readAheadSize))); |
| | | } |
| | | for(IndexManager mgr : indexMgrList) |
| | | for (IndexManager mgr : indexMgrList) |
| | | { |
| | | futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize))); |
| | | futures.add(dbService.submit(new IndexDBWriteTask(mgr, readAheadSize))); |
| | | } |
| | | |
| | | for (Future<Void> result : futures) |
| | | if(!result.isDone()) { |
| | | { |
| | | if (!result.isDone()) |
| | | { |
| | | result.get(); |
| | | } |
| | | } |
| | | |
| | | dbService.shutdown(); |
| | | } |
| | | |
| | | |
| | | private int cacheSizeFromFreeMemory(int buffers) |
| | | { |
| | | Runtime runTime = Runtime.getRuntime(); |
| | | runTime.gc(); |
| | | runTime.gc(); |
| | | long freeMemory = runTime.freeMemory(); |
| | | long maxMemory = runTime.maxMemory(); |
| | | long totMemory = runTime.totalMemory(); |
| | | long totFreeMemory = (freeMemory + (maxMemory - totMemory)); |
| | | int importMemPct = (100 - JVM_MEM_PCT); |
| | | //For very small heaps, give more memory to the JVM. |
| | | if(totFreeMemory <= SMALL_HEAP_SIZE) |
| | | { |
| | | importMemPct -= 35; |
| | | } |
| | | long availableMemory = (totFreeMemory * importMemPct) / 100; |
| | | int averageBufferSize = (int)(availableMemory /buffers); |
| | | int cacheSize = Math.max(MIN_READ_AHEAD_CACHE_SIZE, averageBufferSize); |
| | | //Cache size is never larger than the buffer size. |
| | | if(cacheSize > bufferSize) |
| | | { |
| | | cacheSize = bufferSize; |
| | | } |
| | | Message message = |
| | | NOTE_JEB_IMPORT_LDIF_PHASE_TWO_MEM_REPORT.get(availableMemory, |
| | | cacheSize, buffers); |
| | | logError(message); |
| | | return cacheSize; |
| | | } |
| | | |
| | | |
| | | private void stopScratchFileWriters() |
| | | { |
| | | IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0); |
| | | IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0); |
| | | for(ScratchFileWriterTask task : scratchFileWriterList) |
| | | { |
| | | task.queue.add(indexBuffer); |
| | |
| | | { |
| | | if (importConfiguration.isCancelled() || isPhaseOneCanceled) |
| | | { |
| | | IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0); |
| | | IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0); |
| | | freeBufferQueue.add(indexBuffer); |
| | | return null; |
| | | } |
| | |
| | | */ |
| | | private class ImportTask implements Callable<Void> |
| | | { |
| | | private final Map<IndexKey, IndexBuffer> indexBufferMap = |
| | | new HashMap<IndexKey, IndexBuffer>(); |
| | | private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = |
| | | new HashMap<IndexKey, IndexOutputBuffer>(); |
| | | private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); |
| | | private final EntryInformation entryInfo = new EntryInformation(); |
| | | private DatabaseEntry keyEntry = new DatabaseEntry(), |
| | |
| | | { |
| | | if (importConfiguration.isCancelled() || isPhaseOneCanceled) |
| | | { |
| | | IndexBuffer indexBuffer = IndexBuffer.createIndexBuffer(0); |
| | | IndexOutputBuffer indexBuffer = new IndexOutputBuffer(0); |
| | | freeBufferQueue.add(indexBuffer); |
| | | return null; |
| | | } |
| | |
| | | void flushIndexBuffers() throws InterruptedException, |
| | | ExecutionException |
| | | { |
| | | Set<Map.Entry<IndexKey, IndexBuffer>> set = indexBufferMap.entrySet(); |
| | | Iterator<Map.Entry<IndexKey, IndexBuffer>> setIterator = set.iterator(); |
| | | Set<Map.Entry<IndexKey, IndexOutputBuffer>> set = |
| | | indexBufferMap.entrySet(); |
| | | Iterator<Map.Entry<IndexKey, IndexOutputBuffer>> setIterator = |
| | | set.iterator(); |
| | | while(setIterator.hasNext()) |
| | | { |
| | | Map.Entry<IndexKey, IndexBuffer> e = setIterator.next(); |
| | | Map.Entry<IndexKey, IndexOutputBuffer> e = setIterator.next(); |
| | | IndexKey indexKey = e.getKey(); |
| | | IndexBuffer indexBuffer = e.getValue(); |
| | | IndexOutputBuffer indexBuffer = e.getValue(); |
| | | setIterator.remove(); |
| | | ImportIndexType indexType = indexKey.getIndexType(); |
| | | indexBuffer.setComparator(indexComparator); |
| | | indexBuffer.setIndexKey(indexKey); |
| | | indexBuffer.setDiscard(); |
| | |
| | | |
| | | int |
| | | processKey(DatabaseContainer container, byte[] key, EntryID entryID, |
| | | IndexBuffer.ComparatorBuffer<byte[]> comparator, IndexKey indexKey, |
| | | boolean insert) |
| | | IndexOutputBuffer.ComparatorBuffer<byte[]> comparator, |
| | | IndexKey indexKey, boolean insert) |
| | | throws ConfigException, InterruptedException |
| | | { |
| | | IndexBuffer indexBuffer; |
| | | if(!indexBufferMap.containsKey(indexKey)) |
| | | IndexOutputBuffer indexBuffer = indexBufferMap.get(indexKey); |
| | | if (indexBuffer == null) |
| | | { |
| | | indexBuffer = getNewIndexBuffer(); |
| | | indexBufferMap.put(indexKey, indexBuffer); |
| | | } |
| | | else |
| | | { |
| | | indexBuffer = indexBufferMap.get(indexKey); |
| | | } |
| | | if(!indexBuffer.isSpaceAvailable(key, entryID.longValue())) |
| | | else if (!indexBuffer.isSpaceAvailable(key, entryID.longValue())) |
| | | { |
| | | indexBuffer.setComparator(comparator); |
| | | indexBuffer.setIndexKey(indexKey); |
| | | bufferSortService.submit(new SortTask(indexBuffer)); |
| | | indexBuffer = getNewIndexBuffer(); |
| | | indexBufferMap.remove(indexKey); |
| | | indexBufferMap.put(indexKey, indexBuffer); |
| | | } |
| | | int id = System.identityHashCode(container); |
| | |
| | | } |
| | | |
| | | |
| | | IndexBuffer getNewIndexBuffer() throws ConfigException, InterruptedException |
| | | IndexOutputBuffer getNewIndexBuffer() throws ConfigException, |
| | | InterruptedException |
| | | { |
| | | IndexBuffer indexBuffer = freeBufferQueue.take(); |
| | | IndexOutputBuffer indexBuffer = freeBufferQueue.take(); |
| | | if(indexBuffer == null) |
| | | { |
| | | Message message = Message.raw(Category.JEB, Severity.SEVERE_ERROR, |
| | |
| | | } |
| | | |
| | | |
| | | private SortedSet<Buffer> initializeBuffers() throws IOException |
| | | private NavigableSet<IndexInputBuffer> initializeBuffers() |
| | | throws IOException |
| | | { |
| | | SortedSet<Buffer> bufferSet = new TreeSet<Buffer>(); |
| | | for(Buffer b : indexMgr.getBufferList()) |
| | | NavigableSet<IndexInputBuffer> bufferSet = |
| | | new TreeSet<IndexInputBuffer>(); |
| | | for (int i = 0; i < indexMgr.bufferIndexCount; i++) |
| | | { |
| | | b.initializeCache(indexMgr, null, cacheSize); |
| | | IndexInputBuffer b = new IndexInputBuffer(indexMgr, |
| | | indexMgr.bufferIndexBegin[i], indexMgr.bufferIndexEnd[i], |
| | | indexMgr.bufferIndexID[i]); |
| | | b.initializeCache(cacheSize); |
| | | bufferSet.add(b); |
| | | } |
| | | indexMgr.getBufferList().clear(); |
| | | |
| | | // GC arrays. |
| | | indexMgr.bufferIndexBegin = null; |
| | | indexMgr.bufferIndexEnd = null; |
| | | indexMgr.bufferIndexID = null; |
| | | |
| | | return bufferSet; |
| | | } |
| | | |
| | |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | ByteBuffer cKey = null; |
| | | ImportIDSet cInsertIDSet = new ImportIDSet(), |
| | | cDeleteIDSet = new ImportIDSet(); |
| | | Thread.setDefaultUncaughtExceptionHandler( |
| | | new DefaultExceptionHandler()); |
| | | indexMgr.setStarted(); |
| | | Message message = |
| | | NOTE_JEB_IMPORT_LDIF_INDEX_STARTED.get(indexMgr.getFileName(), |
| | | indexMgr.getBufferList().size()); |
| | | indexMgr.bufferIndexCount); |
| | | logError(message); |
| | | Integer cIndexID = null; |
| | | |
| | | ByteBuffer key = null; |
| | | ImportIDSet insertIDSet = null; |
| | | ImportIDSet deleteIDSet = null; |
| | | Integer indexID = null; |
| | | |
| | | try |
| | | { |
| | | indexMgr.openIndexFile(); |
| | | SortedSet<Buffer> bufferSet = initializeBuffers(); |
| | | while(!bufferSet.isEmpty()) |
| | | NavigableSet<IndexInputBuffer> bufferSet = initializeBuffers(); |
| | | while (!bufferSet.isEmpty()) |
| | | { |
| | | Buffer b; |
| | | b = bufferSet.first(); |
| | | bufferSet.remove(b); |
| | | if(cKey == null) |
| | | IndexInputBuffer b = bufferSet.pollFirst(); |
| | | if (key == null) |
| | | { |
| | | cKey = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY); |
| | | cIndexID = b.getIndexID(); |
| | | cKey.clear(); |
| | | if(b.getKeyLen() > cKey.capacity()) |
| | | indexID = b.getIndexID(); |
| | | |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | cKey = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | cKey.flip(); |
| | | b.getKey(cKey); |
| | | cInsertIDSet.merge(b.getInsertIDSet()); |
| | | cDeleteIDSet.merge(b.getDeleteIDSet()); |
| | | cInsertIDSet.setKey(cKey); |
| | | cDeleteIDSet.setKey(cKey); |
| | | } |
| | | else |
| | | { |
| | | if(b.compare(cKey, cIndexID) != 0) |
| | | { |
| | | addToDB(cInsertIDSet, cDeleteIDSet, cIndexID); |
| | | indexMgr.incrementKeyCount(); |
| | | cIndexID = b.getIndexID(); |
| | | cKey.clear(); |
| | | if(b.getKeyLen() > cKey.capacity()) |
| | | { |
| | | cKey = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | cKey.flip(); |
| | | b.getKey(cKey); |
| | | cInsertIDSet.clear(true); |
| | | cDeleteIDSet.clear(true); |
| | | cInsertIDSet.merge(b.getInsertIDSet()); |
| | | cDeleteIDSet.merge(b.getDeleteIDSet()); |
| | | cInsertIDSet.setKey(cKey); |
| | | cDeleteIDSet.setKey(cKey); |
| | | insertIDSet = new ImportIDSet(1, 1, false); |
| | | deleteIDSet = new ImportIDSet(1, 1, false); |
| | | } |
| | | else |
| | | { |
| | | cInsertIDSet.merge(b.getInsertIDSet()); |
| | | cDeleteIDSet.merge(b.getDeleteIDSet()); |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | int limit = index.getIndexEntryLimit(); |
| | | boolean doCount = index.getMaintainCount(); |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | } |
| | | |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | key.flip(); |
| | | b.getKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | insertIDSet.setKey(key); |
| | | deleteIDSet.setKey(key); |
| | | } |
| | | else if (b.compare(key, indexID) != 0) |
| | | { |
| | | addToDB(insertIDSet, deleteIDSet, indexID); |
| | | indexMgr.incrementKeyCount(); |
| | | |
| | | indexID = b.getIndexID(); |
| | | |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | insertIDSet = new ImportIDSet(1, 1, false); |
| | | deleteIDSet = new ImportIDSet(1, 1, false); |
| | | } |
| | | else |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | int limit = index.getIndexEntryLimit(); |
| | | boolean doCount = index.getMaintainCount(); |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | } |
| | | |
| | | key.clear(); |
| | | if (b.getKeyLen() > key.capacity()) |
| | | { |
| | | key = ByteBuffer.allocate(b.getKeyLen()); |
| | | } |
| | | key.flip(); |
| | | b.getKey(key); |
| | | |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | insertIDSet.setKey(key); |
| | | deleteIDSet.setKey(key); |
| | | } |
| | | else |
| | | { |
| | | b.mergeIDSet(insertIDSet); |
| | | b.mergeIDSet(deleteIDSet); |
| | | } |
| | | |
| | | if(b.hasMoreData()) |
| | | { |
| | | b.getNextRecord(); |
| | | bufferSet.add(b); |
| | | } |
| | | } |
| | | if(cKey != null) |
| | | |
| | | if(key != null) |
| | | { |
| | | addToDB(cInsertIDSet, cDeleteIDSet, cIndexID); |
| | | addToDB(insertIDSet, deleteIDSet, indexID); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | |
| | | private ByteBuffer parentDN, lastDN; |
| | | private EntryID parentID, lastID, entryID; |
| | | private final DatabaseEntry DNKey, DNValue; |
| | | private final DatabaseEntry dnKey, dnValue; |
| | | private final TreeMap<ByteBuffer, EntryID> parentIDMap; |
| | | private final EntryContainer entryContainer; |
| | | private final Map<byte[], ImportIDSet> id2childTree; |
| | |
| | | subTreeLimit = entryContainer.getID2Subtree().getIndexEntryLimit(); |
| | | subTreeDoCount = entryContainer.getID2Subtree().getMaintainCount(); |
| | | id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator); |
| | | DNKey = new DatabaseEntry(); |
| | | DNValue = new DatabaseEntry(); |
| | | dnKey = new DatabaseEntry(); |
| | | dnValue = new DatabaseEntry(); |
| | | lastDN = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY); |
| | | } |
| | | |
| | |
| | | private boolean checkParent(ImportIDSet record) throws DirectoryException, |
| | | DatabaseException |
| | | { |
| | | DNKey.setData(record.getKey().array(), 0 , record.getKey().limit()); |
| | | dnKey.setData(record.getKey().array(), 0 , record.getKey().limit()); |
| | | byte[] v = record.toDatabase(); |
| | | long v1 = JebFormat.entryIDFromDatabase(v); |
| | | DNValue.setData(v); |
| | | dnValue.setData(v); |
| | | |
| | | entryID = new EntryID(v1); |
| | | parentDN = getParent(record.getKey()); |
| | |
| | | //If null is returned than this is a suffix DN. |
| | | if(parentDN != null) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(parentDN.array()); |
| | | DatabaseEntry key = |
| | | new DatabaseEntry(parentDN.array(), 0 , parentDN.limit()); |
| | | DatabaseEntry value = new DatabaseEntry(); |
| | | OperationStatus status; |
| | | status = |
| | |
| | | if (importConfiguration != null && |
| | | importConfiguration.appendToExistingData()) |
| | | { |
| | | DatabaseEntry key = new DatabaseEntry(dn.array()); |
| | | DatabaseEntry key = new DatabaseEntry(dn.array(), 0, dn.limit()); |
| | | DatabaseEntry value = new DatabaseEntry(); |
| | | OperationStatus status; |
| | | status = |
| | |
| | | |
| | | public void writeToDB() throws DatabaseException, DirectoryException |
| | | { |
| | | entryContainer.getDN2ID().put(null, DNKey, DNValue); |
| | | entryContainer.getDN2ID().put(null, dnKey, dnValue); |
| | | indexMgr.addTotDNCount(1); |
| | | if(parentDN != null) |
| | | { |
| | |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | DNKey.setData(key); |
| | | index.insert(DNKey, idSet, DNValue); |
| | | dnKey.setData(key); |
| | | index.insert(dnKey, idSet, dnValue); |
| | | } |
| | | index.closeCursor(); |
| | | if(clearMap) |
| | |
| | | { |
| | | private final int DRAIN_TO = 3; |
| | | private final IndexManager indexMgr; |
| | | private final BlockingQueue<IndexBuffer> queue; |
| | | private final BlockingQueue<IndexOutputBuffer> queue; |
| | | private final ByteArrayOutputStream insetByteStream = |
| | | new ByteArrayOutputStream(2 * bufferSize); |
| | | private final ByteArrayOutputStream deleteByteStream = |
| | |
| | | private final byte[] tmpArray = new byte[8]; |
| | | private int insertKeyCount = 0, deleteKeyCount = 0; |
| | | private final DataOutputStream dataStream; |
| | | private long bufferCount = 0; |
| | | private int bufferCount = 0; |
| | | private final File file; |
| | | private final SortedSet<IndexBuffer> indexSortedSet; |
| | | private final SortedSet<IndexOutputBuffer> indexSortedSet; |
| | | private boolean poisonSeen = false; |
| | | |
| | | |
| | | public ScratchFileWriterTask(BlockingQueue<IndexBuffer> queue, |
| | | public ScratchFileWriterTask(BlockingQueue<IndexOutputBuffer> queue, |
| | | IndexManager indexMgr) throws FileNotFoundException |
| | | { |
| | | this.queue = queue; |
| | |
| | | new BufferedOutputStream(new FileOutputStream(file), |
| | | READER_WRITER_BUFFER_SIZE); |
| | | dataStream = new DataOutputStream(bufferedStream); |
| | | indexSortedSet = new TreeSet<IndexBuffer>(); |
| | | indexSortedSet = new TreeSet<IndexOutputBuffer>(); |
| | | } |
| | | |
| | | |
| | |
| | | public Void call() throws IOException |
| | | { |
| | | long offset = 0; |
| | | List<IndexBuffer> l = new LinkedList<IndexBuffer>(); |
| | | List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>(); |
| | | try { |
| | | while(true) |
| | | { |
| | | IndexBuffer indexBuffer = queue.poll(); |
| | | IndexOutputBuffer indexBuffer = queue.poll(); |
| | | if(indexBuffer != null) |
| | | { |
| | | long beginOffset = offset; |
| | |
| | | queue.drainTo(l, DRAIN_TO); |
| | | l.add(indexBuffer); |
| | | bufferLen = writeIndexBuffers(l); |
| | | for(IndexBuffer id : l) |
| | | for(IndexOutputBuffer id : l) |
| | | { |
| | | if(!id.isDiscard()) |
| | | { |
| | |
| | | } |
| | | } |
| | | offset += bufferLen; |
| | | indexMgr.addBuffer(new Buffer(beginOffset, offset, bufferCount)); |
| | | indexMgr.addBuffer(beginOffset, offset, bufferCount); |
| | | bufferCount++; |
| | | Importer.this.bufferCount.incrementAndGet(); |
| | | if(poisonSeen) |
| | |
| | | } |
| | | |
| | | |
| | | private long writeIndexBuffer(IndexBuffer indexBuffer) throws IOException |
| | | private long writeIndexBuffer(IndexOutputBuffer indexBuffer) |
| | | throws IOException |
| | | { |
| | | int numberKeys = indexBuffer.getNumberKeys(); |
| | | indexBuffer.setPosition(-1); |
| | |
| | | } |
| | | |
| | | |
| | | private long writeIndexBuffers(List<IndexBuffer> buffers) |
| | | private long writeIndexBuffers(List<IndexOutputBuffer> buffers) |
| | | throws IOException |
| | | { |
| | | long id = 0; |
| | | long bufferLen = 0; |
| | | insetByteStream.reset(); insertKeyCount = 0; |
| | | deleteByteStream.reset(); deleteKeyCount = 0; |
| | | for(IndexBuffer b : buffers) |
| | | for(IndexOutputBuffer b : buffers) |
| | | { |
| | | if(b.isPoison()) |
| | | { |
| | |
| | | int saveIndexID = 0; |
| | | while(!indexSortedSet.isEmpty()) |
| | | { |
| | | IndexBuffer b = indexSortedSet.first(); |
| | | IndexOutputBuffer b = indexSortedSet.first(); |
| | | indexSortedSet.remove(b); |
| | | if(saveKey == null) |
| | | { |
| | |
| | | } |
| | | |
| | | |
| | | private int writeRecord(IndexBuffer b) throws IOException |
| | | private int writeRecord(IndexOutputBuffer b) throws IOException |
| | | { |
| | | int keySize = b.getKeySize(); |
| | | int packedSize = writeHeader(b.getIndexID(), keySize); |
| | |
| | | private final class SortTask implements Callable<Void> |
| | | { |
| | | |
| | | private final IndexBuffer indexBuffer; |
| | | private final IndexOutputBuffer indexBuffer; |
| | | |
| | | public SortTask(IndexBuffer indexBuffer) |
| | | public SortTask(IndexOutputBuffer indexBuffer) |
| | | { |
| | | this.indexBuffer = indexBuffer; |
| | | } |
| | |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | if (importConfiguration != null && |
| | | importConfiguration.isCancelled() || isPhaseOneCanceled) |
| | | if (importConfiguration != null && importConfiguration.isCancelled() |
| | | || isPhaseOneCanceled) |
| | | { |
| | | isPhaseOneCanceled =true; |
| | | isPhaseOneCanceled = true; |
| | | return null; |
| | | } |
| | | indexBuffer.sort(); |
| | | if(indexKeyQueMap.containsKey(indexBuffer.getIndexKey())) { |
| | | BlockingQueue<IndexBuffer> q = |
| | | indexKeyQueMap.get(indexBuffer.getIndexKey()); |
| | | if (indexKeyQueMap.containsKey(indexBuffer.getIndexKey())) |
| | | { |
| | | BlockingQueue<IndexOutputBuffer> q = indexKeyQueMap.get(indexBuffer |
| | | .getIndexKey()); |
| | | q.add(indexBuffer); |
| | | } |
| | | else |
| | | { |
| | | createIndexWriterTask(indexBuffer.getIndexKey()); |
| | | BlockingQueue<IndexBuffer> q = |
| | | indexKeyQueMap.get(indexBuffer.getIndexKey()); |
| | | BlockingQueue<IndexOutputBuffer> q = indexKeyQueMap.get(indexBuffer |
| | | .getIndexKey()); |
| | | q.add(indexBuffer); |
| | | } |
| | | return null; |
| | |
| | | { |
| | | indexMgrList.add(indexMgr); |
| | | } |
| | | BlockingQueue<IndexBuffer> newQue = |
| | | new ArrayBlockingQueue<IndexBuffer>(phaseOneBufferCount); |
| | | BlockingQueue<IndexOutputBuffer> newQue = |
| | | new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount); |
| | | ScratchFileWriterTask indexWriter = |
| | | new ScratchFileWriterTask(newQue, indexMgr); |
| | | scratchFileWriterList.add(indexWriter); |
| | |
| | | } |
| | | |
| | | /** |
| | | * The buffer class is used to process a buffer from the temporary index files |
| | | * during phase 2 processing. |
| | | */ |
| | | private final class Buffer implements Comparable<Buffer> |
| | | { |
| | | private IndexManager indexMgr; |
| | | private final long begin, end, id; |
| | | private long offset; |
| | | private ByteBuffer cache; |
| | | private int limit; |
| | | private ImportIDSet insertIDSet = null, deleteIDSet = null; |
| | | private Integer indexID = null; |
| | | private boolean doCount; |
| | | private ByteBuffer keyBuf = ByteBuffer.allocate(BYTE_BUFFER_CAPACITY); |
| | | |
| | | |
| | | public Buffer(long begin, long end, long id) |
| | | { |
| | | this.begin = begin; |
| | | this.end = end; |
| | | this.offset = 0; |
| | | this.id = id; |
| | | } |
| | | |
| | | |
| | | private void initializeCache(IndexManager indexMgr, ByteBuffer b, |
| | | long cacheSize) throws IOException |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | if(b == null) |
| | | { |
| | | cache = ByteBuffer.allocate((int)cacheSize); |
| | | } |
| | | else |
| | | { |
| | | cache = b; |
| | | } |
| | | loadCache(); |
| | | cache.flip(); |
| | | keyBuf.flip(); |
| | | } |
| | | |
| | | |
| | | private void loadCache() throws IOException |
| | | { |
| | | FileChannel fileChannel = indexMgr.getChannel(); |
| | | fileChannel.position(begin + offset); |
| | | long leftToRead = end - (begin + offset); |
| | | long bytesToRead; |
| | | if(leftToRead < cache.remaining()) |
| | | { |
| | | cache.limit((int) (cache.position() + leftToRead)); |
| | | bytesToRead = (int)leftToRead; |
| | | } |
| | | else |
| | | { |
| | | bytesToRead = Math.min((end - offset),cache.remaining()); |
| | | } |
| | | int bytesRead = 0; |
| | | while(bytesRead < bytesToRead) |
| | | { |
| | | bytesRead += fileChannel.read(cache); |
| | | } |
| | | offset += bytesRead; |
| | | indexMgr.addBytesRead(bytesRead); |
| | | } |
| | | |
| | | public boolean hasMoreData() throws IOException |
| | | { |
| | | boolean ret = ((begin + offset) >= end); |
| | | return !(cache.remaining() == 0 && ret); |
| | | } |
| | | |
| | | public int getKeyLen() |
| | | { |
| | | return keyBuf.limit(); |
| | | } |
| | | |
| | | public void getKey(ByteBuffer b) |
| | | { |
| | | keyBuf.get(b.array(), 0, keyBuf.limit()); |
| | | b.limit(keyBuf.limit()); |
| | | } |
| | | |
| | | ByteBuffer getKeyBuf() |
| | | { |
| | | return keyBuf; |
| | | } |
| | | |
| | | public ImportIDSet getInsertIDSet() |
| | | { |
| | | return insertIDSet; |
| | | } |
| | | |
| | | public ImportIDSet getDeleteIDSet() |
| | | { |
| | | return deleteIDSet; |
| | | } |
| | | |
| | | public long getBufferID() |
| | | { |
| | | return id; |
| | | } |
| | | |
| | | public Integer getIndexID() |
| | | { |
| | | if(indexID == null) |
| | | { |
| | | try { |
| | | getNextRecord(); |
| | | } catch(IOException ex) { |
| | | Message message = |
| | | ERR_JEB_IMPORT_BUFFER_IO_ERROR.get(indexMgr.getFileName()); |
| | | logError(message); |
| | | ex.printStackTrace(); |
| | | System.exit(1); |
| | | } |
| | | } |
| | | return indexID; |
| | | } |
| | | |
| | | public void getNextRecord() throws IOException |
| | | { |
| | | getNextIndexID(); |
| | | getContainerParameters(); |
| | | getNextKey(); |
| | | getNextIDSet(true); //get insert ids |
| | | getNextIDSet(false); //get delete ids |
| | | } |
| | | |
| | | private void getContainerParameters() |
| | | { |
| | | limit = 1; |
| | | doCount = false; |
| | | if(!indexMgr.isDN2ID()) |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | limit = index.getIndexEntryLimit(); |
| | | doCount = index.getMaintainCount(); |
| | | if(insertIDSet == null) |
| | | { |
| | | insertIDSet = new ImportIDSet(128, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(128, limit, doCount); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if(insertIDSet == null) |
| | | { |
| | | insertIDSet = new ImportIDSet(1, limit, doCount); |
| | | deleteIDSet = new ImportIDSet(1, limit, doCount); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private int getInt() throws IOException |
| | | { |
| | | ensureData(4); |
| | | return cache.getInt(); |
| | | } |
| | | |
| | | private void getNextIndexID() throws IOException, BufferUnderflowException |
| | | { |
| | | indexID = getInt(); |
| | | } |
| | | |
| | | private void getNextKey() throws IOException, BufferUnderflowException |
| | | { |
| | | ensureData(20); |
| | | byte[] ba = cache.array(); |
| | | int p = cache.position(); |
| | | int len = PackedInteger.getReadIntLength(ba, p); |
| | | int keyLen = PackedInteger.readInt(ba, p); |
| | | cache.position(p + len); |
| | | if(keyLen > keyBuf.capacity()) |
| | | { |
| | | keyBuf = ByteBuffer.allocate(keyLen); |
| | | } |
| | | ensureData(keyLen); |
| | | keyBuf.clear(); |
| | | cache.get(keyBuf.array(), 0, keyLen); |
| | | keyBuf.limit(keyLen); |
| | | } |
| | | |
| | | private void getNextIDSet(boolean insert) |
| | | throws IOException, BufferUnderflowException |
| | | { |
| | | ensureData(20); |
| | | int p = cache.position(); |
| | | byte[] ba = cache.array(); |
| | | int len = PackedInteger.getReadIntLength(ba, p); |
| | | int keyCount = PackedInteger.readInt(ba, p); |
| | | p += len; |
| | | cache.position(p); |
| | | if(insert) |
| | | { |
| | | insertIDSet.clear(false); |
| | | } |
| | | else |
| | | { |
| | | deleteIDSet.clear(false); |
| | | } |
| | | for(int k = 0; k < keyCount; k++) |
| | | { |
| | | if(ensureData(9)) |
| | | { |
| | | p = cache.position(); |
| | | } |
| | | len = PackedInteger.getReadLongLength(ba, p); |
| | | long l = PackedInteger.readLong(ba, p); |
| | | p += len; |
| | | cache.position(p); |
| | | if(insert) |
| | | { |
| | | insertIDSet.addEntryID(l); |
| | | } |
| | | else |
| | | { |
| | | deleteIDSet.addEntryID(l); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private boolean ensureData(int len) throws IOException |
| | | { |
| | | boolean ret = false; |
| | | if(cache.remaining() == 0) |
| | | { |
| | | cache.clear(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | } |
| | | else if(cache.remaining() < len) |
| | | { |
| | | cache.compact(); |
| | | loadCache(); |
| | | cache.flip(); |
| | | ret = true; |
| | | } |
| | | return ret; |
| | | } |
| | | |
| | | |
| | | private int compare(ByteBuffer cKey, Integer cIndexID) |
| | | { |
| | | int returnCode, rc; |
| | | if(keyBuf.limit() == 0) |
| | | { |
| | | getIndexID(); |
| | | } |
| | | rc = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), |
| | | cKey.array(), cKey.limit()); |
| | | if(rc != 0) { |
| | | returnCode = 1; |
| | | } |
| | | else |
| | | { |
| | | returnCode = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | | return returnCode; |
| | | } |
| | | |
| | | |
| | | |
| | | public int compareTo(Buffer o) { |
| | | //used in remove. |
| | | if(this.equals(o)) |
| | | { |
| | | return 0; |
| | | } |
| | | if(keyBuf.limit() == 0) { |
| | | getIndexID(); |
| | | } |
| | | if(o.getKeyBuf().limit() == 0) |
| | | { |
| | | o.getIndexID(); |
| | | } |
| | | int returnCode; |
| | | byte[] oKey = o.getKeyBuf().array(); |
| | | int oLen = o.getKeyBuf().limit(); |
| | | returnCode = indexComparator.compare(keyBuf.array(), 0, keyBuf.limit(), |
| | | oKey, oLen); |
| | | if(returnCode == 0) |
| | | { |
| | | if(indexID.intValue() == o.getIndexID().intValue()) |
| | | { |
| | | if(insertIDSet.isDefined()) |
| | | { |
| | | returnCode = -1; |
| | | } |
| | | else if(o.getInsertIDSet().isDefined()) |
| | | { |
| | | returnCode = 1; |
| | | } |
| | | else if(insertIDSet.size() == o.getInsertIDSet().size()) |
| | | { |
| | | returnCode = id > o.getBufferID() ? 1 : -1; |
| | | } |
| | | else |
| | | { |
| | | returnCode = insertIDSet.size() - o.getInsertIDSet().size(); |
| | | } |
| | | } |
| | | else if(indexID > o.getIndexID()) |
| | | { |
| | | returnCode = 1; |
| | | } |
| | | else |
| | | { |
| | | returnCode = -1; |
| | | } |
| | | } |
| | | return returnCode; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * The index manager class has several functions: |
| | | * |
| | | * 1. It used to carry information about index processing created in phase |
| | |
| | | * |
| | | * 3. It manages opening and closing the scratch index files. |
| | | */ |
| | | private final class IndexManager implements Comparable<IndexManager> |
| | | final class IndexManager implements Comparable<IndexManager> |
| | | { |
| | | private static final int BUFFER_SIZE = 128; |
| | | |
| | | private final File file; |
| | | private RandomAccessFile rFile = null; |
| | | private final List<Buffer> bufferList = new LinkedList<Buffer>(); |
| | | private long fileLength, bytesRead = 0; |
| | | private boolean done = false, started = false; |
| | | private long totalDNS; |
| | |
| | | private final boolean isDN; |
| | | private final int limit; |
| | | |
| | | private long[] bufferIndexBegin = new long[BUFFER_SIZE]; |
| | | private long[] bufferIndexEnd = new long[BUFFER_SIZE]; |
| | | private int[] bufferIndexID = new int[BUFFER_SIZE]; |
| | | private int bufferIndexCount = 0; |
| | | |
| | | IndexManager(String fileName, boolean isDN, int limit) |
| | | private IndexManager(String fileName, boolean isDN, int limit) |
| | | { |
| | | file = new File(tempDir, fileName); |
| | | this.fileName = fileName; |
| | |
| | | } |
| | | |
| | | |
| | | void openIndexFile() throws FileNotFoundException |
| | | private void openIndexFile() throws FileNotFoundException |
| | | { |
| | | rFile = new RandomAccessFile(file, "r"); |
| | | } |
| | | |
| | | |
| | | public FileChannel getChannel() |
| | | /** |
| | | * Returns the file channel associated with this index manager. |
| | | * |
| | | * @return The file channel associated with this index manager. |
| | | */ |
| | | FileChannel getChannel() |
| | | { |
| | | return rFile.getChannel(); |
| | | } |
| | | |
| | | |
| | | public void addBuffer(Buffer o) |
| | | |
| | | private void addBuffer(long begin, long end, int id) |
| | | { |
| | | this.bufferList.add(o); |
| | | int size = bufferIndexBegin.length; |
| | | if (bufferIndexCount >= size) |
| | | { |
| | | size += BUFFER_SIZE; |
| | | bufferIndexBegin = Arrays.copyOf(bufferIndexBegin, size); |
| | | bufferIndexEnd = Arrays.copyOf(bufferIndexEnd, size); |
| | | bufferIndexID = Arrays.copyOf(bufferIndexID, size); |
| | | } |
| | | bufferIndexBegin[bufferIndexCount] = begin; |
| | | bufferIndexEnd[bufferIndexCount] = end; |
| | | bufferIndexID[bufferIndexCount] = id; |
| | | bufferIndexCount++; |
| | | } |
| | | |
| | | |
| | | public List<Buffer> getBufferList() |
| | | { |
| | | return bufferList; |
| | | } |
| | | |
| | | |
| | | public File getFile() |
| | | private File getFile() |
| | | { |
| | | return file; |
| | | } |
| | | |
| | | |
| | | public boolean deleteIndexFile() |
| | | private boolean deleteIndexFile() |
| | | { |
| | | return file.delete(); |
| | | } |
| | | |
| | | |
| | | public void close() throws IOException |
| | | private void close() throws IOException |
| | | { |
| | | rFile.close(); |
| | | } |
| | | |
| | | |
| | | public void setFileLength() |
| | | private void setFileLength() |
| | | { |
| | | this.fileLength = file.length(); |
| | | } |
| | | |
| | | |
| | | public void addBytesRead(int bytesRead) |
| | | |
| | | /** |
| | | * Updates the bytes read counter. |
| | | * |
| | | * @param bytesRead |
| | | * The number of bytes read. |
| | | */ |
| | | void addBytesRead(int bytesRead) |
| | | { |
| | | this.bytesRead += bytesRead; |
| | | } |
| | | |
| | | |
| | | public void setDone() |
| | | private void setDone() |
| | | { |
| | | this.done = true; |
| | | } |
| | | |
| | | |
| | | public void setStarted() |
| | | private void setStarted() |
| | | { |
| | | started = true; |
| | | } |
| | | |
| | | |
| | | public void addTotDNCount(int delta) |
| | | private void addTotDNCount(int delta) |
| | | { |
| | | this.totalDNS += delta; |
| | | } |
| | | |
| | | |
| | | public long getDNCount() |
| | | private long getDNCount() |
| | | { |
| | | return totalDNS; |
| | | } |
| | | |
| | | |
| | | public boolean isDN2ID() |
| | | private boolean isDN2ID() |
| | | { |
| | | return isDN; |
| | | } |
| | | |
| | | |
| | | public void printStats(long deltaTime) |
| | | private void printStats(long deltaTime) |
| | | { |
| | | if(!done && started) |
| | | { |
| | |
| | | } |
| | | |
| | | |
| | | public void incrementKeyCount() |
| | | private void incrementKeyCount() |
| | | { |
| | | keyCount.incrementAndGet(); |
| | | } |
| | | |
| | | |
| | | public String getFileName() |
| | | /** |
| | | * Returns the file name associated with this index manager. |
| | | * |
| | | * @return The file name associated with this index manager. |
| | | */ |
| | | String getFileName() |
| | | { |
| | | return fileName; |
| | | } |
| | | |
| | | |
| | | public int getLimit() |
| | | private int getLimit() |
| | | { |
| | | return limit; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public int compareTo(IndexManager mgr) |
| | | { |
| | | if(bufferList.size() == mgr.getBufferList().size()) |
| | | { |
| | | return 0; |
| | | } |
| | | else if (bufferList.size() < mgr.getBufferList().size()) |
| | | { |
| | | return -1; |
| | | } |
| | | else |
| | | { |
| | | return 1; |
| | | } |
| | | return bufferIndexCount - mgr.bufferIndexCount; |
| | | } |
| | | } |
| | | |
| | |
| | | /** |
| | | * The rebuild index manager handles all rebuild index related processing. |
| | | */ |
| | | class RebuildIndexManager extends ImportTask { |
| | | private class RebuildIndexManager extends ImportTask { |
| | | |
| | | //Rebuild index configuration. |
| | | private final RebuildConfig rebuildConfig; |
| | |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Perform rebuild index processing. |
| | | * |
| | | * @throws DatabaseException If an database error occurred. |
| | | * @throws InterruptedException If an interrupted error occurred. |
| | | * @throws ExecutionException If an Excecution error occurred. |
| | | * @throws JebException If an JEB error occurred. |
| | | * @throws InitializationException |
| | | * If an initialization error occurred. |
| | | * @throws DatabaseException |
| | | * If an database error occurred. |
| | | * @throws InterruptedException |
| | | * If an interrupted error occurred. |
| | | * @throws ExecutionException |
| | | * If an Excecution error occurred. |
| | | * @throws JebException |
| | | * If an JEB error occurred. |
| | | */ |
| | | public void rebuldIndexes() throws DatabaseException, InterruptedException, |
| | | ExecutionException, JebException |
| | | public void rebuldIndexes() throws InitializationException, |
| | | DatabaseException, InterruptedException, ExecutionException, |
| | | JebException |
| | | { |
| | | phaseOne(); |
| | | if(isPhaseOneCanceled) |
| | | if (isPhaseOneCanceled) |
| | | { |
| | | throw new InterruptedException("Rebuild Index canceled."); |
| | | } |
| | | phaseTwo(); |
| | | if(rebuildAll) |
| | | if (rebuildAll) |
| | | { |
| | | setAllIndexesTrusted(); |
| | | } |
| | |
| | | } |
| | | |
| | | |
| | | private void phaseTwo() throws InterruptedException, JebException, |
| | | ExecutionException |
| | | |
| | | private void phaseTwo() throws InitializationException, |
| | | InterruptedException, JebException, ExecutionException |
| | | { |
| | | SecondPhaseProgressTask progressTask = |
| | | new SecondPhaseProgressTask(entriesProcessed.get()); |
| | | SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask( |
| | | entriesProcessed.get()); |
| | | Timer timer2 = new Timer(); |
| | | timer2.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL); |
| | | processIndexFiles(); |
| | |
| | | * This class reports progress of rebuild index processing at fixed |
| | | * intervals. |
| | | */ |
| | | class RebuildFirstPhaseProgressTask extends TimerTask |
| | | private class RebuildFirstPhaseProgressTask extends TimerTask |
| | | { |
| | | /** |
| | | * The number of records that had been processed at the time of the |
| | |
| | | * This class reports progress of the second phase of import processing at |
| | | * fixed intervals. |
| | | */ |
| | | class SecondPhaseProgressTask extends TimerTask |
| | | private class SecondPhaseProgressTask extends TimerTask |
| | | { |
| | | /** |
| | | * The number of entries that had been read at the time of the |