OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()
This change brings back to life the usage of Storage.startImport() for the import.
pluggable/spi/Importer.java:
Added read() and delete().
Storage.java:
Changed exception thrown from startImport().
PersistItStorage.java, TracedStorage.java:
Implemented the changes in Storage and Importer.
Index.java, DefaultIndex.java, Importer.java:
Used Importer instead of WriteableTransaction.
Importer.java:
For import, used Importer instead of WriteableTransaction.
In processIndexFiles(), closed the storage before starting the import.
In IndexDBWriteTask.endWriteTask(), called DNState.finalFlush() instead of DNState.flush().
In DNState, added finalFlush().
ID2Count.java:
Added importPut(), importPutTotalCount() and importPut0() methods for import. It is sad but they duplicate a bit the WriteableTransaction ones.
| | |
| | | import java.util.ListIterator; |
| | | import java.util.Map; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.Queue; |
| | | import java.util.concurrent.ConcurrentLinkedDeque; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | |
| | | import com.persistit.Persistit; |
| | | import com.persistit.Transaction; |
| | | import com.persistit.Tree; |
| | | import com.persistit.TreeBuilder; |
| | | import com.persistit.Value; |
| | | import com.persistit.Volume; |
| | | import com.persistit.VolumeSpecification; |
| | |
| | | /** PersistIt implementation of the {@link Importer} interface. */ |
| | | private final class ImporterImpl implements Importer |
| | | { |
| | | private final TreeBuilder importer = new TreeBuilder(db); |
| | | private final Key importKey = new Key(db); |
| | | private final Value importValue = new Value(db); |
| | | private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>(); |
| | | private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>(); |
| | | private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>() |
| | | { |
| | | @Override |
| | | protected Map<TreeName, Exchange> initialValue() |
| | | { |
| | | final Map<TreeName, Exchange> value = new HashMap<>(); |
| | | allExchanges.add(value); |
| | | return value; |
| | | } |
| | | }; |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | try |
| | | for (Map<TreeName, Exchange> map : allExchanges) |
| | | { |
| | | importer.merge(); |
| | | for (Exchange exchange : map.values()) |
| | | { |
| | | db.releaseExchange(exchange); |
| | | } |
| | | map.clear(); |
| | | } |
| | | catch (final Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | finally |
| | | { |
| | | PersistItStorage.this.close(); |
| | | } |
| | | PersistItStorage.this.close(); |
| | | } |
| | | |
| | | @Override |
| | |
| | | { |
| | | try |
| | | { |
| | | final Tree tree = trees.get(treeName); |
| | | importer.store(tree, |
| | | bytesToKey(importKey, key), |
| | | bytesToValue(importValue, value)); |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | bytesToValue(ex.getValue(), value); |
| | | ex.store(); |
| | | } |
| | | catch (final Exception e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean delete(final TreeName treeName, final ByteSequence key) |
| | | { |
| | | try |
| | | { |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | return ex.remove(); |
| | | } |
| | | catch (final PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(final TreeName treeName, final ByteSequence key) |
| | | { |
| | | try |
| | | { |
| | | final Exchange ex = getExchangeFromCache(treeName); |
| | | bytesToKey(ex.getKey(), key); |
| | | ex.fetch(); |
| | | return valueToBytes(ex.getValue()); |
| | | } |
| | | catch (final PersistitException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException |
| | | { |
| | | Map<TreeName, Exchange> threadExchanges = exchanges.get(); |
| | | Exchange exchange = threadExchanges.get(treeName); |
| | | if (exchange == null) |
| | | { |
| | | exchange = getNewExchange(treeName, false); |
| | | threadExchanges.put(treeName, exchange); |
| | | } |
| | | return exchange; |
| | | } |
| | | } |
| | | |
| | | /** PersistIt implementation of the {@link WriteableTransaction} interface. */ |
| | |
| | | private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>(); |
| | | |
| | | @Override |
| | | public void put(final TreeName treeName, final ByteSequence key, |
| | | final ByteSequence value) |
| | | public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value) |
| | | { |
| | | try |
| | | { |
| | |
| | | return b1.equals(b2); |
| | | } |
| | | |
| | | private Exchange getExchangeFromCache(final TreeName treeName) |
| | | throws PersistitException |
| | | private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException |
| | | { |
| | | Exchange exchange = exchanges.get(treeName); |
| | | if (exchange == null) |
| | |
| | | } |
| | | exchanges.clear(); |
| | | } |
| | | |
| | | private Exchange getNewExchange(final TreeName treeName, final boolean create) |
| | | throws PersistitException |
| | | { |
| | | return db.getExchange(volume, mangleTreeName(treeName), create); |
| | | } |
| | | } |
| | | |
| | | private static void clearAndCreateDbDir(final File dbDir) |
| | | private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException |
| | | { |
| | | if (dbDir.exists()) |
| | | { |
| | | for (final File child : dbDir.listFiles()) |
| | | { |
| | | child.delete(); |
| | | } |
| | | } |
| | | else |
| | | { |
| | | dbDir.mkdirs(); |
| | | } |
| | | return db.getExchange(volume, mangleTreeName(treeName), create); |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | private final ServerContext serverContext; |
| | | private File backendDirectory; |
| | | private Persistit db; |
| | | private Volume volume; |
| | | private Configuration dbCfg; |
| | | private PersistitBackendCfg config; |
| | | private DiskSpaceMonitor diskMonitor; |
| | | private MemoryQuota memQuota; |
| | |
| | | // FIXME: should be package private once importer is decoupled. |
| | | public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException |
| | | { |
| | | this.serverContext = serverContext; |
| | | backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId()); |
| | | config = cfg; |
| | | dbCfg = new Configuration(); |
| | | cfg.addPersistitChangeListener(this); |
| | | } |
| | | |
| | | private Configuration buildConfiguration() |
| | | { |
| | | final Configuration dbCfg = new Configuration(); |
| | | dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath()); |
| | | dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath()); |
| | | dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null, |
| | | BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false))); |
| | | final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(); |
| | | final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg); |
| | | bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE); |
| | | |
| | | diskMonitor = serverContext.getDiskSpaceMonitor(); |
| | | memQuota = serverContext.getMemoryQuota(); |
| | | if (cfg.getDBCacheSize() > 0) |
| | | if (config.getDBCacheSize() > 0) |
| | | { |
| | | bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize()); |
| | | memQuota.acquireMemory(cfg.getDBCacheSize()); |
| | | bufferPoolCfg.setMaximumMemory(config.getDBCacheSize()); |
| | | memQuota.acquireMemory(config.getDBCacheSize()); |
| | | } |
| | | else |
| | | { |
| | | bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f); |
| | | memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent())); |
| | | bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f); |
| | | memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent())); |
| | | } |
| | | dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP); |
| | | cfg.addPersistitChangeListener(this); |
| | | dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP); |
| | | return dbCfg; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | |
| | | diskMonitor.deregisterMonitoredDirectory(getDirectory(), this); |
| | | } |
| | | |
| | | private BufferPoolConfiguration getBufferPoolCfg() |
| | | private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg) |
| | | { |
| | | return dbCfg.getBufferPoolMap().get(BUFFER_SIZE); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public void open() throws Exception |
| | | public void open() throws ConfigException, StorageRuntimeException |
| | | { |
| | | open0(buildConfiguration()); |
| | | } |
| | | |
| | | private void open0(final Configuration dbCfg) throws ConfigException |
| | | { |
| | | setupStorageFiles(); |
| | | try |
| | | { |
| | | if (db != null) |
| | | { |
| | | throw new IllegalStateException( |
| | | "Database is already open, either the backend is enabled or an import is currently running."); |
| | | } |
| | | db = new Persistit(dbCfg); |
| | | |
| | | final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap()); |
| | | final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap()); |
| | | final long totalSize = bufferCount * BUFFER_SIZE / 1024; |
| | | logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(), |
| | | bufferCount, BUFFER_SIZE, totalSize); |
| | |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Importer startImport() throws Exception |
| | | public Importer startImport() throws ConfigException, StorageRuntimeException |
| | | { |
| | | clearAndCreateDbDir(backendDirectory); |
| | | open(); |
| | | open0(buildConfiguration()); |
| | | return new ImporterImpl(); |
| | | } |
| | | |
| | | private String mangleTreeName(final TreeName treeName) |
| | | private static String mangleTreeName(final TreeName treeName) |
| | | { |
| | | StringBuilder mangled = new StringBuilder(); |
| | | String name = treeName.toString(); |
| | |
| | | * TODO: it would be nice to use the low-level key/value APIs. They seem quite |
| | | * inefficient at the moment for simple byte arrays. |
| | | */ |
| | | private Key bytesToKey(final Key key, final ByteSequence bytes) |
| | | private static Key bytesToKey(final Key key, final ByteSequence bytes) |
| | | { |
| | | final byte[] tmp = bytes.toByteArray(); |
| | | return key.clear().appendByteArray(tmp, 0, tmp.length); |
| | | } |
| | | |
| | | private Value bytesToValue(final Value value, final ByteSequence bytes) |
| | | private static Value bytesToValue(final Value value, final ByteSequence bytes) |
| | | { |
| | | value.clear().putByteArray(bytes.toByteArray()); |
| | | return value; |
| | | } |
| | | |
| | | private ByteString valueToBytes(final Value value) |
| | | private static ByteString valueToBytes(final Value value) |
| | | { |
| | | if (value.isDefined()) |
| | | { |
| | |
| | | import org.opends.server.backends.pluggable.EntryIDSet.EntryIDSetCodec; |
| | | import org.opends.server.backends.pluggable.State.IndexFlag; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.StorageRuntimeException; |
| | | import org.opends.server.backends.pluggable.spi.TreeName; |
| | |
| | | } |
| | | |
| | | @Override |
| | | public final void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded) throws StorageRuntimeException |
| | | public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException |
| | | { |
| | | Reject.ifNull(txn, "txn must not be null"); |
| | | Reject.ifNull(importer, "importer must not be null"); |
| | | Reject.ifNull(idsToBeAdded, "idsToBeAdded must not be null"); |
| | | ByteSequence key = idsToBeAdded.getKey(); |
| | | ByteString value = txn.read(getName(), key); |
| | | ByteString value = importer.read(getName(), key); |
| | | if (value != null) |
| | | { |
| | | final EntryIDSet entryIDSet = codec.decode(key, value); |
| | | final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit); |
| | | importIDSet.merge(idsToBeAdded); |
| | | txn.put(getName(), key, importIDSet.valueToByteString(codec)); |
| | | importer.put(getName(), key, importIDSet.valueToByteString(codec)); |
| | | } |
| | | else |
| | | { |
| | | txn.put(getName(), key, idsToBeAdded.valueToByteString(codec)); |
| | | importer.put(getName(), key, idsToBeAdded.valueToByteString(codec)); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public final void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved) throws StorageRuntimeException |
| | | public final void importRemove(Importer importer, ImportIDSet idsToBeRemoved) throws StorageRuntimeException |
| | | { |
| | | Reject.ifNull(txn, "txn must not be null"); |
| | | Reject.ifNull(importer, "importer must not be null"); |
| | | Reject.ifNull(idsToBeRemoved, "idsToBeRemoved must not be null"); |
| | | ByteSequence key = idsToBeRemoved.getKey(); |
| | | ByteString value = txn.read(getName(), key); |
| | | ByteString value = importer.read(getName(), key); |
| | | if (value == null) |
| | | { |
| | | // Should never happen -- the keys should always be there. |
| | |
| | | importIDSet.remove(idsToBeRemoved); |
| | | if (importIDSet.isDefined() && importIDSet.size() == 0) |
| | | { |
| | | txn.delete(getName(), key); |
| | | importer.delete(getName(), key); |
| | | } |
| | | else |
| | | { |
| | | txn.put(getName(), key, importIDSet.valueToByteString(codec)); |
| | | importer.put(getName(), key, importIDSet.valueToByteString(codec)); |
| | | } |
| | | } |
| | | |
| | |
| | | import org.forgerock.util.promise.Function; |
| | | import org.forgerock.util.promise.NeverThrowsException; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.TreeName; |
| | | import org.opends.server.backends.pluggable.spi.UpdateFunction; |
| | |
| | | |
| | | private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta) |
| | | { |
| | | final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1)); |
| | | final ByteSequence shardedKey = getKeyFromEntryIDAndBucket(entryID, bucket); |
| | | final ByteSequence shardedKey = getShardedKey(entryID); |
| | | txn.update(getName(), shardedKey, new UpdateFunction() |
| | | { |
| | | @Override |
| | |
| | | }); |
| | | } |
| | | |
| | | void importPut(Importer importer, EntryID entryID, long total) |
| | | { |
| | | Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow."); |
| | | importPut0(importer, entryID, total); |
| | | } |
| | | |
| | | void importPutTotalCount(Importer importer, long total) |
| | | { |
| | | importPut0(importer, TOTAL_COUNT_ENTRY_ID, total); |
| | | } |
| | | |
| | | private void importPut0(Importer importer, EntryID entryID, final long delta) |
| | | { |
| | | Reject.ifNull(importer, "importer must not be null"); |
| | | final ByteSequence shardedKey = getShardedKey(entryID); |
| | | importer.put(getName(), shardedKey, ByteString.valueOf(delta)); |
| | | } |
| | | |
| | | private ByteSequence getShardedKey(EntryID entryID) |
| | | { |
| | | final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1)); |
| | | return getKeyFromEntryIDAndBucket(entryID, bucket); |
| | | } |
| | | |
| | | /** |
| | | * Get the counter value for the specified key |
| | | * @param txn The database transaction |
| | |
| | | |
| | | return counterValue; |
| | | } |
| | | |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private void importPhaseTwo() throws InterruptedException, ExecutionException |
| | | private void importPhaseTwo() throws Exception |
| | | { |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); |
| | | scheduleAtFixedRate(timerService, new SecondPhaseProgressTask()); |
| | |
| | | * Performs on-disk merge by reading several scratch files at once |
| | | * and write their ordered content into the target indexes. |
| | | */ |
| | | private void processIndexFiles() throws InterruptedException, ExecutionException |
| | | private void processIndexFiles() throws Exception |
| | | { |
| | | if (bufferCount.get() == 0) |
| | | { |
| | |
| | | Semaphore permits = new Semaphore(buffers); |
| | | |
| | | // Start DN processing first. |
| | | List<Future<Void>> futures = new LinkedList<>(); |
| | | submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures); |
| | | submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures); |
| | | getAll(futures); |
| | | Storage storage = rootContainer.getStorage(); |
| | | storage.close(); |
| | | try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport()) |
| | | { |
| | | List<Future<Void>> futures = new LinkedList<>(); |
| | | submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures); |
| | | submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures); |
| | | getAll(futures); |
| | | } |
| | | finally |
| | | { |
| | | storage.open(); |
| | | } |
| | | |
| | | shutdownAll(dbService); |
| | | } |
| | | |
| | | private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService, |
| | | Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures) |
| | | private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, |
| | | org.opends.server.backends.pluggable.spi.Importer importer, |
| | | ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures) |
| | | { |
| | | for (IndexManager indexMgr : indexMgrs) |
| | | { |
| | | futures.add(dbService.submit( |
| | | new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize))); |
| | | futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize))); |
| | | } |
| | | } |
| | | |
| | |
| | | */ |
| | | private final class IndexDBWriteTask implements Callable<Void> |
| | | { |
| | | private final Storage storage; |
| | | private final org.opends.server.backends.pluggable.spi.Importer importer; |
| | | private final IndexManager indexMgr; |
| | | private final int cacheSize; |
| | | /** indexID => DNState map */ |
| | |
| | | /** |
| | | * Creates a new index DB writer. |
| | | * |
| | | * @param importer |
| | | * The importer |
| | | * @param indexMgr |
| | | * The index manager. |
| | | * @param storage |
| | | * Where to store data |
| | | * @param permits |
| | | * The semaphore used for restricting the number of buffer allocations. |
| | | * @param maxPermits |
| | |
| | | * @param cacheSize |
| | | * The buffer cache size. |
| | | */ |
| | | public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize) |
| | | public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr, |
| | | Semaphore permits, int maxPermits, int cacheSize) |
| | | { |
| | | this.storage = storage; |
| | | this.importer = importer; |
| | | this.indexMgr = indexMgr; |
| | | this.permits = permits; |
| | | this.maxPermits = maxPermits; |
| | |
| | | } |
| | | |
| | | /** Finishes this task. */ |
| | | private void endWriteTask(WriteableTransaction txn) |
| | | private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | isRunning = false; |
| | | |
| | |
| | | { |
| | | for (DNState dnState : dnStateMap.values()) |
| | | { |
| | | dnState.flush(txn); |
| | | dnState.finalFlush(importer); |
| | | } |
| | | |
| | | if (!isCanceled) |
| | | { |
| | | logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount()); |
| | |
| | | @Override |
| | | public Void call() throws Exception |
| | | { |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | call0(txn); |
| | | } |
| | | }); |
| | | call0(importer); |
| | | return null; |
| | | } |
| | | |
| | | private void call0(WriteableTransaction txn) throws Exception |
| | | private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception |
| | | { |
| | | if (isCanceled) |
| | | { |
| | |
| | | { |
| | | if (previousRecord != null) |
| | | { |
| | | addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | } |
| | | |
| | | // this is a new record |
| | |
| | | |
| | | if (previousRecord != null) |
| | | { |
| | | addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | finally |
| | | { |
| | | endWriteTask(txn); |
| | | endWriteTask(importer); |
| | | } |
| | | } |
| | | |
| | |
| | | return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit()); |
| | | } |
| | | |
| | | private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet) |
| | | throws DirectoryException |
| | | private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet, |
| | | ImportIDSet deleteSet) throws DirectoryException |
| | | { |
| | | keyCount.incrementAndGet(); |
| | | if (indexMgr.isDN2ID()) |
| | | { |
| | | addDN2ID(txn, indexID, insertSet); |
| | | addDN2ID(importer, indexID, insertSet); |
| | | } |
| | | else |
| | | { |
| | | if (!deleteSet.isDefined() || deleteSet.size() > 0) |
| | | { |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | index.importRemove(txn, deleteSet); |
| | | index.importRemove(importer, deleteSet); |
| | | } |
| | | if (!insertSet.isDefined() || insertSet.size() > 0) |
| | | { |
| | | final Index index = indexIDToIndexMap.get(indexID); |
| | | index.importPut(txn, insertSet); |
| | | index.importPut(importer, insertSet); |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException |
| | | private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet) |
| | | throws DirectoryException |
| | | { |
| | | DNState dnState = dnStateMap.get(indexID); |
| | | if (dnState == null) |
| | |
| | | dnState = new DNState(indexIDToECMap.get(indexID)); |
| | | dnStateMap.put(indexID, dnState); |
| | | } |
| | | if (dnState.checkParent(txn, idSet)) |
| | | if (dnState.checkParent(importer, idSet)) |
| | | { |
| | | dnState.writeToDN2ID(txn, idSet.getKey()); |
| | | dnState.writeToDN2ID(importer, idSet.getKey()); |
| | | } |
| | | } |
| | | |
| | |
| | | * This class is used to by a index DB merge thread performing DN processing |
| | | * to keep track of the state of individual DN2ID index processing. |
| | | */ |
| | | final class DNState |
| | | private final class DNState |
| | | { |
| | | private static final int DN_STATE_CACHE_SIZE = 64 * KB; |
| | | |
| | |
| | | private ByteSequence parentDN; |
| | | private final ByteStringBuilder lastDN = new ByteStringBuilder(); |
| | | private EntryID parentID, lastID, entryID; |
| | | private long totalNbEntries; |
| | | |
| | | DNState(EntryContainer entryContainer) |
| | | private DNState(EntryContainer entryContainer) |
| | | { |
| | | this.entryContainer = entryContainer; |
| | | dn2id = entryContainer.getDN2ID().getName(); |
| | |
| | | } |
| | | |
| | | /** Why do we still need this if we are checking parents in the first phase? */ |
| | | private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException |
| | | boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet) |
| | | throws StorageRuntimeException |
| | | { |
| | | entryID = idSet.iterator().next(); |
| | | parentDN = getParent(idSet.getKey()); |
| | |
| | | // If null is returned then this is a suffix DN. |
| | | if (parentDN != null) |
| | | { |
| | | parentID = get(txn, dn2id, parentDN); |
| | | parentID = get(importer, dn2id, parentDN); |
| | | if (parentID == null) |
| | | { |
| | | // We have a missing parent. Maybe parent checking was turned off? |
| | |
| | | return importCfg != null && importCfg.appendToExistingData(); |
| | | } |
| | | |
| | | EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException |
| | | private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn) |
| | | throws StorageRuntimeException |
| | | { |
| | | ByteString value = txn.read(dn2id, dn); |
| | | ByteString value = importer.read(dn2id, dn); |
| | | return value != null ? new EntryID(value) : null; |
| | | } |
| | | |
| | | public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException |
| | | void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key) |
| | | throws DirectoryException |
| | | { |
| | | txn.put(dn2id, key, entryID.toByteString()); |
| | | importer.put(dn2id, key, entryID.toByteString()); |
| | | indexMgr.addTotDNCount(1); |
| | | if (parentID != null) |
| | | { |
| | | incrementChildrenCounter(txn); |
| | | incrementChildrenCounter(importer); |
| | | } |
| | | } |
| | | |
| | | private void incrementChildrenCounter(WriteableTransaction txn) |
| | | private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | final AtomicLong counter = getId2childrenCounter(); |
| | | counter.incrementAndGet(); |
| | | if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE) |
| | | { |
| | | flush(txn); |
| | | flush(importer); |
| | | } |
| | | } |
| | | |
| | | private void flush(WriteableTransaction txn) |
| | | private void flush(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet()) |
| | | { |
| | | entryContainer.getID2ChildrenCount() |
| | | .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get()); |
| | | final EntryID entryID = childrenCounter.getKey(); |
| | | final long totalForEntryID = childrenCounter.getValue().get(); |
| | | totalNbEntries += totalForEntryID; |
| | | entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID); |
| | | } |
| | | id2childrenCountTree.clear(); |
| | | } |
| | | |
| | | |
| | | void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer) |
| | | { |
| | | flush(importer); |
| | | |
| | | entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries); |
| | | } |
| | | } |
| | | } |
| | | |
| | |
| | | indexKeyQueueMap.clear(); |
| | | } |
| | | |
| | | private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException |
| | | private void rebuildIndexesPhaseTwo() throws Exception |
| | | { |
| | | final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask()); |
| | | try |
| | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | |
| | |
| | | int getIndexEntryLimit(); |
| | | |
| | | // Ignores trusted state. |
| | | void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded); |
| | | void importPut(Importer importer, ImportIDSet idsToBeAdded); |
| | | |
| | | // Ignores trusted state. |
| | | void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved); |
| | | void importRemove(Importer importer, ImportIDSet idsToBeRemoved); |
| | | |
| | | boolean isTrusted(); |
| | | |
| | |
| | | package org.opends.server.backends.pluggable; |
| | | |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.RestoreConfig; |
| | | |
| | | /** |
| | | * Decorates a {@link Storage} with additional trace logging. |
| | | */ |
| | | /** Decorates a {@link Storage} with additional trace logging. */ |
| | | @SuppressWarnings("javadoc") |
| | | final class TracedStorage implements Storage |
| | | { |
| | | /** |
| | | * Decorates an {@link Importer} with additional trace logging. |
| | | */ |
| | | /** Decorates an {@link Importer} with additional trace logging. */ |
| | | private final class TracedImporter implements Importer |
| | | { |
| | | private final Importer importer; |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | importer.close(); |
| | | logger.trace("Storage@%s.Importer@%s.close(%s)", |
| | | storageId(), id(), backendId); |
| | | } |
| | | |
| | | @Override |
| | | public void createTree(final TreeName name) |
| | | { |
| | | importer.createTree(name); |
| | |
| | | storageId(), id(), backendId, name, hex(key), hex(value)); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(TreeName name, ByteSequence key) |
| | | { |
| | | final ByteString value = importer.read(name, key); |
| | | logger.trace("Storage@%s.Importer@%s.read(%s, %s, %s) = %s", |
| | | storageId(), id(), backendId, name, hex(key), hex(value)); |
| | | return value; |
| | | } |
| | | |
| | | @Override |
| | | public boolean delete(TreeName name, ByteSequence key) |
| | | { |
| | | final boolean delete = importer.delete(name, key); |
| | | logger.trace("Storage@%s.Importer@%s.delete(%s, %s, %s) = %b", |
| | | storageId(), id(), backendId, name, hex(key), delete); |
| | | return delete; |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | importer.close(); |
| | | logger.trace("Storage@%s.Importer@%s.close(%s)", |
| | | storageId(), id(), backendId); |
| | | } |
| | | |
| | | private int id() |
| | | { |
| | | return System.identityHashCode(this); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public Importer startImport() throws Exception |
| | | public Importer startImport() throws ConfigException, StorageRuntimeException |
| | | { |
| | | final Importer importer = storage.startImport(); |
| | | if (logger.isTraceEnabled()) |
| | |
| | | import java.io.Closeable; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | |
| | | /** |
| | | * Allows to run an import. For performance reasons, imports are run without transactions. |
| | |
| | | */ |
| | | void put(TreeName treeName, ByteSequence key, ByteSequence value); |
| | | |
| | | /** |
| | | * Deletes the record with the provided key, in the tree whose name is provided. |
| | | * |
| | | * @param treeName |
| | | * the tree name |
| | | * @param key |
| | | * the key of the record to delete |
| | | * @return {@code true} if the record could be deleted, {@code false} otherwise |
| | | */ |
| | | boolean delete(TreeName treeName, ByteSequence key); |
| | | |
| | | /** |
| | | * Reads the record's value associated to the provided key, in the tree whose name is provided. |
| | | * |
| | | * @param treeName |
| | | * the tree name |
| | | * @param key |
| | | * the record's key |
| | | * @return the record's value, or {@code null} if none exists |
| | | */ |
| | | ByteString read(TreeName treeName, ByteSequence key); |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | void close(); |
| | |
| | | |
| | | import java.io.Closeable; |
| | | |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.BackupDirectory; |
| | | import org.opends.server.types.DirectoryException; |
| | |
| | | * Starts the import operation. |
| | | * |
| | | * @return a new Importer object which must be closed to release all resources |
| | | * @throws Exception |
| | | * @throws ConfigException |
| | | * if there is a problem with the configuration |
| | | * @throws StorageRuntimeException |
| | | * if a problem occurs with the underlying storage engine |
| | | * @see #close() to release all resources once import is finished |
| | | */ |
| | | Importer startImport() throws Exception; |
| | | Importer startImport() throws ConfigException, StorageRuntimeException; |
| | | |
| | | /** |
| | | * Opens the storage engine to allow executing operations on it. |