mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.12.2015 46631887a9d7a2a52b55f0a96f4b14540b1b6f91
OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

Implemented the memory mapped files buffer for import phase 1.
Remain to copy/stream data from phase 1 buffers to trees in backend's storage.
... minus bugs of course.


OnDiskMergeStorageImporter.java:
Implemented MemoryMappedBufferImporter for importPhaseOne().
Added Id2EntryPutTask and Id2EntryData inner classes for putting in id2entry.
Renamed MigrateExistingTask to MigrateExistingEntriesTask.
Removed more unused code.
1 files modified
520 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java 520 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -34,21 +34,32 @@
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.TimerTask;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,6 +71,7 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.forgerock.opendj.ldap.ByteStringBuilder;
import org.forgerock.util.Utils;
import org.opends.server.admin.std.meta.BackendIndexCfgDefn.IndexType;
import org.opends.server.admin.std.server.BackendIndexCfg;
import org.opends.server.admin.std.server.PluggableBackendCfg;
@@ -72,6 +84,7 @@
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.core.DirectoryServer;
@@ -91,6 +104,353 @@
 */
final class OnDiskMergeStorageImporter
{
  /** Data to put into id2entry tree. */
  private static final class Id2EntryData
  {
    private final Suffix suffix;
    private final EntryID entryID;
    private final Entry entry;
    public Id2EntryData(Suffix suffix, EntryID entryID, Entry entry)
    {
      this.suffix = suffix;
      this.entryID = entryID;
      this.entry = entry;
    }
    private void put(WriteableTransaction txn) throws DirectoryException
    {
      suffix.getID2Entry().put(txn, entryID, entry);
    }
    @Override
    public String toString()
    {
      return getClass().getSimpleName()
          + "(suffix=" + suffix
          + ", entryID=" + entryID
          + ", entry=" + entry + ")";
    }
  }
  /** Runnable putting data into id2entry tree in batches. */
  private final class Id2EntryPutTask implements Runnable
  {
    private static final int BATCH_SIZE = 100;
    private volatile boolean moreData = true;
    private final Storage storage;
    private final NavigableSet<Id2EntryData> dataToPut = new ConcurrentSkipListSet<>(new Comparator<Id2EntryData>()
    {
      @Override
      public int compare(Id2EntryData o1, Id2EntryData o2)
      {
        return o1.entryID.compareTo(o2.entryID);
      }
    });
    private Id2EntryPutTask(Storage storage)
    {
      this.storage = storage;
    }
    private void put(Suffix suffix, EntryID entryID, Entry entry)
    {
      dataToPut.add(new Id2EntryData(suffix, entryID, entry));
      if (enoughDataToPut())
      {
        synchronized (dataToPut)
        {
          dataToPut.notify();
        }
      }
    }
    @Override
    public void run()
    {
      try
      {
        while (!isCanceled() && moreData)
        {
          if (enoughDataToPut())
          {
            put(BATCH_SIZE);
          }
          else
          {
            synchronized (dataToPut)
            {
              if (moreData)
              {
                dataToPut.wait();
              }
            }
          }
        }
        while (!isCanceled() && !dataToPut.isEmpty())
        {
          put(BATCH_SIZE);
        }
      }
      catch (Exception e)
      {
        logger.traceException(e);
      }
    }
    private boolean enoughDataToPut()
    {
      return dataToPut.size() > BATCH_SIZE;
    }
    private void put(final int batchSize) throws Exception
    {
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
        {
          int count = 0;
          while (!dataToPut.isEmpty() && count < batchSize)
          {
            dataToPut.pollFirst().put(txn);
            count++;
          }
        }
      });
    }
    private void finishedWrites()
    {
      moreData = false;
      synchronized (dataToPut)
      {
        dataToPut.notify();
      }
    }
    @Override
    public String toString()
    {
      final StringBuilder sb = new StringBuilder("[");
      Iterator<Id2EntryData> it = dataToPut.iterator();
      if (it.hasNext())
      {
        sb.append(it.next().entryID.longValue());
      }
      while (it.hasNext())
      {
        sb.append(",");
        sb.append(it.next().entryID.longValue());
      }
      sb.append("]");
      return super.toString();
    }
  }
  /**
   * Represents an on-disk buffer file, accessed via memory mapped files.
   * <p>
   * Data to write is appended in-memory before being dumped into a {@link MappedByteBuffer}
   * for writing to disk.
   */
  private static final class Buffer
  {
    private final File file;
    private final FileChannel fileChannel;
    private final List<Integer> bufferPositions = new ArrayList<>();
    private int bufferSize = 1024; // TODO JNR use MAX_BUFFER_SIZE?
    // FIXME this is not thread safe yet!!!
    /**
     * Maps {@link ByteSequence} keys to (conflicting) values.
     * <p>
     * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
     * {@link #bufferSize}.
     */
    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
    /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
    private int maximumExpectedSizeOnDisk;
    private Buffer(File file) throws FileNotFoundException
    {
      file.getParentFile().mkdirs();
      this.file = file;
      this.fileChannel = new RandomAccessFile(file, "rw").getChannel();
      this.bufferPositions.add(0);
    }
    void putKeyValue(ByteSequence key, ByteSequence value) throws IOException
    {
      int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
      if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
      {
        copyToDisk();
        inMemoryStore.clear();
        maximumExpectedSizeOnDisk = 0;
      }
      Set<ByteSequence> values = inMemoryStore.get(key);
      if (values == null)
      {
        values = new ConcurrentSkipListSet<>();
        Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
        if (existingValues != null)
        {
          values = existingValues;
        }
      }
      values.add(value);
      maximumExpectedSizeOnDisk += recordSize;
    }
    private void copyToDisk() throws IOException
    {
      MappedByteBuffer byteBuffer = nextBuffer();
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
      {
        ByteSequence key = mapEntry.getKey();
        for (ByteSequence value : mapEntry.getValue())
        {
          put(byteBuffer, key);
          put(byteBuffer, value);
        }
      }
      if (byteBuffer.position() != maximumExpectedSizeOnDisk)
      {
        logger.trace("Expected to write %d bytes, but actually wrote %d bytes",
            maximumExpectedSizeOnDisk, byteBuffer.position());
      }
      byteBuffer.force();
      addPosition(bufferPositions, byteBuffer);
    }
    private MappedByteBuffer nextBuffer() throws IOException
    {
      // FIXME JNR bufferSize is an acceptable over approximation
      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
    }
    private int getLastPosition(List<Integer> l)
    {
      return l.get(l.size() - 1);
    }
    private void addPosition(List<Integer> l, MappedByteBuffer byteBuffer)
    {
      l.add(getLastPosition(l) + byteBuffer.position());
    }
    private void put(ByteBuffer byteBuffer, ByteSequence b)
    {
      byteBuffer.putInt(b.length());
      // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
      // Why does it do that?
      final int posBeforeFlip = byteBuffer.position();
      b.copyTo(byteBuffer);
      byteBuffer.limit(bufferSize);
      byteBuffer.position(posBeforeFlip + b.length());
    }
    void flush()
    {
      writeBufferIndexFile();
    }
    private void writeBufferIndexFile()
    {
      final File bufferIndexFile = new File(file.getParent(), file.getName() + ".index");
      try (PrintWriter writer = new PrintWriter(bufferIndexFile))
      {
        writer.print(Utils.joinAsString(" ", this.bufferPositions));
      }
      catch (FileNotFoundException e)
      {
        logger.traceException(e);
      }
    }
    /** {@inheritDoc} */
    @Override
    public String toString()
    {
      String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
      return getClass().getSimpleName()
          + "(treeName=\"" + treeName + "\""
          + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
          + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
    }
  }
  /** An importer using memory mapped files, a.k.a {@link MappedByteBuffer}. */
  private static final class MemoryMappedBufferImporter implements Importer
  {
    private final File bufferDir;
    private final Map<TreeName, Buffer> treeNameToBufferMap = new HashMap<>();
    private MemoryMappedBufferImporter(File bufferDir)
    {
      this.bufferDir = bufferDir;
    }
    @Override
    public void put(TreeName treeName, ByteSequence key, ByteSequence value)
    {
      try
      {
        getBuffer(treeName).putKeyValue(key, value);
      }
      catch (IOException e)
      {
        logger.traceException(e);
      }
    }
    private Buffer getBuffer(TreeName treeName) throws IOException
    {
      Buffer buffer = treeNameToBufferMap.get(treeName);
      if (buffer == null)
      {
        // TODO JNR that would be great if it was creating sub directories :)
        buffer = new Buffer(new File(bufferDir, treeName.toString()));
        treeNameToBufferMap.put(treeName, buffer);
      }
      return buffer;
    }
    @Override
    public ByteString read(TreeName treeName, ByteSequence key)
    {
      throw new RuntimeException("Not implemented");
    }
    @Override
    public boolean delete(TreeName treeName, ByteSequence key)
    {
      throw new RuntimeException("Not implemented");
    }
    @Override
    public void createTree(TreeName name)
    {
      throw new RuntimeException("Not implemented");
    }
    @Override
    public void close()
    {
      for (Buffer buffer : treeNameToBufferMap.values())
      {
        buffer.flush();
      }
    }
  }
  /**
   * Shim that allows properly constructing an {@link OnDiskMergeStorageImporter} without polluting
   * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings.
@@ -181,8 +541,6 @@
  /** Temp scratch directory. */
  private final File tempDir;
  /** Size in bytes of DN cache. */
  private long dnCacheSize;
  /** Available memory at the start of the import. */
  private long availableMemory;
  /** Size in bytes of DB cache. */
@@ -295,57 +653,21 @@
    final long usableMemory = availableMemory - (indexCount * READER_WRITER_BUFFER_SIZE);
    // We need caching when doing DN validation
    if (validateDNs)
    if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
    {
      // DN validation: calculate memory for DB cache, DN2ID temporary cache, and buffers.
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
      {
        dbCacheSize = 500 * KB;
        dnCacheSize = 500 * KB;
      }
      else if (usableMemory < (MIN_DB_CACHE_MEMORY + MIN_DB_CACHE_SIZE))
      {
        dbCacheSize = MIN_DB_CACHE_SIZE;
        dnCacheSize = MIN_DB_CACHE_SIZE;
      }
      else if (!clearedBackend)
      {
        // Appending to existing data so reserve extra memory for the DB cache
        // since it will be needed for dn2id queries.
        dbCacheSize = usableMemory * 33 / 100;
        dnCacheSize = usableMemory * 33 / 100;
      }
      else
      {
        dbCacheSize = MAX_DB_CACHE_SIZE;
        dnCacheSize = usableMemory * 66 / 100;
      }
      dbCacheSize = 500 * KB;
    }
    // We need caching when doing DN validation
    else if (usableMemory < MIN_DB_CACHE_MEMORY + (validateDNs ? MIN_DB_CACHE_SIZE : 0))
    {
      dbCacheSize = MIN_DB_CACHE_SIZE;
    }
    else
    {
      // No DN validation: calculate memory for DB cache and buffers.
      // No need for DN2ID cache.
      dnCacheSize = 0;
      if (System.getProperty(PROPERTY_RUNNING_UNIT_TESTS) != null)
      {
        dbCacheSize = 500 * KB;
      }
      else if (usableMemory < MIN_DB_CACHE_MEMORY)
      {
        dbCacheSize = MIN_DB_CACHE_SIZE;
      }
      else
      {
        // No need to differentiate between append/clear backend, since dn2id is
        // not being queried.
        dbCacheSize = MAX_DB_CACHE_SIZE;
      }
      dbCacheSize = MAX_DB_CACHE_SIZE;
    }
    final long phaseOneBufferMemory = usableMemory - dbCacheSize - dnCacheSize;
    final long phaseOneBufferMemory = usableMemory - dbCacheSize;
    final int oldThreadCount = threadCount;
    if (indexCount != 0) // Avoid / by zero
    {
@@ -370,12 +692,7 @@
            final long extraMemory = phaseOneBufferMemory - (totalPhaseOneBufferCount * bufferSize);
            if (!clearedBackend)
            {
              dbCacheSize += extraMemory / 2;
              dnCacheSize += extraMemory / 2;
            }
            else
            {
              dnCacheSize += extraMemory;
              dbCacheSize += extraMemory;
            }
          }
@@ -396,7 +713,7 @@
          // Not enough memory.
          final long minimumPhaseOneBufferMemory = totalPhaseOneBufferCount * MIN_BUFFER_SIZE;
          throw new InitializationException(ERR_IMPORT_LDIF_LACK_MEM.get(
              usableMemory, minimumPhaseOneBufferMemory + dbCacheSize + dnCacheSize));
              usableMemory, minimumPhaseOneBufferMemory + dbCacheSize));
        }
      }
    }
@@ -407,10 +724,6 @@
    }
    logger.info(NOTE_IMPORT_LDIF_TOT_MEM_BUF, availableMemory, phaseOneBufferCount);
    if (dnCacheSize > 0)
    {
      logger.info(NOTE_IMPORT_LDIF_TMP_ENV_MEM, dnCacheSize);
    }
    logger.info(NOTE_IMPORT_LDIF_DB_MEM_BUF_INFO, dbCacheSize, bufferSize);
  }
@@ -594,8 +907,8 @@
      logger.info(NOTE_IMPORT_STARTING, DirectoryServer.getVersionString(), BUILD_ID, REVISION_NUMBER);
      logger.info(NOTE_IMPORT_THREAD_COUNT, threadCount);
      final Storage storage = rootContainer.getStorage();
      storage.write(new WriteOperation()
      final Storage backendStorage = rootContainer.getStorage();
      backendStorage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
@@ -605,8 +918,10 @@
        }
      });
      final Importer tmpImporter = new MemoryMappedBufferImporter(tempDir);
      final long startTime = System.currentTimeMillis();
      importPhaseOne();
      importPhaseOne(backendStorage, tmpImporter);
      final long phaseOneFinishTime = System.currentTimeMillis();
      if (isCanceled())
@@ -622,7 +937,7 @@
      }
      final long phaseTwoFinishTime = System.currentTimeMillis();
      storage.write(new WriteOperation()
      backendStorage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
@@ -700,29 +1015,33 @@
   * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
   * </ol>
   * TODO JNR fix all javadocs
   */
  private void importPhaseOne() throws Exception
  private void importPhaseOne(Storage storage, Importer importer) throws Exception
  {
    final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
    threadCount = 2; // FIXME JNR id2entry + another task
    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
    final Storage storage = rootContainer.getStorage();
    final Importer importer = storage.startImport();
    execService.submit(new MigrateExistingTask(storage, importer)).get();
    final Id2EntryPutTask id2EntryPutTask = new Id2EntryPutTask(storage);
    final Future<?> dn2IdPutFuture = execService.submit(id2EntryPutTask);
    execService.submit(new MigrateExistingEntriesTask(storage, importer, id2EntryPutTask)).get();
    final List<Callable<Void>> tasks = new ArrayList<>(threadCount);
    if (!importCfg.appendToExistingData() || !importCfg.replaceExistingEntries())
    {
      for (int i = 0; i < threadCount; i++)
      for (int i = 0; i < threadCount - 1; i++)
      {
        tasks.add(new ImportTask(importer));
        tasks.add(new ImportTask(importer, id2EntryPutTask));
      }
    }
    execService.invokeAll(tasks);
    tasks.clear();
    execService.submit(new MigrateExcludedTask(storage, importer)).get();
    execService.submit(new MigrateExcludedTask(storage, importer, id2EntryPutTask)).get();
    id2EntryPutTask.finishedWrites();
    dn2IdPutFuture.get();
    shutdownAll(timerService, execService);
  }
@@ -763,9 +1082,9 @@
  {
    private final Storage storage;
    private MigrateExcludedTask(final Storage storage, final Importer importer)
    private MigrateExcludedTask(Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
    {
      super(importer);
      super(importer, id2EntryPutTask);
      this.storage = storage;
    }
@@ -838,13 +1157,13 @@
  }
  /** Task to migrate existing entries. */
  private final class MigrateExistingTask extends ImportTask
  private final class MigrateExistingEntriesTask extends ImportTask
  {
    private final Storage storage;
    private MigrateExistingTask(final Storage storage, final Importer importer)
    private MigrateExistingEntriesTask(final Storage storage, Importer importer, Id2EntryPutTask id2EntryPutTask)
    {
      super(importer);
      super(importer, id2EntryPutTask);
      this.storage = storage;
    }
@@ -939,10 +1258,12 @@
  private class ImportTask implements Callable<Void>
  {
    private final Importer importer;
    private final Id2EntryPutTask id2EntryPutTask;
    public ImportTask(final Importer importer)
    public ImportTask(final Importer importer, Id2EntryPutTask id2EntryPutTask)
    {
      this.importer = importer;
      this.id2EntryPutTask = id2EntryPutTask;
    }
    /** {@inheritDoc} */
@@ -997,8 +1318,7 @@
      processDN2URI(suffix, entry);
      processIndexes(suffix, entry, entryID);
      processVLVIndexes(suffix, entry, entryID);
      // FIXME JNR run a dedicated thread to do the puts ordered by entryID
      // suffix.getID2Entry().put(importer, entryID, entry);
      id2EntryPutTask.put(suffix, entryID, entry);
      importCount.getAndIncrement();
    }
@@ -1066,8 +1386,7 @@
      }
    }
    void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID)
        throws DirectoryException
    void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException
    {
      for (VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes())
      {
@@ -1151,42 +1470,7 @@
    }
  }
  /** Invocation handler for the {@link PluggableBackendCfg} proxy. */
  private static final class BackendCfgHandler implements InvocationHandler
  {
    private final Map<String, Object> returnValues;
    private BackendCfgHandler(final Map<String, Object> returnValues)
    {
      this.returnValues = returnValues;
    }
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
    {
      final String methodName = method.getName();
      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
      {
        // ignore calls to (add|remove)*ChangeListener() methods
        return null;
      }
      final Object returnValue = returnValues.get(methodName);
      if (returnValue != null)
      {
        return returnValue;
      }
      throw new IllegalArgumentException("Unhandled method call on proxy ("
          + BackendCfgHandler.class.getSimpleName()
          + ") for method (" + method
          + ") with arguments (" + Arrays.toString(args) + ")");
    }
  }
  /**
   * Used to check DN's when DN validation is performed during phase one processing.
   * It is deleted after phase one processing.
   */
  /** Used to check DN's when DN validation is performed during phase one processing. */
  private final class Dn2IdDnCache implements DNCache
  {
    private Suffix suffix;