From 82840bfe0d65a0715357a001e3224ba0d6a9c8df Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 03 Jun 2015 12:26:19 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 457 ++++++++++++++++++++++++++++++--------------------------
1 files changed, 244 insertions(+), 213 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 13200de..85df3ff 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -64,8 +64,6 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -126,118 +124,6 @@
return new UnsupportedOperationException("Not implemented");
}
- /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
- private static final class ConcurrentHashSet<E> implements Set<E>
- {
- private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
-
- @Override
- public boolean add(E e)
- {
- return delegate.put(e, e) == null;
- }
-
- @Override
- public boolean addAll(Collection<? extends E> c)
- {
- boolean changed = false;
- for (E e : c)
- {
- changed &= add(e);
- }
- return changed;
- }
-
- @Override
- public void clear()
- {
- delegate.clear();
- }
-
- @Override
- public boolean contains(Object o)
- {
- return delegate.containsKey(o);
- }
-
- @Override
- public boolean containsAll(Collection<?> c)
- {
- return delegateSet().containsAll(c);
- }
-
- @Override
- public boolean equals(Object o)
- {
- return delegateSet().equals(o);
- }
-
- @Override
- public int hashCode()
- {
- return delegateSet().hashCode();
- }
-
- @Override
- public boolean isEmpty()
- {
- return delegate.isEmpty();
- }
-
- @Override
- public Iterator<E> iterator()
- {
- return delegateSet().iterator();
- }
-
- @Override
- public boolean remove(Object o)
- {
- return delegateSet().remove(o);
- }
-
- @Override
- public boolean removeAll(Collection<?> c)
- {
- return delegateSet().removeAll(c);
- }
-
- @Override
- public boolean retainAll(Collection<?> c)
- {
- return delegateSet().retainAll(c);
- }
-
- @Override
- public int size()
- {
- return delegate.size();
- }
-
- @Override
- public Object[] toArray()
- {
- return delegateSet().toArray();
- }
-
- @Override
- public <T> T[] toArray(T[] a)
- {
- return delegateSet().toArray(a);
- }
-
- @Override
- public String toString()
- {
- return delegateSet().toString();
- }
-
- private Set<E> delegateSet()
- {
- return delegate.keySet();
- }
- }
-
/** Data to put into id2entry tree. */
private static final class Id2EntryData
{
@@ -393,37 +279,49 @@
*/
private static final class Buffer
{
- private final File file;
+ private final TreeName treeName;
+ private final File indexFile;
+ private final File bufferFile;
private final FileChannel fileChannel;
private final List<Integer> bufferPositions = new ArrayList<>();
/** TODO JNR offer configuration for this. */
- private int bufferSize = 10 * MB;
+ private final int bufferSize = 10 * MB;
- // FIXME this is not thread safe yet!!!
/**
* Maps {@link ByteSequence} keys to (conflicting) values.
* <p>
* This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
* {@link #bufferSize}.
- * <p>
- * This code uses a {@link ConcurrentHashMap} instead of a {@code ConcurrentSkipListMap} because
- * during performance testing it was found this code spent a lot of time in
- * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
- * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
- * <p>
- * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
- * is not required. How to solve this problem?
*/
- private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
+ private Map<ByteSequence, List<ByteSequence>> inMemoryStore = new HashMap<>();
/** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
private int maximumExpectedSizeOnDisk;
+ private long totalBytes;
- private Buffer(File file) throws FileNotFoundException
+ public Buffer(TreeName treeName, File bufferDir, MapMode mapMode) throws IOException
{
- file.getParentFile().mkdirs();
- this.file = file;
- this.fileChannel = new RandomAccessFile(file, "rw").getChannel();
- this.bufferPositions.add(0);
+ this.treeName = treeName;
+ bufferFile = new File(bufferDir, treeName.toString());
+ bufferFile.getParentFile().mkdirs();
+ indexFile = new File(bufferDir, treeName + ".index");
+ this.fileChannel = new RandomAccessFile(bufferFile, getMode(mapMode)).getChannel();
+ if (MapMode.READ_WRITE.equals(mapMode))
+ {
+ this.bufferPositions.add(0);
+ }
+ }
+
+ private String getMode(MapMode mapMode)
+ {
+ if (MapMode.READ_ONLY.equals(mapMode))
+ {
+ return "r";
+ }
+ else if (MapMode.READ_WRITE.equals(mapMode))
+ {
+ return "rw";
+ }
+ throw new IllegalArgumentException("Unhandled map mode: " + mapMode);
}
void putKeyValue(ByteSequence key, ByteSequence value) throws IOException
@@ -436,15 +334,11 @@
maximumExpectedSizeOnDisk = 0;
}
- Set<ByteSequence> values = inMemoryStore.get(key);
+ List<ByteSequence> values = inMemoryStore.get(key);
if (values == null)
{
- values = new ConcurrentHashSet<>();
- Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
- if (existingValues != null)
- {
- values = existingValues;
- }
+ values = new ArrayList<>();
+ inMemoryStore.put(key, values);
}
values.add(value);
maximumExpectedSizeOnDisk += recordSize;
@@ -452,13 +346,15 @@
private void flushToMappedByteBuffer() throws IOException
{
- final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
+ final SortedMap<ByteSequence, List<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
MappedByteBuffer byteBuffer = nextBuffer();
- for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
+ for (Map.Entry<ByteSequence, List<ByteSequence>> mapEntry : sortedStore.entrySet())
{
ByteSequence key = mapEntry.getKey();
// FIXME JNR merge values before put
+ // Edit: Merging during phase one is slower than not merging at all,
+ // perhaps due to merging importIDSets for keys that will exceed index entry limits anyway?
for (ByteSequence value : mapEntry.getValue())
{
put(byteBuffer, key);
@@ -513,8 +409,7 @@
private void writeBufferIndexFile()
{
- final File bufferIndexFile = new File(file.getParent(), file.getName() + ".index");
- try (PrintWriter writer = new PrintWriter(bufferIndexFile))
+ try (PrintWriter writer = new PrintWriter(indexFile))
{
writer.print(Utils.joinAsString(" ", this.bufferPositions));
}
@@ -524,10 +419,47 @@
}
}
+ private Cursor<ByteString, ByteString> openCursor() throws IOException
+ {
+ readBufferPositions();
+
+ totalBytes = Files.size(bufferFile.toPath());
+ final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, totalBytes);
+
+ final List<ByteBufferCursor> cursors = new ArrayList<>(bufferPositions.size() - 1);
+ Iterator<Integer> it = bufferPositions.iterator();
+ if (it.hasNext())
+ {
+ int lastPos = it.next();
+ while (it.hasNext())
+ {
+ final int bufferPos = it.next();
+ cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
+ lastPos = bufferPos;
+ }
+ }
+ Cursor<ByteString, ByteString> composite = new CompositeCursor<ByteString, ByteString>(cursors);
+ return new ProgressCursor<ByteString, ByteString>(composite, this, cursors);
+ }
+
+ private void readBufferPositions() throws IOException
+ {
+ List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
+ if (indexLines.size() != 1)
+ {
+ throw new IllegalStateException("Not implemented");// TODO JNR
+ }
+
+ final String[] bufferPositionsString = indexLines.get(0).split(" ");
+ for (String bufferPos : bufferPositionsString)
+ {
+ bufferPositions.add(Integer.valueOf(bufferPos));
+ }
+ }
+
@Override
public String toString()
{
- String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
return getClass().getSimpleName()
+ "(treeName=\"" + treeName + "\""
+ ", current buffer holds " + inMemoryStore.size() + " record(s)"
@@ -623,6 +555,95 @@
}
}
+ /** A cursor implementation keeping stats about reading progress. */
+ private static final class ProgressCursor<K, V> implements Cursor<K, V>
+ {
+ private final Cursor<K, V> delegate;
+ private final List<ByteBufferCursor> cursors;
+ private final Buffer buffer;
+
+ public ProgressCursor(Cursor<K, V> delegateCursor, Buffer buffer, List<ByteBufferCursor> cursors)
+ {
+ this.delegate = delegateCursor;
+ this.buffer = buffer;
+ this.cursors = new ArrayList<>(cursors);
+ }
+
+ public String getBufferFileName()
+ {
+ return buffer.treeName.toString();
+ }
+
+ private long getTotalBytes()
+ {
+ return buffer.totalBytes;
+ }
+
+ private int getBytesRead()
+ {
+ int count = 0;
+ for (ByteBufferCursor cursor : cursors)
+ {
+ count += cursor.getBytesRead();
+ }
+ return count;
+ }
+
+ @Override
+ public boolean next()
+ {
+ return delegate.next();
+ }
+
+ @Override
+ public boolean isDefined()
+ {
+ return delegate.isDefined();
+ }
+
+ @Override
+ public K getKey() throws NoSuchElementException
+ {
+ return delegate.getKey();
+ }
+
+ @Override
+ public V getValue() throws NoSuchElementException
+ {
+ return delegate.getValue();
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ }
+
+ @Override
+ public boolean positionToKey(ByteSequence key)
+ {
+ return delegate.positionToKey(key);
+ }
+
+ @Override
+ public boolean positionToKeyOrNext(ByteSequence key)
+ {
+ return delegate.positionToKeyOrNext(key);
+ }
+
+ @Override
+ public boolean positionToLastKey()
+ {
+ return delegate.positionToLastKey();
+ }
+
+ @Override
+ public boolean positionToIndex(int index)
+ {
+ return delegate.positionToIndex(index);
+ }
+ }
+
/** A cursor implementation aggregating several cursors and ordering them by their key value. */
private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
{
@@ -655,11 +676,12 @@
}
});
- private CompositeCursor(Collection<SequentialCursor<K, V>> cursors)
+ private CompositeCursor(Collection<? extends SequentialCursor<K, V>> cursors)
{
Reject.ifNull(cursors);
- for (Iterator<SequentialCursor<K, V>> it = cursors.iterator(); it.hasNext();)
+ final List<SequentialCursor<K, V>> tmpCursors = new ArrayList<>(cursors);
+ for (Iterator<SequentialCursor<K, V>> it = tmpCursors.iterator(); it.hasNext();)
{
SequentialCursor<K, V> cursor = it.next();
if (!cursor.isDefined() && !cursor.next())
@@ -668,7 +690,7 @@
}
}
- this.cursors.addAll(cursors);
+ this.cursors.addAll(tmpCursors);
}
@Override
@@ -774,6 +796,7 @@
private final ByteBuffer byteBuffer;
private final int startPos;
private final int endPos;
+ // TODO JNR build ByteSequence implementation reading from memory mapped files?
private final ByteStringBuilder keyBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy?
private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR bad: do zero copy?
private int currentPos;
@@ -843,7 +866,12 @@
@Override
public void close()
{
- throw notImplemented();
+ // nothing to do
+ }
+
+ public int getBytesRead()
+ {
+ return currentPos - startPos;
}
@Override
@@ -891,10 +919,8 @@
{
try
{
- List<Integer> bufferPositions = readBufferPositions(treeName);
-
- // TODO JNR build ByteSequence implementation reading from memory mapped files?
- return getCursors(treeName, bufferPositions);
+ Buffer buffer = new Buffer(treeName, bufferDir, MapMode.READ_ONLY);
+ return buffer.openCursor();
}
catch (IOException e)
{
@@ -902,49 +928,6 @@
}
}
- private List<Integer> readBufferPositions(TreeName treeName) throws IOException
- {
- // TODO JNR move to Buffer class?
- File indexFile = new File(bufferDir, treeName + ".index");
- List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
- if (indexLines.size() != 1)
- {
- throw new IllegalStateException("Not implemented");// TODO JNR
- }
-
- final String[] bufferPositions = indexLines.get(0).split(" ");
- final List<Integer> results = new ArrayList<>(bufferPositions.length);
- for (String bufferPos : bufferPositions)
- {
- results.add(Integer.valueOf(bufferPos));
- }
- return results;
- }
-
- private Cursor<ByteString, ByteString> getCursors(TreeName treeName, List<Integer> bufferPositions)
- throws IOException
- {
- // TODO JNR move to Buffer class?
- File bufferFile = new File(bufferDir, treeName.toString());
- FileChannel fileChannel = new RandomAccessFile(bufferFile, "r").getChannel();
- long fileSize = Files.size(bufferFile.toPath());
- final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
-
- final List<SequentialCursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
- Iterator<Integer> it = bufferPositions.iterator();
- if (it.hasNext())
- {
- int lastPos = it.next();
- while (it.hasNext())
- {
- final int bufferPos = it.next();
- cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
- lastPos = bufferPos;
- }
- }
- return new CompositeCursor<ByteString, ByteString>(cursors);
- }
-
@Override
public ByteString read(TreeName treeName, ByteSequence key)
{
@@ -1056,7 +1039,7 @@
{
// Creates sub directories for each suffix
// FIXME JNR cannot directly use DN names as directory + file names
- buffer = new Buffer(new File(bufferDir, treeName.toString()));
+ buffer = new Buffer(treeName, bufferDir, MapMode.READ_WRITE);
treeNameToBufferMap.put(treeName, buffer);
}
return buffer;
@@ -1668,8 +1651,9 @@
*/
private void importPhaseOne(Storage backendStorage, Storage tmpStorage) throws Exception
{
+ final FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+ scheduleAtFixedRate(timerService, progressTask);
threadCount = 2; // FIXME JNR id2entry + another task
final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
@@ -1694,8 +1678,11 @@
id2EntryPutTask.finishedWrites();
dn2IdPutFuture.get();
}
-
- shutdownAll(timerService, execService);
+ finally
+ {
+ shutdownAll(timerService, execService);
+ progressTask.run();
+ }
}
private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
@@ -1715,36 +1702,41 @@
}
}
- private void importPhaseTwo(final Storage outStorage, Storage inStorage) throws Exception
+ private void importPhaseTwo(final Storage outputStorage, Storage inputStorage) throws Exception
{
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
+ final SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask();
+ scheduleAtFixedRate(timerService, progressTask);
- final Set<TreeName> treeNames = inStorage.listTrees();
+ final Set<TreeName> treeNames = inputStorage.listTrees();
ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
- try (Importer importer = outStorage.startImport())
+ try (Importer outputImporter = outputStorage.startImport())
{
for (final TreeName treeName : treeNames)
{
- copyTo(treeName, inStorage, importer);// FIXME JNR use dbService
+ copyTo(treeName, inputStorage, outputImporter, progressTask);// FIXME JNR use dbService
}
}
finally
{
shutdownAll(timerService, dbService);
+ progressTask.run();
}
}
- private void copyTo(final TreeName treeName, Storage input, final Importer output) throws Exception
+ private void copyTo(final TreeName treeName, Storage input, final Importer output,
+ final SecondPhaseProgressTask progressTask) throws Exception
{
input.read(new ReadOperation<Void>()
{
@Override
public Void run(ReadableTransaction txn) throws Exception
{
- try (SequentialCursor<ByteString, ByteString> cursor =
- new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
+ try (Cursor<ByteString, ByteString> cursor0 = txn.openCursor(treeName);
+ SequentialCursor<ByteString, ByteString> cursor =
+ new MergingCursor<ByteString, ByteString>(cursor0, getMerger(treeName)))
{
+ progressTask.addCursor(cursor0);
while (cursor.next())
{
output.put(treeName, cursor.getKey(), cursor.getValue());
@@ -1966,6 +1958,8 @@
}
}
+ // trim the array to the actual size
+ entryIDs = Arrays.copyOf(entryIDs, i);
// due to how the entryIDSets are built, there should not be any duplicate entryIDs
Arrays.sort(entryIDs);
return EntryIDSet.newDefinedSet(entryIDs);
@@ -2360,15 +2354,26 @@
/** This class reports progress of the second phase of import processing at fixed intervals. */
private class SecondPhaseProgressTask extends TimerTask
{
+ private final Map<ProgressCursor<?, ?>, Integer> cursors = new LinkedHashMap<>();
/** The time in milliseconds of the previous progress report. */
private long previousTime;
/** Create a new import progress task. */
- public SecondPhaseProgressTask()
+ private SecondPhaseProgressTask()
{
previousTime = System.currentTimeMillis();
}
+ private void addCursor(Cursor<ByteString, ByteString> cursor)
+ {
+ if (cursor instanceof ProgressCursor)
+ {
+ final ProgressCursor<?, ?> c = (ProgressCursor<?, ?>) cursor;
+ cursors.put(c, 0);
+ logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, c.getBufferFileName(), 1, 1);
+ }
+ }
+
/** The action to be performed by this timer task. */
@Override
public void run()
@@ -2382,15 +2387,41 @@
previousTime = latestTime;
- // DN index managers first.
- printStats(deltaTime, true);
- // non-DN index managers second
- printStats(deltaTime, false);
+ for (Iterator<Map.Entry<ProgressCursor<?, ?>, Integer>> it = cursors.entrySet().iterator(); it.hasNext();)
+ {
+ final Map.Entry<ProgressCursor<?, ?>, Integer> mapEntry = it.next();
+ ProgressCursor<?, ?> cursor = mapEntry.getKey();
+ int lastBytesRead = mapEntry.getValue();
+ printStats(deltaTime, cursor, lastBytesRead);
+
+ if (!cursor.isDefined())
+ {
+ logger.info(NOTE_IMPORT_LDIF_INDEX_CLOSE, cursor.getBufferFileName());
+ it.remove();
+ }
+ }
}
- private void printStats(long deltaTime, boolean dn2id)
+ private void printStats(long deltaTime, final ProgressCursor<?, ?> cursor, int lastBytesRead)
{
- // TODO JNR
+ final long bufferFileSize = cursor.getTotalBytes();
+ final int tmpBytesRead = cursor.getBytesRead();
+ if (lastBytesRead == tmpBytesRead)
+ {
+ return;
+ }
+
+ final long bytesReadInterval = tmpBytesRead - lastBytesRead;
+ final int bytesReadPercent = Math.round((100f * tmpBytesRead) / bufferFileSize);
+
+ // Kilo and milli approximately cancel out.
+ final long kiloBytesRate = bytesReadInterval / deltaTime;
+ final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
+
+ logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
+ kiloBytesRate, 1, 1);
+
+ lastBytesRead = tmpBytesRead;
}
}
--
Gitblit v1.10.0