From cccb086edd542cbf8f4cafa9ee3923f5e9099ed6 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 May 2015 08:39:56 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java | 8
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java | 29 ++
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java | 21 +
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 577 +++++++++++++++++++++++++++++++++++++++++++----
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java | 12 +
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java | 15 +
6 files changed, 610 insertions(+), 52 deletions(-)
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
index 647336e..0167f3b 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -41,11 +41,13 @@
import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.forgerock.i18n.LocalizableMessage;
@@ -965,6 +967,25 @@
new BackupManager(config.getBackendId()).restoreBackup(this, restoreConfig);
}
+ @Override
+ public Set<TreeName> listTrees()
+ {
+ try
+ {
+ String[] treeNames = volume.getTreeNames();
+ final Set<TreeName> results = new HashSet<>(treeNames.length);
+ for (String treeName : treeNames)
+ {
+ results.add(TreeName.valueOf(treeName));
+ }
+ return results;
+ }
+ catch (PersistitException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
/**
* TODO: it would be nice to use the low-level key/value APIs. They seem quite
* inefficient at the moment for simple byte arrays.
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
index 42e21d0..319dffb 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
@@ -44,8 +44,10 @@
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.forgerock.opendj.ldap.DecodeException;
+import org.forgerock.util.Reject;
import org.opends.server.api.CompressedSchema;
import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.backends.pluggable.spi.TreeName;
@@ -323,6 +325,7 @@
public void put(WriteableTransaction txn, EntryID id, Entry entry)
throws StorageRuntimeException, DirectoryException
{
+ Reject.ifNull(txn);
ByteString key = id.toByteString();
EntryCodec codec = acquireEntryCodec();
try
@@ -337,6 +340,32 @@
}
/**
+ * Write a record in the entry tree.
+ *
+ * @param importer a non null importer
+ * @param id The entry ID which forms the key.
+ * @param entry The LDAP entry.
+ * @throws StorageRuntimeException If an error occurs in the storage.
+ * @throws DirectoryException If a problem occurs while attempting to encode the entry.
+ */
+ public void importPut(Importer importer, EntryID id, Entry entry)
+ throws StorageRuntimeException, DirectoryException
+ {
+ Reject.ifNull(importer);
+ ByteString key = id.toByteString();
+ EntryCodec codec = acquireEntryCodec();
+ try
+ {
+ ByteString value = codec.encodeInternal(entry, dataConfig);
+ importer.put(getName(), key, value);
+ }
+ finally
+ {
+ codec.release();
+ }
+ }
+
+ /**
* Remove a record from the entry tree.
*
* @param txn a non null transaction
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index d297582..dea8c2c 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -42,20 +42,26 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
+import java.nio.charset.Charset;
+import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimerTask;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -71,6 +77,7 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Reject;
import org.forgerock.util.Utils;
import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
import org.opends.server.admin.std.server.BackendIndexCfg;
@@ -83,19 +90,24 @@
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.Storage;
+import org.opends.server.backends.pluggable.spi.Storage.AccessMode;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
+import org.opends.server.backends.pluggable.spi.StorageStatus;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
import org.opends.server.types.AttributeType;
+import org.opends.server.types.BackupConfig;
+import org.opends.server.types.BackupDirectory;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
+import org.opends.server.types.RestoreConfig;
import org.opends.server.util.Platform;
/**
@@ -104,6 +116,11 @@
*/
final class OnDiskMergeStorageImporter
{
+ private static UnsupportedOperationException notImplemented()
+ {
+ return new UnsupportedOperationException("Not implemented");
+ }
+
/** Data to put into id2entry tree. */
private static final class Id2EntryData
{
@@ -262,7 +279,8 @@
private final File file;
private final FileChannel fileChannel;
private final List<Integer> bufferPositions = new ArrayList<>();
- private int bufferSize = 1024; // TODO JNR use MAX_BUFFER_SIZE?
+ /** TODO JNR offer configuration for this. */
+ private int bufferSize = 1024;
// FIXME this is not thread safe yet!!!
/**
@@ -271,7 +289,7 @@
* This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
* {@link #bufferSize}.
*/
- private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
+ private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>();
/** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
private int maximumExpectedSizeOnDisk;
@@ -375,7 +393,6 @@
}
}
- /** {@inheritDoc} */
@Override
public String toString()
{
@@ -387,6 +404,436 @@
}
}
+ /** A cursor implementation aggregating several cursors and ordering them by their key value. */
+ private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
+ {
+ private static final byte UNINITIALIZED = 0;
+ private static final byte READY = 1;
+ private static final byte CLOSED = 2;
+
+ /**
+ * The state of this cursor. One of {@link #UNINITIALIZED}, {@link #READY} or {@link #CLOSED}
+ */
+ private byte state = UNINITIALIZED;
+
+ /**
+ * The cursors are sorted based on the key change of each cursor to consider the next change
+ * across all cursors.
+ */
+ private final NavigableSet<Cursor<K, V>> cursors = new TreeSet<>(new Comparator<Cursor<K, V>>()
+ {
+ @Override
+ public int compare(Cursor<K, V> c1, Cursor<K, V> c2)
+ {
+ final int cmp = c1.getKey().compareTo(c2.getKey());
+ if (cmp == 0)
+ {
+ // Never return 0. Otherwise both cursors are considered equal
+ // and only one of them is kept by this set
+ return System.identityHashCode(c1) - System.identityHashCode(c2);
+ }
+ return cmp;
+ }
+ });
+
+ private CompositeCursor(Collection<Cursor<K, V>> cursors)
+ {
+ Reject.ifNull(cursors);
+
+ for (Iterator<Cursor<K, V>> it = cursors.iterator(); it.hasNext();)
+ {
+ Cursor<K, V> cursor = it.next();
+ if (!cursor.isDefined() && !cursor.next())
+ {
+ it.remove();
+ }
+ }
+
+ this.cursors.addAll(cursors);
+ }
+
+ @Override
+ public boolean next()
+ {
+ if (state == CLOSED)
+ {
+ return false;
+ }
+
+ // If previous state was ready, then we must advance the first cursor
+ // To keep consistent the cursors' order in the SortedSet, it is necessary
+ // to remove the first cursor, then add it again after moving it forward.
+ if (state == UNINITIALIZED)
+ {
+ state = READY;
+ }
+ else if (state == READY)
+ {
+ final Cursor<K, V> cursorToAdvance = cursors.pollFirst();
+ if (cursorToAdvance != null && cursorToAdvance.next())
+ {
+ this.cursors.add(cursorToAdvance);
+ }
+ }
+ return isDefined();
+ }
+
+ @Override
+ public boolean isDefined()
+ {
+ return state == READY && !cursors.isEmpty();
+ }
+
+ @Override
+ public K getKey() throws NoSuchElementException
+ {
+ throwIfNotDefined();
+ return cursors.first().getKey();
+ }
+
+ @Override
+ public V getValue() throws NoSuchElementException
+ {
+ throwIfNotDefined();
+ return cursors.first().getValue();
+ }
+
+ private void throwIfNotDefined()
+ {
+ if (!isDefined())
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ state = CLOSED;
+ Utils.closeSilently(cursors);
+ cursors.clear();
+ }
+
+ @Override
+ public String toString()
+ {
+ if (isDefined())
+ {
+ return cursors.first().toString();
+ }
+ return "not defined";
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionToKey(ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionToKeyOrNext(ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionToLastKey()
+ {
+ throw notImplemented();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean positionToIndex(int index)
+ {
+ throw notImplemented();
+ }
+ }
+
+ /** A cursor implementation reading key/value pairs from memory mapped files, a.k.a {@link MappedByteBuffer}. */
+ private static final class ByteBufferCursor implements Cursor<ByteString, ByteString>
+ {
+ private final ByteBuffer byteBuffer;
+ private final int startPos;
+ private final int endPos;
+ private final ByteStringBuilder keyBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy?
+ private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy?
+ private int currentPos;
+ private boolean isDefined;
+
+ private ByteBufferCursor(ByteBuffer byteBuffer, int startPos, int endPos)
+ {
+ this.byteBuffer = byteBuffer;
+ this.startPos = startPos;
+ this.endPos = endPos;
+ this.currentPos = startPos;
+ }
+
+ @Override
+ public boolean next()
+ {
+ isDefined = false;
+ if (currentPos >= endPos)
+ {
+ return isDefined = false;
+ }
+ read(keyBuffer);
+ read(valueBuffer);
+ return isDefined = true;
+ }
+
+ private void read(ByteStringBuilder buffer)
+ {
+ int length = byteBuffer.getInt(currentPos);
+ currentPos += INT_SIZE;
+ byteBuffer.position(currentPos);
+
+ buffer.clear();
+ buffer.setLength(length);
+ byteBuffer.get(buffer.getBackingArray(), 0, length);
+ currentPos += length;
+ }
+
+ @Override
+ public boolean isDefined()
+ {
+ return isDefined;
+ }
+
+ @Override
+ public ByteString getKey() throws NoSuchElementException
+ {
+ throwIfNotDefined();
+ return keyBuffer.toByteString();
+ }
+
+ @Override
+ public ByteString getValue() throws NoSuchElementException
+ {
+ throwIfNotDefined();
+ return valueBuffer.toByteString();
+ }
+
+ private void throwIfNotDefined()
+ {
+ if (!isDefined())
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public String toString()
+ {
+ if (isDefined())
+ {
+ final ByteString key = getKey();
+ final ByteString value = getValue();
+ return "<key=" + key + "(" + key.toHexString() + "), value=" + value + "(" + value.toHexString() + ")>";
+ }
+ return "not defined";
+ }
+
+ @Override
+ public boolean positionToKey(ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean positionToKeyOrNext(ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean positionToLastKey()
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean positionToIndex(int index)
+ {
+ throw notImplemented();
+ }
+ }
+
+ /** A storage using memory mapped files, a.k.a {@link MappedByteBuffer}. */
+ private static final class MemoryMappedStorage implements Storage
+ {
+ private final File bufferDir;
+
+ private MemoryMappedStorage(File bufferDir)
+ {
+ this.bufferDir = bufferDir;
+ }
+
+ @Override
+ public Importer startImport() throws ConfigException, StorageRuntimeException
+ {
+ return new MemoryMappedBufferImporter(bufferDir);
+ }
+
+ @Override
+ public void open(AccessMode accessMode) throws Exception
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public <T> T read(ReadOperation<T> readOperation) throws Exception
+ {
+ return readOperation.run(new ReadableTransaction()
+ {
+ @Override
+ public Cursor<ByteString, ByteString> openCursor(TreeName treeName)
+ {
+ try
+ {
+ List<Integer> bufferPositions = readBufferPositions(treeName);
+
+ // TODO JNR build ByteSequence implementation reading from memory mapped files?
+ return getCursors(treeName, bufferPositions);
+ }
+ catch (IOException e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ }
+
+ private List<Integer> readBufferPositions(TreeName treeName) throws IOException
+ {
+ // TODO JNR move to Buffer class?
+ File indexFile = new File(bufferDir, treeName + ".index");
+ List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
+ if (indexLines.size() != 1)
+ {
+ throw new IllegalStateException("Not implemented");// TODO JNR
+ }
+
+ final String[] bufferPositions = indexLines.get(0).split(" ");
+ final List<Integer> results = new ArrayList<>(bufferPositions.length);
+ for (String bufferPos : bufferPositions)
+ {
+ results.add(Integer.valueOf(bufferPos));
+ }
+ return results;
+ }
+
+ private Cursor<ByteString, ByteString> getCursors(TreeName treeName, List<Integer> bufferPositions)
+ throws IOException
+ {
+ // TODO JNR move to Buffer class?
+ File bufferFile = new File(bufferDir, treeName.toString());
+ FileChannel fileChannel = new RandomAccessFile(bufferFile, "r").getChannel();
+ long fileSize = Files.size(bufferFile.toPath());
+ final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
+
+ final List<Cursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
+ Iterator<Integer> it = bufferPositions.iterator();
+ if (it.hasNext())
+ {
+ int lastPos = it.next();
+ while (it.hasNext())
+ {
+ final int bufferPos = it.next();
+ cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
+ lastPos = bufferPos;
+ }
+ }
+ return new CompositeCursor<ByteString, ByteString>(cursors);
+ }
+
+ @Override
+ public ByteString read(TreeName treeName, ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public long getRecordCount(TreeName treeName)
+ {
+ throw notImplemented();
+ }
+ });
+ }
+
+ @Override
+ public void write(WriteOperation writeOperation) throws Exception
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public void removeStorageFiles() throws StorageRuntimeException
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public StorageStatus getStorageStatus()
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean supportsBackupAndRestore()
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public void close()
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public void createBackup(BackupConfig backupConfig) throws DirectoryException
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public void removeBackup(BackupDirectory backupDirectory, String backupID) throws DirectoryException
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public Set<TreeName> listTrees()
+ {
+ final Set<TreeName> results = new HashSet<>();
+ for (File baseDN : this.bufferDir.listFiles())
+ {
+ for (File index : baseDN.listFiles())
+ {
+ if (!index.getName().endsWith(".index"))
+ {
+ results.add(new TreeName(baseDN.getName(), index.getName()));
+ }
+ }
+ }
+ return results;
+ }
+ }
+
/** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */
private static final class MemoryMappedBufferImporter implements Importer
{
@@ -416,7 +863,8 @@
Buffer buffer = treeNameToBufferMap.get(treeName);
if (buffer == null)
{
- // TODO JNR that would be great if it was creating sub directories :)
+ // Creates sub directories for each suffix
+ // FIXME JNR cannot directly use DN names as directory + file names
buffer = new Buffer(new File(bufferDir, treeName.toString()));
treeNameToBufferMap.put(treeName, buffer);
}
@@ -424,24 +872,6 @@
}
@Override
- public ByteString read(TreeName treeName, ByteSequence key)
- {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public boolean delete(TreeName treeName, ByteSequence key)
- {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
- public void createTree(TreeName name)
- {
- throw new RuntimeException("Not implemented");
- }
-
- @Override
public void close()
{
for (Buffer buffer : treeNameToBufferMap.values())
@@ -449,6 +879,24 @@
buffer.flush();
}
}
+
+ @Override
+ public ByteString read(TreeName treeName, ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public boolean delete(TreeName treeName, ByteSequence key)
+ {
+ throw notImplemented();
+ }
+
+ @Override
+ public void createTree(TreeName name)
+ {
+ throw notImplemented();
+ }
}
/**
@@ -918,10 +1366,10 @@
}
});
- final Importer tmpImporter = new MemoryMappedBufferImporter(tempDir);
+ final MemoryMappedStorage tmpStorage = new MemoryMappedStorage(tempDir);
final long startTime = System.currentTimeMillis();
- importPhaseOne(backendStorage, tmpImporter);
+ importPhaseOne(backendStorage, tmpStorage);
final long phaseOneFinishTime = System.currentTimeMillis();
if (isCanceled())
@@ -929,14 +1377,17 @@
throw new InterruptedException("Import processing canceled.");
}
+ backendStorage.close();
+
final long phaseTwoTime = System.currentTimeMillis();
- importPhaseTwo();
+ importPhaseTwo(backendStorage, tmpStorage);
if (isCanceled())
{
throw new InterruptedException("Import processing canceled.");
}
final long phaseTwoFinishTime = System.currentTimeMillis();
+ backendStorage.open(AccessMode.READ_WRITE);
backendStorage.write(new WriteOperation()
{
@Override
@@ -1017,31 +1468,34 @@
* </ol>
* TODO JNR fix all javadocs
*/
- private void importPhaseOne(Storage storage, Importer importer) throws Exception
+ private void importPhaseOne(Storage backendStorage, Storage tmpStorage) throws Exception
{
final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
threadCount = 2; // FIXME JNR id2entry + another task
final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(storage);
- final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
- execService.submit(new MigrateExistingEntriesTask(storage, importer, id2EntryPutTask)).get();
-
- final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
- if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
+ try (Importer tmpImporter = tmpStorage.startImport())
{
- for (int i = 0; i < threadCount - 1; i++)
- {
- tasks.add(new ImportTask(importer, id2EntryPutTask));
- }
- }
- execService.invokeAll(tasks);
- tasks.clear();
+ final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(backendStorage);
+ final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
+ execService.submit(new MigrateExistingEntriesTask(backendStorage, tmpImporter, id2EntryPutTask)).get();
- execService.submit(new MigrateExcludedTask(storage, importer, id2EntryPutTask)).get();
- id2EntryPutTask.finishedWrites();
- dn2IdPutFuture.get();
+ final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
+ if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
+ {
+ for (int i = 0; i < threadCount - 1; i++)
+ {
+ tasks.add(new ImportTask(tmpImporter, id2EntryPutTask));
+ }
+ }
+ execService.invokeAll(tasks);
+ tasks.clear();
+
+ execService.submit(new MigrateExcludedTask(backendStorage, tmpImporter, id2EntryPutTask)).get();
+ id2EntryPutTask.finishedWrites();
+ dn2IdPutFuture.get();
+ }
shutdownAll(timerService, execService);
}
@@ -1063,20 +1517,45 @@
}
}
- private void importPhaseTwo() throws Exception
+ private void importPhaseTwo(final Storage outStorage, Storage inStorage) throws Exception
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
- try
+
+ final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()?
+ ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
+ try (Importer importer = outStorage.startImport())
{
- // TODO JNR
+ for (final TreeName treeName : treeNames)
+ {
+ copyTo(treeName, inStorage, importer);// FIXME JNR use dbService
+ }
}
finally
{
- shutdownAll(timerService);
+ shutdownAll(timerService, dbService);
}
}
+ private void copyTo(final TreeName treeName, Storage input, final Importer output) throws Exception
+ {
+ input.read(new ReadOperation<Void>()
+ {
+ @Override
+ public Void run(ReadableTransaction txn) throws Exception
+ {
+ try (Cursor<ByteString, ByteString> cursor = txn.openCursor(treeName))
+ {
+ while (cursor.next())
+ {// FIXME JNR add merge phase
+ output.put(treeName, cursor.getKey(), cursor.getValue());
+ }
+ }
+ return null;
+ }
+ });
+ }
+
/** Task used to migrate excluded branch. */
private final class MigrateExcludedTask extends ImportTask
{
@@ -1088,7 +1567,6 @@
this.storage = storage;
}
- /** {@inheritDoc} */
@Override
public Void call() throws Exception
{
@@ -1266,7 +1744,6 @@
this.id2EntryPutTask = id2EntryPutTask;
}
- /** {@inheritDoc} */
@Override
public Void call() throws Exception
{
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
index 5c097ea..ec2a586 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -25,6 +25,8 @@
*/
package org.opends.server.backends.pluggable;
+import java.util.Set;
+
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
@@ -375,6 +377,17 @@
}
}
+ @Override
+ public Set<TreeName> listTrees()
+ {
+ final Set<TreeName> results = storage.listTrees();
+ if (logger.isTraceEnabled())
+ {
+ logger.trace("Storage@%s.listTrees() = ", storageId(), results);
+ }
+ return results;
+ }
+
private String hex(final ByteSequence bytes)
{
return bytes != null ? bytes.toByteString().toHexString() : null;
@@ -384,6 +397,4 @@
{
return System.identityHashCode(this);
}
-
-
}
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
index ca97c08..d5d73ba 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -26,6 +26,7 @@
package org.opends.server.backends.pluggable.spi;
import java.io.Closeable;
+import java.util.Set;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.types.BackupConfig;
@@ -155,4 +156,11 @@
* If a Directory Server error occurs.
*/
void restoreBackup(RestoreConfig restoreConfig) throws DirectoryException;
+
+ /**
+ * TODO JNR.
+ *
+ * @return TODO JNR
+ */
+ Set<TreeName> listTrees();
}
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java
index d52866c..9e67d51 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/TreeName.java
@@ -53,6 +53,18 @@
}
/**
+ * Builds a new {@link TreeName} object based on the provided string representation.
+ *
+ * @param treeName the string representation of the tree name
+ * @return a new {@link TreeName} object constructed from the provided string
+ */
+ public static TreeName valueOf(String treeName)
+ {
+ final String[] split = treeName.split("/");
+ return new TreeName(split[0], split[1]);
+ }
+
+ /**
* Returns the base DN.
*
* @return a {@code String} representing the base DN
--
Gitblit v1.10.0