| | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.server.util.DynamicConstants.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | import static java.nio.file.StandardOpenOption.*; |
| | | |
| | | import java.io.Closeable; |
| | | import java.io.File; |
| | |
| | | import java.nio.MappedByteBuffer; |
| | | import java.nio.channels.FileChannel; |
| | | import java.nio.channels.FileChannel.MapMode; |
| | | import java.nio.file.FileAlreadyExistsException; |
| | | import java.nio.file.StandardOpenOption; |
| | | import java.util.AbstractList; |
| | | import java.util.ArrayList; |
| | | import java.util.Arrays; |
| | |
| | | import java.util.TimerTask; |
| | | import java.util.TreeMap; |
| | | import java.util.TreeSet; |
| | | import java.util.UUID; |
| | | import java.util.WeakHashMap; |
| | | import java.util.concurrent.ArrayBlockingQueue; |
| | | import java.util.concurrent.BlockingQueue; |
| | |
| | | */ |
| | | private static final class PhaseOneWriteableTransaction implements WriteableTransaction |
| | | { |
| | | /** Must be power of 2 because of fast-modulo computing. */ |
| | | private static final int LOCKTABLE_SIZE = 64; |
| | | private final ConcurrentMap<TreeName, Chunk> chunks = new ConcurrentHashMap<>(); |
| | | private final ChunkFactory chunkFactory; |
| | | private final Object[] lockTable = new Object[LOCKTABLE_SIZE]; |
| | | |
| | | PhaseOneWriteableTransaction(ChunkFactory chunkFactory) |
| | | { |
| | | this.chunkFactory = chunkFactory; |
| | | for (int i = 0; i < LOCKTABLE_SIZE; i++) |
| | | { |
| | | lockTable[i] = new Object(); |
| | | } |
| | | } |
| | | |
| | | Map<TreeName, Chunk> getChunks() |
| | |
| | | return alreadyExistingChunk; |
| | | } |
| | | |
| | | final Chunk newChunk = chunkFactory.newChunk(treeName); |
| | | alreadyExistingChunk = chunks.putIfAbsent(treeName, newChunk); |
| | | // Fast modulo computing. |
| | | final int lockIndex = treeName.hashCode() & (LOCKTABLE_SIZE - 1); |
| | | synchronized (lockTable[lockIndex]) |
| | | { |
| | | alreadyExistingChunk = chunks.get(treeName); |
| | | if (alreadyExistingChunk != null) |
| | | { |
| | | // Another thread was faster at creating a new chunk, close this one. |
| | | newChunk.delete(); |
| | | return alreadyExistingChunk; |
| | | } |
| | | final Chunk newChunk = chunkFactory.newChunk(treeName); |
| | | chunks.put(treeName, newChunk); |
| | | return newChunk; |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public ByteString read(TreeName treeName, ByteSequence key) |
| | |
| | | * {@link #put(ByteSequence, ByteSequence)} operations. |
| | | */ |
| | | long size(); |
| | | |
| | | /** |
| | | * While chunk's memory and files are automatically garbage collected/deleted at exit, this method can be called to |
| | | * clean things now. |
| | | */ |
| | | void delete(); |
| | | } |
| | | |
| | | /** |
| | |
| | | ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector, |
| | | Executor sortExecutor) throws IOException |
| | | { |
| | | FileChannel candidateChannel = null; |
| | | File candidateFile = null; |
| | | while (candidateChannel == null) |
| | | { |
| | | candidateFile = new File(tempDir, (name + UUID.randomUUID()).replaceAll("\\W+", "_")); |
| | | try |
| | | { |
| | | candidateChannel = |
| | | open(candidateFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE, |
| | | StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE); |
| | | candidateFile.deleteOnExit(); |
| | | } |
| | | catch (FileAlreadyExistsException ignore) |
| | | { |
| | | // someone else got it |
| | | } |
| | | } |
| | | this.name = name; |
| | | this.bufferPool = bufferPool; |
| | | this.deduplicator = collector; |
| | | this.file = candidateFile; |
| | | this.channel = candidateChannel; |
| | | this.file = new File(tempDir, name.replaceAll("\\W+", "_")); |
| | | this.file.deleteOnExit(); |
| | | this.channel = open(this.file.toPath(), READ, WRITE, CREATE_NEW, SPARSE); |
| | | this.sorter = new ExecutorCompletionService<>(sortExecutor); |
| | | } |
| | | |
| | |
| | | return size.get() + activeSize; |
| | | } |
| | | |
| | | @Override |
| | | public void delete() |
| | | { |
| | | closeSilently(channel); |
| | | file.delete(); |
| | | } |
| | | |
| | | int getNbSortedChunks() |
| | | { |
| | | return nbSortedChunks.get(); |
| | |
| | | return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB); |
| | | } |
| | | |
| | | @Override |
| | | public void delete() |
| | | { |
| | | bufferPool.release(buffer); |
| | | } |
| | | |
| | | /** Cursor of the in-memory chunk. */ |
| | | private final class InMemorySortedChunkCursor implements MeteredCursor<ByteString, ByteString> |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void delete() |
| | | { |
| | | // Nothing to do |
| | | } |
| | | |
| | | /** Cursor through the specific memory-mapped file's region. */ |
| | | private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString> |
| | | { |
| | |
| | | { |
| | | return size.get(); |
| | | } |
| | | |
| | | @Override |
| | | public void delete() |
| | | { |
| | | // Nothing to do |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void delete() |
| | | { |
| | | // Nothing to do |
| | | } |
| | | |
| | | @Override |
| | | public synchronized boolean put(ByteSequence key, ByteSequence value) |
| | | { |
| | | pendingRecords.put(key, value); |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void delete() |
| | | { |
| | | // Nothing to do |
| | | } |
| | | |
| | | @Override |
| | | public MeteredCursor<ByteString, ByteString> flip() |
| | | { |
| | | return new MeteredCursor<ByteString, ByteString>() |