mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.50.2015 5a06735032d3c0155548b77c9e627674c400b4ec
OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

Various little performance improvements.
ID2ChildrenCount remain to be computed.


OnDiskMergeStorageImporter.java:
In Buffer:
- increased the size of memory mapped buffers from 1024 to 10 MiB.
- used ConcurrentHashMaps instead of ConcurrentSkipListMaps because during performance testing it was found this code spent a lot of time in ByteString.compareTo() when putting entries to the map. However, at this point, we only need to put() very quickly data in the map, we do not need keys to be sorted.
- added ConcurrentHashSet, Set implementation backed by a ConcurrentHashMap, to avoid the use of ConcurrentSkipListSet backed by ConcurrentSkipListMap
- renamed copyToDisk() to flushToMappedByteBuffer()
In ImportIDSetsMerger():
- in merge(), extracted and rewrote buildEntryIDSet(Set<ByteString> values)
1 files modified
213 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java 213 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -46,6 +46,7 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -57,12 +58,14 @@
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -122,6 +125,118 @@
    return new UnsupportedOperationException("Not implemented");
  }
  /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
  private static final class ConcurrentHashSet<E> implements Set<E>
  {
    private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
    @Override
    public boolean add(E e)
    {
      return delegate.put(e, e) == null;
    }
    @Override
    public boolean addAll(Collection<? extends E> c)
    {
      boolean changed = false;
      for (E e : c)
      {
        changed &= add(e);
      }
      return changed;
    }
    @Override
    public void clear()
    {
      delegate.clear();
    }
    @Override
    public boolean contains(Object o)
    {
      return delegate.containsKey(o);
    }
    @Override
    public boolean containsAll(Collection<?> c)
    {
      return delegateSet().containsAll(c);
    }
    @Override
    public boolean equals(Object o)
    {
      return delegateSet().equals(o);
    }
    @Override
    public int hashCode()
    {
      return delegateSet().hashCode();
    }
    @Override
    public boolean isEmpty()
    {
      return delegate.isEmpty();
    }
    @Override
    public Iterator<E> iterator()
    {
      return delegateSet().iterator();
    }
    @Override
    public boolean remove(Object o)
    {
      return delegateSet().remove(o);
    }
    @Override
    public boolean removeAll(Collection<?> c)
    {
      return delegateSet().removeAll(c);
    }
    @Override
    public boolean retainAll(Collection<?> c)
    {
      return delegateSet().retainAll(c);
    }
    @Override
    public int size()
    {
      return delegate.size();
    }
    @Override
    public Object[] toArray()
    {
      return delegateSet().toArray();
    }
    @Override
    public <T> T[] toArray(T[] a)
    {
      return delegateSet().toArray(a);
    }
    @Override
    public String toString()
    {
      return delegateSet().toString();
    }
    private Set<E> delegateSet()
    {
      return delegate.keySet();
    }
  }
  /** Data to put into id2entry tree. */
  private static final class Id2EntryData
  {
@@ -281,7 +396,7 @@
    private final FileChannel fileChannel;
    private final List<Integer> bufferPositions = new ArrayList<>();
    /** TODO JNR offer configuration for this. */
    private int bufferSize = 1024;
    private int bufferSize = 10 * MB;
    // FIXME this is not thread safe yet!!!
    /**
@@ -289,8 +404,16 @@
     * <p>
     * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
     * {@link #bufferSize}.
     * <p>
     * This code uses a {@link ConcurrentHashMap} instead of a {@link ConcurrentSkipListMap} because
     * during performance testing it was found this code spent a lot of time in
     * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
     * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
     * <p>
     * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
     * is not required. How to solve this problem?
     */
    private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>();
    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
    /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
    private int maximumExpectedSizeOnDisk;
@@ -307,7 +430,7 @@
      int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
      if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
      {
        copyToDisk();
        flushToMappedByteBuffer();
        inMemoryStore.clear();
        maximumExpectedSizeOnDisk = 0;
      }
@@ -315,7 +438,7 @@
      Set<ByteSequence> values = inMemoryStore.get(key);
      if (values == null)
      {
        values = new ConcurrentSkipListSet<>();
        values = new ConcurrentHashSet<>();
        Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
        if (existingValues != null)
        {
@@ -326,10 +449,12 @@
      maximumExpectedSizeOnDisk += recordSize;
    }
    private void copyToDisk() throws IOException
    private void flushToMappedByteBuffer() throws IOException
    {
      final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
      MappedByteBuffer byteBuffer = nextBuffer();
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
      {
        ByteSequence key = mapEntry.getKey();
        // FIXME JNR merge values before put
@@ -352,8 +477,9 @@
    private MappedByteBuffer nextBuffer() throws IOException
    {
      // FIXME JNR bufferSize is an acceptable over approximation
      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
      // FIXME JNR when merging duplicate keys during phase one,
      // maximumExpectedSizeOnDisk is an acceptable over approximation
      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), maximumExpectedSizeOnDisk);
    }
    private int getLastPosition(List<Integer> l)
@@ -371,14 +497,16 @@
      byteBuffer.putInt(b.length());
      // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
      // Why does it do that?
      final int limitBeforeFlip = byteBuffer.limit();
      final int posBeforeFlip = byteBuffer.position();
      b.copyTo(byteBuffer);
      byteBuffer.limit(bufferSize);
      byteBuffer.limit(limitBeforeFlip);
      byteBuffer.position(posBeforeFlip + b.length());
    }
    void flush()
    void flush() throws IOException
    {
      flushToMappedByteBuffer();
      writeBufferIndexFile();
    }
@@ -401,7 +529,7 @@
      String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
      return getClass().getSimpleName()
          + "(treeName=\"" + treeName + "\""
          + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
          + ", current buffer holds " + inMemoryStore.size() + " record(s)"
          + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
    }
  }
@@ -984,9 +1112,16 @@
    @Override
    public void close()
    {
      for (Buffer buffer : treeNameToBufferMap.values())
      try
      {
        buffer.flush();
        for (Buffer buffer : treeNameToBufferMap.values())
        {
          buffer.flush();
        }
      }
      catch (IOException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
@@ -1632,7 +1767,7 @@
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
    final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()?
    final Set<TreeName> treeNames = inStorage.listTrees();
    ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
    try (Importer importer = outStorage.startImport())
    {
@@ -1674,6 +1809,11 @@
          // key conflicts == merge EntryIDSets
          return new ImportIDSetsMerger(index);
        }
        else if (treeName.getIndexId().equals(ID2CHILDREN_COUNT_NAME))
        {
          // key conflicts == sum values
          // TODO JNR
        }
        else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
            || treeName.getIndexId().equals(DN2URI_INDEX_NAME)
            || isVLVIndex(entryContainer, treeName))
@@ -1836,17 +1976,11 @@
        }
        else if (values.size() == 1)
        {
          // Avoids unnecessary decoding + encoding
          return values.iterator().next();
        }
        ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
        for (ByteString value : values)
        {
          // FIXME JNR Can we make this more efficient?
          // go through long[] + sort in the end?
          idSet.merge(index.decodeValue(ByteString.empty(), value));
        }
        return index.toValue(idSet);
        return index.toValue(buildEntryIDSet(values));
      }
      finally
      {
@@ -1855,6 +1989,31 @@
        values.clear();
      }
    }
    private EntryIDSet buildEntryIDSet(Set<ByteString> values)
    {
      // accumulate in array
      int i = 0;
      long[] entryIDs = new long[index.getIndexEntryLimit()];
      for (ByteString value : values)
      {
        final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), value);
        if (!entryIDSet.isDefined() || i + entryIDSet.size() >= index.getIndexEntryLimit())
        {
          // above index entry limit
          return EntryIDSet.newUndefinedSet();
        }
        for (EntryID entryID : entryIDSet)
        {
          entryIDs[i++] = entryID.longValue();
        }
      }
      // due to how the entryIDSets are built, there should not be any duplicate entryIDs
      Arrays.sort(entryIDs);
      return EntryIDSet.newDefinedSet(entryIDs);
    }
  }
  /** Task used to migrate excluded branch. */
@@ -2093,10 +2252,12 @@
      {
        processDN2ID(suffix, entry.getName(), entryID);
      }
      processDN2URI(suffix, entry);
      processIndexes(suffix, entry, entryID);
      processVLVIndexes(suffix, entry, entryID);
      id2EntryPutTask.put(suffix, entryID, entry);
      importCount.getAndIncrement();
    }
@@ -2267,9 +2428,9 @@
    @Override
    public boolean insert(final DN dn, final EntryID entryID)
    {
      final AtomicBoolean result = new AtomicBoolean();
      try
      {
        final AtomicBoolean result = new AtomicBoolean();
        storage.write(new WriteOperation()
        {
          @Override
@@ -2278,12 +2439,12 @@
            result.set(suffix.getDN2ID().insert(txn, dn, entryID));
          }
        });
        return result.get();
      }
      catch (Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      return result.get();
    }
    @Override