From 1d01cf7ddda87acd88dc372b81265e68200ea5a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 May 2015 14:12:11 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 520 ++++++++++++++++++++++++++++++++++++++++++++-------------
1 files changed, 402 insertions(+), 118 deletions(-)
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 584e865..d297582 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -34,21 +34,32 @@
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimerTask;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +71,7 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Utils;
import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
import org.opends.server.admin.std.server.BackendIndexCfg;
import org.opends.server.admin.std.server.PluggableBackendCfg;
@@ -72,6 +84,7 @@
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
+import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
@@ -91,6 +104,353 @@
*/
final class OnDiskMergeStorageImporter
{
+ /** Data to put into id2entry tree. */
+ private static final class Id2EntryData
+ {
+ private final Suffix suffix;
+ private final EntryID entryID;
+ private final Entry entry;
+
+ public Id2EntryData(Suffix suffix, EntryID entryID, Entry entry)
+ {
+ this.suffix = suffix;
+ this.entryID = entryID;
+ this.entry = entry;
+ }
+
+ private void put(WriteableTransaction txn) throws DirectoryException
+ {
+ suffix.getID2Entry().put(txn, entryID, entry);
+ }
+
+ @Override
+ public String toString()
+ {
+ return getClass().getSimpleName()
+ + "(suffix=" + suffix
+ + ", entryID=" + entryID
+ + ", entry=" + entry + ")";
+ }
+ }
+
+ /** Runnable putting data into id2entry tree in batches. */
+ private final class Id2EntryPutTask implements Runnable
+ {
+ private static final int BATCH_SIZE = 100;
+
+ private volatile boolean moreData = true;
+ private final Storage storage;
+ private final NavigableSet<Id2EntryData> dataToPut = new ConcurrentSkipListSet<>(new Comparator<Id2EntryData>()
+ {
+ @Override
+ public int compare(Id2EntryData o1, Id2EntryData o2)
+ {
+ return o1.entryID.compareTo(o2.entryID);
+ }
+ });
+
+ private Id2EntryPutTask(Storage storage)
+ {
+ this.storage = storage;
+ }
+
+ private void put(Suffix suffix, EntryID entryID, Entry entry)
+ {
+ dataToPut.add(new Id2EntryData(suffix, entryID, entry));
+
+ if (enoughDataToPut())
+ {
+ synchronized (dataToPut)
+ {
+ dataToPut.notify();
+ }
+ }
+ }
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ while (!isCanceled() && moreData)
+ {
+ if (enoughDataToPut())
+ {
+ put(BATCH_SIZE);
+ }
+ else
+ {
+ synchronized (dataToPut)
+ {
+ if (moreData)
+ {
+ dataToPut.wait();
+ }
+ }
+ }
+ }
+ while (!isCanceled() && !dataToPut.isEmpty())
+ {
+ put(BATCH_SIZE);
+ }
+ }
+ catch (Exception e)
+ {
+ logger.traceException(e);
+ }
+ }
+
+ private boolean enoughDataToPut()
+ {
+ return dataToPut.size() > BATCH_SIZE;
+ }
+
+ private void put(final int batchSize) throws Exception
+ {
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ int count = 0;
+ while (!dataToPut.isEmpty() && count < batchSize)
+ {
+ dataToPut.pollFirst().put(txn);
+ count++;
+ }
+ }
+ });
+ }
+
+ private void finishedWrites()
+ {
+ moreData = false;
+
+ synchronized (dataToPut)
+ {
+ dataToPut.notify();
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ final StringBuilder sb = new StringBuilder("[");
+ Iterator<Id2EntryData> it = dataToPut.iterator();
+ if (it.hasNext())
+ {
+ sb.append(it.next().entryID.longValue());
+ }
+ while (it.hasNext())
+ {
+ sb.append(",");
+ sb.append(it.next().entryID.longValue());
+ }
+ sb.append("]");
+ return super.toString();
+ }
+ }
+
+ /**
+ * Represents an on-disk buffer file, accessed via memory mapped files.
+ * <p>
+ * Data to write is appended in-memory before being dumped into a {@link MappedByteBuffer}
+ * for writing to disk.
+ */
+ private static final class Buffer
+ {
+ private final File file;
+ private final FileChannel fileChannel;
+ private final List<Integer> bufferPositions = new ArrayList<>();
+ private int bufferSize = 1024; // TODO JNR use MAX_BUFFER_SIZE?
+
+ // FIXME this is not thread safe yet!!!
+ /**
+ * Maps {@link ByteSequence} keys to (conflicting) values.
+ * <p>
+ * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
+ * {@link #bufferSize}.
+ */
+ private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
+ /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
+ private int maximumExpectedSizeOnDisk;
+
+ private Buffer(File file) throws FileNotFoundException
+ {
+ file.getParentFile().mkdirs();
+ this.file = file;
+ this.fileChannel = new RandomAccessFile(file, "rw").getChannel();
+ this.bufferPositions.add(0);
+ }
+
+ void putKeyValue(ByteSequence key, ByteSequence value) throws IOException
+ {
+ int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
+ if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
+ {
+ copyToDisk();
+ inMemoryStore.clear();
+ maximumExpectedSizeOnDisk = 0;
+ }
+
+ Set<ByteSequence> values = inMemoryStore.get(key);
+ if (values == null)
+ {
+ values = new ConcurrentSkipListSet<>();
+ Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
+ if (existingValues != null)
+ {
+ values = existingValues;
+ }
+ }
+ values.add(value);
+ maximumExpectedSizeOnDisk += recordSize;
+ }
+
+ private void copyToDisk() throws IOException
+ {
+ MappedByteBuffer byteBuffer = nextBuffer();
+ for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
+ {
+ ByteSequence key = mapEntry.getKey();
+ for (ByteSequence value : mapEntry.getValue())
+ {
+ put(byteBuffer, key);
+ put(byteBuffer, value);
+ }
+ }
+ if (byteBuffer.position() != maximumExpectedSizeOnDisk)
+ {
+ logger.trace("Expected to write %d bytes, but actually wrote %d bytes",
+ maximumExpectedSizeOnDisk, byteBuffer.position());
+ }
+
+ byteBuffer.force();
+
+ addPosition(bufferPositions, byteBuffer);
+ }
+
+ private MappedByteBuffer nextBuffer() throws IOException
+ {
+ // FIXME JNR bufferSize is an acceptable over approximation
+ return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
+ }
+
+ private int getLastPosition(List<Integer> l)
+ {
+ return l.get(l.size() - 1);
+ }
+
+ private void addPosition(List<Integer> l, MappedByteBuffer byteBuffer)
+ {
+ l.add(getLastPosition(l) + byteBuffer.position());
+ }
+
+ private void put(ByteBuffer byteBuffer, ByteSequence b)
+ {
+ byteBuffer.putInt(b.length());
+ // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
+ // Why does it do that?
+ final int posBeforeFlip = byteBuffer.position();
+ b.copyTo(byteBuffer);
+ byteBuffer.limit(bufferSize);
+ byteBuffer.position(posBeforeFlip + b.length());
+ }
+
+ void flush()
+ {
+ writeBufferIndexFile();
+ }
+
+ private void writeBufferIndexFile()
+ {
+ final File bufferIndexFile = new File(file.getParent(), file.getName() + ".index");
+ try (PrintWriter writer = new PrintWriter(bufferIndexFile))
+ {
+ writer.print(Utils.joinAsString(" ", this.bufferPositions));
+ }
+ catch (FileNotFoundException e)
+ {
+ logger.traceException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString()
+ {
+ String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
+ return getClass().getSimpleName()
+ + "(treeName=\"" + treeName + "\""
+ + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
+ + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
+ }
+ }
+
+ /** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */
+ private static final class MemoryMappedBufferImporter implements Importer
+ {
+ private final File bufferDir;
+ private final Map<TreeName, Buffer> treeNameToBufferMap = new HashMap<>();
+
+ private MemoryMappedBufferImporter(File bufferDir)
+ {
+ this.bufferDir = bufferDir;
+ }
+
+ @Override
+ public void put(TreeName treeName, ByteSequence key, ByteSequence value)
+ {
+ try
+ {
+ getBuffer(treeName).putKeyValue(key, value);
+ }
+ catch (IOException e)
+ {
+ logger.traceException(e);
+ }
+ }
+
+ private Buffer getBuffer(TreeName treeName) throws IOException
+ {
+ Buffer buffer = treeNameToBufferMap.get(treeName);
+ if (buffer == null)
+ {
+ // TODO JNR that would be great if it was creating sub directories :)
+ buffer = new Buffer(new File(bufferDir, treeName.toString()));
+ treeNameToBufferMap.put(treeName, buffer);
+ }
+ return buffer;
+ }
+
+ @Override
+ public ByteString read(TreeName treeName, ByteSequence key)
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public boolean delete(TreeName treeName, ByteSequence key)
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void createTree(TreeName name)
+ {
+ throw new RuntimeException("Not implemented");
+ }
+
+ @Override
+ public void close()
+ {
+ for (Buffer buffer : treeNameToBufferMap.values())
+ {
+ buffer.flush();
+ }
+ }
+ }
+
/**
* Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting
* {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
@@ -181,8 +541,6 @@
/** Temp scratch directory. */
private final File tempDir;
- /** Size in bytes of DN cache. */
- private long dnCacheSize;
/** Available memory at the start of the import. */
private long availableMemory;
/** Size in bytes of DB cache. */
@@ -295,57 +653,21 @@
final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
- // We need caching when doing DN validation
- if (validateDNs)
+ if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
{
- // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
- if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
- {
- dbCacheSize = 500 * KB;
- dnCacheSize = 500 * KB;
- }
- else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
- {
- dbCacheSize = MIN_DB_CACHE_SIZE;
- dnCacheSize = MIN_DB_CACHE_SIZE;
- }
- else if (!clearedBackend)
- {
- // Appending to existing data so reserve extra memory for the DB cache
- // since it will be needed for dn2id queries.
- dbCacheSize = usableMemory * 33 / 100;
- dnCacheSize = usableMemory * 33 / 100;
- }
- else
- {
- dbCacheSize = MAX_DB_CACHE_SIZE;
- dnCacheSize = usableMemory * 66 / 100;
- }
+ dbCacheSize = 500 * KB;
+ }
+ // We need caching when doing DN validation
+ else if (usableMemory < MIN_DB_CACHE_MEMORY + (validateDNs ? MIN_DB_CACHE_SIZE : 0))
+ {
+ dbCacheSize = MIN_DB_CACHE_SIZE;
}
else
{
- // No DN validation: calculate memory for DB cache and buffers.
-
- // No need for DN2ID cache.
- dnCacheSize = 0;
-
- if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
- {
- dbCacheSize = 500 * KB;
- }
- else if (usableMemory < MIN_DB_CACHE_MEMORY)
- {
- dbCacheSize = MIN_DB_CACHE_SIZE;
- }
- else
- {
- // No need to differentiate between append/clear backend, since dn2id is
- // not being queried.
- dbCacheSize = MAX_DB_CACHE_SIZE;
- }
+ dbCacheSize = MAX_DB_CACHE_SIZE;
}
- final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize;
+ final long phaseOneBufferMemory = usableMemory - dbCacheSize;
final int oldThreadCount = threadCount;
if (indexCount != 0) // Avoid / by zero
{
@@ -370,12 +692,7 @@
final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
if (!clearedBackend)
{
- dbCacheSize += extraMemory / 2;
- dnCacheSize += extraMemory / 2;
- }
- else
- {
- dnCacheSize += extraMemory;
+ dbCacheSize += extraMemory;
}
}
@@ -396,7 +713,7 @@
// Not enough memory.
final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(
- usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize));
+ usableMemory, minimumPhaseOneBufferMemory + dbCacheSize));
}
}
}
@@ -407,10 +724,6 @@
}
logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
- if (dnCacheSize > 0)
- {
- logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize);
- }
logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
}
@@ -594,8 +907,8 @@
logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
- final Storage storage = rootContainer.getStorage();
- storage.write(new WriteOperation()
+ final Storage backendStorage = rootContainer.getStorage();
+ backendStorage.write(new WriteOperation()
{
@Override
public void run(WriteableTransaction txn) throws Exception
@@ -605,8 +918,10 @@
}
});
+ final Importer tmpImporter = new MemoryMappedBufferImporter(tempDir);
+
final long startTime = System.currentTimeMillis();
- importPhaseOne();
+ importPhaseOne(backendStorage, tmpImporter);
final long phaseOneFinishTime = System.currentTimeMillis();
if (isCanceled())
@@ -622,7 +937,7 @@
}
final long phaseTwoFinishTime = System.currentTimeMillis();
- storage.write(new WriteOperation()
+ backendStorage.write(new WriteOperation()
{
@Override
public void run(WriteableTransaction txn) throws Exception
@@ -700,29 +1015,33 @@
* <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
* The scratch files will be read by phaseTwo to perform on-disk merge</li>
* </ol>
+ * TODO JNR fix all javadocs
*/
- private void importPhaseOne() throws Exception
+ private void importPhaseOne(Storage storage, Importer importer) throws Exception
{
final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+ threadCount = 2; // FIXME JNR id2entry + another task
final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- final Storage storage = rootContainer.getStorage();
- final Importer importer = storage.startImport();
- execService.submit(new MigrateExistingTask(storage, importer)).get();
+ final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(storage);
+ final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
+ execService.submit(new MigrateExistingEntriesTask(storage, importer, id2EntryPutTask)).get();
final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
{
- for (int i = 0; i < threadCount; i++)
+ for (int i = 0; i < threadCount - 1; i++)
{
- tasks.add(new ImportTask(importer));
+ tasks.add(new ImportTask(importer, id2EntryPutTask));
}
}
execService.invokeAll(tasks);
tasks.clear();
- execService.submit(new MigrateExcludedTask(storage, importer)).get();
+ execService.submit(new MigrateExcludedTask(storage, importer, id2EntryPutTask)).get();
+ id2EntryPutTask.finishedWrites();
+ dn2IdPutFuture.get();
shutdownAll(timerService, execService);
}
@@ -763,9 +1082,9 @@
{
private final Storage storage;
- private MigrateExcludedTask(final Storage storage, final Importer importer)
+ private MigrateExcludedTask(Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
{
- super(importer);
+ super(importer, id2EntryPutTask);
this.storage = storage;
}
@@ -838,13 +1157,13 @@
}
/** Task to migrate existing entries. */
- private final class MigrateExistingTask extends ImportTask
+ private final class MigrateExistingEntriesTask extends ImportTask
{
private final Storage storage;
- private MigrateExistingTask(final Storage storage, final Importer importer)
+ private MigrateExistingEntriesTask(final Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
{
- super(importer);
+ super(importer, id2EntryPutTask);
this.storage = storage;
}
@@ -939,10 +1258,12 @@
private class ImportTask implements Callable<Void>
{
private final Importer importer;
+ private final Id2EntryPutTask id2EntryPutTask;
- public ImportTask(final Importer importer)
+ public ImportTask(final Importer importer, Id2EntryPutTask id2EntryPutTask)
{
this.importer = importer;
+ this.id2EntryPutTask = id2EntryPutTask;
}
/** {@inheritDoc} */
@@ -997,8 +1318,7 @@
processDN2URI(suffix, entry);
processIndexes(suffix, entry, entryID);
processVLVIndexes(suffix, entry, entryID);
- // FIXME JNR run a dedicated thread to do the puts ordered by entryID
- // suffix.getID2Entry().put(importer, entryID, entry);
+ id2EntryPutTask.put(suffix, entryID, entry);
importCount.getAndIncrement();
}
@@ -1066,8 +1386,7 @@
}
}
- void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID)
- throws DirectoryException
+ void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException
{
for (VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes())
{
@@ -1151,42 +1470,7 @@
}
}
- /** Invocation handler for the {@link PluggableBackendCfg} proxy. */
- private static final class BackendCfgHandler implements InvocationHandler
- {
- private final Map<String, Object> returnValues;
-
- private BackendCfgHandler(final Map<String, Object> returnValues)
- {
- this.returnValues = returnValues;
- }
-
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
- {
- final String methodName = method.getName();
- if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
- {
- // ignore calls to (add|remove)*ChangeListener() methods
- return null;
- }
-
- final Object returnValue = returnValues.get(methodName);
- if (returnValue != null)
- {
- return returnValue;
- }
- throw new IllegalArgumentException("Unhandled method call on proxy ("
- + BackendCfgHandler.class.getSimpleName()
- + ") for method (" + method
- + ") with arguments (" + Arrays.toString(args) + ")");
- }
- }
-
- /**
- * Used to check DN's when DN validation is performed during phase one processing.
- * It is deleted after phase one processing.
- */
+ /** Used to check DN's when DN validation is performed during phase one processing. */
private final class Dn2IdDnCache implements DNCache
{
private Suffix suffix;
--
Gitblit v1.10.0