| | |
| | | package org.opends.server.backends.pluggable; |
| | | |
| | | import static java.nio.channels.FileChannel.*; |
| | | |
| | | import static java.nio.file.StandardOpenOption.*; |
| | | import static org.forgerock.util.Utils.*; |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static java.nio.file.StandardOpenOption.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | |
| | | import org.forgerock.opendj.config.server.ConfigException; |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.forgerock.opendj.ldap.ResultCode; |
| | | import org.forgerock.opendj.ldap.spi.Indexer; |
| | | import org.forgerock.util.Reject; |
| | |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.ServerContext; |
| | | import org.forgerock.opendj.ldap.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.InitializationException; |
| | |
| | | } |
| | | } |
| | | |
| | | /** Default size for the off-heap memory dedicated to the phase one's buffer. */ |
| | | private static final int DEFAULT_OFFHEAP_SIZE = 700; |
| | | /** Max size of phase one buffer. */ |
| | | private static final int MAX_BUFFER_SIZE = 2 * MB; |
| | | /** Min size of phase one buffer. */ |
| | |
| | | * Store and sort data into multiple chunks. Thanks to the chunk rolling mechanism, this chunk can sort and store an |
| | | * unlimited amount of data. This class uses double-buffering: data are firstly stored in a |
| | | * {@link InMemorySortedChunk} which, once full, will be asynchronously sorted and copied into a |
| | | * {@link FileRegionChunk}. Duplicate keys are reduced by a {@link Collector}. |
| | | * {@link FileRegion}. Duplicate keys are reduced by a {@link Collector}. |
| | | * {@link #put(ByteSequence, ByteSequence))} is thread-safe. |
| | | * This class is used in phase-one. There is one {@link ExternalSortChunk} per |
| | | * database tree, shared across all phase-one importer threads, in charge of storing/sorting records. |
| | |
| | | private final Collector<?, ByteString> phaseOneDeduplicator; |
| | | private final Collector<?, ByteString> phaseTwoDeduplicator; |
| | | /** Keep track of pending sorting tasks. */ |
| | | private final CompletionService<MeteredCursor<ByteString, ByteString>> sorter; |
| | | private final CompletionService<Region> sorter; |
| | | /** Keep track of currently opened chunks. */ |
| | | private final Set<Chunk> activeChunks = Collections.synchronizedSet(new HashSet<Chunk>()); |
| | | /** Keep track of the number of chunks created. */ |
| | |
| | | { |
| | | sortAndAppendChunkAsync(chunk); |
| | | } |
| | | |
| | | final List<MeteredCursor<ByteString, ByteString>> cursors = new ArrayList<>(); |
| | | try |
| | | { |
| | | return new CollectorCursor<>( |
| | | new CompositeCursor<ByteString, ByteString>(name, |
| | | waitTasksTermination(sorter, nbSortedChunks.get())) |
| | | final List<Region> regions = waitTasksTermination(sorter, nbSortedChunks.get()); |
| | | Collections.sort(regions); // Sort regions by their starting offsets. |
| | | long mmapPosition = 0; |
| | | // Create as big as possible memory are (handling 2Gb limit) and create as many cursors as regions from |
| | | // these area. |
| | | MappedByteBuffer mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get(), Integer.MAX_VALUE)); |
| | | for (Region region : regions) |
| | | { |
| | | if ((region.offset + region.size) > (mmapPosition + Integer.MAX_VALUE)) |
| | | { |
| | | // Handle the 2Gb ByteBuffer limit |
| | | mmapPosition = region.offset; |
| | | mmap = channel.map(MapMode.READ_ONLY, mmapPosition, Math.min(size.get() - mmapPosition, Integer.MAX_VALUE)); |
| | | } |
| | | final ByteBuffer regionBuffer = mmap.duplicate(); |
| | | final int relativeRegionOffset = (int) (region.offset - mmapPosition); |
| | | regionBuffer.position(relativeRegionOffset).limit(regionBuffer.position() + region.size); |
| | | cursors.add(new FileRegion.Cursor(name, regionBuffer.slice())); |
| | | } |
| | | } |
| | | catch (ExecutionException | InterruptedException | IOException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | |
| | | return new CollectorCursor<>(new CompositeCursor<ByteString, ByteString>(name, cursors) |
| | | { |
| | | @Override |
| | | public void close() |
| | | { |
| | | super.close(); |
| | |
| | | } |
| | | }, (Collector<?, ByteString>) phaseTwoDeduplicator); |
| | | } |
| | | catch (ExecutionException | InterruptedException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public long size() |
| | |
| | | final long startOffset = filePosition.getAndAdd(chunk.size()); |
| | | nbSortedChunks.incrementAndGet(); |
| | | |
| | | sorter.submit(new Callable<MeteredCursor<ByteString, ByteString>>() |
| | | sorter.submit(new Callable<Region>() |
| | | { |
| | | @Override |
| | | public MeteredCursor<ByteString, ByteString> call() throws Exception |
| | | public Region call() throws Exception |
| | | { |
| | | /* |
| | | * NOTE: The resulting size of the FileRegionChunk might be less than chunk.size() because of key |
| | | * de-duplication performed by the CollectorCursor. |
| | | * NOTE: The resulting size of the FileRegion might be less than chunk.size() because of key de-duplication |
| | | * performed by the CollectorCursor. |
| | | */ |
| | | final Chunk persistentChunk = new FileRegionChunk(name, channel, startOffset, chunk.size()); |
| | | final FileRegion region = new FileRegion(channel, startOffset, chunk.size()); |
| | | final int regionSize; |
| | | try (final SequentialCursor<ByteString, ByteString> source = |
| | | new CollectorCursor<>(chunk.flip(), phaseOneDeduplicator)) |
| | | { |
| | | copyIntoChunk(source, persistentChunk); |
| | | regionSize = region.write(source); |
| | | } |
| | | return persistentChunk.flip(); |
| | | return new Region(startOffset, regionSize); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | /** Define a region inside a file. */ |
| | | private static final class Region implements Comparable<Region> |
| | | { |
| | | private final long offset; |
| | | private final int size; |
| | | |
| | | Region(long offset, int size) |
| | | { |
| | | this.offset = offset; |
| | | this.size = size; |
| | | } |
| | | |
| | | @Override |
| | | public int compareTo(Region o) |
| | | { |
| | | return Long.compare(offset, o.offset); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Store data inside fixed-size byte arrays. Data stored in this chunk are sorted by key during the flip() so that |
| | | * they can be cursored ascendantly. Byte arrays are supplied through a {@link BufferPool}. To allow sort operation, |
| | |
| | | * +------------+--------------+--------------+----------------+ |
| | | * </pre> |
| | | */ |
| | | static final class FileRegionChunk implements Chunk |
| | | static final class FileRegion |
| | | { |
| | | private final String metricName; |
| | | private final FileChannel channel; |
| | | private final long startOffset; |
| | | private long size; |
| | | private MappedByteBuffer mmapBuffer; |
| | | private final MappedByteBuffer mmapBuffer; |
| | | private final OutputStream mmapBufferOS = new OutputStream() |
| | | { |
| | | @Override |
| | |
| | | } |
| | | }; |
| | | |
| | | FileRegionChunk(String name, FileChannel channel, long startOffset, long size) throws IOException |
| | | FileRegion(FileChannel channel, long startOffset, long size) throws IOException |
| | | { |
| | | this.metricName = name; |
| | | this.channel = channel; |
| | | this.startOffset = startOffset; |
| | | if (size > 0) |
| | | { |
| | | // Make sure that the file is big-enough to encapsulate this memory-mapped region. |
| | | channel.write(ByteBuffer.wrap(new byte[] { 0 }), (startOffset + size) - 1); |
| | | } |
| | | this.mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size); |
| | | mmapBuffer = channel.map(MapMode.READ_WRITE, startOffset, size); |
| | | } |
| | | |
| | | @Override |
| | | public boolean put(ByteSequence key, ByteSequence value) |
| | | public int write(SequentialCursor<ByteString, ByteString> source) throws IOException |
| | | { |
| | | final int recordSize = |
| | | PackedLong.getEncodedSize(key.length()) + key.length() + PackedLong.getEncodedSize(value.length()) + value |
| | | .length(); |
| | | if (mmapBuffer.remaining() < recordSize) |
| | | while (source.next()) |
| | | { |
| | | // The regions is full |
| | | return false; |
| | | } |
| | | |
| | | try |
| | | { |
| | | final ByteSequence key = source.getKey(); |
| | | final ByteSequence value = source.getValue(); |
| | | PackedLong.writeCompactUnsigned(mmapBufferOS, key.length()); |
| | | PackedLong.writeCompactUnsigned(mmapBufferOS, value.length()); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | | key.copyTo(mmapBuffer); |
| | | value.copyTo(mmapBuffer); |
| | | |
| | | return true; |
| | | } |
| | | |
| | | @Override |
| | | public long size() |
| | | { |
| | | return mmapBuffer == null ? size : mmapBuffer.position(); |
| | | } |
| | | |
| | | @Override |
| | | public MeteredCursor<ByteString, ByteString> flip() |
| | | { |
| | | size = mmapBuffer.position(); |
| | | mmapBuffer = null; |
| | | return new FileRegionChunkCursor(startOffset, size); |
| | | return mmapBuffer.position(); |
| | | } |
| | | |
| | | /** Cursor through the specific memory-mapped file's region. */ |
| | | private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString> |
| | | static final class Cursor implements MeteredCursor<ByteString, ByteString> |
| | | { |
| | | private final long regionOffset; |
| | | private final long regionSize; |
| | | private final InputStream asInputStream = new InputStream() |
| | | { |
| | | @Override |
| | |
| | | return region.get() & 0xFF; |
| | | } |
| | | }; |
| | | private final String metricName; |
| | | private ByteBuffer region; |
| | | private ByteString key, value; |
| | | |
| | | FileRegionChunkCursor(long regionOffset, long regionSize) |
| | | Cursor(String metricName, ByteBuffer region) |
| | | { |
| | | this.regionOffset = regionOffset; |
| | | this.regionSize = regionSize; |
| | | this.metricName = metricName; |
| | | this.region = region; |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | ensureRegionIsMemoryMapped(); |
| | | if (!region.hasRemaining()) |
| | | { |
| | | key = value = null; |
| | |
| | | return true; |
| | | } |
| | | |
| | | private void ensureRegionIsMemoryMapped() |
| | | { |
| | | if (region == null) |
| | | { |
| | | // Because mmap() regions are a counted and limited resources, we create them lazily. |
| | | try |
| | | { |
| | | region = channel.map(MapMode.READ_ONLY, regionOffset, regionSize); |
| | | } |
| | | catch (IOException e) |
| | | { |
| | | throw new IllegalStateException(e); |
| | | } |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean isDefined() |
| | | { |
| | |
| | | @Override |
| | | public long getNbBytesRead() |
| | | { |
| | | return region == null ? 0 : region.position(); |
| | | return region.position(); |
| | | } |
| | | |
| | | @Override |
| | | public long getNbBytesTotal() |
| | | { |
| | | return regionSize; |
| | | return region.limit(); |
| | | } |
| | | } |
| | | } |
| | |
| | | long nbRecords = 0; |
| | | while (source.next()) |
| | | { |
| | | destination.put(source.getKey(), source.getValue()); |
| | | if (!destination.put(source.getKey(), source.getValue())) |
| | | { |
| | | throw new IllegalStateException("Destination chunk is full"); |
| | | } |
| | | nbRecords++; |
| | | } |
| | | return nbRecords; |
| | |
| | | } |
| | | |
| | | /** Off-heap buffer using Unsafe memory access. */ |
| | | @SuppressWarnings("restriction") |
| | | static final class OffHeapBuffer implements Buffer |
| | | { |
| | | private static final Unsafe UNSAFE = (Unsafe) UNSAFE_OBJECT; |
| | |
| | | { |
| | | private static final Collector<Object, Object> INSTANCE = new UniqueValueCollector<>(); |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | static <V> Collector<V, V> getInstance() |
| | | { |
| | | return (Collector<V, V>) INSTANCE; |
| | |
| | | // +1 because newly added entry is added before the least recently one is removed. |
| | | this.cache = Collections.synchronizedMap(new LinkedHashMap<T, Object>(maxEntries + 1, 1.0f, true) |
| | | { |
| | | private static final long serialVersionUID = 1L; |
| | | |
| | | @Override |
| | | protected boolean removeEldestEntry(Map.Entry<T, Object> eldest) |
| | | { |