From 1d01cf7ddda87acd88dc372b81265e68200ea5a1 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 22 May 2015 14:12:11 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java |  520 ++++++++++++++++++++++++++++++++++++++++++++-------------
 1 files changed, 402 insertions(+), 118 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 584e865..d297582 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -34,21 +34,32 @@
 import static org.opends.server.util.StaticUtils.*;
 
 import java.io.File;
+import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Method;
+import java.io.PrintWriter;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileChannel.MapMode;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableSet;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TimerTask;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +71,7 @@
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
 import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Utils;
 import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
 import org.opends.server.admin.std.server.BackendIndexCfg;
 import org.opends.server.admin.std.server.PluggableBackendCfg;
@@ -72,6 +84,7 @@
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.Storage;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
+import org.opends.server.backends.pluggable.spi.TreeName;
 import org.opends.server.backends.pluggable.spi.WriteOperation;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.core.DirectoryServer;
@@ -91,6 +104,353 @@
  */
 final class OnDiskMergeStorageImporter
 {
+  /** Data to put into id2entry tree. */
+  private static final class Id2EntryData
+  {
+    private final Suffix suffix;
+    private final EntryID entryID;
+    private final Entry entry;
+
+    public Id2EntryData(Suffix suffix, EntryID entryID, Entry entry)
+    {
+      this.suffix = suffix;
+      this.entryID = entryID;
+      this.entry = entry;
+    }
+
+    private void put(WriteableTransaction txn) throws DirectoryException
+    {
+      suffix.getID2Entry().put(txn, entryID, entry);
+    }
+
+    @Override
+    public String toString()
+    {
+      return getClass().getSimpleName()
+          + "(suffix=" + suffix
+          + ", entryID=" + entryID
+          + ", entry=" + entry + ")";
+    }
+  }
+
+  /** Runnable putting data into id2entry tree in batches. */
+  private final class Id2EntryPutTask implements Runnable
+  {
+    private static final int BATCH_SIZE = 100;
+
+    private volatile boolean moreData = true;
+    private final Storage storage;
+    private final NavigableSet<Id2EntryData> dataToPut = new ConcurrentSkipListSet<>(new Comparator<Id2EntryData>()
+    {
+      @Override
+      public int compare(Id2EntryData o1, Id2EntryData o2)
+      {
+        return o1.entryID.compareTo(o2.entryID);
+      }
+    });
+
+    private Id2EntryPutTask(Storage storage)
+    {
+      this.storage = storage;
+    }
+
+    private void put(Suffix suffix, EntryID entryID, Entry entry)
+    {
+      dataToPut.add(new Id2EntryData(suffix, entryID, entry));
+
+      if (enoughDataToPut())
+      {
+        synchronized (dataToPut)
+        {
+          dataToPut.notify();
+        }
+      }
+    }
+
+    @Override
+    public void run()
+    {
+      try
+      {
+        while (!isCanceled() && moreData)
+        {
+          if (enoughDataToPut())
+          {
+            put(BATCH_SIZE);
+          }
+          else
+          {
+            synchronized (dataToPut)
+            {
+              if (moreData)
+              {
+                dataToPut.wait();
+              }
+            }
+          }
+        }
+        while (!isCanceled() && !dataToPut.isEmpty())
+        {
+          put(BATCH_SIZE);
+        }
+      }
+      catch (Exception e)
+      {
+        logger.traceException(e);
+      }
+    }
+
+    private boolean enoughDataToPut()
+    {
+      return dataToPut.size() > BATCH_SIZE;
+    }
+
+    private void put(final int batchSize) throws Exception
+    {
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableTransaction txn) throws Exception
+        {
+          int count = 0;
+          while (!dataToPut.isEmpty() && count < batchSize)
+          {
+            dataToPut.pollFirst().put(txn);
+            count++;
+          }
+        }
+      });
+    }
+
+    private void finishedWrites()
+    {
+      moreData = false;
+
+      synchronized (dataToPut)
+      {
+        dataToPut.notify();
+      }
+    }
+
+    @Override
+    public String toString()
+    {
+      final StringBuilder sb = new StringBuilder("[");
+      Iterator<Id2EntryData> it = dataToPut.iterator();
+      if (it.hasNext())
+      {
+        sb.append(it.next().entryID.longValue());
+      }
+      while (it.hasNext())
+      {
+        sb.append(",");
+        sb.append(it.next().entryID.longValue());
+      }
+      sb.append("]");
+      return super.toString();
+    }
+  }
+
+  /**
+   * Represents an on-disk buffer file, accessed via memory mapped files.
+   * <p>
+   * Data to write is appended in-memory before being dumped into a {@link MappedByteBuffer}
+   * for writing to disk.
+   */
+  private static final class Buffer
+  {
+    private final File file;
+    private final FileChannel fileChannel;
+    private final List<Integer> bufferPositions = new ArrayList<>();
+    private int bufferSize = 1024; // TODO JNR use MAX_BUFFER_SIZE?
+
+    // FIXME this is not thread safe yet!!!
+    /**
+     * Maps {@link ByteSequence} keys to (conflicting) values.
+     * <p>
+     * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
+     * {@link #bufferSize}.
+     */
+    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
+    /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
+    private int maximumExpectedSizeOnDisk;
+
+    private Buffer(File file) throws FileNotFoundException
+    {
+      file.getParentFile().mkdirs();
+      this.file = file;
+      this.fileChannel = new RandomAccessFile(file, "rw").getChannel();
+      this.bufferPositions.add(0);
+    }
+
+    void putKeyValue(ByteSequence key, ByteSequence value) throws IOException
+    {
+      int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
+      if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
+      {
+        copyToDisk();
+        inMemoryStore.clear();
+        maximumExpectedSizeOnDisk = 0;
+      }
+
+      Set<ByteSequence> values = inMemoryStore.get(key);
+      if (values == null)
+      {
+        values = new ConcurrentSkipListSet<>();
+        Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
+        if (existingValues != null)
+        {
+          values = existingValues;
+        }
+      }
+      values.add(value);
+      maximumExpectedSizeOnDisk += recordSize;
+    }
+
+    private void copyToDisk() throws IOException
+    {
+      MappedByteBuffer byteBuffer = nextBuffer();
+      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
+      {
+        ByteSequence key = mapEntry.getKey();
+        for (ByteSequence value : mapEntry.getValue())
+        {
+          put(byteBuffer, key);
+          put(byteBuffer, value);
+        }
+      }
+      if (byteBuffer.position() != maximumExpectedSizeOnDisk)
+      {
+        logger.trace("Expected to write %d bytes, but actually wrote %d bytes",
+            maximumExpectedSizeOnDisk, byteBuffer.position());
+      }
+
+      byteBuffer.force();
+
+      addPosition(bufferPositions, byteBuffer);
+    }
+
+    private MappedByteBuffer nextBuffer() throws IOException
+    {
+      // FIXME JNR bufferSize is an acceptable over approximation
+      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
+    }
+
+    private int getLastPosition(List<Integer> l)
+    {
+      return l.get(l.size() - 1);
+    }
+
+    private void addPosition(List<Integer> l, MappedByteBuffer byteBuffer)
+    {
+      l.add(getLastPosition(l) + byteBuffer.position());
+    }
+
+    private void put(ByteBuffer byteBuffer, ByteSequence b)
+    {
+      byteBuffer.putInt(b.length());
+      // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
+      // Why does it do that?
+      final int posBeforeFlip = byteBuffer.position();
+      b.copyTo(byteBuffer);
+      byteBuffer.limit(bufferSize);
+      byteBuffer.position(posBeforeFlip + b.length());
+    }
+
+    void flush()
+    {
+      writeBufferIndexFile();
+    }
+
+    private void writeBufferIndexFile()
+    {
+      final File bufferIndexFile = new File(file.getParent(), file.getName() + ".index");
+      try (PrintWriter writer = new PrintWriter(bufferIndexFile))
+      {
+        writer.print(Utils.joinAsString(" ", this.bufferPositions));
+      }
+      catch (FileNotFoundException e)
+      {
+        logger.traceException(e);
+      }
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString()
+    {
+      String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
+      return getClass().getSimpleName()
+          + "(treeName=\"" + treeName + "\""
+          + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
+          + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
+    }
+  }
+
+  /** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */
+  private static final class MemoryMappedBufferImporter implements Importer
+  {
+    private final File bufferDir;
+    private final Map<TreeName, Buffer> treeNameToBufferMap = new HashMap<>();
+
+    private MemoryMappedBufferImporter(File bufferDir)
+    {
+      this.bufferDir = bufferDir;
+    }
+
+    @Override
+    public void put(TreeName treeName, ByteSequence key, ByteSequence value)
+    {
+      try
+      {
+        getBuffer(treeName).putKeyValue(key, value);
+      }
+      catch (IOException e)
+      {
+        logger.traceException(e);
+      }
+    }
+
+    private Buffer getBuffer(TreeName treeName) throws IOException
+    {
+      Buffer buffer = treeNameToBufferMap.get(treeName);
+      if (buffer == null)
+      {
+        // TODO JNR that would be great if it was creating sub directories :)
+        buffer = new Buffer(new File(bufferDir, treeName.toString()));
+        treeNameToBufferMap.put(treeName, buffer);
+      }
+      return buffer;
+    }
+
+    @Override
+    public ByteString read(TreeName treeName, ByteSequence key)
+    {
+      throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public boolean delete(TreeName treeName, ByteSequence key)
+    {
+      throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public void createTree(TreeName name)
+    {
+      throw new RuntimeException("Not implemented");
+    }
+
+    @Override
+    public void close()
+    {
+      for (Buffer buffer : treeNameToBufferMap.values())
+      {
+        buffer.flush();
+      }
+    }
+  }
+
   /**
    * Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting
    * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
@@ -181,8 +541,6 @@
 
   /** Temp scratch directory. */
   private final File tempDir;
-  /** Size in bytes of DN cache. */
-  private long dnCacheSize;
   /** Available memory at the start of the import. */
   private long availableMemory;
   /** Size in bytes of DB cache. */
@@ -295,57 +653,21 @@
 
     final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
 
-    // We need caching when doing DN validation
-    if (validateDNs)
+    if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
     {
-      // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
-      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
-      {
-        dbCacheSize = 500 * KB;
-        dnCacheSize = 500 * KB;
-      }
-      else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
-      {
-        dbCacheSize = MIN_DB_CACHE_SIZE;
-        dnCacheSize = MIN_DB_CACHE_SIZE;
-      }
-      else if (!clearedBackend)
-      {
-        // Appending to existing data so reserve extra memory for the DB cache
-        // since it will be needed for dn2id queries.
-        dbCacheSize = usableMemory * 33 / 100;
-        dnCacheSize = usableMemory * 33 / 100;
-      }
-      else
-      {
-        dbCacheSize = MAX_DB_CACHE_SIZE;
-        dnCacheSize = usableMemory * 66 / 100;
-      }
+      dbCacheSize = 500 * KB;
+    }
+    // We need caching when doing DN validation
+    else if (usableMemory < MIN_DB_CACHE_MEMORY + (validateDNs ? MIN_DB_CACHE_SIZE : 0))
+    {
+      dbCacheSize = MIN_DB_CACHE_SIZE;
     }
     else
     {
-      // No DN validation: calculate memory for DB cache and buffers.
-
-      // No need for DN2ID cache.
-      dnCacheSize = 0;
-
-      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
-      {
-        dbCacheSize = 500 * KB;
-      }
-      else if (usableMemory < MIN_DB_CACHE_MEMORY)
-      {
-        dbCacheSize = MIN_DB_CACHE_SIZE;
-      }
-      else
-      {
-        // No need to differentiate between append/clear backend, since dn2id is
-        // not being queried.
-        dbCacheSize = MAX_DB_CACHE_SIZE;
-      }
+      dbCacheSize = MAX_DB_CACHE_SIZE;
     }
 
-    final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize;
+    final long phaseOneBufferMemory = usableMemory - dbCacheSize;
     final int oldThreadCount = threadCount;
     if (indexCount != 0) // Avoid / by zero
     {
@@ -370,12 +692,7 @@
             final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
             if (!clearedBackend)
             {
-              dbCacheSize += extraMemory / 2;
-              dnCacheSize += extraMemory / 2;
-            }
-            else
-            {
-              dnCacheSize += extraMemory;
+              dbCacheSize += extraMemory;
             }
           }
 
@@ -396,7 +713,7 @@
           // Not enough memory.
           final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
           throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(
-              usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize));
+              usableMemory, minimumPhaseOneBufferMemory + dbCacheSize));
         }
       }
     }
@@ -407,10 +724,6 @@
     }
 
     logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
-    if (dnCacheSize > 0)
-    {
-      logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize);
-    }
     logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
   }
 
@@ -594,8 +907,8 @@
       logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
       logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
 
-      final Storage storage = rootContainer.getStorage();
-      storage.write(new WriteOperation()
+      final Storage backendStorage = rootContainer.getStorage();
+      backendStorage.write(new WriteOperation()
       {
         @Override
         public void run(WriteableTransaction txn) throws Exception
@@ -605,8 +918,10 @@
         }
       });
 
+      final Importer tmpImporter = new MemoryMappedBufferImporter(tempDir);
+
       final long startTime = System.currentTimeMillis();
-      importPhaseOne();
+      importPhaseOne(backendStorage, tmpImporter);
       final long phaseOneFinishTime = System.currentTimeMillis();
 
       if (isCanceled())
@@ -622,7 +937,7 @@
       }
       final long phaseTwoFinishTime = System.currentTimeMillis();
 
-      storage.write(new WriteOperation()
+      backendStorage.write(new WriteOperation()
       {
         @Override
         public void run(WriteableTransaction txn) throws Exception
@@ -700,29 +1015,33 @@
    * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
    * The scratch files will be read by phaseTwo to perform on-disk merge</li>
    * </ol>
+   * TODO JNR fix all javadocs
    */
-  private void importPhaseOne() throws Exception
+  private void importPhaseOne(Storage storage, Importer importer) throws Exception
   {
     final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+    threadCount = 2; // FIXME JNR id2entry + another task
     final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 
-    final Storage storage = rootContainer.getStorage();
-    final Importer importer = storage.startImport();
-    execService.submit(new MigrateExistingTask(storage, importer)).get();
+    final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(storage);
+    final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
+    execService.submit(new MigrateExistingEntriesTask(storage, importer, id2EntryPutTask)).get();
 
     final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
     if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
     {
-      for (int i = 0; i < threadCount; i++)
+      for (int i = 0; i < threadCount - 1; i++)
       {
-        tasks.add(new ImportTask(importer));
+        tasks.add(new ImportTask(importer, id2EntryPutTask));
       }
     }
     execService.invokeAll(tasks);
     tasks.clear();
 
-    execService.submit(new MigrateExcludedTask(storage, importer)).get();
+    execService.submit(new MigrateExcludedTask(storage, importer, id2EntryPutTask)).get();
+    id2EntryPutTask.finishedWrites();
+    dn2IdPutFuture.get();
 
     shutdownAll(timerService, execService);
   }
@@ -763,9 +1082,9 @@
   {
     private final Storage storage;
 
-    private MigrateExcludedTask(final Storage storage, final Importer importer)
+    private MigrateExcludedTask(Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
     {
-      super(importer);
+      super(importer, id2EntryPutTask);
       this.storage = storage;
     }
 
@@ -838,13 +1157,13 @@
   }
 
   /** Task to migrate existing entries. */
-  private final class MigrateExistingTask extends ImportTask
+  private final class MigrateExistingEntriesTask extends ImportTask
   {
     private final Storage storage;
 
-    private MigrateExistingTask(final Storage storage, final Importer importer)
+    private MigrateExistingEntriesTask(final Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
     {
-      super(importer);
+      super(importer, id2EntryPutTask);
       this.storage = storage;
     }
 
@@ -939,10 +1258,12 @@
   private class ImportTask implements Callable<Void>
   {
     private final Importer importer;
+    private final Id2EntryPutTask id2EntryPutTask;
 
-    public ImportTask(final Importer importer)
+    public ImportTask(final Importer importer, Id2EntryPutTask id2EntryPutTask)
     {
       this.importer = importer;
+      this.id2EntryPutTask = id2EntryPutTask;
     }
 
     /** {@inheritDoc} */
@@ -997,8 +1318,7 @@
       processDN2URI(suffix, entry);
       processIndexes(suffix, entry, entryID);
       processVLVIndexes(suffix, entry, entryID);
-      // FIXME JNR run a dedicated thread to do the puts ordered by entryID
-      // suffix.getID2Entry().put(importer, entryID, entry);
+      id2EntryPutTask.put(suffix, entryID, entry);
       importCount.getAndIncrement();
     }
 
@@ -1066,8 +1386,7 @@
       }
     }
 
-    void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID)
-        throws DirectoryException
+    void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException
     {
       for (VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes())
       {
@@ -1151,42 +1470,7 @@
     }
   }
 
-  /** Invocation handler for the {@link PluggableBackendCfg} proxy. */
-  private static final class BackendCfgHandler implements InvocationHandler
-  {
-    private final Map<String, Object> returnValues;
-
-    private BackendCfgHandler(final Map<String, Object> returnValues)
-    {
-      this.returnValues = returnValues;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
-    {
-      final String methodName = method.getName();
-      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
-      {
-        // ignore calls to (add|remove)*ChangeListener() methods
-        return null;
-      }
-
-      final Object returnValue = returnValues.get(methodName);
-      if (returnValue != null)
-      {
-        return returnValue;
-      }
-      throw new IllegalArgumentException("Unhandled method call on proxy ("
-          + BackendCfgHandler.class.getSimpleName()
-          + ") for method (" + method
-          + ") with arguments (" + Arrays.toString(args) + ")");
-    }
-  }
-
-  /**
-   * Used to check DN's when DN validation is performed during phase one processing.
-   * It is deleted after phase one processing.
-   */
+  /** Used to check DN's when DN validation is performed during phase one processing. */
   private final class Dn2IdDnCache implements DNCache
   {
     private Suffix suffix;

--
Gitblit v1.10.0