mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
28.50.2015 6145a17c281d881e8976e486c0d6b6a203dffd48
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -46,6 +46,7 @@
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
@@ -57,12 +58,14 @@
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -122,6 +125,118 @@
    return new UnsupportedOperationException("Not implemented");
  }
  /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
  private static final class ConcurrentHashSet<E> implements Set<E>
  {
    private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
    @Override
    public boolean add(E e)
    {
      return delegate.put(e, e) == null;
    }
    @Override
    public boolean addAll(Collection<? extends E> c)
    {
      boolean changed = false;
      for (E e : c)
      {
        changed &= add(e);
      }
      return changed;
    }
    @Override
    public void clear()
    {
      delegate.clear();
    }
    @Override
    public boolean contains(Object o)
    {
      return delegate.containsKey(o);
    }
    @Override
    public boolean containsAll(Collection<?> c)
    {
      return delegateSet().containsAll(c);
    }
    @Override
    public boolean equals(Object o)
    {
      return delegateSet().equals(o);
    }
    @Override
    public int hashCode()
    {
      return delegateSet().hashCode();
    }
    @Override
    public boolean isEmpty()
    {
      return delegate.isEmpty();
    }
    @Override
    public Iterator<E> iterator()
    {
      return delegateSet().iterator();
    }
    @Override
    public boolean remove(Object o)
    {
      return delegateSet().remove(o);
    }
    @Override
    public boolean removeAll(Collection<?> c)
    {
      return delegateSet().removeAll(c);
    }
    @Override
    public boolean retainAll(Collection<?> c)
    {
      return delegateSet().retainAll(c);
    }
    @Override
    public int size()
    {
      return delegate.size();
    }
    @Override
    public Object[] toArray()
    {
      return delegateSet().toArray();
    }
    @Override
    public <T> T[] toArray(T[] a)
    {
      return delegateSet().toArray(a);
    }
    @Override
    public String toString()
    {
      return delegateSet().toString();
    }
    private Set<E> delegateSet()
    {
      return delegate.keySet();
    }
  }
  /** Data to put into id2entry tree. */
  private static final class Id2EntryData
  {
@@ -281,7 +396,7 @@
    private final FileChannel fileChannel;
    private final List<Integer> bufferPositions = new ArrayList<>();
    /** TODO JNR offer configuration for this. */
    private int bufferSize = 1024;
    private int bufferSize = 10 * MB;
    // FIXME this is not thread safe yet!!!
    /**
@@ -289,8 +404,16 @@
     * <p>
     * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
     * {@link #bufferSize}.
     * <p>
     * This code uses a {@link ConcurrentHashMap} instead of a {@link ConcurrentSkipListMap} because
     * during performance testing it was found this code spent a lot of time in
     * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
     * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
     * <p>
     * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
     * is not required. How to solve this problem?
     */
    private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>();
    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
    /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
    private int maximumExpectedSizeOnDisk;
@@ -307,7 +430,7 @@
      int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
      if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
      {
        copyToDisk();
        flushToMappedByteBuffer();
        inMemoryStore.clear();
        maximumExpectedSizeOnDisk = 0;
      }
@@ -315,7 +438,7 @@
      Set<ByteSequence> values = inMemoryStore.get(key);
      if (values == null)
      {
        values = new ConcurrentSkipListSet<>();
        values = new ConcurrentHashSet<>();
        Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
        if (existingValues != null)
        {
@@ -326,10 +449,12 @@
      maximumExpectedSizeOnDisk += recordSize;
    }
    private void copyToDisk() throws IOException
    private void flushToMappedByteBuffer() throws IOException
    {
      final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
      MappedByteBuffer byteBuffer = nextBuffer();
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
      {
        ByteSequence key = mapEntry.getKey();
        // FIXME JNR merge values before put
@@ -352,8 +477,9 @@
    private MappedByteBuffer nextBuffer() throws IOException
    {
      // FIXME JNR bufferSize is an acceptable over approximation
      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
      // FIXME JNR when merging duplicate keys during phase one,
      // maximumExpectedSizeOnDisk is an acceptable over approximation
      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), maximumExpectedSizeOnDisk);
    }
    private int getLastPosition(List<Integer> l)
@@ -371,14 +497,16 @@
      byteBuffer.putInt(b.length());
      // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
      // Why does it do that?
      final int limitBeforeFlip = byteBuffer.limit();
      final int posBeforeFlip = byteBuffer.position();
      b.copyTo(byteBuffer);
      byteBuffer.limit(bufferSize);
      byteBuffer.limit(limitBeforeFlip);
      byteBuffer.position(posBeforeFlip + b.length());
    }
    void flush()
    void flush() throws IOException
    {
      flushToMappedByteBuffer();
      writeBufferIndexFile();
    }
@@ -401,7 +529,7 @@
      String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
      return getClass().getSimpleName()
          + "(treeName=\"" + treeName + "\""
          + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
          + ", current buffer holds " + inMemoryStore.size() + " record(s)"
          + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
    }
  }
@@ -984,9 +1112,16 @@
    @Override
    public void close()
    {
      for (Buffer buffer : treeNameToBufferMap.values())
      try
      {
        buffer.flush();
        for (Buffer buffer : treeNameToBufferMap.values())
        {
          buffer.flush();
        }
      }
      catch (IOException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
@@ -1632,7 +1767,7 @@
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
    final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()?
    final Set<TreeName> treeNames = inStorage.listTrees();
    ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
    try (Importer importer = outStorage.startImport())
    {
@@ -1674,6 +1809,11 @@
          // key conflicts == merge EntryIDSets
          return new ImportIDSetsMerger(index);
        }
        else if (treeName.getIndexId().equals(ID2CHILDREN_COUNT_NAME))
        {
          // key conflicts == sum values
          // TODO JNR
        }
        else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
            || treeName.getIndexId().equals(DN2URI_INDEX_NAME)
            || isVLVIndex(entryContainer, treeName))
@@ -1836,17 +1976,11 @@
        }
        else if (values.size() == 1)
        {
          // Avoids unnecessary decoding + encoding
          return values.iterator().next();
        }
        ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
        for (ByteString value : values)
        {
          // FIXME JNR Can we make this more efficient?
          // go through long[] + sort in the end?
          idSet.merge(index.decodeValue(ByteString.empty(), value));
        }
        return index.toValue(idSet);
        return index.toValue(buildEntryIDSet(values));
      }
      finally
      {
@@ -1855,6 +1989,31 @@
        values.clear();
      }
    }
    private EntryIDSet buildEntryIDSet(Set<ByteString> values)
    {
      // accumulate in array
      int i = 0;
      long[] entryIDs = new long[index.getIndexEntryLimit()];
      for (ByteString value : values)
      {
        final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), value);
        if (!entryIDSet.isDefined() || i + entryIDSet.size() >= index.getIndexEntryLimit())
        {
          // above index entry limit
          return EntryIDSet.newUndefinedSet();
        }
        for (EntryID entryID : entryIDSet)
        {
          entryIDs[i++] = entryID.longValue();
        }
      }
      // due to how the entryIDSets are built, there should not be any duplicate entryIDs
      Arrays.sort(entryIDs);
      return EntryIDSet.newDefinedSet(entryIDs);
    }
  }
  /** Task used to migrate excluded branch. */
@@ -2093,10 +2252,12 @@
      {
        processDN2ID(suffix, entry.getName(), entryID);
      }
      processDN2URI(suffix, entry);
      processIndexes(suffix, entry, entryID);
      processVLVIndexes(suffix, entry, entryID);
      id2EntryPutTask.put(suffix, entryID, entry);
      importCount.getAndIncrement();
    }
@@ -2267,9 +2428,9 @@
    @Override
    public boolean insert(final DN dn, final EntryID entryID)
    {
      final AtomicBoolean result = new AtomicBoolean();
      try
      {
        final AtomicBoolean result = new AtomicBoolean();
        storage.write(new WriteOperation()
        {
          @Override
@@ -2278,12 +2439,12 @@
            result.set(suffix.getDN2ID().insert(txn, dn, entryID));
          }
        });
        return result.get();
      }
      catch (Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      return result.get();
    }
    @Override