From 5a06735032d3c0155548b77c9e627674c400b4ec Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 28 May 2015 10:50:36 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 213 ++++++++++++++++++++++++++++++++++++++++++++++------
1 files changed, 187 insertions(+), 26 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 3f6170c..62ad490 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -46,6 +46,7 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -57,12 +58,14 @@
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Set;
+import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TimerTask;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -122,6 +125,118 @@
return new UnsupportedOperationException("Not implemented");
}
+ /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
+ private static final class ConcurrentHashSet<E> implements Set<E>
+ {
+ private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
+
+ @Override
+ public boolean add(E e)
+ {
+ return delegate.put(e, e) == null;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends E> c)
+ {
+ boolean changed = false;
+ for (E e : c)
+ {
+ changed &= add(e);
+ }
+ return changed;
+ }
+
+ @Override
+ public void clear()
+ {
+ delegate.clear();
+ }
+
+ @Override
+ public boolean contains(Object o)
+ {
+ return delegate.containsKey(o);
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c)
+ {
+ return delegateSet().containsAll(c);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return delegateSet().equals(o);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return delegateSet().hashCode();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return delegate.isEmpty();
+ }
+
+ @Override
+ public Iterator<E> iterator()
+ {
+ return delegateSet().iterator();
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+ return delegateSet().remove(o);
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c)
+ {
+ return delegateSet().removeAll(c);
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c)
+ {
+ return delegateSet().retainAll(c);
+ }
+
+ @Override
+ public int size()
+ {
+ return delegate.size();
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ return delegateSet().toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a)
+ {
+ return delegateSet().toArray(a);
+ }
+
+ @Override
+ public String toString()
+ {
+ return delegateSet().toString();
+ }
+
+ private Set<E> delegateSet()
+ {
+ return delegate.keySet();
+ }
+ }
+
/** Data to put into id2entry tree. */
private static final class Id2EntryData
{
@@ -281,7 +396,7 @@
private final FileChannel fileChannel;
private final List<Integer> bufferPositions = new ArrayList<>();
/** TODO JNR offer configuration for this. */
- private int bufferSize = 1024;
+ private int bufferSize = 10 * MB;
// FIXME this is not thread safe yet!!!
/**
@@ -289,8 +404,16 @@
* <p>
* This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
* {@link #bufferSize}.
+ * <p>
+ * This code uses a {@link ConcurrentHashMap} instead of a {@link ConcurrentSkipListMap} because
+ * during performance testing it was found this code spent a lot of time in
+ * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
+ * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
+ * <p>
+ * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
+ * is not required. How to solve this problem?
*/
- private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>();
+ private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
/** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
private int maximumExpectedSizeOnDisk;
@@ -307,7 +430,7 @@
int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
{
- copyToDisk();
+ flushToMappedByteBuffer();
inMemoryStore.clear();
maximumExpectedSizeOnDisk = 0;
}
@@ -315,7 +438,7 @@
Set<ByteSequence> values = inMemoryStore.get(key);
if (values == null)
{
- values = new ConcurrentSkipListSet<>();
+ values = new ConcurrentHashSet<>();
Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
if (existingValues != null)
{
@@ -326,10 +449,12 @@
maximumExpectedSizeOnDisk += recordSize;
}
- private void copyToDisk() throws IOException
+ private void flushToMappedByteBuffer() throws IOException
{
+ final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
+
MappedByteBuffer byteBuffer = nextBuffer();
- for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
+ for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
{
ByteSequence key = mapEntry.getKey();
// FIXME JNR merge values before put
@@ -352,8 +477,9 @@
private MappedByteBuffer nextBuffer() throws IOException
{
- // FIXME JNR bufferSize is an acceptable over approximation
- return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
+ // FIXME JNR when merging duplicate keys during phase one,
+ // maximumExpectedSizeOnDisk is an acceptable over approximation
+ return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), maximumExpectedSizeOnDisk);
}
private int getLastPosition(List<Integer> l)
@@ -371,14 +497,16 @@
byteBuffer.putInt(b.length());
// Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
// Why does it do that?
+ final int limitBeforeFlip = byteBuffer.limit();
final int posBeforeFlip = byteBuffer.position();
b.copyTo(byteBuffer);
- byteBuffer.limit(bufferSize);
+ byteBuffer.limit(limitBeforeFlip);
byteBuffer.position(posBeforeFlip + b.length());
}
- void flush()
+ void flush() throws IOException
{
+ flushToMappedByteBuffer();
writeBufferIndexFile();
}
@@ -401,7 +529,7 @@
String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
return getClass().getSimpleName()
+ "(treeName=\"" + treeName + "\""
- + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
+ + ", current buffer holds " + inMemoryStore.size() + " record(s)"
+ " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
}
}
@@ -984,9 +1112,16 @@
@Override
public void close()
{
- for (Buffer buffer : treeNameToBufferMap.values())
+ try
{
- buffer.flush();
+ for (Buffer buffer : treeNameToBufferMap.values())
+ {
+ buffer.flush();
+ }
+ }
+ catch (IOException e)
+ {
+ throw new StorageRuntimeException(e);
}
}
@@ -1632,7 +1767,7 @@
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
- final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()?
+ final Set<TreeName> treeNames = inStorage.listTrees();
ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
try (Importer importer = outStorage.startImport())
{
@@ -1674,6 +1809,11 @@
// key conflicts == merge EntryIDSets
return new ImportIDSetsMerger(index);
}
+ else if (treeName.getIndexId().equals(ID2CHILDREN_COUNT_NAME))
+ {
+ // key conflicts == sum values
+ // TODO JNR
+ }
else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
|| treeName.getIndexId().equals(DN2URI_INDEX_NAME)
|| isVLVIndex(entryContainer, treeName))
@@ -1836,17 +1976,11 @@
}
else if (values.size() == 1)
{
+ // Avoids unnecessary decoding + encoding
return values.iterator().next();
}
- ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
- for (ByteString value : values)
- {
- // FIXME JNR Can we make this more efficient?
- // go through long[] + sort in the end?
- idSet.merge(index.decodeValue(ByteString.empty(), value));
- }
- return index.toValue(idSet);
+ return index.toValue(buildEntryIDSet(values));
}
finally
{
@@ -1855,6 +1989,31 @@
values.clear();
}
}
+
+ private EntryIDSet buildEntryIDSet(Set<ByteString> values)
+ {
+ // accumulate in array
+ int i = 0;
+ long[] entryIDs = new long[index.getIndexEntryLimit()];
+ for (ByteString value : values)
+ {
+ final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), value);
+ if (!entryIDSet.isDefined() || i + entryIDSet.size() >= index.getIndexEntryLimit())
+ {
+ // above index entry limit
+ return EntryIDSet.newUndefinedSet();
+ }
+
+ for (EntryID entryID : entryIDSet)
+ {
+ entryIDs[i++] = entryID.longValue();
+ }
+ }
+
+ // due to how the entryIDSets are built, there should not be any duplicate entryIDs
+ Arrays.sort(entryIDs);
+ return EntryIDSet.newDefinedSet(entryIDs);
+ }
}
/** Task used to migrate excluded branch. */
@@ -2093,10 +2252,12 @@
{
processDN2ID(suffix, entry.getName(), entryID);
}
+
processDN2URI(suffix, entry);
processIndexes(suffix, entry, entryID);
processVLVIndexes(suffix, entry, entryID);
id2EntryPutTask.put(suffix, entryID, entry);
+
importCount.getAndIncrement();
}
@@ -2267,9 +2428,9 @@
@Override
public boolean insert(final DN dn, final EntryID entryID)
{
- final AtomicBoolean result = new AtomicBoolean();
try
{
+ final AtomicBoolean result = new AtomicBoolean();
storage.write(new WriteOperation()
{
@Override
@@ -2278,12 +2439,12 @@
result.set(suffix.getDN2ID().insert(txn, dn, entryID));
}
});
+ return result.get();
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
- return result.get();
}
@Override
--
Gitblit v1.10.0