From 3156b73d71dbdf8786d141967db72f9c5cbbecdb Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 27 Jan 2016 10:12:13 +0000
Subject: [PATCH] Simplification by replacing optimistic chunk creation by a pessimistic one.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 94 +++++++++++------------------------------------
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java | 7 ---
2 files changed, 23 insertions(+), 78 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index ce9c259..fdca497 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -30,6 +30,7 @@
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.StaticUtils.*;
+import static java.nio.file.StandardOpenOption.*;
import java.io.Closeable;
import java.io.File;
@@ -41,8 +42,6 @@
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
-import java.nio.file.FileAlreadyExistsException;
-import java.nio.file.StandardOpenOption;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
@@ -64,7 +63,6 @@
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
-import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -1251,12 +1249,19 @@
*/
private static final class PhaseOneWriteableTransaction implements WriteableTransaction
{
+ /** Must be power of 2 because of fast-modulo computing. */
+ private static final int LOCKTABLE_SIZE = 64;
private final ConcurrentMap<TreeName, Chunk> chunks = new ConcurrentHashMap<>();
private final ChunkFactory chunkFactory;
+ private final Object[] lockTable = new Object[LOCKTABLE_SIZE];
PhaseOneWriteableTransaction(ChunkFactory chunkFactory)
{
this.chunkFactory = chunkFactory;
+ for (int i = 0; i < LOCKTABLE_SIZE; i++)
+ {
+ lockTable[i] = new Object();
+ }
}
Map<TreeName, Chunk> getChunks()
@@ -1289,15 +1294,19 @@
return alreadyExistingChunk;
}
- final Chunk newChunk = chunkFactory.newChunk(treeName);
- alreadyExistingChunk = chunks.putIfAbsent(treeName, newChunk);
- if (alreadyExistingChunk != null)
+ // Fast modulo computing.
+ final int lockIndex = treeName.hashCode() & (LOCKTABLE_SIZE - 1);
+ synchronized (lockTable[lockIndex])
{
- // Another thread was faster at creating a new chunk, close this one.
- newChunk.delete();
- return alreadyExistingChunk;
+ alreadyExistingChunk = chunks.get(treeName);
+ if (alreadyExistingChunk != null)
+ {
+ return alreadyExistingChunk;
+ }
+ final Chunk newChunk = chunkFactory.newChunk(treeName);
+ chunks.put(treeName, newChunk);
+ return newChunk;
}
- return newChunk;
}
@Override
@@ -1370,12 +1379,6 @@
* {@link #put(ByteSequence, ByteSequence)} operations.
*/
long size();
-
- /**
- * While chunk's memory and files are automatically garbage collected/deleted at exit, this method can be called to
- * clean things now.
- */
- void delete();
}
/**
@@ -1421,28 +1424,12 @@
ExternalSortChunk(File tempDir, String name, BufferPool bufferPool, Collector<?, ByteString> collector,
Executor sortExecutor) throws IOException
{
- FileChannel candidateChannel = null;
- File candidateFile = null;
- while (candidateChannel == null)
- {
- candidateFile = new File(tempDir, (name + UUID.randomUUID()).replaceAll("\\W+", "_"));
- try
- {
- candidateChannel =
- open(candidateFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE,
- StandardOpenOption.CREATE_NEW, StandardOpenOption.SPARSE);
- candidateFile.deleteOnExit();
- }
- catch (FileAlreadyExistsException ignore)
- {
- // someone else got it
- }
- }
this.name = name;
this.bufferPool = bufferPool;
this.deduplicator = collector;
- this.file = candidateFile;
- this.channel = candidateChannel;
+ this.file = new File(tempDir, name.replaceAll("\\W+", "_"));
+ this.file.deleteOnExit();
+ this.channel = open(this.file.toPath(), READ, WRITE, CREATE_NEW, SPARSE);
this.sorter = new ExecutorCompletionService<>(sortExecutor);
}
@@ -1492,13 +1479,6 @@
return size.get() + activeSize;
}
- @Override
- public void delete()
- {
- closeSilently(channel);
- file.delete();
- }
-
int getNbSortedChunks()
{
return nbSortedChunks.get();
@@ -1672,12 +1652,6 @@
return buffer.compare(keyOffsetA, keyLengthA, keyOffsetB, keyLengthB);
}
- @Override
- public void delete()
- {
- bufferPool.release(buffer);
- }
-
/** Cursor of the in-memory chunk. */
private final class InMemorySortedChunkCursor implements MeteredCursor<ByteString, ByteString>
{
@@ -1861,12 +1835,6 @@
}
}
- @Override
- public void delete()
- {
- // Nothing to do
- }
-
/** Cursor through the specific memory-mapped file's region. */
private final class FileRegionChunkCursor implements MeteredCursor<ByteString, ByteString>
{
@@ -2406,12 +2374,6 @@
{
return size.get();
}
-
- @Override
- public void delete()
- {
- // Nothing to do
- }
}
/**
@@ -2494,12 +2456,6 @@
}
@Override
- public void delete()
- {
- // Nothing to do
- }
-
- @Override
public synchronized boolean put(ByteSequence key, ByteSequence value)
{
pendingRecords.put(key, value);
@@ -2556,12 +2512,6 @@
}
@Override
- public void delete()
- {
- // Nothing to do
- }
-
- @Override
public MeteredCursor<ByteString, ByteString> flip()
{
return new MeteredCursor<ByteString, ByteString>()
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index dfb3c0f..12ad1a4 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -20,7 +20,7 @@
*
* CDDL HEADER END
*
- * Copyright 2015 ForgeRock AS.
+ * Copyright 2015-2016 ForgeRock AS.
*/
package org.opends.server.backends.pluggable;
@@ -529,10 +529,5 @@
{
return size;
}
-
- @Override
- public void delete()
- {
- }
}
}
--
Gitblit v1.10.0