| | |
| | | |
| | | import static org.opends.messages.JebMessages.*; |
| | | import static org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType.*; |
| | | import static org.opends.server.backends.pluggable.EntryIDSet.*; |
| | | import static org.opends.server.backends.pluggable.IndexOutputBuffer.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.util.ServerConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static org.opends.server.backends.pluggable.EntryIDSet.*; |
| | | |
| | | import java.io.BufferedInputStream; |
| | | import java.io.BufferedOutputStream; |
| | |
| | | import java.util.concurrent.ScheduledThreadPoolExecutor; |
| | | import java.util.concurrent.Semaphore; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicBoolean; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | |
| | | |
| | | /** Map of DNs to Suffix objects. */ |
| | | private final Map<DN, Suffix> dnSuffixMap = new LinkedHashMap<DN, Suffix>(); |
| | | /** Map of container ids to database containers. */ |
| | | private final ConcurrentHashMap<Integer, Index> idContainerMap = new ConcurrentHashMap<Integer, Index>(); |
| | | /** Map of container ids to entry containers. */ |
| | | private final ConcurrentHashMap<Integer, EntryContainer> idECMap = |
| | | /** Map of indexIDs to database containers. */ |
| | | private final ConcurrentHashMap<Integer, Index> indexIDToIndexMap = new ConcurrentHashMap<Integer, Index>(); |
| | | /** Map of indexIDs to entry containers. */ |
| | | private final ConcurrentHashMap<Integer, EntryContainer> indexIDToECMap = |
| | | new ConcurrentHashMap<Integer, EntryContainer>(); |
| | | |
| | | /** Used to synchronize when a scratch file index writer is first setup. */ |
| | |
| | | * @throws ConfigException |
| | | * If a problem occurs during initialization. |
| | | */ |
| | | public Importer(RebuildConfig rebuildConfig, PersistitBackendCfg cfg) throws InitializationException, |
| | | StorageRuntimeException, ConfigException |
| | | Importer(RebuildConfig rebuildConfig, PersistitBackendCfg cfg) |
| | | throws InitializationException, StorageRuntimeException, ConfigException |
| | | { |
| | | this.importConfiguration = null; |
| | | this.backendConfiguration = cfg; |
| | |
| | | * @throws StorageRuntimeException |
| | | * If an error occurred when opening the DB. |
| | | */ |
| | | public Importer(LDIFImportConfig importConfiguration, PersistitBackendCfg backendCfg) |
| | | Importer(LDIFImportConfig importConfiguration, PersistitBackendCfg backendCfg) |
| | | throws InitializationException, ConfigException, StorageRuntimeException |
| | | { |
| | | this.rebuildManager = null; |
| | |
| | | { |
| | | if (index != null) |
| | | { |
| | | idContainerMap.putIfAbsent(getIndexID(index), index); |
| | | indexIDToIndexMap.putIfAbsent(getIndexID(index), index); |
| | | } |
| | | } |
| | | |
| | |
| | | * @param txn |
| | | * The database transaction |
| | | * @return A LDIF result. |
| | | * @throws ConfigException |
| | | * If the import failed because of an configuration error. |
| | | * @throws InitializationException |
| | | * If the import failed because of an initialization error. |
| | | * @throws StorageRuntimeException |
| | | * If the import failed due to a database error. |
| | | * @throws InterruptedException |
| | | * If the import failed due to an interrupted error. |
| | | * @throws ExecutionException |
| | | * If the import failed due to an execution error. |
| | | * @throws Exception |
| | | * If the import failed |
| | | */ |
| | | public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) |
| | | throws ConfigException, InitializationException, StorageRuntimeException, |
| | | InterruptedException, ExecutionException |
| | | public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception |
| | | { |
| | | this.rootContainer = rootContainer; |
| | | try { |
| | |
| | | BUILD_ID, REVISION_NUMBER); |
| | | logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount); |
| | | initializeSuffixes(txn); |
| | | setIndexesTrusted(false); |
| | | setIndexesTrusted(txn, false); |
| | | |
| | | final long startTime = System.currentTimeMillis(); |
| | | importPhaseOne(txn); |
| | |
| | | } |
| | | final long phaseTwoFinishTime = System.currentTimeMillis(); |
| | | |
| | | setIndexesTrusted(true); |
| | | setIndexesTrusted(txn, true); |
| | | switchEntryContainers(txn); |
| | | recursiveDelete(tempDir); |
| | | final long finishTime = System.currentTimeMillis(); |
| | |
| | | } |
| | | } |
| | | |
| | | private void setIndexesTrusted(boolean trusted) throws StorageRuntimeException |
| | | private void setIndexesTrusted(WriteableStorage txn, boolean trusted) throws StorageRuntimeException |
| | | { |
| | | try |
| | | { |
| | | for (Suffix s : dnSuffixMap.values()) |
| | | { |
| | | s.setIndexesTrusted(trusted); |
| | | s.setIndexesTrusted(txn, trusted); |
| | | } |
| | | } |
| | | catch (StorageRuntimeException ex) |
| | |
| | | bufferSortService = Executors.newFixedThreadPool(threadCount); |
| | | final ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | tasks.add(new MigrateExistingTask(txn)); |
| | | getAll(execService.invokeAll(tasks)); |
| | | tasks.clear(); |
| | | execService.submit(new MigrateExistingTask(txn)).get(); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount); |
| | | if (importConfiguration.appendToExistingData() |
| | | && importConfiguration.replaceExistingEntries()) |
| | | { |
| | |
| | | getAll(execService.invokeAll(tasks)); |
| | | tasks.clear(); |
| | | |
| | | tasks.add(new MigrateExcludedTask(txn)); |
| | | getAll(execService.invokeAll(tasks)); |
| | | execService.submit(new MigrateExcludedTask(txn)).get(); |
| | | |
| | | stopScratchFileWriters(); |
| | | getAll(scratchFileWriterFutures); |
| | |
| | | } |
| | | } |
| | | |
| | | private void processAttribute(Index index, ImportIndexType presence, Entry entry, |
| | | AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException |
| | | { |
| | | if (index != null) |
| | | { |
| | | IndexKey indexKey = new IndexKey(attributeType, presence, index.getIndexEntryLimit()); |
| | | processAttribute(index, entry, entryID, options, indexKey); |
| | | } |
| | | } |
| | | |
| | | private void processAttributes(Collection<Index> indexes, ImportIndexType indexType, Entry entry, |
| | | AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException |
| | | { |
| | |
| | | { |
| | | for (Index index : indexes) |
| | | { |
| | | IndexKey indexKey = new IndexKey(attributeType, indexType, index.getIndexEntryLimit()); |
| | | processAttribute(index, entry, entryID, options, indexKey); |
| | | processAttribute(index, indexType, entry, attributeType, entryID, options); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void processAttribute(Index index, ImportIndexType indexType, Entry entry, |
| | | AttributeType attributeType, EntryID entryID, IndexingOptions options) throws InterruptedException |
| | | { |
| | | if (index != null) |
| | | { |
| | | IndexKey indexKey = new IndexKey(attributeType, indexType, index.getIndexEntryLimit()); |
| | | processAttribute(index, entry, entryID, options, indexKey); |
| | | } |
| | | } |
| | | |
| | | void processAttribute(Index index, Entry entry, EntryID entryID, IndexingOptions options, |
| | | IndexKey indexKey) throws StorageRuntimeException, InterruptedException |
| | | { |
| | |
| | | { |
| | | DN2ID dn2id = suffix.getDN2ID(); |
| | | ByteString dnBytes = JebFormat.dnToDNKey(dn, suffix.getBaseDN().size()); |
| | | int id = processKey(dn2id, dnBytes, entryID, dnIndexKey, true); |
| | | idECMap.putIfAbsent(id, suffix.getEntryContainer()); |
| | | int indexID = processKey(dn2id, dnBytes, entryID, dnIndexKey, true); |
| | | indexIDToECMap.putIfAbsent(indexID, suffix.getEntryContainer()); |
| | | } |
| | | |
| | | void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry) throws StorageRuntimeException |
| | |
| | | return new ImportIDSet(key, newDefinedSet(), 1, false); |
| | | } |
| | | |
| | | final Index index = idContainerMap.get(indexID); |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | return new ImportIDSet(key, newDefinedSet(), index.getIndexEntryLimit(), index.getMaintainCount()); |
| | | } |
| | | |
| | |
| | | { |
| | | if (deleteSet.size() > 0 || !deleteSet.isDefined()) |
| | | { |
| | | final Index index = idContainerMap.get(indexID); |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | index.delete(txn, deleteSet); |
| | | } |
| | | if (insertSet.size() > 0 || !insertSet.isDefined()) |
| | | { |
| | | final Index index = idContainerMap.get(indexID); |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | index.insert(txn, insertSet); |
| | | } |
| | | } |
| | |
| | | DNState dnState; |
| | | if (!dnStateMap.containsKey(indexID)) |
| | | { |
| | | dnState = new DNState(idECMap.get(indexID)); |
| | | dnState = new DNState(indexIDToECMap.get(indexID)); |
| | | dnStateMap.put(indexID, dnState); |
| | | } |
| | | else |
| | |
| | | */ |
| | | private final class TmpEnv implements DNCache |
| | | { |
| | | private Storage storage; |
| | | private final Storage storage; |
| | | private WriteableStorage txn; |
| | | private org.opends.server.backends.pluggable.spi.Importer importer; |
| | | private static final String DB_NAME = "dn_cache"; |
| | | private TreeName dnCache = new TreeName("", DB_NAME); |
| | | private final TreeName dnCache = new TreeName("", DB_NAME); |
| | | |
| | | /** |
| | | * Create a temporary DB environment and database to be used as a cache of |
| | |
| | | |
| | | private boolean insert(ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException |
| | | { |
| | | Cursor cursor = null; |
| | | try |
| | | return txn.update(dnCache, key, new UpdateFunction() |
| | | { |
| | | final AtomicBoolean result = new AtomicBoolean(); |
| | | txn.update(dnCache, key, new UpdateFunction() |
| | | @Override |
| | | public ByteSequence computeNewValue(ByteSequence existingDns) |
| | | { |
| | | @Override |
| | | public ByteSequence computeNewValue(ByteSequence existingDns) |
| | | if (containsDN(existingDns, dn)) |
| | | { |
| | | if (existingDns != null) |
| | | { |
| | | if (isDNMatched(existingDns, dn)) |
| | | { |
| | | // dn is already present, no change |
| | | result.set(false); |
| | | return existingDns; |
| | | } |
| | | else |
| | | { |
| | | // dn is not present in the list, add it |
| | | result.set(true); |
| | | return addDN(existingDns, dn); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | // no previous data, create a new list |
| | | result.set(true); |
| | | return singletonList(dn); |
| | | } |
| | | // no change |
| | | return existingDns; |
| | | } |
| | | |
| | | /** Add the DN to the DNs because of a hash collision. */ |
| | | private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd) |
| | | else if (existingDns != null) |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length()); |
| | | builder.append(dnList); |
| | | builder.append(dntoAdd.length()); |
| | | builder.append(dntoAdd); |
| | | return builder; |
| | | return addDN(existingDns, dn); |
| | | } |
| | | |
| | | /** Create a list of dn made of one element. */ |
| | | private ByteSequence singletonList(final ByteSequence dntoAdd) |
| | | else |
| | | { |
| | | final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE); |
| | | singleton.append(dntoAdd.length()); |
| | | singleton.append(dntoAdd); |
| | | return singleton; |
| | | return singletonList(dn); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | return result.get(); |
| | | } |
| | | finally |
| | | { |
| | | close(cursor); |
| | | } |
| | | /** Add the DN to the DNs because of a hash collision. */ |
| | | private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd) |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length()); |
| | | builder.append(dnList); |
| | | builder.append(dntoAdd.length()); |
| | | builder.append(dntoAdd); |
| | | return builder; |
| | | } |
| | | |
| | | /** Create a list of dn made of one element. */ |
| | | private ByteSequence singletonList(final ByteSequence dntoAdd) |
| | | { |
| | | final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE); |
| | | singleton.append(dntoAdd.length()); |
| | | singleton.append(dntoAdd); |
| | | return singleton; |
| | | } |
| | | }); |
| | | } |
| | | |
| | | /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */ |
| | | private boolean isDNMatched(ByteSequence existingDns, ByteStringBuilder dn) |
| | | private boolean containsDN(ByteSequence existingDns, ByteStringBuilder dn) |
| | | { |
| | | final ByteSequenceReader reader = existingDns.asReader(); |
| | | int previousPos = 0; |
| | | while (reader.remaining() != 0) |
| | | if (existingDns != null && existingDns.length() > 0) |
| | | { |
| | | int pLen = INT_SIZE; |
| | | int len = reader.getInt(); |
| | | // TODO JNR remove call to toByteArray() on next line |
| | | // TODO JNR remove call to toByteArray() on next line? |
| | | final byte[] existingDnsBytes = existingDns.toByteArray(); |
| | | if (indexComparator.compare(existingDnsBytes, previousPos + pLen, len, dn.getBackingArray(), dn.length()) == 0) |
| | | final ByteSequenceReader reader = existingDns.asReader(); |
| | | int previousPos = 0; |
| | | while (reader.remaining() != 0) |
| | | { |
| | | return true; |
| | | int pLen = INT_SIZE; |
| | | int len = reader.getInt(); |
| | | if (indexComparator.compare(existingDnsBytes, previousPos+pLen, len, dn.getBackingArray(), dn.length()) == 0) |
| | | { |
| | | return true; |
| | | } |
| | | previousPos = reader.position(); |
| | | } |
| | | previousPos = reader.position(); |
| | | } |
| | | return false; |
| | | } |
| | |
| | | if (existingDns != null) |
| | | { |
| | | final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString()); |
| | | return isDNMatched(existingDns, dnBytes); |
| | | return containsDN(existingDns, dnBytes); |
| | | } |
| | | return false; |
| | | } |