| | |
| | | private final int MAX_DB_CACHE_SIZE = 128 * MB; |
| | | private final int MIN_DB_CACHE_SIZE = 16 * MB; |
| | | private final int MAX_DB_LOG_BUF_BYTES = 100 * MB; |
| | | private final int MEM_PCT_PHASE_1 = 60; |
| | | private final int MEM_PCT_PHASE_1 = 45; |
| | | private final int MEM_PCT_PHASE_2 = 50; |
| | | |
| | | private final String DIRECT_PROPERTY = "import.directphase2"; |
| | | private static AttributeType dnType; |
| | | private static IndexBuffer.DNComparator dnComparator |
| | | = new IndexBuffer.DNComparator(); |
| | | private static final IndexBuffer.IndexComparator indexComparator = |
| | | new IndexBuffer.IndexComparator(); |
| | | |
| | | private final AtomicInteger bufferCount = new AtomicInteger(0); |
| | | private final File tempDir; |
| | | private final int indexCount, threadCount; |
| | | private final boolean dn2idPhase2; |
| | | private final boolean skipDNValidation; |
| | | private final LDIFImportConfig config; |
| | | private final LocalDBBackendCfg dbCfg; |
| | | private final ByteBuffer directBuffer; |
| | | |
| | | private RootContainer rootContainer; |
| | | private LDIFReader reader; |
| | | private int bufferSize; |
| | | private int bufferSize, indexBufferCount; |
| | | private int migratedCount; |
| | | private long dbCacheSize = 0, dbLogBufSize = 0; |
| | | |
| | | |
| | | //The executor service used for the sort tasks. |
| | | private ExecutorService sortService; |
| | | |
| | |
| | | private final BlockingQueue<IndexBuffer> freeBufQue = |
| | | new LinkedBlockingQueue<IndexBuffer>(); |
| | | |
| | | //Map of DB containers to que of index buffers. Used to allocate sorted |
| | | //Map of index keys to index buffers. Used to allocate sorted |
| | | //index buffers to a index writer thread. |
| | | private final |
| | | Map<DatabaseContainer, BlockingQueue<IndexBuffer>> containerQueMap = |
| | | new LinkedHashMap<DatabaseContainer, BlockingQueue<IndexBuffer>>(); |
| | | Map<IndexKey, BlockingQueue<IndexBuffer>> indexKeyQueMap = |
| | | new ConcurrentHashMap<IndexKey, BlockingQueue<IndexBuffer>>(); |
| | | |
| | | //Map of DB containers to index managers. Used to start phase 2. |
| | | private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap = |
| | | new LinkedHashMap<DatabaseContainer, IndexManager>(); |
| | | private final List<IndexManager> indexMgrList = |
| | | new LinkedList<IndexManager>(); |
| | | |
| | | //Futures used to indicate when the index file writers are done flushing |
| | | //their work queues and have exited. End of phase one. |
| | |
| | | //index file writer tasks when the LDIF file has been done. |
| | | private final List<IndexFileWriterTask> indexWriterList; |
| | | |
| | | //Map of DNs to Suffix objects. Placeholder for when multiple suffixes are |
| | | //supported. |
| | | |
| | | //Map of DNs to Suffix objects. |
| | | private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>(); |
| | | |
| | | |
| | | private final ConcurrentHashMap<Integer, DatabaseContainer> idContainerMap = |
| | | new ConcurrentHashMap<Integer, DatabaseContainer>(); |
| | | |
| | | private final ConcurrentHashMap<Integer, EntryContainer> idECMap = |
| | | new ConcurrentHashMap<Integer, EntryContainer>(); |
| | | |
| | | private final Object synObj = new Object(); |
| | | |
| | | static |
| | | { |
| | | if ((dnType = DirectoryServer.getAttributeType("dn")) == null) |
| | | { |
| | | dnType = DirectoryServer.getDefaultAttributeType("dn"); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Create a new import job with the specified ldif import config. |
| | | * |
| | | * @param config The LDIF import config. |
| | | * @param cfg The local DB backend config. |
| | | * @param dbCfg The local DB backend config. |
| | | * @throws IOException If a problem occurs while opening the LDIF file for |
| | | * reading. |
| | | * @throws InitializationException If a problem occurs initializationing. |
| | | */ |
| | | public Importer(LDIFImportConfig config, |
| | | LocalDBBackendCfg cfg ) |
| | | throws IOException |
| | | public Importer(LDIFImportConfig config, LocalDBBackendCfg dbCfg ) |
| | | throws IOException, InitializationException |
| | | { |
| | | this.config = config; |
| | | threadCount = cfg.getImportThreadCount(); |
| | | indexCount = cfg.listLocalDBIndexes().length + 2; |
| | | this.dbCfg = dbCfg; |
| | | if(config.getThreadCount() == -1) |
| | | { |
| | | threadCount = Runtime.getRuntime().availableProcessors() * 2; |
| | | } |
| | | else |
| | | { |
| | | threadCount = config.getThreadCount(); |
| | | if(threadCount <= 0) |
| | | { |
| | | Message msg = ERR_IMPORT_LDIF_INVALID_THREAD_COUNT.get(threadCount); |
| | | throw new InitializationException(msg); |
| | | } |
| | | } |
| | | indexCount = dbCfg.listLocalDBIndexes().length + 2; |
| | | indexWriterList = new ArrayList<IndexFileWriterTask>(indexCount); |
| | | indexWriterFutures = new CopyOnWriteArrayList<Future<?>>(); |
| | | File parentDir; |
| | |
| | | { |
| | | parentDir = getFileForPath(config.getTmpDirectory()); |
| | | } |
| | | tempDir = new File(parentDir, cfg.getBackendId()); |
| | | |
| | | tempDir = new File(parentDir, dbCfg.getBackendId()); |
| | | if(!tempDir.exists() && !tempDir.mkdirs()) |
| | | { |
| | | Message msg = ERR_JEB_IMPORT_CREATE_TMPDIR_ERROR.get( |
| | |
| | | f.delete(); |
| | | } |
| | | } |
| | | dn2idPhase2 = config.getDNCheckPhase2(); |
| | | skipDNValidation = config.getSkipDNValidation(); |
| | | String propString = System.getProperty(DIRECT_PROPERTY); |
| | | if(propString != null) |
| | | { |
| | |
| | | |
| | | private void initIndexBuffers(int threadCount) |
| | | { |
| | | int bufferCount = 2 * (indexCount * threadCount); |
| | | for(int i = 0; i < bufferCount; i++) |
| | | indexBufferCount = 2 * (indexCount * threadCount); |
| | | for(int i = 0; i < indexBufferCount; i++) |
| | | { |
| | | IndexBuffer b = IndexBuffer.createIndexBuffer(bufferSize); |
| | | freeBufQue.add(b); |
| | |
| | | |
| | | |
| | | |
| | | private void initSuffixes() |
| | | throws ConfigException, InitializationException |
| | | private void initSuffixes() throws DatabaseException, JebException, |
| | | ConfigException, InitializationException |
| | | { |
| | | Iterator<EntryContainer> i = rootContainer.getEntryContainers().iterator(); |
| | | EntryContainer ec = i.next(); |
| | | Suffix suffix = Suffix.createSuffixContext(ec, config, rootContainer); |
| | | dnSuffixMap.put(ec.getBaseDN(), suffix); |
| | | for(EntryContainer ec : rootContainer.getEntryContainers()) |
| | | { |
| | | Suffix suffix = getSuffix(ec); |
| | | if(suffix != null) |
| | | { |
| | | dnSuffixMap.put(ec.getBaseDN(), suffix); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private Suffix getSuffix(EntryContainer entryContainer) |
| | | throws DatabaseException, JebException, ConfigException, |
| | | InitializationException { |
| | | DN baseDN = entryContainer.getBaseDN(); |
| | | EntryContainer srcEntryContainer = null; |
| | | List<DN> includeBranches = new ArrayList<DN>(); |
| | | List<DN> excludeBranches = new ArrayList<DN>(); |
| | | |
| | | if(!config.appendToExistingData() && |
| | | !config.clearBackend()) |
| | | { |
| | | for(DN dn : config.getExcludeBranches()) |
| | | { |
| | | if(baseDN.equals(dn)) |
| | | { |
| | | // This entire base DN was explicitly excluded. Skip. |
| | | return null; |
| | | } |
| | | if(baseDN.isAncestorOf(dn)) |
| | | { |
| | | excludeBranches.add(dn); |
| | | } |
| | | } |
| | | |
| | | if(!config.getIncludeBranches().isEmpty()) |
| | | { |
| | | for(DN dn : config.getIncludeBranches()) |
| | | { |
| | | if(baseDN.isAncestorOf(dn)) |
| | | { |
| | | includeBranches.add(dn); |
| | | } |
| | | } |
| | | |
| | | if(includeBranches.isEmpty()) |
| | | { |
| | | // There are no branches in the explicitly defined include list under |
| | | // this base DN. Skip this base DN alltogether. |
| | | |
| | | return null; |
| | | } |
| | | |
| | | // Remove any overlapping include branches. |
| | | Iterator<DN> includeBranchIterator = includeBranches.iterator(); |
| | | while(includeBranchIterator.hasNext()) |
| | | { |
| | | DN includeDN = includeBranchIterator.next(); |
| | | boolean keep = true; |
| | | for(DN dn : includeBranches) |
| | | { |
| | | if(!dn.equals(includeDN) && dn.isAncestorOf(includeDN)) |
| | | { |
| | | keep = false; |
| | | break; |
| | | } |
| | | } |
| | | if(!keep) |
| | | { |
| | | includeBranchIterator.remove(); |
| | | } |
| | | } |
| | | |
| | | // Remvoe any exclude branches that are not are not under a include |
| | | // branch since they will be migrated as part of the existing entries |
| | | // outside of the include branches anyways. |
| | | Iterator<DN> excludeBranchIterator = excludeBranches.iterator(); |
| | | while(excludeBranchIterator.hasNext()) |
| | | { |
| | | DN excludeDN = excludeBranchIterator.next(); |
| | | boolean keep = false; |
| | | for(DN includeDN : includeBranches) |
| | | { |
| | | if(includeDN.isAncestorOf(excludeDN)) |
| | | { |
| | | keep = true; |
| | | break; |
| | | } |
| | | } |
| | | if(!keep) |
| | | { |
| | | excludeBranchIterator.remove(); |
| | | } |
| | | } |
| | | |
| | | if(includeBranches.size() == 1 && excludeBranches.size() == 0 && |
| | | includeBranches.get(0).equals(baseDN)) |
| | | { |
| | | // This entire base DN is explicitly included in the import with |
| | | // no exclude branches that we need to migrate. Just clear the entry |
| | | // container. |
| | | entryContainer.lock(); |
| | | entryContainer.clear(); |
| | | entryContainer.unlock(); |
| | | } |
| | | else |
| | | { |
| | | // Create a temp entry container |
| | | srcEntryContainer = entryContainer; |
| | | entryContainer = |
| | | rootContainer.openEntryContainer(baseDN, |
| | | baseDN.toNormalizedString() + |
| | | "_importTmp"); |
| | | } |
| | | } |
| | | } |
| | | return Suffix.createSuffixContext(entryContainer, srcEntryContainer, |
| | | includeBranches, excludeBranches); |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * Import a ldif using the specified root container. |
| | |
| | | * @throws InterruptedException If the import failed due to an interrupted |
| | | * error. |
| | | * @throws ExecutionException If the import failed due to an execution error. |
| | | * @throws DatabaseException If the import failed due to a database error. |
| | | */ |
| | | public LDIFImportResult |
| | | processImport(RootContainer rootContainer) throws ConfigException, |
| | | InitializationException, IOException, JebException, |
| | | InitializationException, IOException, JebException, DatabaseException, |
| | | InterruptedException, ExecutionException |
| | | { |
| | | try { |
| | | this.rootContainer = rootContainer; |
| | | this.reader = new LDIFReader(config, rootContainer, LDIF_READER_BUF_SIZE); |
| | | Message message = |
| | |
| | | processPhaseOne(); |
| | | processPhaseTwo(); |
| | | setIndexesTrusted(); |
| | | switchContainers(); |
| | | tempDir.delete(); |
| | | long finishTime = System.currentTimeMillis(); |
| | | long importTime = (finishTime - startTime); |
| | |
| | | rate = 1000f * reader.getEntriesRead() / importTime; |
| | | message = NOTE_JEB_IMPORT_FINAL_STATUS.get(reader.getEntriesRead(), |
| | | reader.getEntriesRead(), reader.getEntriesIgnored(), reader |
| | | .getEntriesRejected(), 0, importTime / 1000, rate); |
| | | .getEntriesRejected(), migratedCount, importTime / 1000, rate); |
| | | logError(message); |
| | | } |
| | | finally |
| | | { |
| | | reader.close(); |
| | | } |
| | | return new LDIFImportResult(reader.getEntriesRead(), reader |
| | | .getEntriesRejected(), reader.getEntriesIgnored()); |
| | | } |
| | | |
| | | |
| | | private void switchContainers() throws DatabaseException, JebException { |
| | | |
| | | for(Suffix suffix : dnSuffixMap.values()) { |
| | | DN baseDN = suffix.getBaseDN(); |
| | | EntryContainer srcEntryContainer = |
| | | suffix.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null) { |
| | | EntryContainer unregEC = |
| | | rootContainer.unregisterEntryContainer(baseDN); |
| | | //Make sure the unregistered EC for the base DN is the same as |
| | | //the one in the import context. |
| | | if(unregEC != srcEntryContainer) { |
| | | rootContainer.registerEntryContainer(baseDN, unregEC); |
| | | continue; |
| | | } |
| | | srcEntryContainer.lock(); |
| | | srcEntryContainer.close(); |
| | | srcEntryContainer.delete(); |
| | | srcEntryContainer.unlock(); |
| | | EntryContainer newEC = suffix.getEntryContainer(); |
| | | newEC.lock(); |
| | | newEC.setDatabasePrefix(baseDN.toNormalizedString()); |
| | | newEC.unlock(); |
| | | rootContainer.registerEntryContainer(baseDN, newEC); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | private void setIndexesTrusted() throws JebException |
| | | { |
| | | try { |
| | |
| | | timer.scheduleAtFixedRate(progressTask, TIMER_INTERVAL, TIMER_INTERVAL); |
| | | indexProcessService = Executors.newFixedThreadPool(2 * indexCount); |
| | | sortService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | //Import tasks are collective tasks. |
| | | List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new ImportTask()); |
| | | } |
| | | ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | List<Future<Void>> results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | assert result.isDone(); |
| | | List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | |
| | | tasks.add(new MigrateExistingTask()); |
| | | List<Future<Void>> results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | assert result.isDone(); |
| | | tasks.clear(); |
| | | results.clear(); |
| | | |
| | | if (config.appendToExistingData() && |
| | | config.replaceExistingEntries()) |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new AppendReplaceTask()); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new ImportTask()); |
| | | } |
| | | } |
| | | results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | assert result.isDone(); |
| | | |
| | | |
| | | tasks.clear(); |
| | | results.clear(); |
| | | tasks.add(new MigrateExcludedTask()); |
| | | results = execService.invokeAll(tasks); |
| | | for (Future<Void> result : results) |
| | | assert result.isDone(); |
| | | |
| | | |
| | | stopIndexWriterTasks(); |
| | | for (Future<?> result : indexWriterFutures) |
| | | { |
| | | result.get(); |
| | | result.get(); |
| | | } |
| | | execService.shutdown(); |
| | | freeBufQue.clear(); |
| | |
| | | private void processPhaseTwo() throws InterruptedException |
| | | { |
| | | SecondPhaseProgressTask progress2Task = |
| | | new SecondPhaseProgressTask(containerIndexMgrMap); |
| | | new SecondPhaseProgressTask(indexMgrList); |
| | | Timer timer2 = new Timer(); |
| | | timer2.scheduleAtFixedRate(progress2Task, TIMER_INTERVAL, TIMER_INTERVAL); |
| | | processIndexFiles(); |
| | |
| | | { |
| | | cacheSize = cacheSizeFromDirectMemory(); |
| | | } |
| | | for(Map.Entry<DatabaseContainer, IndexManager> e : |
| | | containerIndexMgrMap.entrySet()) |
| | | for(IndexManager idxMgr : indexMgrList) |
| | | { |
| | | DatabaseContainer container = e.getKey(); |
| | | IndexManager indexMgr = e.getValue(); |
| | | boolean isDN2ID = false; |
| | | if(container instanceof DN2ID) |
| | | { |
| | | isDN2ID = true; |
| | | } |
| | | if(directBuffer != null) |
| | | { |
| | | int cacheSizes = cacheSize * indexMgr.getBufferList().size(); |
| | | int cacheSizes = cacheSize * idxMgr.getBufferList().size(); |
| | | offSet += cacheSizes; |
| | | directBuffer.limit(offSet); |
| | | directBuffer.position(p); |
| | | ByteBuffer b = directBuffer.slice(); |
| | | tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, b, cacheSize)); |
| | | tasks.add(new IndexWriteDBTask(idxMgr, b, cacheSize)); |
| | | p += cacheSizes; |
| | | } |
| | | else |
| | | { |
| | | tasks.add(new IndexWriteDBTask(indexMgr, isDN2ID, cacheSize)); |
| | | tasks.add(new IndexWriteDBTask(idxMgr, null, cacheSize)); |
| | | } |
| | | } |
| | | List<Future<Void>> results = indexProcessService.invokeAll(tasks); |
| | |
| | | |
| | | |
| | | /** |
| | | * This task processes the LDIF file during phase 1. |
| | | * Task used to migrate excluded branch. |
| | | */ |
| | | private final class ImportTask implements Callable<Void> { |
| | | private final Map<Suffix, Map<DatabaseContainer, IndexBuffer>> suffixMap = |
| | | new HashMap<Suffix, Map<DatabaseContainer, IndexBuffer>>(); |
| | | private final class MigrateExcludedTask extends ImportTask |
| | | { |
| | | private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); |
| | | private final IndexBuffer.DNComparator dnComparator |
| | | = new IndexBuffer.DNComparator(); |
| | | private final IndexBuffer.IndexComparator indexComparator = |
| | | new IndexBuffer.IndexComparator(); |
| | | |
| | | private final |
| | | Map<IndexKey, IndexBuffer> indexBufferMap = |
| | | new HashMap<IndexKey, IndexBuffer>(); |
| | | |
| | | public Void call() throws Exception |
| | | { |
| | | for(Suffix suffix : dnSuffixMap.values()) { |
| | | EntryContainer srcEntryContainer = suffix.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null && |
| | | !suffix.getExcludeBranches().isEmpty()) { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode lockMode = LockMode.DEFAULT; |
| | | OperationStatus status; |
| | | Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( |
| | | "excluded", String.valueOf(suffix.getBaseDN())); |
| | | logError(message); |
| | | Cursor cursor = |
| | | srcEntryContainer.getDN2ID().openCursor(null, |
| | | CursorConfig.READ_COMMITTED); |
| | | Comparator<byte[]> dn2idComparator = |
| | | srcEntryContainer.getDN2ID().getComparator(); |
| | | try { |
| | | for(DN excludedDN : suffix.getExcludeBranches()) { |
| | | byte[] bytes = |
| | | StaticUtils.getBytes(excludedDN.toNormalizedString()); |
| | | key.setData(bytes); |
| | | status = cursor.getSearchKeyRange(key, data, lockMode); |
| | | if(status == OperationStatus.SUCCESS && |
| | | Arrays.equals(key.getData(), bytes)) { |
| | | // This is the base entry for a branch that was excluded in the |
| | | // import so we must migrate all entries in this branch over to |
| | | // the new entry container. |
| | | byte[] end = |
| | | StaticUtils.getBytes("," + excludedDN.toNormalizedString()); |
| | | end[0] = (byte) (end[0] + 1); |
| | | |
| | | while(status == OperationStatus.SUCCESS && |
| | | dn2idComparator.compare(key.getData(), end) < 0 && |
| | | !config.isCancelled()) { |
| | | EntryID id = new EntryID(data); |
| | | Entry entry = srcEntryContainer.getID2Entry().get(null, |
| | | id, LockMode.DEFAULT); |
| | | processEntry(entry, rootContainer.getNextEntryID(), |
| | | suffix); |
| | | migratedCount++; |
| | | status = cursor.getNext(key, data, lockMode); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | cursor.close(); |
| | | flushIndexBuffers(); |
| | | closeCursors(); |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * Task to migrate existing entries. |
| | | */ |
| | | private final class MigrateExistingTask extends ImportTask |
| | | { |
| | | |
| | | private final |
| | | Map<IndexKey, IndexBuffer> indexBufferMap = |
| | | new HashMap<IndexKey, IndexBuffer>(); |
| | | private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); |
| | | |
| | | public Void call() throws Exception |
| | | { |
| | | for(Suffix suffix : dnSuffixMap.values()) { |
| | | EntryContainer srcEntryContainer = suffix.getSrcEntryContainer(); |
| | | if(srcEntryContainer != null && |
| | | !suffix.getIncludeBranches().isEmpty()) { |
| | | DatabaseEntry key = new DatabaseEntry(); |
| | | DatabaseEntry data = new DatabaseEntry(); |
| | | LockMode lockMode = LockMode.DEFAULT; |
| | | OperationStatus status; |
| | | Message message = NOTE_JEB_IMPORT_MIGRATION_START.get( |
| | | "existing", String.valueOf(suffix.getBaseDN())); |
| | | logError(message); |
| | | Cursor cursor = |
| | | srcEntryContainer.getDN2ID().openCursor(null, |
| | | null); |
| | | try { |
| | | status = cursor.getFirst(key, data, lockMode); |
| | | while(status == OperationStatus.SUCCESS && |
| | | !config.isCancelled()) { |
| | | DN dn = DN.decode(ByteString.wrap(key.getData())); |
| | | if(!suffix.getIncludeBranches().contains(dn)) { |
| | | EntryID id = new EntryID(data); |
| | | Entry entry = |
| | | srcEntryContainer.getID2Entry().get(null, |
| | | id, LockMode.DEFAULT); |
| | | processEntry(entry, rootContainer.getNextEntryID(),suffix); |
| | | migratedCount++; |
| | | status = cursor.getNext(key, data, lockMode); |
| | | } else { |
| | | // This is the base entry for a branch that will be included |
| | | // in the import so we don't want to copy the branch to the |
| | | // new entry container. |
| | | |
| | | /** |
| | | * Advance the cursor to next entry at the same level in the |
| | | * DIT |
| | | * skipping all the entries in this branch. |
| | | * Set the next starting value to a value of equal length but |
| | | * slightly greater than the previous DN. Since keys are |
| | | * compared in reverse order we must set the first byte |
| | | * (the comma). |
| | | * No possibility of overflow here. |
| | | */ |
| | | byte[] begin = |
| | | StaticUtils.getBytes("," + dn.toNormalizedString()); |
| | | begin[0] = (byte) (begin[0] + 1); |
| | | key.setData(begin); |
| | | status = cursor.getSearchKeyRange(key, data, lockMode); |
| | | } |
| | | } |
| | | } finally { |
| | | cursor.close(); |
| | | flushIndexBuffers(); |
| | | closeCursors(); |
| | | } |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | } |
| | | |
| | | /** |
| | | * Task to handle append/replace combination. |
| | | */ |
| | | private class AppendReplaceTask extends ImportTask |
| | | { |
| | | |
| | | private final |
| | | Map<IndexKey, IndexBuffer> indexBufferMap = |
| | | new HashMap<IndexKey, IndexBuffer>(); |
| | | private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); |
| | | private final Set<byte[]> deleteKeySet = new HashSet<byte[]>(); |
| | | private final EntryInformation entryInfo = new EntryInformation(); |
| | | private Entry oldEntry; |
| | | private EntryID entryID; |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | Suffix suffix = null; |
| | | while (true) |
| | | { |
| | | if (config.isCancelled()) |
| | |
| | | freeBufQue.add(idxBuffer); |
| | | return null; |
| | | } |
| | | Entry entry = reader.readEntry(dnSuffixMap); |
| | | |
| | | oldEntry = null; |
| | | Entry entry = reader.readEntry(dnSuffixMap, entryInfo); |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | DN entryDN = entry.getDN(); |
| | | EntryID entryID = (EntryID) entry.getAttachment(); |
| | | //Temporary until multiple suffixes supported. |
| | | if(suffix == null) |
| | | { |
| | | suffix = getMatchSuffix(entryDN, dnSuffixMap); |
| | | } |
| | | if(!suffixMap.containsKey(suffix)) |
| | | { |
| | | suffixMap.put(suffix, new HashMap<DatabaseContainer, IndexBuffer>()); |
| | | } |
| | | if(!dn2idPhase2) |
| | | entryID = entryInfo.getEntryID(); |
| | | Suffix suffix = entryInfo.getSuffix(); |
| | | processEntry(entry, suffix); |
| | | } |
| | | flushIndexBuffers(); |
| | | closeCursors(); |
| | | return null; |
| | | } |
| | | |
| | | |
| | | void processEntry(Entry entry, Suffix suffix) |
| | | throws DatabaseException, ConfigException, DirectoryException, |
| | | JebException |
| | | |
| | | { |
| | | DN entryDN = entry.getDN(); |
| | | DN2ID dn2id = suffix.getDN2ID(); |
| | | EntryID oldID = dn2id.get(null, entryDN, LockMode.DEFAULT); |
| | | if(oldID != null) |
| | | { |
| | | oldEntry = suffix.getID2Entry().get(null, oldID, LockMode.DEFAULT); |
| | | } |
| | | if(oldEntry == null) |
| | | { |
| | | if(!skipDNValidation) |
| | | { |
| | | if(!processParent(entryDN, entryID, entry, suffix)) |
| | | { |
| | | suffix.removePending(entryDN); |
| | | continue; |
| | | return; |
| | | } |
| | | if(!suffix.getDN2ID().insert(null, entryDN, entryID)) |
| | | { |
| | | suffix.removePending(entryDN); |
| | | Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); |
| | | reader.rejectEntry(entry, msg); |
| | | continue; |
| | | Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); |
| | | reader.rejectEntry(entry, msg); |
| | | return; |
| | | } |
| | | suffix.removePending(entryDN); |
| | | processID2SC(entryID, entry, suffix); |
| | |
| | | processDN2ID(suffix, entryDN, entryID); |
| | | suffix.removePending(entryDN); |
| | | } |
| | | suffix.getID2Entry().put(null, entryID, entry); |
| | | } |
| | | else |
| | | { |
| | | suffix.removePending(entryDN); |
| | | entryID = oldID; |
| | | } |
| | | suffix.getID2Entry().put(null, entryID, entry); |
| | | if(oldEntry == null) |
| | | { |
| | | processIndexes(suffix, entry, entryID); |
| | | } |
| | | flushIndexBuffers(); |
| | | if(!dn2idPhase2) |
| | | else |
| | | { |
| | | suffix.getEntryContainer().getID2Children().closeCursor(); |
| | | suffix.getEntryContainer().getID2Subtree().closeCursor(); |
| | | processAllIndexes(suffix, entry, entryID); |
| | | } |
| | | } |
| | | |
| | | void |
| | | processAllIndexes(Suffix ctx, Entry entry, EntryID entryID) throws |
| | | DatabaseException, DirectoryException, JebException, ConfigException |
| | | { |
| | | Transaction txn = null; |
| | | Map<AttributeType, AttributeIndex> attrMap = ctx.getAttrIndexMap(); |
| | | for(Map.Entry<AttributeType, AttributeIndex> mapEntry : |
| | | attrMap.entrySet()) { |
| | | AttributeType attrType = mapEntry.getKey(); |
| | | AttributeIndex attributeIndex = mapEntry.getValue(); |
| | | Index index; |
| | | if((index=attributeIndex.getEqualityIndex()) != null) { |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType,IndexType.EQUALITY)); |
| | | } |
| | | if((index=attributeIndex.getPresenceIndex()) != null) { |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType, IndexType.PRESENCE)); |
| | | } |
| | | if((index=attributeIndex.getSubstringIndex()) != null) { |
| | | int subLen = ((SubstringIndexer)index.indexer).getSubStringLen(); |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType, IndexType.SUBSTRING, subLen)); |
| | | } |
| | | if((index=attributeIndex.getOrderingIndex()) != null) { |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType, IndexType.ORDERING)); |
| | | } |
| | | if((index=attributeIndex.getApproximateIndex()) != null) { |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType,IndexType.APPROXIMATE)); |
| | | } |
| | | for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) { |
| | | vlvIdx.addEntry(txn, entryID, entry); |
| | | } |
| | | Map<String,Collection<Index>> extensibleMap = |
| | | attributeIndex.getExtensibleIndexes(); |
| | | if(!extensibleMap.isEmpty()) { |
| | | Collection<Index> subIndexes = |
| | | attributeIndex.getExtensibleIndexes().get( |
| | | EXTENSIBLE_INDEXER_ID_SUBSTRING); |
| | | if(subIndexes != null) { |
| | | for(Index subIndex: subIndexes) { |
| | | indexAttr(subIndex, entry, entryID, |
| | | new IndexKey(attrType, IndexType.EX_SUBSTRING)); |
| | | } |
| | | } |
| | | Collection<Index> sharedIndexes = |
| | | attributeIndex.getExtensibleIndexes().get( |
| | | EXTENSIBLE_INDEXER_ID_SHARED); |
| | | if(sharedIndexes !=null) { |
| | | for(Index sharedIndex:sharedIndexes) { |
| | | indexAttr(sharedIndex, entry, entryID, |
| | | new IndexKey(attrType, IndexType.EX_SHARED)); |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | void indexAttr(Index index, Entry entry, EntryID entryID, |
| | | IndexKey indexKey) throws DatabaseException, |
| | | ConfigException |
| | | { |
| | | |
| | | if(oldEntry != null) |
| | | { |
| | | deleteKeySet.clear(); |
| | | index.indexer.indexEntry(oldEntry, deleteKeySet); |
| | | for(byte[] delKey : deleteKeySet) |
| | | { |
| | | processKey(index, delKey, entryID, indexComparator, indexKey, false); |
| | | } |
| | | } |
| | | insertKeySet.clear(); |
| | | index.indexer.indexEntry(entry, insertKeySet); |
| | | for(byte[] key : insertKeySet) |
| | | { |
| | | processKey(index, key, entryID, indexComparator, indexKey, true); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * This task processes the LDIF file during phase 1. |
| | | */ |
| | | private class ImportTask implements Callable<Void> |
| | | { |
| | | |
| | | private final |
| | | Map<IndexKey, IndexBuffer> indexBufferMap = |
| | | new HashMap<IndexKey, IndexBuffer>(); |
| | | private final Set<byte[]> insertKeySet = new HashSet<byte[]>(); |
| | | private final EntryInformation entryInfo = new EntryInformation(); |
| | | |
| | | /** |
| | | * {@inheritDoc} |
| | | */ |
| | | public Void call() throws Exception |
| | | { |
| | | while (true) |
| | | { |
| | | if (config.isCancelled()) |
| | | { |
| | | IndexBuffer idxBuffer = IndexBuffer.createIndexBuffer(0); |
| | | freeBufQue.add(idxBuffer); |
| | | return null; |
| | | } |
| | | Entry entry = reader.readEntry(dnSuffixMap, entryInfo); |
| | | |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | EntryID entryID = entryInfo.getEntryID(); |
| | | Suffix suffix = entryInfo.getSuffix(); |
| | | processEntry(entry, entryID, suffix); |
| | | } |
| | | flushIndexBuffers(); |
| | | closeCursors(); |
| | | return null; |
| | | } |
| | | |
| | | |
| | | private boolean processParent(DN entryDN, EntryID entryID, Entry entry, |
| | | void closeCursors() throws DatabaseException |
| | | { |
| | | if(!skipDNValidation) |
| | | { |
| | | for(Suffix suffix : dnSuffixMap.values()) |
| | | { |
| | | suffix.getEntryContainer().getID2Children().closeCursor(); |
| | | suffix.getEntryContainer().getID2Subtree().closeCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | void processEntry(Entry entry, EntryID entryID, Suffix suffix) |
| | | throws DatabaseException, ConfigException, DirectoryException, |
| | | JebException |
| | | |
| | | { |
| | | DN entryDN = entry.getDN(); |
| | | if(!skipDNValidation) |
| | | { |
| | | if(!processParent(entryDN, entryID, entry, suffix)) |
| | | { |
| | | suffix.removePending(entryDN); |
| | | return; |
| | | } |
| | | if(!suffix.getDN2ID().insert(null, entryDN, entryID)) |
| | | { |
| | | suffix.removePending(entryDN); |
| | | Message msg = WARN_JEB_IMPORT_ENTRY_EXISTS.get(); |
| | | reader.rejectEntry(entry, msg); |
| | | return; |
| | | } |
| | | suffix.removePending(entryDN); |
| | | processID2SC(entryID, entry, suffix); |
| | | } |
| | | else |
| | | { |
| | | processDN2ID(suffix, entryDN, entryID); |
| | | suffix.removePending(entryDN); |
| | | } |
| | | suffix.getID2Entry().put(null, entryID, entry); |
| | | processIndexes(suffix, entry, entryID); |
| | | return; |
| | | } |
| | | |
| | | boolean processParent(DN entryDN, EntryID entryID, Entry entry, |
| | | Suffix suffix) throws DatabaseException |
| | | { |
| | | EntryID parentID = null; |
| | |
| | | return true; |
| | | } |
| | | |
| | | private void processID2SC(EntryID entryID, Entry entry, Suffix suffix) |
| | | void processID2SC(EntryID entryID, Entry entry, Suffix suffix) |
| | | throws DatabaseException |
| | | { |
| | | Set<byte[]> childKeySet = new HashSet<byte[]>(); |
| | |
| | | |
| | | DatabaseEntry dbKey = new DatabaseEntry(); |
| | | DatabaseEntry dbVal = new DatabaseEntry(); |
| | | ImportIDSet idSet = new ImportIDSet(); |
| | | idSet.addEntryID(entryID, id2children.getIndexEntryLimit(), |
| | | id2children.getMaintainCount()); |
| | | ImportIDSet idSet = new ImportIDSet(1, id2children.getIndexEntryLimit(), |
| | | id2children.getMaintainCount()); |
| | | idSet.addEntryID(entryID); |
| | | id2children.insert(idSet, childKeySet, dbKey, dbVal); |
| | | |
| | | DatabaseEntry dbSubKey = new DatabaseEntry(); |
| | | DatabaseEntry dbSubVal = new DatabaseEntry(); |
| | | ImportIDSet idSubSet = new ImportIDSet(); |
| | | idSubSet.addEntryID(entryID, id2subtree.getIndexEntryLimit(), |
| | | id2subtree.getMaintainCount()); |
| | | ImportIDSet idSubSet = new ImportIDSet(1, id2subtree.getIndexEntryLimit(), |
| | | id2subtree.getMaintainCount()); |
| | | idSubSet.addEntryID(entryID); |
| | | id2subtree.insert(idSubSet, subtreeKeySet, dbSubKey, dbSubVal); |
| | | } |
| | | |
| | | private EntryID getAncestorID(DN2ID dn2id, DN dn) |
| | | EntryID getAncestorID(DN2ID dn2id, DN dn) |
| | | throws DatabaseException |
| | | { |
| | | int i=0; |
| | |
| | | |
| | | |
| | | |
| | | private void |
| | | void |
| | | processIndexes(Suffix ctx, Entry entry, EntryID entryID) throws |
| | | DatabaseException, DirectoryException, JebException, ConfigException |
| | | { |
| | |
| | | AttributeIndex attributeIndex = mapEntry.getValue(); |
| | | Index index; |
| | | if((index=attributeIndex.getEqualityIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType,IndexType.EQUALITY)); |
| | | } |
| | | if((index=attributeIndex.getPresenceIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType, IndexType.PRESENCE)); |
| | | } |
| | | if((index=attributeIndex.getSubstringIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | int subLen = ((SubstringIndexer)index.indexer).getSubStringLen(); |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType, IndexType.SUBSTRING, subLen)); |
| | | } |
| | | if((index=attributeIndex.getOrderingIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType, IndexType.ORDERING)); |
| | | } |
| | | if((index=attributeIndex.getApproximateIndex()) != null) { |
| | | indexAttr(ctx, index, entry, entryID); |
| | | indexAttr(index, entry, entryID, |
| | | new IndexKey(attrType,IndexType.APPROXIMATE)); |
| | | } |
| | | for(VLVIndex vlvIdx : ctx.getEntryContainer().getVLVIndexes()) { |
| | | vlvIdx.addEntry(txn, entryID, entry); |
| | |
| | | EXTENSIBLE_INDEXER_ID_SUBSTRING); |
| | | if(subIndexes != null) { |
| | | for(Index subIndex: subIndexes) { |
| | | indexAttr(ctx, subIndex, entry, entryID); |
| | | indexAttr(subIndex, entry, entryID, |
| | | new IndexKey(attrType, IndexType.EX_SUBSTRING)); |
| | | } |
| | | } |
| | | Collection<Index> sharedIndexes = |
| | |
| | | EXTENSIBLE_INDEXER_ID_SHARED); |
| | | if(sharedIndexes !=null) { |
| | | for(Index sharedIndex:sharedIndexes) { |
| | | indexAttr(ctx, sharedIndex, entry, entryID); |
| | | indexAttr(sharedIndex, entry, entryID, |
| | | new IndexKey(attrType, IndexType.EX_SHARED)); |
| | | } |
| | | } |
| | | } |
| | |
| | | |
| | | |
| | | |
| | | private void indexAttr(Suffix ctx, Index index, Entry entry, |
| | | EntryID entryID) |
| | | throws DatabaseException, ConfigException |
| | | void indexAttr(Index index, Entry entry, EntryID entryID, |
| | | IndexKey indexKey) throws DatabaseException, |
| | | ConfigException |
| | | { |
| | | insertKeySet.clear(); |
| | | index.indexer.indexEntry(entry, insertKeySet); |
| | | for(byte[] key : insertKeySet) |
| | | { |
| | | processKey(ctx, index, key, entryID, indexComparator, null); |
| | | processKey(index, key, entryID, indexComparator, indexKey, true); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void flushIndexBuffers() throws InterruptedException, |
| | | void flushIndexBuffers() throws InterruptedException, |
| | | ExecutionException |
| | | { |
| | | Iterator<Suffix> i = dnSuffixMap.values().iterator(); |
| | | Suffix suffix = i.next(); |
| | | for(Map<DatabaseContainer, IndexBuffer> map : suffixMap.values()) |
| | | { |
| | | for(Map.Entry<DatabaseContainer, IndexBuffer> e : map.entrySet()) |
| | | Set<Map.Entry<IndexKey, IndexBuffer>> set = indexBufferMap.entrySet(); |
| | | for(Map.Entry<IndexKey, IndexBuffer> e : set) |
| | | { |
| | | DatabaseContainer container = e.getKey(); |
| | | IndexKey indexKey = e.getKey(); |
| | | IndexBuffer indexBuffer = e.getValue(); |
| | | if(container instanceof DN2ID) |
| | | IndexType indexType = indexKey.getIndexType(); |
| | | if(indexType.equals(IndexType.DN)) |
| | | { |
| | | indexBuffer.setComparator(dnComparator); |
| | | } |
| | |
| | | { |
| | | indexBuffer.setComparator(indexComparator); |
| | | } |
| | | indexBuffer.setContainer(container); |
| | | indexBuffer.setEntryContainer(suffix.getEntryContainer()); |
| | | indexBuffer.setIndexKey(indexKey); |
| | | Future<Void> future = sortService.submit(new SortTask(indexBuffer)); |
| | | future.get(); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private void |
| | | processKey(Suffix ctx, DatabaseContainer container, byte[] key, |
| | | EntryID entryID,IndexBuffer.ComparatorBuffer<byte[]> comparator, |
| | | EntryContainer entryContainer) throws ConfigException |
| | | int |
| | | processKey(DatabaseContainer container, byte[] key, EntryID entryID, |
| | | IndexBuffer.ComparatorBuffer<byte[]> comparator, IndexKey indexKey, |
| | | boolean insert) |
| | | throws ConfigException |
| | | { |
| | | IndexBuffer indexBuffer; |
| | | Map<DatabaseContainer, IndexBuffer> conMap = suffixMap.get(ctx); |
| | | if(!conMap.containsKey(container)) |
| | | if(!indexBufferMap.containsKey(indexKey)) |
| | | { |
| | | indexBuffer = getNewIndexBuffer(); |
| | | conMap.put(container, indexBuffer); |
| | | indexBufferMap.put(indexKey, indexBuffer); |
| | | } |
| | | else |
| | | { |
| | | indexBuffer = conMap.get(container); |
| | | indexBuffer = indexBufferMap.get(indexKey); |
| | | } |
| | | if(!indexBuffer.isSpaceAvailable(key)) |
| | | { |
| | | indexBuffer.setContainer(container); |
| | | indexBuffer.setComparator(comparator); |
| | | indexBuffer.setEntryContainer(entryContainer); |
| | | indexBuffer.setIndexKey(indexKey); |
| | | sortService.submit(new SortTask(indexBuffer)); |
| | | indexBuffer = getNewIndexBuffer(); |
| | | conMap.remove(container); |
| | | conMap.put(container, indexBuffer); |
| | | indexBufferMap.remove(indexKey); |
| | | indexBufferMap.put(indexKey, indexBuffer); |
| | | } |
| | | indexBuffer.add(key, entryID); |
| | | int id = System.identityHashCode(container); |
| | | idContainerMap.putIfAbsent(id, container); |
| | | indexBuffer.add(key, entryID, id, insert); |
| | | return id; |
| | | } |
| | | |
| | | |
| | | private IndexBuffer getNewIndexBuffer() throws ConfigException |
| | | IndexBuffer getNewIndexBuffer() throws ConfigException |
| | | { |
| | | IndexBuffer indexBuffer = freeBufQue.poll(); |
| | | if(indexBuffer.isPoison()) |
| | |
| | | } |
| | | |
| | | |
| | | private void processDN2ID(Suffix suffix, DN dn, EntryID entryID) |
| | | void processDN2ID(Suffix suffix, DN dn, EntryID entryID) |
| | | throws ConfigException |
| | | { |
| | | DatabaseContainer dn2id = suffix.getDN2ID(); |
| | | byte[] dnBytes = StaticUtils.getBytes(dn.toNormalizedString()); |
| | | processKey(suffix, dn2id, dnBytes, entryID, dnComparator, |
| | | suffix.getEntryContainer()); |
| | | |
| | | int id = processKey(dn2id, dnBytes, entryID, dnComparator, |
| | | new IndexKey(dnType, IndexType.DN), true); |
| | | idECMap.putIfAbsent(id, suffix.getEntryContainer()); |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * The task reads the temporary index files and writes their results to the |
| | | * index database. |
| | | */ |
| | | private final class IndexWriteDBTask implements Callable<Void> { |
| | | |
| | | private final class IndexWriteDBTask implements Callable<Void> |
| | | { |
| | | private final IndexManager indexMgr; |
| | | private final boolean isDN2ID; |
| | | private final DatabaseEntry dbKey, dbValue; |
| | | private final DN2ID dn2id; |
| | | private final Index index; |
| | | |
| | | private final EntryContainer entryContainer; |
| | | private final int id2ChildLimit; |
| | | private final boolean id2ChildMCount; |
| | | |
| | | private TreeMap<DN,EntryID> parentIDMap = new TreeMap<DN,EntryID>(); |
| | | private DN parentDN, lastDN; |
| | | private EntryID parentID, lastID; |
| | | private final Map<byte[], ImportIDSet> id2childTree; |
| | | private final Map<byte[], ImportIDSet> id2subtreeTree; |
| | | private final int cacheSize; |
| | | private ByteBuffer directBuffer = null; |
| | | private final Map<Integer, DNState> dnStateMap = |
| | | new HashMap<Integer, DNState>(); |
| | | private final Map<Integer, Index> indexMap = new HashMap<Integer, Index>(); |
| | | |
| | | public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID, |
| | | ByteBuffer b, int cacheSize) |
| | | { |
| | | this(indexMgr, isDN2ID, cacheSize); |
| | | directBuffer = b; |
| | | } |
| | | |
| | | public IndexWriteDBTask(IndexManager indexMgr, boolean isDN2ID, |
| | | int cacheSize) |
| | | public IndexWriteDBTask(IndexManager indexMgr, ByteBuffer b, int cacheSize) |
| | | { |
| | | this.indexMgr = indexMgr; |
| | | this.entryContainer = indexMgr.entryContainer; |
| | | this.isDN2ID = isDN2ID; |
| | | directBuffer = b; |
| | | this.dbKey = new DatabaseEntry(); |
| | | this.dbValue = new DatabaseEntry(); |
| | | this.cacheSize = cacheSize; |
| | | if(isDN2ID) |
| | | { |
| | | this.dn2id = indexMgr.dn2id; |
| | | this.index = null; |
| | | id2ChildLimit = entryContainer.getID2Children().getIndexEntryLimit(); |
| | | id2ChildMCount = entryContainer.getID2Subtree().getMaintainCount(); |
| | | Comparator<byte[]> id2ChildComparator = |
| | | entryContainer.getID2Children().getComparator(); |
| | | Comparator<byte[]> id2SubtreeComparator = |
| | | entryContainer.getID2Subtree().getComparator(); |
| | | id2childTree = |
| | | new TreeMap<byte[], ImportIDSet>(id2ChildComparator); |
| | | id2subtreeTree = |
| | | new TreeMap<byte[], ImportIDSet>(id2SubtreeComparator); |
| | | } |
| | | else |
| | | { |
| | | this.dn2id = null; |
| | | this.index = indexMgr.getIndex(); |
| | | id2subtreeTree = null; |
| | | id2childTree = null; |
| | | id2ChildLimit = 0; |
| | | id2ChildMCount = false; |
| | | } |
| | | } |
| | | |
| | | |
| | | public Void call() throws Exception |
| | | private SortedSet<Buffer> initBuffers() throws IOException |
| | | { |
| | | |
| | | Comparator<byte[]> comparator = indexMgr.getComparator(); |
| | | int limit = indexMgr.getLimit(); |
| | | boolean maintainCount = indexMgr.getMaintainCount(); |
| | | byte[] cKey = null; |
| | | ImportIDSet cIDSet = null; |
| | | indexMgr.init(); |
| | | List<Buffer> bufferList = indexMgr.getBufferList(); |
| | | SortedSet<Buffer> bufferSet = new TreeSet<Buffer>(); |
| | | int p = 0; |
| | | int offSet = cacheSize; |
| | | for(Buffer b : bufferList) |
| | | SortedSet<Buffer> bufferSet = new TreeSet<Buffer>(); |
| | | for(Buffer b : indexMgr.getBufferList()) |
| | | { |
| | | if(directBuffer != null) |
| | | { |
| | |
| | | } |
| | | bufferSet.add(b); |
| | | } |
| | | return bufferSet; |
| | | } |
| | | |
| | | public Void call() throws Exception |
| | | { |
| | | byte[] cKey = null; |
| | | ImportIDSet cInsertIDSet = null, cDeleteIDSet = null; |
| | | Integer cIndexID = null; |
| | | |
| | | indexMgr.init(); |
| | | SortedSet<Buffer> bufferSet = initBuffers(); |
| | | while(!bufferSet.isEmpty()) |
| | | { |
| | | Buffer b; |
| | | b = bufferSet.first(); |
| | | if(b == null) { |
| | | System.out.println("null b"); |
| | | } |
| | | bufferSet.remove(b); |
| | | byte[] key = b.getKey(); |
| | | ImportIDSet idSet = b.getIDSet(); |
| | | if(cKey == null) |
| | | { |
| | | cKey = key; |
| | | cIDSet = idSet; |
| | | cIndexID = b.getIndexID(); |
| | | cKey = b.getKey(); |
| | | cInsertIDSet = b.getInsertIDSet(); |
| | | cDeleteIDSet = b.getDeleteIDSet(); |
| | | cInsertIDSet.setKey(cKey); |
| | | cDeleteIDSet.setKey(cKey); |
| | | } |
| | | else |
| | | { |
| | | if(comparator.compare(key, cKey) != 0) |
| | | if(b.compare(cKey, cIndexID) != 0) |
| | | { |
| | | addToDB(cKey, cIDSet); |
| | | addToDB(cInsertIDSet, cDeleteIDSet, cIndexID); |
| | | indexMgr.incrKeyCount(); |
| | | cKey = key; |
| | | cIDSet = idSet; |
| | | cIndexID = b.getIndexID(); |
| | | cKey = b.getKey(); |
| | | cInsertIDSet = b.getInsertIDSet(); |
| | | cDeleteIDSet = b.getDeleteIDSet(); |
| | | cInsertIDSet.setKey(cKey); |
| | | cDeleteIDSet.setKey(cKey); |
| | | } |
| | | else |
| | | { |
| | | cIDSet.setKey(cKey); |
| | | cIDSet.merge(idSet, limit, maintainCount); |
| | | cInsertIDSet.merge(b.getInsertIDSet()); |
| | | cDeleteIDSet.merge(b.getDeleteIDSet()); |
| | | } |
| | | } |
| | | if(b.hasMoreData()) |
| | |
| | | } |
| | | if(cKey != null) |
| | | { |
| | | addToDB(cKey, cIDSet); |
| | | addToDB(cInsertIDSet, cDeleteIDSet, cIndexID); |
| | | } |
| | | cleanUP(); |
| | | return null; |
| | |
| | | private void cleanUP() throws DatabaseException, DirectoryException, |
| | | IOException |
| | | { |
| | | if(!isDN2ID) { |
| | | index.closeCursor(); |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(index.getName()); |
| | | if(indexMgr.isDN2ID() && skipDNValidation) |
| | | { |
| | | for(DNState dnState : dnStateMap.values()) |
| | | { |
| | | dnState.flush(); |
| | | } |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getDNCount()); |
| | | logError(msg); |
| | | |
| | | } |
| | | else |
| | | { |
| | | if(dn2idPhase2) |
| | | for(Index index : indexMap.values()) |
| | | { |
| | | flushSubTreeChildIndexes(); |
| | | index.closeCursor(); |
| | | } |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_INDEX_CLOSE.get(indexMgr.getName()); |
| | | logError(msg); |
| | | } |
| | | indexMgr.setDone(); |
| | | indexMgr.close(); |
| | | indexMgr.deleteIndexFile(); |
| | | } |
| | | |
| | | |
| | | private void flushSubTreeChildIndexes() |
| | | throws DatabaseException, DirectoryException |
| | | { |
| | | Index id2child = entryContainer.getID2Children(); |
| | | Set<Map.Entry<byte[], ImportIDSet>> id2childSet = |
| | | id2childTree.entrySet(); |
| | | for(Map.Entry<byte[], ImportIDSet> e : id2childSet) |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | dbKey.setData(key); |
| | | id2child.insert(dbKey, idSet, dbValue); |
| | | } |
| | | id2child.closeCursor(); |
| | | Index id2subtree = entryContainer.getID2Subtree(); |
| | | Set<Map.Entry<byte[], ImportIDSet>> subtreeSet = |
| | | id2subtreeTree.entrySet(); |
| | | for(Map.Entry<byte[], ImportIDSet> e : subtreeSet) |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | dbKey.setData(key); |
| | | id2subtree.insert(dbKey, idSet, dbValue); |
| | | } |
| | | id2subtree.closeCursor(); |
| | | Message msg = |
| | | NOTE_JEB_IMPORT_LDIF_DN_CLOSE.get(indexMgr.getTotDNCount()); |
| | | logError(msg); |
| | | } |
| | | |
| | | |
| | | private void addToDB(byte[] key, ImportIDSet record) |
| | | private void addToDB(ImportIDSet insRec, ImportIDSet delRec, int indexID) |
| | | throws InterruptedException, DatabaseException, DirectoryException |
| | | { |
| | | record.setKey(key); |
| | | if(!this.isDN2ID) |
| | | if(!indexMgr.isDN2ID()) |
| | | { |
| | | addIndex(record); |
| | | } |
| | | else |
| | | { |
| | | if(dn2idPhase2) |
| | | Index index; |
| | | if((delRec.size() > 0) || (!delRec.isDefined())) |
| | | { |
| | | addDN2ID(record); |
| | | dbKey.setData(delRec.getKey()); |
| | | index = (Index)idContainerMap.get(indexID); |
| | | index.delete(dbKey, delRec, dbValue); |
| | | if(!indexMap.containsKey(indexID)) |
| | | { |
| | | indexMap.put(indexID, index); |
| | | } |
| | | } |
| | | |
| | | |
| | | if((insRec.size() > 0) || (!insRec.isDefined())) |
| | | { |
| | | dbKey.setData(insRec.getKey()); |
| | | index = (Index)idContainerMap.get(indexID); |
| | | index.insert(dbKey, insRec, dbValue); |
| | | if(!indexMap.containsKey(indexID)) |
| | | { |
| | | indexMap.put(indexID, index); |
| | | } |
| | | } |
| | | } |
| | | else if(skipDNValidation) |
| | | { |
| | | addDN2ID(insRec, indexID); |
| | | } |
| | | } |
| | | |
| | | |
| | | private void id2Subtree(EntryContainer ec, EntryID childID, |
| | | int limit, boolean mCount) throws DatabaseException |
| | | private void addDN2ID(ImportIDSet record, Integer indexID) |
| | | throws DatabaseException, DirectoryException |
| | | { |
| | | ImportIDSet idSet; |
| | | if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | DNState dnState; |
| | | if(!dnStateMap.containsKey(indexID)) |
| | | { |
| | | idSet = new ImportIDSet(); |
| | | id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | dnState = new DNState(idECMap.get(indexID)); |
| | | dnStateMap.put(indexID, dnState); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData()); |
| | | dnState = dnStateMap.get(indexID); |
| | | } |
| | | idSet.addEntryID(childID, limit, mCount); |
| | | for (DN dn = ec.getParentWithinBase(parentDN); dn != null; |
| | | dn = ec.getParentWithinBase(dn)) |
| | | |
| | | if(!dnState.checkParent(record)) |
| | | { |
| | | EntryID nodeID = parentIDMap.get(dn); |
| | | if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData())) |
| | | return; |
| | | } |
| | | dnState.writeToDB(); |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This class is used to by a index DB merge thread performing DN processing |
| | | * to keep track of the state of individual DN2ID index processing. |
| | | */ |
| | | class DNState |
| | | { |
| | | //DN related stuff per suffix |
| | | private final DatabaseEntry dbKey1, dbValue1; |
| | | private final TreeMap<DN, EntryID> parentIDMap = |
| | | new TreeMap<DN, EntryID>(); |
| | | private DN parentDN, lastDN; |
| | | private EntryID parentID, lastID, entryID; |
| | | private final EntryContainer entryContainer; |
| | | private final Map<byte[], ImportIDSet> id2childTree; |
| | | private final Map<byte[], ImportIDSet> id2subtreeTree; |
| | | private final Index childIndex, subIndex; |
| | | private final DN2ID dn2id; |
| | | |
| | | DNState(EntryContainer entryContainer) |
| | | { |
| | | this.entryContainer = entryContainer; |
| | | dn2id = entryContainer.getDN2ID(); |
| | | childIndex = entryContainer.getID2Children(); |
| | | subIndex = entryContainer.getID2Subtree(); |
| | | Comparator<byte[]> childComparator = childIndex.getComparator(); |
| | | Comparator<byte[]> subComparator = subIndex.getComparator(); |
| | | id2childTree = new TreeMap<byte[], ImportIDSet>(childComparator); |
| | | id2subtreeTree = new TreeMap<byte[], ImportIDSet>(subComparator); |
| | | this.dbKey1 = new DatabaseEntry(); |
| | | this.dbValue1 = new DatabaseEntry(); |
| | | } |
| | | |
| | | |
| | | private boolean checkParent(ImportIDSet record) throws DirectoryException |
| | | { |
| | | dbKey1.setData(record.getKey()); |
| | | byte[] v = record.toDatabase(); |
| | | long v1 = JebFormat.entryIDFromDatabase(v); |
| | | dbValue1.setData(v); |
| | | DN dn = DN.decode(ByteString.wrap(dbKey1.getData())); |
| | | |
| | | |
| | | entryID = new EntryID(v1); |
| | | if(parentIDMap.isEmpty()) |
| | | { |
| | | idSet = new ImportIDSet(); |
| | | id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet); |
| | | parentIDMap.put(dn, entryID); |
| | | return true; |
| | | } |
| | | else if(lastDN != null && lastDN.isAncestorOf(dn)) |
| | | { |
| | | parentIDMap.put(lastDN, lastID); |
| | | parentDN = lastDN; |
| | | parentID = lastID; |
| | | lastDN = dn; |
| | | lastID = entryID; |
| | | return true; |
| | | } |
| | | else if(parentIDMap.lastKey().isAncestorOf(dn)) |
| | | { |
| | | parentDN = parentIDMap.lastKey(); |
| | | parentID = parentIDMap.get(parentDN); |
| | | lastDN = dn; |
| | | lastID = entryID; |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData()); |
| | | DN pDN = entryContainer.getParentWithinBase(dn); |
| | | if(parentIDMap.containsKey(pDN)) { |
| | | DN lastKey = parentIDMap.lastKey(); |
| | | Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey); |
| | | for(Map.Entry<DN, EntryID> e : subMap.entrySet()) |
| | | { |
| | | subMap.remove(e.getKey()); |
| | | } |
| | | parentDN = pDN; |
| | | parentID = parentIDMap.get(pDN); |
| | | lastDN = dn; |
| | | lastID = entryID; |
| | | } |
| | | else |
| | | { |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString()); |
| | | Entry e = new Entry(dn, null, null, null); |
| | | reader.rejectEntry(e, msg); |
| | | return false; |
| | | } |
| | | } |
| | | idSet.addEntryID(childID, limit, mCount); |
| | | return true; |
| | | } |
| | | } |
| | | |
| | | private void id2child(EntryID childID, int limit, boolean mCount) |
| | | |
| | | private void id2child(EntryID childID) |
| | | { |
| | | ImportIDSet idSet; |
| | | if(!id2childTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | { |
| | | idSet = new ImportIDSet(); |
| | | idSet = new ImportIDSet(1,childIndex.getIndexEntryLimit(), |
| | | childIndex.getMaintainCount()); |
| | | id2childTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2childTree.get(parentID.getDatabaseEntry().getData()); |
| | | } |
| | | idSet.addEntryID(childID, limit, mCount); |
| | | idSet.addEntryID(childID); |
| | | } |
| | | |
| | | private boolean checkParent(DN dn, EntryID id, EntryContainer ec) |
| | | { |
| | | if(parentIDMap.isEmpty()) |
| | | |
| | | private void id2Subtree(EntryID childID) throws DatabaseException |
| | | { |
| | | parentIDMap.put(dn, id); |
| | | return true; |
| | | } |
| | | else if(lastDN != null && lastDN.isAncestorOf(dn)) |
| | | { |
| | | parentIDMap.put(lastDN, lastID); |
| | | parentDN = lastDN; |
| | | parentID = lastID; |
| | | lastDN = dn; |
| | | lastID = id; |
| | | return true; |
| | | } |
| | | else if(parentIDMap.lastKey().isAncestorOf(dn)) |
| | | { |
| | | parentDN = parentIDMap.lastKey(); |
| | | parentID = parentIDMap.get(parentDN); |
| | | lastDN = dn; |
| | | lastID = id; |
| | | return true; |
| | | } |
| | | else |
| | | { |
| | | DN pDN = ec.getParentWithinBase(dn); |
| | | if(parentIDMap.containsKey(pDN)) { |
| | | DN lastKey = parentIDMap.lastKey(); |
| | | Map<DN, EntryID> subMap = parentIDMap.subMap(pDN, lastKey); |
| | | for(Map.Entry<DN, EntryID> e : subMap.entrySet()) |
| | | { |
| | | subMap.remove(e.getKey()); |
| | | } |
| | | parentDN = pDN; |
| | | parentID = parentIDMap.get(pDN); |
| | | lastDN = dn; |
| | | lastID = id; |
| | | ImportIDSet idSet; |
| | | if(!id2subtreeTree.containsKey(parentID.getDatabaseEntry().getData())) |
| | | { |
| | | idSet = new ImportIDSet(1, subIndex.getIndexEntryLimit(), |
| | | subIndex.getMaintainCount()); |
| | | id2subtreeTree.put(parentID.getDatabaseEntry().getData(), idSet); |
| | | } |
| | | else |
| | | { |
| | | Message msg = NOTE_JEB_IMPORT_LDIF_DN_NO_PARENT.get(dn.toString()); |
| | | Entry e = new Entry(dn, null, null, null); |
| | | reader.rejectEntry(e, msg); |
| | | return false; |
| | | idSet = id2subtreeTree.get(parentID.getDatabaseEntry().getData()); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | for (DN dn = entryContainer.getParentWithinBase(parentDN); dn != null; |
| | | dn = entryContainer.getParentWithinBase(dn)) |
| | | { |
| | | EntryID nodeID = parentIDMap.get(dn); |
| | | if(!id2subtreeTree.containsKey(nodeID.getDatabaseEntry().getData())) |
| | | { |
| | | idSet = new ImportIDSet(1, subIndex.getIndexEntryLimit(), |
| | | subIndex.getMaintainCount()); |
| | | id2subtreeTree.put(nodeID.getDatabaseEntry().getData(), idSet); |
| | | } |
| | | else |
| | | { |
| | | idSet = id2subtreeTree.get(nodeID.getDatabaseEntry().getData()); |
| | | } |
| | | idSet.addEntryID(childID); |
| | | } |
| | | } |
| | | return true; |
| | | } |
| | | |
| | | private void addDN2ID(ImportIDSet record) |
| | | throws DatabaseException, DirectoryException |
| | | { |
| | | DatabaseEntry idVal = new DatabaseEntry(); |
| | | dbKey.setData(record.getKey()); |
| | | idVal.setData(record.toDatabase()); |
| | | DN dn = DN.decode(ByteString.wrap(dbKey.getData())); |
| | | EntryID entryID = new EntryID(idVal); |
| | | if(!checkParent(dn, entryID, entryContainer)) |
| | | { |
| | | return; |
| | | } |
| | | dn2id.putRaw(null, dbKey, idVal); |
| | | |
| | | public void writeToDB() throws DatabaseException |
| | | { |
| | | dn2id.putRaw(null, dbKey1, dbValue1); |
| | | indexMgr.addTotDNCount(1); |
| | | if(parentDN != null) |
| | | { |
| | | id2child(entryID, id2ChildLimit, id2ChildMCount); |
| | | id2Subtree(entryContainer, |
| | | entryID, id2ChildLimit, id2ChildMCount); |
| | | id2child(entryID); |
| | | id2Subtree(entryID); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | private void addIndex(ImportIDSet record) throws DatabaseException |
| | | { |
| | | dbKey.setData(record.getKey()); |
| | | index.insert(dbKey, record, dbValue); |
| | | public void flush() throws DatabaseException, DirectoryException |
| | | { |
| | | Set<Map.Entry<byte[], ImportIDSet>> id2childSet = |
| | | id2childTree.entrySet(); |
| | | for(Map.Entry<byte[], ImportIDSet> e : id2childSet) |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | dbKey1.setData(key); |
| | | childIndex.insert(dbKey1, idSet, dbValue1); |
| | | } |
| | | childIndex.closeCursor(); |
| | | //Do subtree. |
| | | Set<Map.Entry<byte[], ImportIDSet>> subtreeSet = |
| | | id2subtreeTree.entrySet(); |
| | | for(Map.Entry<byte[], ImportIDSet> e : subtreeSet) |
| | | { |
| | | byte[] key = e.getKey(); |
| | | ImportIDSet idSet = e.getValue(); |
| | | dbKey1.setData(key); |
| | | subIndex.insert(dbKey1, idSet, dbValue1); |
| | | } |
| | | subIndex.closeCursor(); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | { |
| | | private final IndexManager indexMgr; |
| | | private final BlockingQueue<IndexBuffer> que; |
| | | private final ByteArrayOutputStream byteStream = |
| | | private final ByteArrayOutputStream insetByteStream = |
| | | new ByteArrayOutputStream(2 * bufferSize); |
| | | private final ByteArrayOutputStream deleteByteStream = |
| | | new ByteArrayOutputStream(2 * bufferSize); |
| | | private final DataOutputStream dataStream; |
| | | private long bufCount = 0; |
| | |
| | | { |
| | | long beginOffset = offset; |
| | | long bufLen; |
| | | /* |
| | | if(!que.isEmpty()) |
| | | { |
| | | que.drainTo(l, DRAIN_TO); |
| | |
| | | } |
| | | freeBufQue.addAll(l); |
| | | l.clear(); |
| | | if(poisonSeen) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | */ |
| | | if(indexBuffer.isPoison()) |
| | | { |
| | | break; |
| | |
| | | bufLen = writeIndexBuffer(indexBuffer); |
| | | indexBuffer.reset(); |
| | | freeBufQue.add(indexBuffer); |
| | | } |
| | | // } |
| | | offset += bufLen; |
| | | indexMgr.addBuffer(new Buffer(beginOffset, offset, bufCount)); |
| | | bufCount++; |
| | | bufferCount.incrementAndGet(); |
| | | if(poisonSeen) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | dataStream.close(); |
| | |
| | | catch (IOException e) { |
| | | Message msg = |
| | | ERR_JEB_IMPORT_LDIF_INDEX_FILEWRITER_ERR.get(file.getName(), |
| | | e.getMessage()); |
| | | e.getMessage()); |
| | | logError(msg); |
| | | } |
| | | } |
| | |
| | | int numKeys = indexBuffer.getNumberKeys(); |
| | | indexBuffer.setPos(-1); |
| | | long bufLen = 0; |
| | | byteStream.reset(); |
| | | insetByteStream.reset(); |
| | | deleteByteStream.reset(); |
| | | for(int i = 0; i < numKeys; i++) |
| | | { |
| | | if(indexBuffer.getPos() == -1) |
| | | { |
| | | indexBuffer.setPos(i); |
| | | byteStream.write(indexBuffer.getID(i)); |
| | | if(indexBuffer.isInsert(i)) |
| | | { |
| | | insetByteStream.write(indexBuffer.getIDBytes(i)); |
| | | } |
| | | else |
| | | { |
| | | deleteByteStream.write(indexBuffer.getIDBytes(i)); |
| | | } |
| | | continue; |
| | | } |
| | | |
| | | if(!indexBuffer.compare(i)) |
| | | { |
| | | int recLen = indexBuffer.getKeySize(); |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | indexBuffer.writeKey(dataStream); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | bufLen += indexBuffer.writeRecord(insetByteStream, deleteByteStream, |
| | | dataStream); |
| | | indexBuffer.setPos(i); |
| | | byteStream.reset(); |
| | | insetByteStream.reset(); |
| | | deleteByteStream.reset(); |
| | | } |
| | | byteStream.write(indexBuffer.getID(i)); |
| | | if(indexBuffer.isInsert(i)) |
| | | { |
| | | insetByteStream.write(indexBuffer.getIDBytes(i)); |
| | | } |
| | | else |
| | | { |
| | | deleteByteStream.write(indexBuffer.getIDBytes(i)); |
| | | } |
| | | } |
| | | |
| | | if(indexBuffer.getPos() != -1) |
| | | { |
| | | int recLen = indexBuffer.getKeySize(); |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | indexBuffer.writeKey(dataStream); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | bufLen += indexBuffer.writeRecord(insetByteStream, deleteByteStream, |
| | | dataStream); |
| | | } |
| | | return bufLen; |
| | | } |
| | |
| | | { |
| | | long id = 0; |
| | | long bufLen = 0; |
| | | byteStream.reset(); |
| | | insetByteStream.reset(); |
| | | deleteByteStream.reset(); |
| | | for(IndexBuffer b : buffers) |
| | | { |
| | | if(b.isPoison()) |
| | |
| | | } |
| | | } |
| | | byte[] saveKey = null; |
| | | int saveIndexID = 0; |
| | | while(!indexSortedSet.isEmpty()) |
| | | { |
| | | IndexBuffer b = indexSortedSet.first(); |
| | | indexSortedSet.remove(b); |
| | | byte[] key = b.getKeyBytes(b.getPos()); |
| | | if(saveKey == null) |
| | | { |
| | | saveKey = key; |
| | | byteStream.write(b.getID(b.getPos())); |
| | | } |
| | | else |
| | | { |
| | | if(!b.compare(saveKey)) |
| | | saveKey = b.getKeyBytes(); |
| | | saveIndexID = b.getIndexID(); |
| | | if(b.isInsert(b.getPos())) |
| | | { |
| | | int recLen = saveKey.length; |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | dataStream.writeInt(saveKey.length); |
| | | dataStream.write(saveKey); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | byteStream.reset(); |
| | | saveKey = key; |
| | | byteStream.write(b.getID(b.getPos())); |
| | | insetByteStream.write(b.getIDBytes(b.getPos())); |
| | | } |
| | | else |
| | | { |
| | | byteStream.write(b.getID(b.getPos())); |
| | | deleteByteStream.write(b.getIDBytes(b.getPos())); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if(!b.compare(saveKey, saveIndexID)) |
| | | { |
| | | bufLen += IndexBuffer.writeRecord(saveKey, saveIndexID, |
| | | insetByteStream, deleteByteStream, dataStream); |
| | | insetByteStream.reset(); |
| | | deleteByteStream.reset(); |
| | | saveKey = b.getKeyBytes(); |
| | | saveIndexID = b.getIndexID(); |
| | | if(b.isInsert(b.getPos())) |
| | | { |
| | | insetByteStream.write(b.getIDBytes(b.getPos())); |
| | | } |
| | | else |
| | | { |
| | | deleteByteStream.write(b.getIDBytes(b.getPos())); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | if(b.isInsert(b.getPos())) |
| | | { |
| | | insetByteStream.write(b.getIDBytes(b.getPos())); |
| | | } |
| | | else |
| | | { |
| | | deleteByteStream.write(b.getIDBytes(b.getPos())); |
| | | } |
| | | } |
| | | } |
| | | if(b.hasMoreData()) |
| | |
| | | } |
| | | if(saveKey != null) |
| | | { |
| | | int recLen = saveKey.length; |
| | | recLen += byteStream.size(); |
| | | recLen += 8; |
| | | bufLen += recLen; |
| | | dataStream.writeInt(saveKey.length); |
| | | dataStream.write(saveKey); |
| | | dataStream.writeInt(byteStream.size()); |
| | | byteStream.writeTo(dataStream); |
| | | bufLen += IndexBuffer.writeRecord(saveKey, saveIndexID, |
| | | insetByteStream, deleteByteStream, dataStream); |
| | | } |
| | | return bufLen; |
| | | } |
| | |
| | | { |
| | | return null; |
| | | } |
| | | /* |
| | | if(!indexBuffer.getIndexKey().getName().equals("mail.SUBSTRING")) |
| | | { |
| | | freeBufQue.add(indexBuffer); |
| | | return null; |
| | | } |
| | | */ |
| | | indexBuffer.sort(); |
| | | if(containerQueMap.containsKey(indexBuffer.getContainer())) { |
| | | if(indexKeyQueMap.containsKey(indexBuffer.getIndexKey())) { |
| | | BlockingQueue<IndexBuffer> q = |
| | | containerQueMap.get(indexBuffer.getContainer()); |
| | | indexKeyQueMap.get(indexBuffer.getIndexKey()); |
| | | q.add(indexBuffer); |
| | | } |
| | | else |
| | | { |
| | | DatabaseContainer container = indexBuffer.getContainer(); |
| | | EntryContainer entryContainer = indexBuffer.getEntryContainer(); |
| | | createIndexWriterTask(container, entryContainer); |
| | | BlockingQueue<IndexBuffer> q = containerQueMap.get(container); |
| | | createIndexWriterTask(indexBuffer.getIndexKey()); |
| | | BlockingQueue<IndexBuffer> q = |
| | | indexKeyQueMap.get(indexBuffer.getIndexKey()); |
| | | q.add(indexBuffer); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | private void createIndexWriterTask(DatabaseContainer container, |
| | | EntryContainer entryContainer) |
| | | throws FileNotFoundException |
| | | private void createIndexWriterTask(IndexKey indexKey) |
| | | throws FileNotFoundException |
| | | { |
| | | synchronized(container) { |
| | | if(containerQueMap.containsKey(container)) |
| | | boolean dn2id = false; |
| | | synchronized(synObj) |
| | | { |
| | | if(indexKeyQueMap.containsKey(indexKey)) |
| | | { |
| | | return; |
| | | } |
| | | IndexManager indexMgr; |
| | | if(container instanceof Index) |
| | | if(indexKey.getIndexType().equals(IndexType.DN)) |
| | | { |
| | | Index index = (Index) container; |
| | | indexMgr = new IndexManager(index); |
| | | dn2id = true; |
| | | } |
| | | else |
| | | { |
| | | DN2ID dn2id = (DN2ID) container; |
| | | indexMgr = new IndexManager(dn2id, entryContainer); |
| | | } |
| | | containerIndexMgrMap.put(container, indexMgr); |
| | | IndexManager indexMgr = new IndexManager(indexKey.getName(), dn2id); |
| | | indexMgrList.add(indexMgr); |
| | | BlockingQueue<IndexBuffer> newQue = |
| | | new ArrayBlockingQueue<IndexBuffer>(threadCount + 5); |
| | | new ArrayBlockingQueue<IndexBuffer>(indexBufferCount); |
| | | IndexFileWriterTask indexWriter = |
| | | new IndexFileWriterTask(newQue, indexMgr); |
| | | indexWriterList.add(indexWriter); |
| | | indexWriterFutures.add(indexProcessService.submit(indexWriter)); |
| | | containerQueMap.put(container, newQue); |
| | | indexKeyQueMap.put(indexKey, newQue); |
| | | } |
| | | } |
| | | } |
| | |
| | | private final long begin, end, id; |
| | | private long offset; |
| | | private ByteBuffer cache; |
| | | private int keyLen, idLen; |
| | | private int keyLen, idLen, limit; |
| | | private byte[] key; |
| | | private ImportIDSet idSet; |
| | | private ImportIDSet insertIDSet, deleteIDSet; |
| | | private Integer indexID = null; |
| | | private boolean doCount; |
| | | private Comparator<byte[]> comparator; |
| | | |
| | | |
| | | public Buffer(long begin, long end, long id) |
| | |
| | | } |
| | | loadCache(); |
| | | cache.flip(); |
| | | getNextRecord(); |
| | | } |
| | | |
| | | |
| | |
| | | return key; |
| | | } |
| | | |
| | | public ImportIDSet getIDSet() |
| | | public ImportIDSet getInsertIDSet() |
| | | { |
| | | return idSet; |
| | | return insertIDSet; |
| | | } |
| | | |
| | | public ImportIDSet getDeleteIDSet() |
| | | { |
| | | return deleteIDSet; |
| | | } |
| | | |
| | | public long getBufID() |
| | |
| | | return id; |
| | | } |
| | | |
| | | public Integer getIndexID() |
| | | { |
| | | if(indexID == null) |
| | | { |
| | | try { |
| | | getNextRecord(); |
| | | } catch(IOException ex) { |
| | | System.out.println("MPD need some error message"); |
| | | } |
| | | } |
| | | return indexID; |
| | | } |
| | | |
| | | public void getNextRecord() throws IOException |
| | | { |
| | | getNextIndexID(); |
| | | getContainerParams(); |
| | | getNextKey(); |
| | | getNextIDSet(); |
| | | getNextIDSet(true); //get insert ids |
| | | getNextIDSet(false); //get delete ids |
| | | } |
| | | |
| | | private void getContainerParams() |
| | | { |
| | | limit = 1; |
| | | doCount = false; |
| | | if(!indexMgr.isDN2ID()) |
| | | { |
| | | Index index = (Index) idContainerMap.get(indexID); |
| | | limit = index.getIndexEntryLimit(); |
| | | doCount = index.getMaintainCount(); |
| | | comparator = index.getComparator(); |
| | | } |
| | | else |
| | | { |
| | | DN2ID dn2id = (DN2ID) idContainerMap.get(indexID); |
| | | comparator = dn2id.getComparator(); |
| | | } |
| | | } |
| | | |
| | | private int getInt() throws IOException |
| | |
| | | cache.get(b); |
| | | } |
| | | |
| | | private void getNextIndexID() throws IOException, BufferUnderflowException |
| | | { |
| | | indexID = new Integer(getInt()); |
| | | } |
| | | |
| | | private void getNextKey() throws IOException, BufferUnderflowException |
| | | { |
| | | keyLen = getInt(); |
| | | key = new byte[keyLen]; |
| | | getBytes(key); |
| | | getBytes(key); |
| | | } |
| | | |
| | | |
| | | private void getNextIDSet() throws IOException, BufferUnderflowException |
| | | private void getNextIDSet(boolean insert) |
| | | throws IOException, BufferUnderflowException |
| | | { |
| | | idLen = getInt(); |
| | | int idCount = idLen/8; |
| | | idSet = new ImportIDSet(idCount); |
| | | |
| | | if(insert) |
| | | { |
| | | insertIDSet = new ImportIDSet(idCount, limit, doCount); |
| | | } |
| | | else |
| | | { |
| | | deleteIDSet = new ImportIDSet(idCount, limit, doCount); |
| | | } |
| | | for(int i = 0; i < idCount; i++) |
| | | { |
| | | long l = getLong(); |
| | | idSet.addEntryID(l, indexMgr.getLimit(), indexMgr.getMaintainCount()); |
| | | if(insert) |
| | | { |
| | | insertIDSet.addEntryID(l); |
| | | } |
| | | else |
| | | { |
| | | deleteIDSet.addEntryID(l); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | public int compareTo(Buffer o) { |
| | | if(key == null) { |
| | | if(id == o.getBufID()) |
| | | { |
| | | return 0; |
| | | } |
| | | else |
| | | { |
| | | return id > o.getBufID() ? 1 : -1; |
| | | } |
| | | |
| | | private int compare(byte[] cKey, Integer cIndexID) |
| | | { |
| | | |
| | | int rc; |
| | | if(key == null) |
| | | { |
| | | getIndexID(); |
| | | } |
| | | if(comparator.compare(key, cKey) != 0) { |
| | | rc = 1; |
| | | } |
| | | else |
| | | { |
| | | rc = (indexID.intValue() == cIndexID.intValue()) ? 0 : 1; |
| | | } |
| | | return rc; |
| | | } |
| | | |
| | | |
| | | |
| | | public int compareTo(Buffer o) { |
| | | //used in remove. |
| | | if(this.equals(o)) |
| | | { |
| | | return 0; |
| | | } |
| | | int rc = indexMgr.getComparator().compare(key, o.getKey()); |
| | | if(key == null) { |
| | | getIndexID(); |
| | | } |
| | | if(o.getKey() == null) |
| | | { |
| | | o.getIndexID(); |
| | | } |
| | | int rc = comparator.compare(key, o.getKey()); |
| | | if(rc == 0) |
| | | { |
| | | if(idSet.isDefined()) |
| | | if(indexID.intValue() == o.getIndexID().intValue()) |
| | | { |
| | | return -1; |
| | | if(insertIDSet.isDefined()) |
| | | { |
| | | rc = -1; |
| | | } |
| | | else if(o.getInsertIDSet().isDefined()) |
| | | { |
| | | rc = 1; |
| | | } |
| | | else if(insertIDSet.size() == o.getInsertIDSet().size()) |
| | | { |
| | | rc = id > o.getBufID() ? 1 : -1; |
| | | } |
| | | else |
| | | { |
| | | rc = insertIDSet.size() - o.getInsertIDSet().size(); |
| | | } |
| | | } |
| | | else if(o.getIDSet().isDefined()) |
| | | else if(indexID.intValue() > o.getIndexID().intValue()) |
| | | { |
| | | return 1; |
| | | } |
| | | else if(idSet.size() == o.getIDSet().size()) |
| | | { |
| | | rc = id > o.getBufID() ? 1 : -1; |
| | | rc = 1; |
| | | } |
| | | else |
| | | { |
| | | rc = idSet.size() - o.getIDSet().size(); |
| | | rc = -1; |
| | | } |
| | | } |
| | | return rc; |
| | |
| | | */ |
| | | private final class IndexManager |
| | | { |
| | | private final Index index; |
| | | private final DN2ID dn2id; |
| | | private final EntryContainer entryContainer; |
| | | private final File file; |
| | | |
| | | |
| | | private RandomAccessFile raf = null; |
| | | private final List<Buffer> bufferList = new LinkedList<Buffer>(); |
| | | private final int limit; |
| | | private long fileLength, bytesRead = 0; |
| | | private final boolean maintainCount; |
| | | private final Comparator<byte[]> comparator; |
| | | private boolean done = false; |
| | | private long totalDNS; |
| | | private AtomicInteger keyCount = new AtomicInteger(0); |
| | | private final String name; |
| | | private final boolean dn2id; |
| | | |
| | | public IndexManager(Index index) |
| | | public IndexManager(String name, boolean dn2id) |
| | | { |
| | | this.index = index; |
| | | dn2id = null; |
| | | file = new File(tempDir, index.getName()); |
| | | name = index.getName(); |
| | | limit = index.getIndexEntryLimit(); |
| | | maintainCount = index.getMaintainCount(); |
| | | comparator = index.getComparator(); |
| | | entryContainer = null; |
| | | } |
| | | |
| | | |
| | | public IndexManager(DN2ID dn2id, EntryContainer entryContainer) |
| | | { |
| | | index = null; |
| | | file = new File(tempDir, name); |
| | | this.name = name; |
| | | this.dn2id = dn2id; |
| | | file = new File(tempDir, dn2id.getName()); |
| | | limit = 1; |
| | | maintainCount = false; |
| | | comparator = dn2id.getComparator(); |
| | | this.entryContainer = entryContainer; |
| | | name = dn2id.getName(); |
| | | } |
| | | |
| | | public void init() throws FileNotFoundException |
| | |
| | | raf.close(); |
| | | } |
| | | |
| | | public int getLimit() |
| | | { |
| | | return limit; |
| | | } |
| | | |
| | | public boolean getMaintainCount() |
| | | { |
| | | return maintainCount; |
| | | } |
| | | |
| | | public Comparator<byte[]> getComparator() |
| | | { |
| | | return comparator; |
| | | } |
| | | |
| | | public Index getIndex() |
| | | { |
| | | return index; |
| | | } |
| | | |
| | | public void setFileLength() |
| | | { |
| | | this.fileLength = file.length(); |
| | |
| | | } |
| | | |
| | | |
| | | public long getTotDNCount() |
| | | public long getDNCount() |
| | | { |
| | | return totalDNS; |
| | | } |
| | | |
| | | public boolean isDN2ID() |
| | | { |
| | | return dn2id; |
| | | } |
| | | |
| | | public void printStats(long deltaTime) |
| | | { |
| | |
| | | { |
| | | keyCount.incrementAndGet(); |
| | | } |
| | | |
| | | public String getName() |
| | | { |
| | | return name; |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | // Suspend output. |
| | | private boolean pause = false; |
| | | |
| | | private final Map<DatabaseContainer, IndexManager> containerIndexMgrMap; |
| | | private final List<IndexManager> indexMgrList; |
| | | |
| | | |
| | | /** |
| | | * Create a new import progress task. |
| | | * @param containerIndexMgrMap Map of database container objects to |
| | | * index manager objects. |
| | | * @param indexMgrList List of index managers. |
| | | */ |
| | | public SecondPhaseProgressTask(Map<DatabaseContainer, |
| | | IndexManager> containerIndexMgrMap) |
| | | public SecondPhaseProgressTask (List<IndexManager> indexMgrList) |
| | | { |
| | | previousTime = System.currentTimeMillis(); |
| | | this.containerIndexMgrMap = containerIndexMgrMap; |
| | | this.indexMgrList = indexMgrList; |
| | | try |
| | | { |
| | | prevEnvStats = |
| | |
| | | previousCount = latestCount; |
| | | previousTime = latestTime; |
| | | |
| | | for(Map.Entry<DatabaseContainer, IndexManager> e : |
| | | containerIndexMgrMap.entrySet()) |
| | | for(IndexManager indexMgr : indexMgrList) |
| | | { |
| | | IndexManager indexMgr = e.getValue(); |
| | | indexMgr.printStats(deltaTime); |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | /** |
| | | * A class to hold information about the entry determined by the LDIF reader. |
| | | * |
| | | */ |
| | | public class EntryInformation |
| | | { |
| | | private EntryID entryID; |
| | | private Suffix suffix; |
| | | |
| | | |
| | | /** |
| | | * Return the suffix associated with the entry. |
| | | * |
| | | * @return Entry's suffix instance; |
| | | */ |
| | | public Suffix getSuffix() |
| | | { |
| | | return suffix; |
| | | } |
| | | |
| | | /** |
| | | * Set the suffix instance associated with the entry. |
| | | * |
| | | * @param suffix The suffix associated with the entry. |
| | | */ |
| | | public void setSuffix(Suffix suffix) |
| | | { |
| | | this.suffix = suffix; |
| | | } |
| | | |
| | | /** |
| | | * Set the entry's ID. |
| | | * |
| | | * @param entryID The entry ID to set the entry ID to. |
| | | */ |
| | | public void setEntryID(EntryID entryID) |
| | | { |
| | | this.entryID = entryID; |
| | | } |
| | | |
| | | /** |
| | | * Return the entry ID associated with the entry. |
| | | * |
| | | * @return The entry ID associated with the entry. |
| | | */ |
| | | public EntryID getEntryID() |
| | | { |
| | | return entryID; |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * This class defines the individual index type available. |
| | | * |
| | | */ |
| | | public enum IndexType { |
| | | /** |
| | | * The DN index type. |
| | | **/ |
| | | DN, |
| | | |
| | | /** |
| | | * The equality index type. |
| | | **/ |
| | | EQUALITY, |
| | | |
| | | /** |
| | | * The presence index type. |
| | | **/ |
| | | PRESENCE, |
| | | |
| | | /** |
| | | * The substring index type. |
| | | **/ |
| | | SUBSTRING, |
| | | |
| | | /** |
| | | * The ordering index type. |
| | | **/ |
| | | ORDERING, |
| | | |
| | | /** |
| | | * The approximate index type. |
| | | **/ |
| | | APPROXIMATE, |
| | | |
| | | /** |
| | | * The extensible substring index type. |
| | | **/ |
| | | EX_SUBSTRING, |
| | | |
| | | /** |
| | | * The extensible shared index type. |
| | | **/ |
| | | EX_SHARED; |
| | | } |
| | | |
| | | |
| | | /** |
| | | * This class is used as and index key for several hash maps that need to |
| | | * process multiple suffix index elements into a single que or map based on |
| | | * both attribute type and index type (ie., cn.equality, sn.equality,...). |
| | | * |
| | | * It tries to perform some optimization if the index is a substring index. |
| | | */ |
| | | public class IndexKey { |
| | | |
| | | private final AttributeType type; |
| | | private final IndexType indexType; |
| | | private byte[] keyBytes = null; |
| | | |
| | | /** |
| | | * Create index key instance using the specified attribute type, index type |
| | | * and substring length. Used only for substring indexes. |
| | | * |
| | | * @param type The attribute type. |
| | | * @param indexType The index type. |
| | | * @param subLen The substring length. |
| | | */ |
| | | IndexKey(AttributeType type, IndexType indexType, int subLen) |
| | | { |
| | | this(type, indexType); |
| | | keyBytes = new byte[subLen]; |
| | | } |
| | | |
| | | /** |
| | | * Create index key instance using the specified attribute type, index type. |
| | | * |
| | | * @param type The attribute type. |
| | | * @param indexType The index type. |
| | | */ |
| | | IndexKey(AttributeType type, IndexType indexType) |
| | | { |
| | | this.type = type; |
| | | this.indexType = indexType; |
| | | } |
| | | |
| | | /** |
| | | * An equals method that uses both the attribute type and the index type. |
| | | * |
| | | * @param obj the object to compare. |
| | | * @return <CODE>true</CODE> if the objects are equal. |
| | | */ |
| | | public boolean equals(Object obj) |
| | | { |
| | | IndexKey oKey = (IndexKey) obj; |
| | | boolean rc = false; |
| | | if(type.equals(oKey.getType()) && indexType.equals(oKey.getIndexType())) |
| | | { |
| | | rc = true; |
| | | } |
| | | return rc; |
| | | } |
| | | |
| | | /** |
| | | * An hashcode method that adds the hashcodes of the attribute type and |
| | | * index type and returns that value. |
| | | * |
| | | * @return The combined hash values. |
| | | */ |
| | | public int hashCode() |
| | | { |
| | | return type.hashCode() + indexType.hashCode(); |
| | | } |
| | | |
| | | /** |
| | | * Return the attribute type. |
| | | * |
| | | * @return The attribute type. |
| | | */ |
| | | public AttributeType getType() |
| | | { |
| | | return type; |
| | | } |
| | | |
| | | /** |
| | | * Return the index type. |
| | | * @return The index type. |
| | | */ |
| | | public IndexType getIndexType() |
| | | { |
| | | return indexType; |
| | | } |
| | | |
| | | /** |
| | | * Return the index key name, which is the attribute type primary name, |
| | | * a period, and the index type name. Used for building file names and |
| | | * output. |
| | | * |
| | | * @return The index key name. |
| | | */ |
| | | public String getName() |
| | | { |
| | | return type.getPrimaryName() + "." + |
| | | StaticUtils.toLowerCase(indexType.name()); |
| | | } |
| | | |
| | | /** |
| | | * Returns a preallocated byte array having substring len size if the |
| | | * index key is a substring index and the desired size is equal to substring |
| | | * len size. This is a performance hack for substring indexes only. |
| | | * |
| | | * @param size The size of byte array desired. |
| | | * @return Either a preallocated byte array, or a freshly created one using |
| | | * the size parameter. |
| | | */ |
| | | public byte[] getKeyBytes(int size) |
| | | { |
| | | if(keyBytes != null) |
| | | { |
| | | if(size == keyBytes.length) |
| | | { |
| | | return this.keyBytes; |
| | | } |
| | | else |
| | | { |
| | | return new byte[size]; |
| | | } |
| | | } |
| | | else |
| | | { |
| | | return new byte[size]; |
| | | } |
| | | } |
| | | } |
| | | } |