| | |
| | | /** The DN attribute type. */ |
| | | private static final AttributeType DN_TYPE; |
| | | |
| | | /** Root container. */ |
| | | private final RootContainer rootContainer; |
| | | /** Import configuration. */ |
| | | private final LDIFImportConfig importConfiguration; |
| | | private final ServerContext serverContext; |
| | | |
| | | /** LDIF reader. */ |
| | | private ImportLDIFReader reader; |
| | | /** Phase one buffer count. */ |
| | | private final AtomicInteger bufferCount = new AtomicInteger(0); |
| | | /** Phase one imported entries count. */ |
| | | private final AtomicLong importCount = new AtomicLong(0); |
| | | /** Migrated entry count. */ |
| | | private int migratedCount; |
| | | |
| | | /** Phase one buffer size in bytes. */ |
| | | private int bufferSize; |
| | | |
| | | /** Temp scratch directory. */ |
| | | private final File tempDir; |
| | | |
| | | /** Index count. */ |
| | | private final int indexCount; |
| | | /** Thread count. */ |
| | |
| | | /** Set to true when validation is skipped. */ |
| | | private final boolean skipDNValidation; |
| | | |
| | | /** Temp scratch directory. */ |
| | | private final File tempDir; |
| | | /** Temporary environment used when DN validation is done in first phase. */ |
| | | private final DNCache tmpEnv; |
| | | |
| | | /** Root container. */ |
| | | private final RootContainer rootContainer; |
| | | |
| | | /** Import configuration. */ |
| | | private final LDIFImportConfig importConfiguration; |
| | | private final ServerContext serverContext; |
| | | |
| | | /** LDIF reader. */ |
| | | private ImportLDIFReader reader; |
| | | |
| | | /** Migrated entry count. */ |
| | | private int migratedCount; |
| | | |
| | | /** Size in bytes of temporary env. */ |
| | | private long tmpEnvCacheSize; |
| | | /** Available memory at the start of the import. */ |
| | |
| | | private ExecutorService scratchFileWriterService; |
| | | |
| | | /** Queue of free index buffers -- used to re-cycle index buffers. */ |
| | | private final BlockingQueue<IndexOutputBuffer> freeBufferQueue = |
| | | new LinkedBlockingQueue<IndexOutputBuffer>(); |
| | | private final BlockingQueue<IndexOutputBuffer> freeBufferQueue = new LinkedBlockingQueue<>(); |
| | | |
| | | /** |
| | | * Map of index keys to index buffers. Used to allocate sorted index buffers |
| | | * to a index writer thread. |
| | | */ |
| | | private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap = |
| | | new ConcurrentHashMap<IndexKey, BlockingQueue<IndexOutputBuffer>>(); |
| | | private final Map<IndexKey, BlockingQueue<IndexOutputBuffer>> indexKeyQueueMap = new ConcurrentHashMap<>(); |
| | | |
| | | /** Map of DB containers to index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> indexMgrList = new LinkedList<IndexManager>(); |
| | | private final List<IndexManager> indexMgrList = new LinkedList<>(); |
| | | /** Map of DB containers to DN-based index managers. Used to start phase 2. */ |
| | | private final List<IndexManager> DNIndexMgrList = new LinkedList<IndexManager>(); |
| | | private final List<IndexManager> DNIndexMgrList = new LinkedList<>(); |
| | | |
| | | /** |
| | | * Futures used to indicate when the index file writers are done flushing |
| | | * their work queues and have exited. End of phase one. |
| | | */ |
| | | private final List<Future<Void>> scratchFileWriterFutures; |
| | | private final List<Future<Void>> scratchFileWriterFutures = new CopyOnWriteArrayList<>(); |
| | | /** |
| | | * List of index file writer tasks. Used to signal stopScratchFileWriters to |
| | | * the index file writer tasks when the LDIF file has been done. |
| | |
| | | private final List<ScratchFileWriterTask> scratchFileWriterList; |
| | | |
| | | /** Map of DNs to Suffix objects. */ |
| | | private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>(); |
| | | private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<>(); |
| | | /** Map of indexIDs to database containers. */ |
| | | private final ConcurrentHashMap<Integer, Index> indexIDToIndexMap = new ConcurrentHashMap<Integer, Index>(); |
| | | private final ConcurrentHashMap<Integer, Index> indexIDToIndexMap = new ConcurrentHashMap<>(); |
| | | /** Map of indexIDs to entry containers. */ |
| | | private final ConcurrentHashMap<Integer, EntryContainer> indexIDToECMap = |
| | | new ConcurrentHashMap<Integer, EntryContainer>(); |
| | | private final ConcurrentHashMap<Integer, EntryContainer> indexIDToECMap = new ConcurrentHashMap<>(); |
| | | |
| | | /** Used to synchronize when a scratch file index writer is first setup. */ |
| | | private final Object synObj = new Object(); |
| | |
| | | this.rootContainer = rootContainer; |
| | | this.importConfiguration = null; |
| | | this.serverContext = serverContext; |
| | | this.tmpEnv = null; |
| | | this.threadCount = 1; |
| | | this.rebuildManager = new RebuildIndexManager(rootContainer.getStorage(), rebuildConfig, cfg); |
| | | this.indexCount = rebuildManager.getIndexCount(); |
| | | this.clearedBackend = false; |
| | | this.scratchFileWriterList = |
| | | new ArrayList<ScratchFileWriterTask>(indexCount); |
| | | this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>(); |
| | | this.scratchFileWriterList = new ArrayList<>(indexCount); |
| | | |
| | | this.tempDir = getTempDir(cfg, rebuildConfig.getTmpDirectory()); |
| | | recursiveDelete(tempDir); |
| | | if (!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir)); |
| | | } |
| | | this.tempDir = prepareTempDir(cfg, rebuildConfig.getTmpDirectory()); |
| | | computeMemoryRequirements(); |
| | | this.skipDNValidation = true; |
| | | initializeDBEnv(); |
| | | this.tmpEnv = null; |
| | | } |
| | | |
| | | /** |
| | |
| | | this.indexCount = getTotalIndexCount(backendCfg); |
| | | |
| | | this.clearedBackend = mustClearBackend(importConfiguration, backendCfg); |
| | | this.scratchFileWriterList = |
| | | new ArrayList<ScratchFileWriterTask>(indexCount); |
| | | this.scratchFileWriterFutures = new CopyOnWriteArrayList<Future<Void>>(); |
| | | this.scratchFileWriterList = new ArrayList<>(indexCount); |
| | | |
| | | this.tempDir = getTempDir(backendCfg, importConfiguration.getTmpDirectory()); |
| | | recursiveDelete(tempDir); |
| | | if (!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir)); |
| | | } |
| | | this.tempDir = prepareTempDir(backendCfg, importConfiguration.getTmpDirectory()); |
| | | computeMemoryRequirements(); |
| | | |
| | | skipDNValidation = importConfiguration.getSkipDNValidation(); |
| | | initializeDBEnv(); |
| | | |
| | | // Set up temporary environment. |
| | | if (!skipDNValidation) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException |
| | | { |
| | | File parentDir = getFileForPath(tmpDirectory != null ? tmpDirectory : DEFAULT_TMP_DIR); |
| | | File tempDir = new File(parentDir, backendCfg.getBackendId()); |
| | | recursiveDelete(tempDir); |
| | | if (!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | throw new InitializationException(ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get(tempDir)); |
| | | } |
| | | return tempDir; |
| | | } |
| | | |
| | | /** |
| | | * Returns whether the backend must be cleared. |
| | | * |
| | |
| | | */ |
| | | } |
| | | |
| | | private static File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) |
| | | { |
| | | File parentDir; |
| | | if (tmpDirectory != null) |
| | | { |
| | | parentDir = getFileForPath(tmpDirectory); |
| | | } |
| | | else |
| | | { |
| | | parentDir = getFileForPath(DEFAULT_TMP_DIR); |
| | | } |
| | | return new File(parentDir, backendCfg.getBackendId()); |
| | | } |
| | | |
| | | private static int getTotalIndexCount(PluggableBackendCfg backendCfg) throws ConfigException |
| | | { |
| | | int indexes = 2; // dn2id, dn2uri |
| | |
| | | } |
| | | |
| | | /** |
| | | * Calculate buffer sizes and initialize JEB properties based on memory. |
| | | * Calculate buffer sizes and initialize properties based on memory. |
| | | * |
| | | * @throws InitializationException |
| | | * If a problem occurs during calculation. |
| | | */ |
| | | private void initializeDBEnv() throws InitializationException |
| | | private void computeMemoryRequirements() throws InitializationException |
| | | { |
| | | // Calculate amount of usable memory. This will need to take into account |
| | | // various fudge factors, including the number of IO buffers used by the |
| | |
| | | // We need caching when doing DN validation or rebuilding indexes. |
| | | if (!skipDNValidation || rebuildManager != null) |
| | | { |
| | | // No DN validation: calculate memory for DB cache, DN2ID temporary cache, |
| | | // and buffers. |
| | | // No DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers. |
| | | if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null) |
| | | { |
| | | dbCacheSize = 500 * KB; |
| | |
| | | { |
| | | DN baseDN = entryContainer.getBaseDN(); |
| | | EntryContainer sourceEntryContainer = null; |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | List<DN> excludeBranches = new ArrayList<DN>(); |
| | | List<DN> includeBranches = new ArrayList<>(); |
| | | List<DN> excludeBranches = new ArrayList<>(); |
| | | |
| | | if (!importConfiguration.appendToExistingData() |
| | | && !importConfiguration.clearBackend()) |
| | |
| | | final long startTime = System.currentTimeMillis(); |
| | | importPhaseOne(); |
| | | final long phaseOneFinishTime = System.currentTimeMillis(); |
| | | |
| | | if (!skipDNValidation) |
| | | { |
| | | tmpEnv.shutdown(); |
| | | } |
| | | |
| | | if (isCanceled) |
| | | { |
| | | throw new InterruptedException("Import processing canceled."); |
| | |
| | | final Storage storage = rootContainer.getStorage(); |
| | | execService.submit(new MigrateExistingTask(storage)).get(); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | final List<Callable<Void>> tasks = new ArrayList<>(threadCount); |
| | | if (importConfiguration.appendToExistingData() |
| | | && importConfiguration.replaceExistingEntries()) |
| | | { |
| | |
| | | int buffers; |
| | | while (true) |
| | | { |
| | | final List<IndexManager> allIndexMgrs = new ArrayList<IndexManager>(DNIndexMgrList); |
| | | final List<IndexManager> allIndexMgrs = new ArrayList<>(DNIndexMgrList); |
| | | allIndexMgrs.addAll(indexMgrList); |
| | | Collections.sort(allIndexMgrs, Collections.reverseOrder()); |
| | | |
| | |
| | | Semaphore permits = new Semaphore(buffers); |
| | | |
| | | // Start DN processing first. |
| | | List<Future<Void>> futures = new LinkedList<Future<Void>>(); |
| | | List<Future<Void>> futures = new LinkedList<>(); |
| | | submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures); |
| | | submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures); |
| | | getAll(futures); |
| | |
| | | |
| | | private List<ByteString> includeBranchesAsBytes(Suffix suffix) |
| | | { |
| | | List<ByteString> includeBranches = new ArrayList<ByteString>(suffix.getIncludeBranches().size()); |
| | | List<ByteString> includeBranches = new ArrayList<>(suffix.getIncludeBranches().size()); |
| | | for (DN includeBranch : suffix.getIncludeBranches()) |
| | | { |
| | | if (includeBranch.isDescendantOf(suffix.getBaseDN())) |
| | |
| | | super(storage); |
| | | } |
| | | |
| | | private final Set<ByteString> insertKeySet = new HashSet<ByteString>(); |
| | | private final Set<ByteString> deleteKeySet = new HashSet<ByteString>(); |
| | | private final Set<ByteString> insertKeySet = new HashSet<>(); |
| | | private final Set<ByteString> deleteKeySet = new HashSet<>(); |
| | | private final EntryInformation entryInfo = new EntryInformation(); |
| | | private Entry oldEntry; |
| | | private EntryID entryID; |
| | |
| | | } |
| | | } |
| | | |
| | | void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix) throws DirectoryException, |
| | | StorageRuntimeException, InterruptedException |
| | | void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix) |
| | | throws DirectoryException, StorageRuntimeException, InterruptedException |
| | | { |
| | | DN entryDN = entry.getName(); |
| | | DN2ID dn2id = suffix.getDN2ID(); |
| | | EntryID oldID = dn2id.get(txn, entryDN); |
| | | |
| | | EntryID oldID = suffix.getDN2ID().get(txn, entryDN); |
| | | if (oldID != null) |
| | | { |
| | | oldEntry = suffix.getID2Entry().get(txn, oldID); |
| | | } |
| | | |
| | | if (oldEntry == null) |
| | | { |
| | | if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix)) |
| | |
| | | suffix.removePending(entryDN); |
| | | entryID = oldID; |
| | | } |
| | | |
| | | processDN2URI(txn, suffix, oldEntry, entry); |
| | | suffix.getID2Entry().put(txn, entryID, entry); |
| | | if (oldEntry != null) |
| | |
| | | private class ImportTask implements Callable<Void> |
| | | { |
| | | private final Storage storage; |
| | | private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>(); |
| | | private final Set<ByteString> insertKeySet = new HashSet<ByteString>(); |
| | | private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<>(); |
| | | private final Set<ByteString> insertKeySet = new HashSet<>(); |
| | | private final EntryInformation entryInfo = new EntryInformation(); |
| | | private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID_INDEX_NAME, 1); |
| | | |
| | |
| | | |
| | | void flushIndexBuffers() throws InterruptedException, ExecutionException |
| | | { |
| | | final ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>(); |
| | | final ArrayList<Future<Void>> futures = new ArrayList<>(); |
| | | for (IndexOutputBuffer indexBuffer : indexBufferMap.values()) |
| | | { |
| | | indexBuffer.discard(); |
| | |
| | | private final IndexManager indexMgr; |
| | | private final int cacheSize; |
| | | /** indexID => DNState map */ |
| | | private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>(); |
| | | private final Map<Integer, DNState> dnStateMap = new HashMap<>(); |
| | | private final Semaphore permits; |
| | | private final int maxPermits; |
| | | private final AtomicLong bytesRead = new AtomicLong(); |
| | |
| | | batchNumber.incrementAndGet(); |
| | | |
| | | // Create all the index buffers for the next batch. |
| | | final NavigableSet<IndexInputBuffer> buffers = new TreeSet<IndexInputBuffer>(); |
| | | final NavigableSet<IndexInputBuffer> buffers = new TreeSet<>(); |
| | | for (int i = 0; i < permitRequest; i++) |
| | | { |
| | | final long bufferBegin = bufferIndexFile.readLong(); |
| | |
| | | |
| | | private final EntryContainer entryContainer; |
| | | private final TreeName dn2id; |
| | | private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<ByteString, EntryID>(); |
| | | private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<EntryID, AtomicLong>(); |
| | | private final TreeMap<ByteString, EntryID> parentIDMap = new TreeMap<>(); |
| | | private final Map<EntryID, AtomicLong> id2childrenCountTree = new TreeMap<>(); |
| | | private ByteSequence parentDN; |
| | | private final ByteStringBuilder lastDN = new ByteStringBuilder(); |
| | | private EntryID parentID, lastID, entryID; |
| | |
| | | } |
| | | id2childrenCountTree.clear(); |
| | | } |
| | | |
| | | |
| | | } |
| | | } |
| | | |
| | |
| | | private final ByteArrayOutputStream deleteByteStream = new ByteArrayOutputStream(2 * bufferSize); |
| | | private final DataOutputStream bufferStream; |
| | | private final DataOutputStream bufferIndexStream; |
| | | private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<IndexOutputBuffer>(); |
| | | private final TreeSet<IndexOutputBuffer> indexSortedSet = new TreeSet<>(); |
| | | private int insertKeyCount, deleteKeyCount; |
| | | private int bufferCount; |
| | | private boolean poisonSeen; |
| | |
| | | public Void call() throws IOException, InterruptedException |
| | | { |
| | | long offset = 0; |
| | | List<IndexOutputBuffer> l = new LinkedList<IndexOutputBuffer>(); |
| | | List<IndexOutputBuffer> l = new LinkedList<>(); |
| | | try |
| | | { |
| | | while (true) |
| | |
| | | { |
| | | indexMgrList.add(indexMgr); |
| | | } |
| | | BlockingQueue<IndexOutputBuffer> newQueue = |
| | | new ArrayBlockingQueue<IndexOutputBuffer>(phaseOneBufferCount); |
| | | BlockingQueue<IndexOutputBuffer> newQueue = new ArrayBlockingQueue<>(phaseOneBufferCount); |
| | | ScratchFileWriterTask indexWriter = new ScratchFileWriterTask(newQueue, indexMgr); |
| | | scratchFileWriterList.add(indexWriter); |
| | | scratchFileWriterFutures.add(scratchFileWriterService.submit(indexWriter)); |
| | |
| | | |
| | | /** Rebuild index configuration. */ |
| | | private final RebuildConfig rebuildConfig; |
| | | |
| | | /** Local DB backend configuration. */ |
| | | private final PluggableBackendCfg cfg; |
| | | |
| | | /** Map of index keys to indexes. */ |
| | | private final Map<IndexKey, MatchingRuleIndex> indexMap = |
| | | new LinkedHashMap<IndexKey, MatchingRuleIndex>(); |
| | | private final Map<IndexKey, MatchingRuleIndex> indexMap = new LinkedHashMap<>(); |
| | | /** List of VLV indexes. */ |
| | | private final List<VLVIndex> vlvIndexes = new LinkedList<VLVIndex>(); |
| | | private final List<VLVIndex> vlvIndexes = new LinkedList<>(); |
| | | |
| | | /** The suffix instance. */ |
| | | private Suffix suffix; |
| | | /** The entry container. */ |
| | | private EntryContainer entryContainer; |
| | | /** The DN2ID index. */ |
| | | private DN2ID dn2id; |
| | | /** The DN2URI index. */ |
| | |
| | | /** Total entries processed. */ |
| | | private final AtomicLong entriesProcessed = new AtomicLong(0); |
| | | |
| | | /** The suffix instance. */ |
| | | private Suffix suffix; |
| | | /** The entry container. */ |
| | | private EntryContainer entryContainer; |
| | | |
| | | /** |
| | | * Create an instance of the rebuild index manager using the specified |
| | | * parameters. |
| | |
| | | scratchFileWriterService = Executors.newFixedThreadPool(2 * indexCount); |
| | | bufferSortService = Executors.newFixedThreadPool(threadCount); |
| | | ExecutorService rebuildIndexService = Executors.newFixedThreadPool(threadCount); |
| | | List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | List<Callable<Void>> tasks = new ArrayList<>(threadCount); |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(this); |
| | |
| | | */ |
| | | private TmpEnv(File envPath) throws StorageRuntimeException |
| | | { |
| | | final Map<String, Object> returnValues = new HashMap<String, Object>(); |
| | | final Map<String, Object> returnValues = new HashMap<>(); |
| | | returnValues.put("getDBDirectory", envPath.getAbsolutePath()); |
| | | returnValues.put("getBackendId", DB_NAME); |
| | | returnValues.put("getDBCacheSize", 0L); |