From 571c3b4cb6142f4ef5eb41c3b8cf50dd9d6c9c81 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Tue, 12 Apr 2016 14:43:12 +0000
Subject: [PATCH] OPENDJ-2631: OOME error while importing 100M entries.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 211 +++++++++++++++++++++++++---------------------------
1 files changed, 102 insertions(+), 109 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 40023b0..2d08f3d 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -16,12 +16,11 @@
package org.opends.server.backends.pluggable;
import static java.nio.channels.FileChannel.*;
-
+import static java.nio.file.StandardOpenOption.*;
import static org.forgerock.util.Utils.*;
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
-import static java.nio.file.StandardOpenOption.*;
import java.io.Closeable;
import java.io.File;
@@ -80,6 +79,7 @@
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.DN;
import org.forgerock.opendj.ldap.ResultCode;
import org.forgerock.opendj.ldap.spi.Indexer;
import org.forgerock.util.Reject;
@@ -107,7 +107,6 @@
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
import org.opends.server.core.ServerContext;
-import org.forgerock.opendj.ldap.DN;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
@@ -817,8 +816,6 @@
}
}
- /** Default size for the off-heap memory dedicated to the phase one's buffer. */
- private static final int DEFAULT_OFFHEAP_SIZE = 700;
/** Max size of phase one buffer. */
private static final int MAX_BUFFER_SIZE = 2 * MB;
/** Min size of phase one buffer. */
@@ -1401,7 +1398,7 @@
* Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an
* unlimited amount of data. This class uses double-buffering: data are firstly stored in a
* {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a
- * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}.
+ * {@link FileRegion}. Duplicate keys are reduced by a {@link Collector}.
* {@link #put(ByteSequence, ByteSequence))} is thread-safe.
* This class is used in phase-one. There is one {@link ExternalSortChunk} per
* database tree, shared across all phase-one importer threads, in charge of storing/sorting records.
@@ -1420,7 +1417,7 @@
private final Collector<?, ByteString> phaseOneDeduplicator;
private final Collector<?, ByteString> phaseTwoDeduplicator;
/** Keep track of pending sorting tasks. */
- private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter;
+ private final CompletionService<Region> sorter;
/** Keep track of currently opened chunks. */
private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>());
/** Keep track of the number of chunks created. */
@@ -1473,36 +1470,57 @@
{
sortAndAppendChunkAsync(chunk);
}
+
+ final List<MeteredCursor<ByteString, ByteString>> cursors = new ArrayList<>();
try
{
- return new CollectorCursor<>(
- new CompositeCursor<ByteString, ByteString>(name,
- waitTasksTermination(sorter, nbSortedChunks.get()))
- {
- public void close()
- {
- super.close();
- if (OperatingSystem.isWindows())
- {
- // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
- // To prevent these not deleted files to waste space, we empty it.
- try
- {
- channel.truncate(0);
- }
- catch (IOException e)
- {
- // This is best effort, it's safe to ignore the exception here.
- }
- }
- closeSilently(channel);
- }
- }, (Collector<?, ByteString>) phaseTwoDeduplicator);
+ final List<Region> regions = waitTasksTermination(sorter, nbSortedChunks.get());
+ Collections.sort(regions); // Sort regions by their starting offsets.
+ long mmapPosition = 0;
+ // Create as big as possible memory are (handling 2Gb limit) and create as many cursors as regions from
+ // these area.
+ MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get(), Integer.MAX_VALUE));
+ for (Region region : regions)
+ {
+ if ((region.offset + region.size) > (mmapPosition + Integer.MAX_VALUE))
+ {
+ // Handle the 2Gb ByteBuffer limit
+ mmapPosition = region.offset;
+ mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get() - mmapPosition, Integer.MAX_VALUE));
+ }
+ final ByteBuffer regionBuffer = mmap.duplicate();
+ final int relativeRegionOffset = (int) (region.offset - mmapPosition);
+ regionBuffer.position(relativeRegionOffset).limit(regionBuffer.position() + region.size);
+ cursors.add(new FileRegion.Cursor(name, regionBuffer.slice()));
+ }
}
- catch (ExecutionException | InterruptedException e)
+ catch (ExecutionException | InterruptedException | IOException e)
{
throw new StorageRuntimeException(e);
}
+
+ return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors)
+ {
+ @Override
+ public void close()
+ {
+ super.close();
+ if (OperatingSystem.isWindows())
+ {
+ // Windows might not be able to delete the file (see http://bugs.java.com/view_bug.do?bug_id=4715154)
+ // To prevent these not deleted files to waste space, we empty it.
+ try
+ {
+ channel.truncate(0);
+ }
+ catch (IOException e)
+ {
+ // This is best effort, it's safe to ignore the exception here.
+ }
+ }
+ closeSilently(channel);
+ }
+ }, (Collector<?, ByteString>) phaseTwoDeduplicator);
}
@Override
@@ -1528,26 +1546,46 @@
final long startOffset = filePosition.getAndAdd(chunk.size());
nbSortedChunks.incrementAndGet();
- sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>()
+ sorter.submit(new Callable<Region>()
{
@Override
- public MeteredCursor<ByteString, ByteString> call() throws Exception
+ public Region call() throws Exception
{
/*
- * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key
- * de-duplication performed by the CollectorCursor.
+ * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication
+ * performed by the CollectorCursor.
*/
- final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size());
+ final FileRegion region = new FileRegion(channel, startOffset, chunk.size());
+ final int regionSize;
try (final SequentialCursor<ByteString, ByteString> source =
new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator))
{
- copyIntoChunk(source, persistentChunk);
+ regionSize = region.write(source);
}
- return persistentChunk.flip();
+ return new Region(startOffset, regionSize);
}
});
}
+ /** Define a region inside a file. */
+ private static final class Region implements Comparable<Region>
+ {
+ private final long offset;
+ private final int size;
+
+ Region(long offset, int size)
+ {
+ this.offset = offset;
+ this.size = size;
+ }
+
+ @Override
+ public int compareTo(Region o)
+ {
+ return Long.compare(offset, o.offset);
+ }
+ }
+
/**
* Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that
* they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation,
@@ -1780,13 +1818,9 @@
* +------------+--------------+--------------+----------------+
* </pre>
*/
- static final class FileRegionChunk implements Chunk
+ static final class FileRegion
{
- private final String metricName;
- private final FileChannel channel;
- private final long startOffset;
- private long size;
- private MappedByteBuffer mmapBuffer;
+ private final MappedByteBuffer mmapBuffer;
private final OutputStream mmapBufferOS = new OutputStream()
{
@Override
@@ -1796,65 +1830,33 @@
}
};
- FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException
+ FileRegion(FileChannel channel, long startOffset, long size) throws IOException
{
- this.metricName = name;
- this.channel = channel;
- this.startOffset = startOffset;
if (size > 0)
{
// Make sure that the file is big-enough to encapsulate this memory-mapped region.
channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1);
}
- this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
+ mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size);
}
- @Override
- public boolean put(ByteSequence key, ByteSequence value)
+ public int write(SequentialCursor<ByteString, ByteString> source) throws IOException
{
- final int recordSize =
- PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value
- .length();
- if (mmapBuffer.remaining() < recordSize)
+ while (source.next())
{
- // The regions is full
- return false;
- }
-
- try
- {
+ final ByteSequence key = source.getKey();
+ final ByteSequence value = source.getValue();
PackedLong.writeCompactUnsigned(mmapBufferOS, key.length());
PackedLong.writeCompactUnsigned(mmapBufferOS, value.length());
+ key.copyTo(mmapBuffer);
+ value.copyTo(mmapBuffer);
}
- catch (IOException e)
- {
- throw new StorageRuntimeException(e);
- }
- key.copyTo(mmapBuffer);
- value.copyTo(mmapBuffer);
-
- return true;
- }
-
- @Override
- public long size()
- {
- return mmapBuffer == null ? size : mmapBuffer.position();
- }
-
- @Override
- public MeteredCursor<ByteString, ByteString> flip()
- {
- size = mmapBuffer.position();
- mmapBuffer = null;
- return new FileRegionChunkCursor(startOffset, size);
+ return mmapBuffer.position();
}
/** Cursor through the specific memory-mapped file's region. */
- private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
+ static final class Cursor implements MeteredCursor<ByteString, ByteString>
{
- private final long regionOffset;
- private final long regionSize;
private final InputStream asInputStream = new InputStream()
{
@Override
@@ -1863,19 +1865,19 @@
return region.get() & 0xFF;
}
};
+ private final String metricName;
private ByteBuffer region;
private ByteString key, value;
- FileRegionChunkCursor(long regionOffset, long regionSize)
+ Cursor(String metricName, ByteBuffer region)
{
- this.regionOffset = regionOffset;
- this.regionSize = regionSize;
+ this.metricName = metricName;
+ this.region = region;
}
@Override
public boolean next()
{
- ensureRegionIsMemoryMapped();
if (!region.hasRemaining())
{
key = value = null;
@@ -1904,22 +1906,6 @@
return true;
}
- private void ensureRegionIsMemoryMapped()
- {
- if (region == null)
- {
- // Because mmap() regions are a counted and limited resources, we create them lazily.
- try
- {
- region = channel.map(MapMode.READ_ONLY, regionOffset, regionSize);
- }
- catch (IOException e)
- {
- throw new IllegalStateException(e);
- }
- }
- }
-
@Override
public boolean isDefined()
{
@@ -1962,13 +1948,13 @@
@Override
public long getNbBytesRead()
{
- return region == null ? 0 : region.position();
+ return region.position();
}
@Override
public long getNbBytesTotal()
{
- return regionSize;
+ return region.limit();
}
}
}
@@ -2263,7 +2249,10 @@
long nbRecords = 0;
while (source.next())
{
- destination.put(source.getKey(), source.getValue());
+ if (!destination.put(source.getKey(), source.getValue()))
+ {
+ throw new IllegalStateException("Destination chunk is full");
+ }
nbRecords++;
}
return nbRecords;
@@ -2815,6 +2804,7 @@
}
/** Off-heap buffer using Unsafe memory access. */
+ @SuppressWarnings("restriction")
static final class OffHeapBuffer implements Buffer
{
private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT;
@@ -3140,6 +3130,7 @@
{
private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>();
+ @SuppressWarnings("unchecked")
static <V> Collector<V, V> getInstance()
{
return (Collector<V, V>) INSTANCE;
@@ -3685,6 +3676,8 @@
// +1 because newly added entry is added before the least recently one is removed.
this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true)
{
+ private static final long serialVersionUID = 1L;
+
@Override
protected boolean removeEldestEntry(Map.Entry<T, Object> eldest)
{
--
Gitblit v1.10.0