From e3a3030cd14ba12631b8c50d955ec800b247fb72 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 10 Mar 2016 13:24:01 +0000
Subject: [PATCH] OPENDJ-2727: Low performance during import with large index-entry-limit
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 365 ++++++++++++++++++++++++++++-----------------------
1 files changed, 201 insertions(+), 164 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index c03316b..b3c6c31 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -176,8 +176,23 @@
final int indexCount = getIndexCount();
final int nbBuffer = threadCount * indexCount * 2;
- final int bufferSize = computeBufferSize(nbBuffer);
- logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
+ final int bufferSize;
+ if (BufferPool.SUPPORTS_OFF_HEAP && importConfig.getOffHeapSize() > 0)
+ {
+ final long offHeapSize = importConfig.getOffHeapSize();
+ bufferSize = (int) ((offHeapSize * MB) / nbBuffer);
+ if (bufferSize < MIN_BUFFER_SIZE)
+ {
+ // Not enough memory.
+ throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(offHeapSize * MB, nbBuffer * MIN_BUFFER_SIZE));
+ }
+ logger.info(NOTE_IMPORT_LDIF_OFFHEAP_MEM_BUF_INFO, DB_CACHE_SIZE, offHeapSize, nbBuffer, bufferSize / KB);
+ }
+ else
+ {
+ bufferSize = computeBufferSize(nbBuffer);
+ logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, DB_CACHE_SIZE, bufferSize);
+ }
logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION);
logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
@@ -466,11 +481,6 @@
private int computeBufferSize(int nbBuffer) throws InitializationException
{
- if (BufferPool.SUPPORTS_OFF_HEAP)
- {
- return MAX_BUFFER_SIZE;
- }
-
final long availableMemory = calculateAvailableMemory();
logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, nbBuffer);
@@ -809,12 +819,14 @@
}
}
+ /** Default size for the off-heap memory dedicated to the phase one's buffer. */
+ private static final int DEFAULT_OFFHEAP_SIZE = 700;
/** Max size of phase one buffer. */
private static final int MAX_BUFFER_SIZE = 2 * MB;
/** Min size of phase one buffer. */
private static final int MIN_BUFFER_SIZE = 4 * KB;
/** DB cache size to use during import. */
- private static final int DB_CACHE_SIZE = 4 * MB;
+ private static final int DB_CACHE_SIZE = 32 * MB;
/** Required free memory for this importer. */
private static final int REQUIRED_FREE_MEMORY = 50 * MB;
/** LDIF reader. */
@@ -855,7 +867,7 @@
{
try
{
- importStrategy.beforeImport(container);
+ importStrategy.beforePhaseOne(container);
}
finally
{
@@ -878,6 +890,8 @@
throw new InterruptedException("Import processing canceled.");
}
+ importStrategy.afterPhaseOne();
+
// Start phase two
final long phaseTwoStartTime = System.currentTimeMillis();
try (final PhaseTwoProgressReporter progressReporter = new PhaseTwoProgressReporter())
@@ -895,7 +909,7 @@
// Finish import
for(EntryContainer entryContainer : importedContainers.keySet())
{
- importStrategy.afterImport(entryContainer);
+ importStrategy.afterPhaseTwo(entryContainer);
}
phaseTwoTimeMs = System.currentTimeMillis() - phaseTwoStartTime;
}
@@ -951,15 +965,20 @@
abstract void validate(EntryContainer entryContainer, EntryID entryID, Entry entry) throws DirectoryException;
- void beforeImport(EntryContainer entryContainer)
+ void beforePhaseOne(EntryContainer entryContainer)
{
entryContainer.delete(asWriteableTransaction(importer));
visitIndexes(entryContainer, setTrust(false, importer));
}
+ void afterPhaseOne()
+ {
+ closeSilently(bufferPool);
+ }
+
abstract Callable<Void> newPhaseTwoTask(TreeName treeName, Chunk source, PhaseTwoProgressReporter progressReporter);
- void afterImport(EntryContainer entryContainer)
+ void afterPhaseTwo(EntryContainer entryContainer)
{
visitIndexes(entryContainer, setTrust(true, importer));
}
@@ -967,7 +986,8 @@
final Chunk newExternalSortChunk(TreeName treeName) throws Exception
{
return new ExternalSortChunk(tempDir, treeName.toString(), bufferPool,
- newCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
+ newPhaseOneCollector(entryContainers.get(treeName.getBaseDN()), treeName),
+ newPhaseTwoCollector(entryContainers.get(treeName.getBaseDN()), treeName), sorter);
}
final Callable<Void> newChunkCopierTask(TreeName treeName, final Chunk source,
@@ -983,7 +1003,7 @@
final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), source,
- id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
+ id2count, newPhaseTwoCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
}
final Callable<Void> newVLVIndexImporterTask(VLVIndex vlvIndex, final Chunk source,
@@ -1167,14 +1187,14 @@
}
@Override
- void beforeImport(EntryContainer entryContainer)
+ void beforePhaseOne(EntryContainer entryContainer)
{
visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, setTrust(false, importer)));
visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, deleteDatabase(importer)));
}
@Override
- void afterImport(EntryContainer entryContainer)
+ void afterPhaseTwo(EntryContainer entryContainer)
{
visitIndexes(entryContainer, visitOnlyIndexes(indexesToRebuild, setTrust(true, importer)));
}
@@ -1395,12 +1415,12 @@
/** Provides buffer used to store and sort chunk of data. */
private final BufferPool bufferPool;
/** File containing the regions used to store the data. */
- private final File file;
private final FileChannel channel;
/** Pointer to the next available region in the file, typically at end of file. */
private final AtomicLong filePosition = new AtomicLong();
/** Collector used to reduces the number of duplicate keys during sort. */
- private final Collector<?, ByteString> deduplicator;
+ private final Collector<?, ByteString> phaseOneDeduplicator;
+ private final Collector<?, ByteString> phaseTwoDeduplicator;
/** Keep track of pending sorting tasks. */
private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
/** Keep track of currently opened chunks. */
@@ -1419,14 +1439,15 @@
}
};
- ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector,
- Executor sortExecutor) throws IOException
+ ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> phaseOneDeduplicator,
+ Collector<?, ByteString> phaseTwoDeduplicator, Executor sortExecutor) throws IOException
{
this.name = name;
this.bufferPool = bufferPool;
- this.deduplicator = collector;
- this.file = new File(tempDir, name.replaceAll("\\W+", "_") + "_" + UUID.randomUUID().toString());
- this.channel = open(this.file.toPath(), CREATE, TRUNCATE_EXISTING, READ, WRITE, DELETE_ON_CLOSE);
+ this.phaseOneDeduplicator = phaseOneDeduplicator;
+ this.phaseTwoDeduplicator = phaseTwoDeduplicator;
+ final File file = new File(tempDir, name.replaceAll("\\W+", "_") + "_" + UUID.randomUUID().toString());
+ this.channel = open(file.toPath(), CREATE_NEW, SPARSE, READ, WRITE);
this.sorter = new ExecutorCompletionService<>(sortExecutor);
}
@@ -1478,7 +1499,7 @@
}
closeSilently(channel);
}
- }, (Collector<?, ByteString>) deduplicator);
+ }, (Collector<?, ByteString>) phaseTwoDeduplicator);
}
catch (ExecutionException | InterruptedException e)
{
@@ -1520,7 +1541,7 @@
*/
final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
try (final SequentialCursor<ByteString, ByteString> source =
- new CollectorCursor<>(chunk.flip(), deduplicator))
+ new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
{
copyIntoChunk(source, persistentChunk);
}
@@ -1553,8 +1574,6 @@
*/
static final class InMemorySortedChunk implements Chunk, Comparator<Integer>
{
- private static final int INT_SIZE = Integer.SIZE / Byte.SIZE;
-
private final String metricName;
private final BufferPool bufferPool;
private final Buffer buffer;
@@ -1574,13 +1593,11 @@
@Override
public boolean put(ByteSequence key, ByteSequence value)
{
- final int keyHeaderSize = PackedLong.getEncodedSize(key.length());
- final int valueHeaderSize = PackedLong.getEncodedSize(value.length());
- final int keyRecordSize = keyHeaderSize + key.length();
- final int recordSize = keyRecordSize + valueHeaderSize + value.length();
+ final int keyRecordSize = INT_SIZE + key.length();
+ final int recordSize = keyRecordSize + INT_SIZE + value.length();
dataPos -= recordSize;
- final int recordDataPos = dataPos;
+ int recordDataPos = dataPos;
final int recordIndexPos = indexPos;
indexPos += INT_SIZE;
@@ -1596,19 +1613,18 @@
// Write record offset
buffer.writeInt(recordIndexPos, recordDataPos);
- final int valuePos = writeDataAt(recordDataPos, key);
- writeDataAt(valuePos, value);
+
+ buffer.writeInt(recordDataPos, key.length());
+ recordDataPos += INT_SIZE;
+ buffer.writeInt(recordDataPos, value.length());
+ recordDataPos += INT_SIZE;
+ buffer.writeByteSequence(recordDataPos, key);
+ recordDataPos += key.length();
+ buffer.writeByteSequence(recordDataPos, value);
return true;
}
- private int writeDataAt(int offset, ByteSequence data)
- {
- final int headerSize = buffer.writeCompactUnsignedLong(offset, data.length());
- buffer.writeByteSequence(offset + headerSize, data);
- return offset + headerSize + data.length();
- }
-
@Override
public long size()
{
@@ -1660,11 +1676,10 @@
return 0;
}
// Compare Keys
- final int keyLengthA = (int) buffer.readCompactUnsignedLong(iOffsetA);
- final int keyOffsetA = iOffsetA + PackedLong.getEncodedSize(keyLengthA);
-
- final int keyLengthB = (int) buffer.readCompactUnsignedLong(iOffsetB);
- final int keyOffsetB = iOffsetB + PackedLong.getEncodedSize(keyLengthB);
+ final int keyLengthA = buffer.readInt(iOffsetA);
+ final int keyLengthB = buffer.readInt(iOffsetB);
+ final int keyOffsetA = iOffsetA + 2 * INT_SIZE;
+ final int keyOffsetB = iOffsetB + 2 * INT_SIZE;
return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB);
}
@@ -1685,19 +1700,19 @@
key = value = null;
return false;
}
- final int recordOffset = buffer.readInt(indexOffset);
+ int recordOffset = buffer.readInt(indexOffset);
- final int keyLength = (int) buffer.readCompactUnsignedLong(recordOffset);
- final int keyHeaderSize = PackedLong.getEncodedSize(keyLength);
- key = buffer.readByteString(recordOffset + keyHeaderSize, keyLength);
+ final int keyLength = buffer.readInt(recordOffset);
+ recordOffset += 4;
+ final int valueLength = buffer.readInt(recordOffset);
+ recordOffset += 4;
- final int valueOffset = recordOffset + keyHeaderSize + keyLength;
- final int valueLength = (int) buffer.readCompactUnsignedLong(valueOffset);
- final int valueHeaderSize = PackedLong.getEncodedSize(valueLength);
- value = buffer.readByteString(valueOffset + valueHeaderSize, valueLength);
+ key = buffer.readByteString(recordOffset, keyLength);
+ recordOffset += key.length();
+ value = buffer.readByteString(recordOffset, valueLength);
indexOffset += INT_SIZE;
- bytesRead += keyHeaderSize + keyLength + valueHeaderSize + valueLength;
+ bytesRead += (2 * INT_SIZE) + keyLength + valueLength;
return true;
}
@@ -1833,11 +1848,6 @@
public MeteredCursor<ByteString, ByteString> flip()
{
size = mmapBuffer.position();
- /*
- * We force OS to write dirty pages now so that they don't accumulate. Indeed, huge number of dirty pages might
- * cause the OS to freeze the producer of those dirty pages (this importer) while it is swapping-out the pages.
- */
- mmapBuffer.force();
mmapBuffer = null;
return new FileRegionChunkCursor(startOffset, size);
}
@@ -2297,7 +2307,7 @@
{
final Chunk id2CountChunk =
new ExternalSortChunk(tempDir, id2count.getName().toString(), bufferPool, id2countCollector,
- sameThreadExecutor());
+ id2countCollector, sameThreadExecutor());
long totalNumberOfEntries = 0;
final TreeVisitor<ChildrenCount> visitor = new ID2CountTreeVisitorImporter(asImporter(id2CountChunk));
@@ -2471,7 +2481,13 @@
* threads which will access the put() method. If underestimated, {@link #put(ByteSequence, ByteSequence)} might
* lead to unordered copy. If overestimated, extra memory is wasted.
*/
- private static final int QUEUE_SIZE = 1024;
+ private static final int QUEUE_SIZE = 128;
+
+ /**
+ * Maximum queued entry size. Beyond this size, entry will not be queued but written directly to the storage in
+ * order to limit the heap size requirement for import.
+ */
+ private static final int ENTRY_MAX_SIZE = 32 * KB;
private final NavigableMap<ByteSequence, ByteSequence> pendingRecords = new TreeMap<>();
private final int queueSize;
@@ -2486,6 +2502,11 @@
@Override
public synchronized boolean put(ByteSequence key, ByteSequence value)
{
+ if ((key.length() + value.length()) >= ENTRY_MAX_SIZE)
+ {
+ return delegate.put(key, value);
+ }
+
pendingRecords.put(key, value);
if (pendingRecords.size() == queueSize)
{
@@ -2700,12 +2721,8 @@
int readInt(int position);
- long readCompactUnsignedLong(int position);
-
ByteString readByteString(int position, int length);
- int writeCompactUnsignedLong(int position, long value);
-
void writeByteSequence(int position, ByteSequence data);
int length();
@@ -2808,14 +2825,6 @@
private final long address;
private final int size;
private int position;
- private final InputStream asInputStream = new InputStream()
- {
- @Override
- public int read() throws IOException
- {
- return UNSAFE.getByte(address + position++) & 0xFF;
- }
- };
private final OutputStream asOutputStream = new OutputStream()
{
@Override
@@ -2823,6 +2832,18 @@
{
UNSAFE.putByte(address + position++, (byte) (value & 0xFF));
}
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET, null, address + position, b.length);
+ position += b.length;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ UNSAFE.copyMemory(b, BYTE_ARRAY_OFFSET + off, null, address + position, len);
+ position += b.length;
+ }
};
private boolean closed;
@@ -2845,42 +2866,17 @@
}
@Override
- public int writeCompactUnsignedLong(final int position, long value)
+ public void writeByteSequence(final int position, ByteSequence data)
{
- try
- {
- this.position = position;
- return PackedLong.writeCompactUnsigned(asOutputStream, value);
- }
- catch (IOException e)
- {
- throw new StorageRuntimeException(e);
- }
- }
-
- @Override
- public long readCompactUnsignedLong(final int position)
- {
+ Reject.ifFalse(position + data.length() <= size);
this.position = position;
try
{
- return PackedLong.readCompactUnsignedLong(asInputStream);
+ data.copyTo(asOutputStream);
}
- catch (IOException e)
+ catch(IOException e)
{
- throw new IllegalStateException(e);
- }
- }
-
- @Override
- public void writeByteSequence(int position, ByteSequence data)
- {
- Reject.ifFalse(position + data.length() <= size);
-
- long offset = address + position;
- for(int i = 0 ; i < data.length() ; i++)
- {
- UNSAFE.putByte(offset++, data.byteAt(i));
+ throw new StorageRuntimeException(e);
}
}
@@ -2931,23 +2927,6 @@
static final class HeapBuffer implements Buffer
{
private final ByteBuffer buffer;
- private final OutputStream asOutputStream = new OutputStream()
- {
-
- @Override
- public void write(int b) throws IOException
- {
- buffer.put((byte) (b & 0xFF));
- }
- };
- private final InputStream asInputStream = new InputStream()
- {
- @Override
- public int read() throws IOException
- {
- return buffer.get() & 0xFF;
- }
- };
HeapBuffer(int size)
{
@@ -2967,34 +2946,6 @@
}
@Override
- public int writeCompactUnsignedLong(final int position, long value)
- {
- buffer.position(position);
- try
- {
- return PackedLong.writeCompactUnsigned(asOutputStream, value);
- }
- catch (IOException e)
- {
- throw new StorageRuntimeException(e);
- }
- }
-
- @Override
- public long readCompactUnsignedLong(final int position)
- {
- buffer.position(position);
- try
- {
- return PackedLong.readCompactUnsignedLong(asInputStream);
- }
- catch (IOException e)
- {
- throw new IllegalArgumentException(e);
- }
- }
-
- @Override
public void writeByteSequence(int position, ByteSequence data)
{
buffer.position(position);
@@ -3065,7 +3016,8 @@
* Get a new {@link Collector} which can be used to merge encoded values. The types of values to merged is deduced
* from the {@link TreeName}
*/
- private static Collector<?, ByteString> newCollector(final EntryContainer entryContainer, final TreeName treeName)
+ private static Collector<?, ByteString> newPhaseTwoCollector(final EntryContainer entryContainer,
+ final TreeName treeName)
{
final DefaultIndex index = getIndex(entryContainer, treeName);
if (index != null)
@@ -3086,6 +3038,18 @@
throw new IllegalArgumentException("Unknown tree: " + treeName);
}
+ private static Collector<?, ByteString> newPhaseOneCollector(final EntryContainer entryContainer,
+ final TreeName treeName)
+ {
+ final DefaultIndex index = getIndex(entryContainer, treeName);
+ if (index != null)
+ {
+ // key conflicts == merge EntryIDSets
+ return new EntryIDsCollector(index);
+ }
+ return newPhaseTwoCollector(entryContainer, treeName);
+ }
+
private static boolean isDN2ID(TreeName treeName)
{
return SuffixContainer.DN2ID_INDEX_NAME.equals(treeName.getIndexId());
@@ -3212,6 +3176,80 @@
}
/**
+ * {@link Collector} that accepts encoded {@link EntryIDSet} objects and
+ * produces a {@link ByteString} representing the merged {@link EntryIDSet}.
+ */
+ static final class EntryIDsCollector implements Collector<LongArray, ByteString>
+ {
+ private final DefaultIndex index;
+ private final int indexLimit;
+
+ EntryIDsCollector(DefaultIndex index)
+ {
+ this.index = index;
+ this.indexLimit = index.getIndexEntryLimit();
+ }
+
+ @Override
+ public LongArray get()
+ {
+ return new LongArray();
+ }
+
+ @Override
+ public LongArray accept(LongArray resultContainer, ByteString value)
+ {
+ if (resultContainer.size() < indexLimit)
+ {
+ resultContainer.add(value.toLong());
+ }
+ /*
+ * else EntryIDSet is above index entry limits, discard additional values
+ * to avoid blowing up memory now, then discard all entries in merge()
+ */
+ return resultContainer;
+ }
+
+ @Override
+ public ByteString merge(LongArray resultContainer)
+ {
+ if (resultContainer.size() >= indexLimit)
+ {
+ return index.toValue(EntryIDSet.newUndefinedSet());
+ }
+ return index.toValue(EntryIDSet.newDefinedSet(resultContainer.get()));
+ }
+ }
+
+ /** Simple long array primitive wrapper. */
+ private static final class LongArray
+ {
+ private long[] values = new long[16];
+ private int size;
+
+ void add(long value)
+ {
+ if (size == values.length)
+ {
+ values = Arrays.copyOf(values, values.length * 2);
+ }
+ values[size++] = value;
+ }
+
+ int size()
+ {
+ return size;
+ }
+
+ long[] get()
+ {
+ values = Arrays.copyOf(values, size);
+ Arrays.sort(values);
+ return values;
+ }
+ }
+
+ /**
* {@link Collector} that accepts encoded {@link EntryIDSet} objects and produces a {@link ByteString} representing
* the merged {@link EntryIDSet}.
*/
@@ -3264,27 +3302,26 @@
private EntryIDSet buildEntryIDSet(Collection<ByteString> encodedIDSets)
{
- final long[] entryIDs = new long[indexLimit];
-
- // accumulate in array
- int i = 0;
- for (ByteString encodedIDSet : encodedIDSets)
- {
+ final List<EntryIDSet> idSets = new ArrayList<>(encodedIDSets.size());
+ int mergedSize = 0;
+ for(ByteString encodedIDSet :encodedIDSets) {
final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), encodedIDSet);
- if (!entryIDSet.isDefined() || i + entryIDSet.size() >= indexLimit)
+ mergedSize += entryIDSet.size();
+ if (!entryIDSet.isDefined() || mergedSize >= indexLimit)
{
// above index entry limit
return EntryIDSet.newUndefinedSet();
}
-
- for (EntryID entryID : entryIDSet)
- {
- entryIDs[i++] = entryID.longValue();
- }
+ idSets.add(entryIDSet);
}
- Arrays.sort(entryIDs, 0, i);
- return EntryIDSet.newDefinedSet(Arrays.copyOf(entryIDs, i));
+ final long[] entryIDs = new long[mergedSize];
+ int offset = 0;
+ for(EntryIDSet idSet : idSets) {
+ offset += idSet.copyTo(entryIDs, offset);
+ }
+ Arrays.sort(entryIDs);
+ return EntryIDSet.newDefinedSet(entryIDs);
}
}
--
Gitblit v1.10.0