| | |
| | | import java.nio.MappedByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | | import java.nio.channels.FileChannel.MapMode; |
| | | import java.nio.charset.Charset; |
| | | import java.nio.file.Files; |
| | | import java.util.ArrayList; |
| | | import java.util.Collection; |
| | | import java.util.Comparator; |
| | | import java.util.HashMap; |
| | | import java.util.HashSet; |
| | | import java.util.Iterator; |
| | | import java.util.LinkedHashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.NavigableSet; |
| | | import java.util.NoSuchElementException; |
| | | import java.util.Set; |
| | | import java.util.SortedSet; |
| | | import java.util.TimerTask; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.Callable; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ConcurrentMap; |
| | | import java.util.concurrent.ConcurrentNavigableMap; |
| | | import java.util.concurrent.ConcurrentSkipListMap; |
| | | import java.util.concurrent.ConcurrentSkipListSet; |
| | | import java.util.concurrent.ExecutorService; |
| | | import java.util.concurrent.Executors; |
| | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.Utils; |
| | | import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType; |
| | | import org.opends.server.admin.std.server.BackendIndexCfg; |
| | |
| | | import org.opends.server.backends.pluggable.spi.ReadOperation; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.Storage; |
| | | import org.opends.server.backends.pluggable.spi.Storage.AccessMode; |
| | | import org.opends.server.backends.pluggable.spi.StorageRuntimeException; |
| | | import org.opends.server.backends.pluggable.spi.StorageStatus; |
| | | import org.opends.server.backends.pluggable.spi.TreeName; |
| | | import org.opends.server.backends.pluggable.spi.WriteOperation; |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ServerContext; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.BackupConfig; |
| | | import org.opends.server.types.BackupDirectory; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.LDIFImportConfig; |
| | | import org.opends.server.types.LDIFImportResult; |
| | | import org.opends.server.types.RestoreConfig; |
| | | import org.opends.server.util.Platform; |
| | | |
| | | /** |
| | |
| | | */ |
| | | final class OnDiskMergeStorageImporter |
| | | { |
| | | private static UnsupportedOperationException notImplemented() |
| | | { |
| | | return new UnsupportedOperationException("Not implemented"); |
| | | } |
| | | |
| | | /** Data to put into id2entry tree. */ |
| | | private static final class Id2EntryData |
| | | { |
| | |
| | | private final File file; |
| | | private final FileChannel fileChannel; |
| | | private final List<Integer> bufferPositions = new ArrayList<>(); |
| | | private int bufferSize = 1024; // TODO JNR use MAX_BUFFER_SIZE? |
| | | /** TODO JNR offer configuration for this. */ |
| | | private int bufferSize = 1024; |
| | | |
| | | // FIXME this is not thread safe yet!!! |
| | | /** |
| | |
| | | * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the |
| | | * {@link #bufferSize}. |
| | | */ |
| | | private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>(); |
| | | private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>(); |
| | | /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */ |
| | | private int maximumExpectedSizeOnDisk; |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public String toString() |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** A cursor implementation aggregating several cursors and ordering them by their key value. */ |
| | | private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V> |
| | | { |
| | | private static final byte UNINITIALIZED = 0; |
| | | private static final byte READY = 1; |
| | | private static final byte CLOSED = 2; |
| | | |
| | | /** |
| | | * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED} |
| | | */ |
| | | private byte state = UNINITIALIZED; |
| | | |
| | | /** |
| | | * The cursors are sorted based on the key change of each cursor to consider the next change |
| | | * across all cursors. |
| | | */ |
| | | private final NavigableSet<Cursor<K, V>> cursors = new TreeSet<>(new Comparator<Cursor<K, V>>() |
| | | { |
| | | @Override |
| | | public int compare(Cursor<K, V> c1, Cursor<K, V> c2) |
| | | { |
| | | final int cmp = c1.getKey().compareTo(c2.getKey()); |
| | | if (cmp == 0) |
| | | { |
| | | // Never return 0. Otherwise both cursors are considered equal |
| | | // and only one of them is kept by this set |
| | | return System.identityHashCode(c1) - System.identityHashCode(c2); |
| | | } |
| | | return cmp; |
| | | } |
| | | }); |
| | | |
| | | private CompositeCursor(Collection<Cursor<K, V>> cursors) |
| | | { |
| | | Reject.ifNull(cursors); |
| | | |
| | | for (Iterator<Cursor<K, V>> it = cursors.iterator(); it.hasNext();) |
| | | { |
| | | Cursor<K, V> cursor = it.next(); |
| | | if (!cursor.isDefined() && !cursor.next()) |
| | | { |
| | | it.remove(); |
| | | } |
| | | } |
| | | |
| | | this.cursors.addAll(cursors); |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | if (state == CLOSED) |
| | | { |
| | | return false; |
| | | } |
| | | |
| | | // If previous state was ready, then we must advance the first cursor |
| | | // To keep consistent the cursors' order in the SortedSet, it is necessary |
| | | // to remove the first cursor, then add it again after moving it forward. |
| | | if (state == UNINITIALIZED) |
| | | { |
| | | state = READY; |
| | | } |
| | | else if (state == READY) |
| | | { |
| | | final Cursor<K, V> cursorToAdvance = cursors.pollFirst(); |
| | | if (cursorToAdvance != null && cursorToAdvance.next()) |
| | | { |
| | | this.cursors.add(cursorToAdvance); |
| | | } |
| | | } |
| | | return isDefined(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean isDefined() |
| | | { |
| | | return state == READY && !cursors.isEmpty(); |
| | | } |
| | | |
| | | @Override |
| | | public K getKey() throws NoSuchElementException |
| | | { |
| | | throwIfNotDefined(); |
| | | return cursors.first().getKey(); |
| | | } |
| | | |
| | | @Override |
| | | public V getValue() throws NoSuchElementException |
| | | { |
| | | throwIfNotDefined(); |
| | | return cursors.first().getValue(); |
| | | } |
| | | |
| | | private void throwIfNotDefined() |
| | | { |
| | | if (!isDefined()) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | state = CLOSED; |
| | | Utils.closeSilently(cursors); |
| | | cursors.clear(); |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | if (isDefined()) |
| | | { |
| | | return cursors.first().toString(); |
| | | } |
| | | return "not defined"; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionToKey(ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionToKeyOrNext(ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionToLastKey() |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public boolean positionToIndex(int index) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | } |
| | | |
| | | /** A cursor implementation reading key/value pairs from memory mapped files, a.k.a {@link MappedByteBuffer}. */ |
| | | private static final class ByteBufferCursor implements Cursor<ByteString, ByteString> |
| | | { |
| | | private final ByteBuffer byteBuffer; |
| | | private final int startPos; |
| | | private final int endPos; |
| | | private final ByteStringBuilder keyBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy? |
| | | private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy? |
| | | private int currentPos; |
| | | private boolean isDefined; |
| | | |
| | | private ByteBufferCursor(ByteBuffer byteBuffer, int startPos, int endPos) |
| | | { |
| | | this.byteBuffer = byteBuffer; |
| | | this.startPos = startPos; |
| | | this.endPos = endPos; |
| | | this.currentPos = startPos; |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | isDefined = false; |
| | | if (currentPos >= endPos) |
| | | { |
| | | return isDefined = false; |
| | | } |
| | | read(keyBuffer); |
| | | read(valueBuffer); |
| | | return isDefined = true; |
| | | } |
| | | |
| | | private void read(ByteStringBuilder buffer) |
| | | { |
| | | int length = byteBuffer.getInt(currentPos); |
| | | currentPos += INT_SIZE; |
| | | byteBuffer.position(currentPos); |
| | | |
| | | buffer.clear(); |
| | | buffer.setLength(length); |
| | | byteBuffer.get(buffer.getBackingArray(), 0, length); |
| | | currentPos += length; |
| | | } |
| | | |
| | | @Override |
| | | public boolean isDefined() |
| | | { |
| | | return isDefined; |
| | | } |
| | | |
| | | @Override |
| | | public ByteString getKey() throws NoSuchElementException |
| | | { |
| | | throwIfNotDefined(); |
| | | return keyBuffer.toByteString(); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString getValue() throws NoSuchElementException |
| | | { |
| | | throwIfNotDefined(); |
| | | return valueBuffer.toByteString(); |
| | | } |
| | | |
| | | private void throwIfNotDefined() |
| | | { |
| | | if (!isDefined()) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public String toString() |
| | | { |
| | | if (isDefined()) |
| | | { |
| | | final ByteString key = getKey(); |
| | | final ByteString value = getValue(); |
| | | return "<key=" + key + "(" + key.toHexString() + "), value=" + value + "(" + value.toHexString() + ")>"; |
| | | } |
| | | return "not defined"; |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionToKey(ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionToKeyOrNext(ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionToLastKey() |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean positionToIndex(int index) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | } |
| | | |
| | | /** A storage using memory mapped files, a.k.a {@link MappedByteBuffer}. */ |
| | | private static final class MemoryMappedStorage implements Storage |
| | | { |
| | | private final File bufferDir; |
| | | |
| | | private MemoryMappedStorage(File bufferDir) |
| | | { |
| | | this.bufferDir = bufferDir; |
| | | } |
| | | |
| | | @Override |
| | | public Importer startImport() throws ConfigException, StorageRuntimeException |
| | | { |
| | | return new MemoryMappedBufferImporter(bufferDir); |
| | | } |
| | | |
| | | @Override |
| | | public void open(AccessMode accessMode) throws Exception |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public <T> T read(ReadOperation<T> readOperation) throws Exception |
| | | { |
| | | return readOperation.run(new ReadableTransaction() |
| | | { |
| | | @Override |
| | | public Cursor<ByteString, ByteString> openCursor(TreeName treeName) |
| | | { |
| | | try |
| | | { |
| | | List<Integer> bufferPositions = readBufferPositions(treeName); |
| | | |
| | | // TODO JNR build ByteSequence implementation reading from memory mapped files? |
| | | return getCursors(treeName, bufferPositions); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | private List<Integer> readBufferPositions(TreeName treeName) throws IOException |
| | | { |
| | | // TODO JNR move to Buffer class? |
| | | File indexFile = new File(bufferDir, treeName + ".index"); |
| | | List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset()); |
| | | if (indexLines.size() != 1) |
| | | { |
| | | throw new IllegalStateException("Not implemented");// TODO JNR |
| | | } |
| | | |
| | | final String[] bufferPositions = indexLines.get(0).split(" "); |
| | | final List<Integer> results = new ArrayList<>(bufferPositions.length); |
| | | for (String bufferPos : bufferPositions) |
| | | { |
| | | results.add(Integer.valueOf(bufferPos)); |
| | | } |
| | | return results; |
| | | } |
| | | |
| | | private Cursor<ByteString, ByteString> getCursors(TreeName treeName, List<Integer> bufferPositions) |
| | | throws IOException |
| | | { |
| | | // TODO JNR move to Buffer class? |
| | | File bufferFile = new File(bufferDir, treeName.toString()); |
| | | FileChannel fileChannel = new RandomAccessFile(bufferFile, "r").getChannel(); |
| | | long fileSize = Files.size(bufferFile.toPath()); |
| | | final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize); |
| | | |
| | | final List<Cursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1); |
| | | Iterator<Integer> it = bufferPositions.iterator(); |
| | | if (it.hasNext()) |
| | | { |
| | | int lastPos = it.next(); |
| | | while (it.hasNext()) |
| | | { |
| | | final int bufferPos = it.next(); |
| | | cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos)); |
| | | lastPos = bufferPos; |
| | | } |
| | | } |
| | | return new CompositeCursor<ByteString, ByteString>(cursors); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(TreeName treeName, ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public long getRecordCount(TreeName treeName) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | @Override |
| | | public void write(WriteOperation writeOperation) throws Exception |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public void removeStorageFiles() throws StorageRuntimeException |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public StorageStatus getStorageStatus() |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean supportsBackupAndRestore() |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public void createBackup(BackupConfig backupConfig) throws DirectoryException |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public Set<TreeName> listTrees() |
| | | { |
| | | final Set<TreeName> results = new HashSet<>(); |
| | | for (File baseDN : this.bufferDir.listFiles()) |
| | | { |
| | | for (File index : baseDN.listFiles()) |
| | | { |
| | | if (!index.getName().endsWith(".index")) |
| | | { |
| | | results.add(new TreeName(baseDN.getName(), index.getName())); |
| | | } |
| | | } |
| | | } |
| | | return results; |
| | | } |
| | | } |
| | | |
| | | /** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */ |
| | | private static final class MemoryMappedBufferImporter implements Importer |
| | | { |
| | |
| | | Buffer buffer = treeNameToBufferMap.get(treeName); |
| | | if (buffer == null) |
| | | { |
| | | // TODO JNR that would be great if it was creating sub directories :) |
| | | // Creates sub directories for each suffix |
| | | // FIXME JNR cannot directly use DN names as directory + file names |
| | | buffer = new Buffer(new File(bufferDir, treeName.toString())); |
| | | treeNameToBufferMap.put(treeName, buffer); |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(TreeName treeName, ByteSequence key) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public boolean delete(TreeName treeName, ByteSequence key) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public void createTree(TreeName name) |
| | | { |
| | | throw new RuntimeException("Not implemented"); |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | for (Buffer buffer : treeNameToBufferMap.values()) |
| | |
| | | buffer.flush(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(TreeName treeName, ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public boolean delete(TreeName treeName, ByteSequence key) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | |
| | | @Override |
| | | public void createTree(TreeName name) |
| | | { |
| | | throw notImplemented(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | }); |
| | | |
| | | final Importer tmpImporter = new MemoryMappedBufferImporter(tempDir); |
| | | final MemoryMappedStorage tmpStorage = new MemoryMappedStorage(tempDir); |
| | | |
| | | final long startTime = System.currentTimeMillis(); |
| | | importPhaseOne(backendStorage, tmpImporter); |
| | | importPhaseOne(backendStorage, tmpStorage); |
| | | final long phaseOneFinishTime = System.currentTimeMillis(); |
| | | |
| | | if (isCanceled()) |
| | |
| | | throw new InterruptedException("Import processing canceled."); |
| | | } |
| | | |
| | | backendStorage.close(); |
| | | |
| | | final long phaseTwoTime = System.currentTimeMillis(); |
| | | importPhaseTwo(); |
| | | importPhaseTwo(backendStorage, tmpStorage); |
| | | if (isCanceled()) |
| | | { |
| | | throw new InterruptedException("Import processing canceled."); |
| | | } |
| | | final long phaseTwoFinishTime = System.currentTimeMillis(); |
| | | |
| | | backendStorage.open(AccessMode.READ_WRITE); |
| | | backendStorage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | |
| | | * </ol> |
| | | * TODO JNR fix all javadocs |
| | | */ |
| | | private void importPhaseOne(Storage storage, Importer importer) throws Exception |
| | | private void importPhaseOne(Storage backendStorage, Storage tmpStorage) throws Exception |
| | | { |
| | | final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); |
| | | scheduleAtFixedRate(timerService, new FirstPhaseProgressTask()); |
| | | threadCount = 2; // FIXME JNR id2entry + another task |
| | | final ExecutorService execService = Executors.newFixedThreadPool(threadCount); |
| | | |
| | | final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(storage); |
| | | final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask); |
| | | execService.submit(new MigrateExistingEntriesTask(storage, importer, id2EntryPutTask)).get(); |
| | | |
| | | final List<Callable<Void>> tasks = new ArrayList<>(threadCount); |
| | | if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries()) |
| | | try (Importer tmpImporter = tmpStorage.startImport()) |
| | | { |
| | | for (int i = 0; i < threadCount - 1; i++) |
| | | { |
| | | tasks.add(new ImportTask(importer, id2EntryPutTask)); |
| | | } |
| | | } |
| | | execService.invokeAll(tasks); |
| | | tasks.clear(); |
| | | final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(backendStorage); |
| | | final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask); |
| | | execService.submit(new MigrateExistingEntriesTask(backendStorage, tmpImporter, id2EntryPutTask)).get(); |
| | | |
| | | execService.submit(new MigrateExcludedTask(storage, importer, id2EntryPutTask)).get(); |
| | | id2EntryPutTask.finishedWrites(); |
| | | dn2IdPutFuture.get(); |
| | | final List<Callable<Void>> tasks = new ArrayList<>(threadCount); |
| | | if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries()) |
| | | { |
| | | for (int i = 0; i < threadCount - 1; i++) |
| | | { |
| | | tasks.add(new ImportTask(tmpImporter, id2EntryPutTask)); |
| | | } |
| | | } |
| | | execService.invokeAll(tasks); |
| | | tasks.clear(); |
| | | |
| | | execService.submit(new MigrateExcludedTask(backendStorage, tmpImporter, id2EntryPutTask)).get(); |
| | | id2EntryPutTask.finishedWrites(); |
| | | dn2IdPutFuture.get(); |
| | | } |
| | | |
| | | shutdownAll(timerService, execService); |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | private void importPhaseTwo() throws Exception |
| | | private void importPhaseTwo(final Storage outStorage, Storage inStorage) throws Exception |
| | | { |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); |
| | | scheduleAtFixedRate(timerService, new SecondPhaseProgressTask()); |
| | | try |
| | | |
| | | final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()? |
| | | ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size()); |
| | | try (Importer importer = outStorage.startImport()) |
| | | { |
| | | // TODO JNR |
| | | for (final TreeName treeName : treeNames) |
| | | { |
| | | copyTo(treeName, inStorage, importer);// FIXME JNR use dbService |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | shutdownAll(timerService); |
| | | shutdownAll(timerService, dbService); |
| | | } |
| | | } |
| | | |
| | | private void copyTo(final TreeName treeName, Storage input, final Importer output) throws Exception |
| | | { |
| | | input.read(new ReadOperation<Void>() |
| | | { |
| | | @Override |
| | | public Void run(ReadableTransaction txn) throws Exception |
| | | { |
| | | try (Cursor<ByteString, ByteString> cursor = txn.openCursor(treeName)) |
| | | { |
| | | while (cursor.next()) |
| | | {// FIXME JNR add merge phase |
| | | output.put(treeName, cursor.getKey(), cursor.getValue()); |
| | | } |
| | | } |
| | | return null; |
| | | } |
| | | }); |
| | | } |
| | | |
| | | /** Task used to migrate excluded branch. */ |
| | | private final class MigrateExcludedTask extends ImportTask |
| | | { |
| | |
| | | this.storage = storage; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | { |
| | |
| | | this.id2EntryPutTask = id2EntryPutTask; |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |
| | | @Override |
| | | public Void call() throws Exception |
| | | { |