| | |
| | | import java.io.IOException; |
| | | import java.lang.reflect.InvocationHandler; |
| | | import java.lang.reflect.Method; |
| | | import java.lang.reflect.Proxy; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | | import java.util.HashMap; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.List; |
| | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteSequenceReader; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType; |
| | | import org.opends.server.admin.std.server.BackendIndexCfg; |
| | | import org.opends.server.admin.std.server.PersistitBackendCfg; |
| | | import org.opends.server.admin.std.server.PluggableBackendCfg; |
| | | import org.opends.server.backends.persistit.PersistItStorage; |
| | | import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex; |
| | | import org.opends.server.backends.pluggable.ImportLDIFReader.EntryInformation; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.DNCache; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeBufferImporter.IndexKey; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadOperation; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.Storage; |
| | | import org.opends.server.backends.pluggable.spi.StorageRuntimeException; |
| | | import org.opends.server.backends.pluggable.spi.TreeName; |
| | | import org.opends.server.backends.pluggable.spi.UpdateFunction; |
| | | import org.opends.server.backends.pluggable.spi.WriteOperation; |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | import org.opends.server.core.DirectoryServer; |
| | |
| | | |
| | | private static final int TIMER_INTERVAL = 10000; |
| | | private static final String DEFAULT_TMP_DIR = "import-tmp"; |
| | | private static final String DN_CACHE_DIR = "dn-cache"; |
| | | |
| | | /** Defaults for DB cache. */ |
| | | private static final int MAX_DB_CACHE_SIZE = 8 * MB; |
| | |
| | | |
| | | /** Temp scratch directory. */ |
| | | private final File tempDir; |
| | | /** DN cache used when DN validation is done in first phase. */ |
| | | private final DNCache dnCache; |
| | | /** Size in bytes of DN cache. */ |
| | | private long dnCacheSize; |
| | | /** Available memory at the start of the import. */ |
| | |
| | | this.tempDir = prepareTempDir(backendCfg, importCfg.getTmpDirectory()); |
| | | // be careful: requires that a few data has been set |
| | | computeMemoryRequirements(); |
| | | |
| | | if (validateDNs) |
| | | { |
| | | final File dnCachePath = new File(tempDir, DN_CACHE_DIR); |
| | | dnCachePath.mkdirs(); |
| | | this.dnCache = new DNCacheImpl(dnCachePath); |
| | | } |
| | | else |
| | | { |
| | | this.dnCache = null; |
| | | } |
| | | } |
| | | |
| | | private File prepareTempDir(PluggableBackendCfg backendCfg, String tmpDirectory) throws InitializationException |
| | |
| | | final long startTime = System.currentTimeMillis(); |
| | | importPhaseOne(); |
| | | final long phaseOneFinishTime = System.currentTimeMillis(); |
| | | if (validateDNs) |
| | | { |
| | | dnCache.close(); |
| | | } |
| | | |
| | | if (isCanceled) |
| | | { |
| | |
| | | finally |
| | | { |
| | | close(reader); |
| | | if (validateDNs) |
| | | { |
| | | close(dnCache); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | final ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | final Storage storage = rootContainer.getStorage(); |
| | | execService.submit(new MigrateExistingTask(storage)).get(); |
| | | final Importer importer = storage.startImport(); |
| | | execService.submit(new MigrateExistingTask(storage, importer)).get(); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<>(threadCount); |
| | | if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries()) |
| | | { |
| | | for (int i = 0; i < threadCount; i++) |
| | | { |
| | | tasks.add(new ImportTask(storage)); |
| | | tasks.add(new ImportTask(importer)); |
| | | } |
| | | } |
| | | execService.invokeAll(tasks); |
| | | tasks.clear(); |
| | | |
| | | execService.submit(new MigrateExcludedTask(storage)).get(); |
| | | execService.submit(new MigrateExcludedTask(storage, importer)).get(); |
| | | |
| | | shutdownAll(timerService, execService); |
| | | } |
| | |
| | | /** Task used to migrate excluded branch. */ |
| | | private final class MigrateExcludedTask extends ImportTask |
| | | { |
| | | private MigrateExcludedTask(final Storage storage) |
| | | private final Storage storage; |
| | | |
| | | private MigrateExcludedTask(final Storage storage, final Importer importer) |
| | | { |
| | | super(storage); |
| | | super(importer); |
| | | this.storage = storage; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | void call0(WriteableTransaction txn) throws Exception |
| | | public Void call() throws Exception |
| | | { |
| | | storage.read(new ReadOperation<Void>() |
| | | { |
| | | @Override |
| | | public Void run(ReadableTransaction txn) throws Exception |
| | | { |
| | | call0(txn); |
| | | return null; |
| | | } |
| | | }); |
| | | return null; |
| | | } |
| | | |
| | | private void call0(ReadableTransaction txn) throws Exception |
| | | { |
| | | for (Suffix suffix : dnSuffixMap.values()) |
| | | { |
| | |
| | | { |
| | | EntryID id = new EntryID(cursor.getValue()); |
| | | Entry entry = entryContainer.getID2Entry().get(txn, id); |
| | | processEntry(txn, entry, rootContainer.getNextEntryID(), suffix); |
| | | processEntry(entry, rootContainer.getNextEntryID(), suffix); |
| | | migratedCount++; |
| | | success = cursor.next(); |
| | | } |
| | |
| | | /** Task to migrate existing entries. */ |
| | | private final class MigrateExistingTask extends ImportTask |
| | | { |
| | | private MigrateExistingTask(final Storage storage) |
| | | private final Storage storage; |
| | | |
| | | private MigrateExistingTask(final Storage storage, final Importer importer) |
| | | { |
| | | super(storage); |
| | | super(importer); |
| | | this.storage = storage; |
| | | } |
| | | |
| | | @Override |
| | | void call0(WriteableTransaction txn) throws Exception |
| | | public Void call() throws Exception |
| | | { |
| | | storage.read(new ReadOperation<Void>() |
| | | { |
| | | @Override |
| | | public Void run(ReadableTransaction txn) throws Exception |
| | | { |
| | | call0(txn); |
| | | return null; |
| | | } |
| | | }); |
| | | return null; |
| | | } |
| | | |
| | | private void call0(ReadableTransaction txn) throws Exception |
| | | { |
| | | for (Suffix suffix : dnSuffixMap.values()) |
| | | { |
| | |
| | | { |
| | | EntryID id = new EntryID(key); |
| | | Entry entry = entryContainer.getID2Entry().get(txn, id); |
| | | processEntry(txn, entry, rootContainer.getNextEntryID(), suffix); |
| | | processEntry(entry, rootContainer.getNextEntryID(), suffix); |
| | | migratedCount++; |
| | | success = cursor.next(); |
| | | } |
| | |
| | | */ |
| | | private class ImportTask implements Callable<Void> |
| | | { |
| | | private final Storage storage; |
| | | private final Importer importer; |
| | | |
| | | public ImportTask(final Storage storage) |
| | | public ImportTask(final Importer importer) |
| | | { |
| | | this.storage = storage; |
| | | this.importer = importer; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public final Void call() throws Exception |
| | | public Void call() throws Exception |
| | | { |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | call0(txn); |
| | | } |
| | | }); |
| | | call0(); |
| | | return null; |
| | | } |
| | | |
| | | void call0(WriteableTransaction txn) throws Exception |
| | | void call0() throws Exception |
| | | { |
| | | try |
| | | { |
| | |
| | | { |
| | | return; |
| | | } |
| | | processEntry(txn, entryInfo.getEntry(), entryInfo.getEntryID(), entryInfo.getSuffix()); |
| | | processEntry(entryInfo.getEntry(), entryInfo.getEntryID(), entryInfo.getSuffix()); |
| | | } |
| | | } |
| | | catch (Exception e) |
| | |
| | | } |
| | | } |
| | | |
| | | void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix) |
| | | void processEntry(Entry entry, EntryID entryID, Suffix suffix) |
| | | throws DirectoryException, StorageRuntimeException, InterruptedException |
| | | { |
| | | DN entryDN = entry.getName(); |
| | | if (validateDNs && !dnSanityCheck(txn, entry, suffix)) |
| | | try |
| | | { |
| | | suffix.removePending(entryDN); |
| | | if (validateDNs && !dnSanityCheck(entry, entryID, suffix)) |
| | | { |
| | | return; |
| | | } |
| | | suffix.removePending(entryDN); |
| | | processDN2ID(suffix, entryDN, entryID); |
| | | } |
| | | finally |
| | | { |
| | | suffix.removePending(entry.getName()); |
| | | } |
| | | |
| | | if (!validateDNs) |
| | | { |
| | | processDN2ID(suffix, entry.getName(), entryID); |
| | | } |
| | | processDN2URI(suffix, entry); |
| | | processIndexes(suffix, entry, entryID, false); |
| | | processVLVIndexes(txn, suffix, entry, entryID); |
| | | suffix.getID2Entry().put(txn, entryID, entry); |
| | | processVLVIndexes(suffix, entry, entryID); |
| | | // FIXME JNR run a dedicated thread to do the puts ordered by entryID |
| | | // suffix.getID2Entry().put(importer, entryID, entry); |
| | | importCount.getAndIncrement(); |
| | | } |
| | | |
| | |
| | | * @return true if the import operation can proceed with the provided entry, false otherwise |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | boolean dnSanityCheck(WriteableTransaction txn, Entry entry, Suffix suffix) |
| | | boolean dnSanityCheck(Entry entry, EntryID entryID, Suffix suffix) |
| | | throws StorageRuntimeException, InterruptedException |
| | | { |
| | | //Perform parent checking. |
| | | DN entryDN = entry.getName(); |
| | | DN parentDN = suffix.getEntryContainer().getParentWithinBase(entryDN); |
| | | if (parentDN != null && !suffix.isParentProcessed(txn, parentDN, dnCache, clearedBackend)) |
| | | DNCache dnCache = new Dn2IdDnCache(suffix, rootContainer.getStorage()); |
| | | if (parentDN != null && !suffix.isParentProcessed(parentDN, dnCache)) |
| | | { |
| | | reader.rejectEntry(entry, ERR_IMPORT_PARENT_NOT_FOUND.get(parentDN)); |
| | | return false; |
| | | } |
| | | if (!insert(txn, entryDN, suffix, dnCache)) |
| | | if (!dnCache.insert(entryDN, entryID)) |
| | | { |
| | | reader.rejectEntry(entry, WARN_IMPORT_ENTRY_EXISTS.get()); |
| | | return false; |
| | |
| | | return true; |
| | | } |
| | | |
| | | private boolean insert(WriteableTransaction txn, DN entryDN, Suffix suffix, DNCache dnCache) |
| | | { |
| | | //If the backend was not cleared, then first check dn2id |
| | | //for DNs that might not exist in the DN cache. |
| | | if (!clearedBackend && suffix.getDN2ID().get(txn, entryDN) != null) |
| | | { |
| | | return false; |
| | | } |
| | | return dnCache.insert(entryDN); |
| | | } |
| | | |
| | | void processIndexes(Suffix suffix, Entry entry, EntryID entryID, boolean allIndexes) |
| | | throws StorageRuntimeException, InterruptedException |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID) |
| | | void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) |
| | | throws DirectoryException |
| | | { |
| | | final EntryContainer entryContainer = suffix.getEntryContainer(); |
| | |
| | | { |
| | | vlvIdx.addEntry(buffer, entryID, entry); |
| | | } |
| | | buffer.flush(txn); |
| | | // buffer.flush(txn); // TODO JNR do something about it |
| | | } |
| | | |
| | | void processAttribute(MatchingRuleIndex index, Entry entry, EntryID entryID, IndexKey indexKey) |
| | |
| | | * Used to check DN's when DN validation is performed during phase one processing. |
| | | * It is deleted after phase one processing. |
| | | */ |
| | | private final class DNCacheImpl implements DNCache |
| | | private final class Dn2IdDnCache implements DNCache |
| | | { |
| | | private static final String DB_NAME = "dn_cache"; |
| | | private final TreeName dnCache = new TreeName("", DB_NAME); |
| | | private final Storage storage; |
| | | private Suffix suffix; |
| | | private Storage storage; |
| | | |
| | | private DNCacheImpl(File dnCachePath) throws StorageRuntimeException |
| | | private Dn2IdDnCache(Suffix suffix, Storage storage) |
| | | { |
| | | final Map<String, Object> returnValues = new HashMap<>(); |
| | | returnValues.put("getDBDirectory", dnCachePath.getAbsolutePath()); |
| | | returnValues.put("getBackendId", DB_NAME); |
| | | returnValues.put("getDBCacheSize", 0L); |
| | | returnValues.put("getDBCachePercent", 10); |
| | | returnValues.put("isDBTxnNoSync", true); |
| | | returnValues.put("getDBDirectoryPermissions", "700"); |
| | | returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB)); |
| | | returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB)); |
| | | try |
| | | { |
| | | returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importDNCache,cn=Backends,cn=config")); |
| | | storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues), |
| | | DirectoryServer.getInstance().getServerContext()); |
| | | storage.open(); |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | txn.openTree(dnCache); |
| | | } |
| | | }); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues) |
| | | { |
| | | return (PersistitBackendCfg) Proxy.newProxyInstance( |
| | | getClass().getClassLoader(), |
| | | new Class<?>[] { PersistitBackendCfg.class }, |
| | | new BackendCfgHandler(returnValues)); |
| | | } |
| | | |
| | | private static final long FNV_INIT = 0xcbf29ce484222325L; |
| | | private static final long FNV_PRIME = 0x100000001b3L; |
| | | |
| | | /** Hash the DN bytes. Uses the FNV-1a hash. */ |
| | | private ByteString fnv1AHashCode(DN dn) |
| | | { |
| | | final ByteString b = dn.toNormalizedByteString(); |
| | | |
| | | long hash = FNV_INIT; |
| | | for (int i = 0; i < b.length(); i++) |
| | | { |
| | | hash ^= b.byteAt(i); |
| | | hash *= FNV_PRIME; |
| | | } |
| | | return ByteString.valueOf(hash); |
| | | this.suffix = suffix; |
| | | this.storage = storage; |
| | | } |
| | | |
| | | @Override |
| | | public void close() throws StorageRuntimeException |
| | | public boolean insert(final DN dn, final EntryID entryID) |
| | | { |
| | | try |
| | | { |
| | | storage.close(); |
| | | } |
| | | finally |
| | | { |
| | | storage.removeStorageFiles(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean insert(DN dn) throws StorageRuntimeException |
| | | { |
| | | // Use a compact representation for key |
| | | // and a reversible representation for value |
| | | final ByteString key = fnv1AHashCode(dn); |
| | | final ByteString dnValue = ByteString.valueOf(dn); |
| | | |
| | | return insert(key, dnValue); |
| | | } |
| | | |
| | | private boolean insert(final ByteString key, final ByteString dn) throws StorageRuntimeException |
| | | { |
| | | final AtomicBoolean updateResult = new AtomicBoolean(); |
| | | final AtomicBoolean result = new AtomicBoolean(); |
| | | try |
| | | { |
| | | storage.write(new WriteOperation() |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | updateResult.set(txn.update(dnCache, key, new UpdateFunction() |
| | | { |
| | | @Override |
| | | public ByteSequence computeNewValue(ByteSequence existingDns) |
| | | { |
| | | if (containsDN(existingDns, dn)) |
| | | { |
| | | // no change |
| | | return existingDns; |
| | | } |
| | | else if (existingDns != null) |
| | | { |
| | | return addDN(existingDns, dn); |
| | | } |
| | | else |
| | | { |
| | | return singletonList(dn); |
| | | } |
| | | } |
| | | |
| | | /** Add the DN to the DNs because of a hash collision. */ |
| | | private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd) |
| | | { |
| | | final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length()); |
| | | builder.append(dnList); |
| | | builder.append(dntoAdd.length()); |
| | | builder.append(dntoAdd); |
| | | return builder; |
| | | } |
| | | |
| | | /** Create a list of dn made of one element. */ |
| | | private ByteSequence singletonList(final ByteSequence dntoAdd) |
| | | { |
| | | final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE); |
| | | singleton.append(dntoAdd.length()); |
| | | singleton.append(dntoAdd); |
| | | return singleton; |
| | | } |
| | | })); |
| | | result.set(suffix.getDN2ID().insert(txn, dn, entryID)); |
| | | } |
| | | }); |
| | | return updateResult.get(); |
| | | } |
| | | catch (StorageRuntimeException e) |
| | | { |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */ |
| | | private boolean containsDN(ByteSequence existingDns, ByteString dnToFind) |
| | | { |
| | | if (existingDns != null && existingDns.length() > 0) |
| | | { |
| | | final ByteSequenceReader reader = existingDns.asReader(); |
| | | int pos = 0; |
| | | while (reader.remaining() != 0) |
| | | { |
| | | int dnLength = reader.getInt(); |
| | | int dnStart = pos + INT_SIZE; |
| | | ByteSequence existingDn = existingDns.subSequence(dnStart, dnStart + dnLength); |
| | | if (dnToFind.equals(existingDn)) |
| | | { |
| | | return true; |
| | | } |
| | | reader.skip(dnLength); |
| | | pos = reader.position(); |
| | | } |
| | | } |
| | | return false; |
| | | return result.get(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean contains(final DN dn) |
| | | public boolean contains(final DN dn) throws StorageRuntimeException |
| | | { |
| | | try |
| | | { |
| | |
| | | @Override |
| | | public Boolean run(ReadableTransaction txn) throws Exception |
| | | { |
| | | final ByteString key = fnv1AHashCode(dn); |
| | | final ByteString existingDns = txn.read(dnCache, key); |
| | | |
| | | return containsDN(existingDns, ByteString.valueOf(dn)); |
| | | return suffix.getDN2ID().get(txn, dn) != null; |
| | | } |
| | | }); |
| | | } |
| | | catch (StorageRuntimeException e) |
| | | { |
| | | throw e; |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | // Nothing to do |
| | | } |
| | | } |
| | | } |