mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
03.26.2015 82840bfe0d65a0715357a001e3224ba0d6a9c8df
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -64,8 +64,6 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -126,118 +124,6 @@
    return new UnsupportedOperationException("Not implemented");
  }
  /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
  private static final class ConcurrentHashSet<E> implements Set<E>
  {
    private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
    @Override
    public boolean add(E e)
    {
      return delegate.put(e, e) == null;
    }
    @Override
    public boolean addAll(Collection<? extends E> c)
    {
      boolean changed = false;
      for (E e : c)
      {
        changed &= add(e);
      }
      return changed;
    }
    @Override
    public void clear()
    {
      delegate.clear();
    }
    @Override
    public boolean contains(Object o)
    {
      return delegate.containsKey(o);
    }
    @Override
    public boolean containsAll(Collection<?> c)
    {
      return delegateSet().containsAll(c);
    }
    @Override
    public boolean equals(Object o)
    {
      return delegateSet().equals(o);
    }
    @Override
    public int hashCode()
    {
      return delegateSet().hashCode();
    }
    @Override
    public boolean isEmpty()
    {
      return delegate.isEmpty();
    }
    @Override
    public Iterator<E> iterator()
    {
      return delegateSet().iterator();
    }
    @Override
    public boolean remove(Object o)
    {
      return delegateSet().remove(o);
    }
    @Override
    public boolean removeAll(Collection<?> c)
    {
      return delegateSet().removeAll(c);
    }
    @Override
    public boolean retainAll(Collection<?> c)
    {
      return delegateSet().retainAll(c);
    }
    @Override
    public int size()
    {
      return delegate.size();
    }
    @Override
    public Object[] toArray()
    {
      return delegateSet().toArray();
    }
    @Override
    public <T> T[] toArray(T[] a)
    {
      return delegateSet().toArray(a);
    }
    @Override
    public String toString()
    {
      return delegateSet().toString();
    }
    private Set<E> delegateSet()
    {
      return delegate.keySet();
    }
  }
  /** Data to put into id2entry tree. */
  private static final class Id2EntryData
  {
@@ -393,37 +279,49 @@
   */
  private static final class Buffer
  {
    private final File file;
    private final TreeName treeName;
    private final File indexFile;
    private final File bufferFile;
    private final FileChannel fileChannel;
    private final List<Integer> bufferPositions = new ArrayList<>();
    /** TODO JNR offer configuration for this. */
    private int bufferSize = 10 * MB;
    private final int bufferSize = 10 * MB;
    // FIXME this is not thread safe yet!!!
    /**
     * Maps {@link ByteSequence} keys to (conflicting) values.
     * <p>
     * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
     * {@link #bufferSize}.
     * <p>
     * This code uses a {@link ConcurrentHashMap} instead of a {@code ConcurrentSkipListMap} because
     * during performance testing it was found this code spent a lot of time in
     * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
     * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
     * <p>
     * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
     * is not required. How to solve this problem?
     */
    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
    private Map<ByteSequence, List<ByteSequence>> inMemoryStore = new HashMap<>();
    /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
    private int maximumExpectedSizeOnDisk;
    private long totalBytes;
    private Buffer(File file) throws FileNotFoundException
    public Buffer(TreeName treeName, File bufferDir, MapMode mapMode) throws IOException
    {
      file.getParentFile().mkdirs();
      this.file = file;
      this.fileChannel = new RandomAccessFile(file, "rw").getChannel();
      this.bufferPositions.add(0);
      this.treeName = treeName;
      bufferFile = new File(bufferDir, treeName.toString());
      bufferFile.getParentFile().mkdirs();
      indexFile = new File(bufferDir, treeName + ".index");
      this.fileChannel = new RandomAccessFile(bufferFile, getMode(mapMode)).getChannel();
      if (MapMode.READ_WRITE.equals(mapMode))
      {
        this.bufferPositions.add(0);
      }
    }
    private String getMode(MapMode mapMode)
    {
      if (MapMode.READ_ONLY.equals(mapMode))
      {
        return "r";
      }
      else if (MapMode.READ_WRITE.equals(mapMode))
      {
        return "rw";
      }
      throw new IllegalArgumentException("Unhandled map mode: " + mapMode);
    }
    void putKeyValue(ByteSequence key, ByteSequence value) throws IOException
@@ -436,15 +334,11 @@
        maximumExpectedSizeOnDisk = 0;
      }
      Set<ByteSequence> values = inMemoryStore.get(key);
      List<ByteSequence> values = inMemoryStore.get(key);
      if (values == null)
      {
        values = new ConcurrentHashSet<>();
        Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
        if (existingValues != null)
        {
          values = existingValues;
        }
        values = new ArrayList<>();
        inMemoryStore.put(key, values);
      }
      values.add(value);
      maximumExpectedSizeOnDisk += recordSize;
@@ -452,13 +346,15 @@
    private void flushToMappedByteBuffer() throws IOException
    {
      final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
      final SortedMap<ByteSequence, List<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
      MappedByteBuffer byteBuffer = nextBuffer();
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
      for (Map.Entry<ByteSequence, List<ByteSequence>> mapEntry : sortedStore.entrySet())
      {
        ByteSequence key = mapEntry.getKey();
        // FIXME JNR merge values before put
        // Edit: Merging during phase one is slower than not merging at all,
        // perhaps due to merging importIDSets for keys that will exceed index entry limits anyway?
        for (ByteSequence value : mapEntry.getValue())
        {
          put(byteBuffer, key);
@@ -513,8 +409,7 @@
    private void writeBufferIndexFile()
    {
      final File bufferIndexFile = new File(file.getParent(), file.getName() + ".index");
      try (PrintWriter writer = new PrintWriter(bufferIndexFile))
      try (PrintWriter writer = new PrintWriter(indexFile))
      {
        writer.print(Utils.joinAsString(" ", this.bufferPositions));
      }
@@ -524,10 +419,47 @@
      }
    }
    private Cursor<ByteString, ByteString> openCursor() throws IOException
    {
      readBufferPositions();
      totalBytes = Files.size(bufferFile.toPath());
      final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, totalBytes);
      final List<ByteBufferCursor> cursors = new ArrayList<>(bufferPositions.size() - 1);
      Iterator<Integer> it = bufferPositions.iterator();
      if (it.hasNext())
      {
        int lastPos = it.next();
        while (it.hasNext())
        {
          final int bufferPos = it.next();
          cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
          lastPos = bufferPos;
        }
      }
      Cursor<ByteString, ByteString> composite = new CompositeCursor<ByteString, ByteString>(cursors);
      return new ProgressCursor<ByteString, ByteString>(composite, this, cursors);
    }
    private void readBufferPositions() throws IOException
    {
      List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
      if (indexLines.size() != 1)
      {
        throw new IllegalStateException("Not implemented");// TODO JNR
      }
      final String[] bufferPositionsString = indexLines.get(0).split(" ");
      for (String bufferPos : bufferPositionsString)
      {
        bufferPositions.add(Integer.valueOf(bufferPos));
      }
    }
    @Override
    public String toString()
    {
      String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
      return getClass().getSimpleName()
          + "(treeName=\"" + treeName + "\""
          + ", current buffer holds " + inMemoryStore.size() + " record(s)"
@@ -623,6 +555,95 @@
    }
  }
  /** A cursor implementation keeping stats about reading progress. */
  private static final class ProgressCursor<K, V> implements Cursor<K, V>
  {
    private final Cursor<K, V> delegate;
    private final List<ByteBufferCursor> cursors;
    private final Buffer buffer;
    public ProgressCursor(Cursor<K, V> delegateCursor, Buffer buffer, List<ByteBufferCursor> cursors)
    {
      this.delegate = delegateCursor;
      this.buffer = buffer;
      this.cursors = new ArrayList<>(cursors);
    }
    public String getBufferFileName()
    {
      return buffer.treeName.toString();
    }
    private long getTotalBytes()
    {
      return buffer.totalBytes;
    }
    private int getBytesRead()
    {
      int count = 0;
      for (ByteBufferCursor cursor : cursors)
      {
        count += cursor.getBytesRead();
      }
      return count;
    }
    @Override
    public boolean next()
    {
      return delegate.next();
    }
    @Override
    public boolean isDefined()
    {
      return delegate.isDefined();
    }
    @Override
    public K getKey() throws NoSuchElementException
    {
      return delegate.getKey();
    }
    @Override
    public V getValue() throws NoSuchElementException
    {
      return delegate.getValue();
    }
    @Override
    public void close()
    {
      delegate.close();
    }
    @Override
    public boolean positionToKey(ByteSequence key)
    {
      return delegate.positionToKey(key);
    }
    @Override
    public boolean positionToKeyOrNext(ByteSequence key)
    {
      return delegate.positionToKeyOrNext(key);
    }
    @Override
    public boolean positionToLastKey()
    {
      return delegate.positionToLastKey();
    }
    @Override
    public boolean positionToIndex(int index)
    {
      return delegate.positionToIndex(index);
    }
  }
  /** A cursor implementation aggregating several cursors and ordering them by their key value. */
  private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
  {
@@ -655,11 +676,12 @@
          }
        });
    private CompositeCursor(Collection<SequentialCursor<K, V>> cursors)
    private CompositeCursor(Collection<? extends SequentialCursor<K, V>> cursors)
    {
      Reject.ifNull(cursors);
      for (Iterator<SequentialCursor<K, V>> it = cursors.iterator(); it.hasNext();)
      final List<SequentialCursor<K, V>> tmpCursors = new ArrayList<>(cursors);
      for (Iterator<SequentialCursor<K, V>> it = tmpCursors.iterator(); it.hasNext();)
      {
        SequentialCursor<K, V> cursor = it.next();
        if (!cursor.isDefined() && !cursor.next())
@@ -668,7 +690,7 @@
        }
      }
      this.cursors.addAll(cursors);
      this.cursors.addAll(tmpCursors);
    }
    @Override
@@ -774,6 +796,7 @@
    private final ByteBuffer byteBuffer;
    private final int startPos;
    private final int endPos;
    // TODO JNR build ByteSequence implementation reading from memory mapped files?
    private final ByteStringBuilder keyBuffer = new ByteStringBuilder();//FIXME JNR  bad: do zero copy?
    private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR  bad: do zero copy?
    private int currentPos;
@@ -843,7 +866,12 @@
    @Override
    public void close()
    {
      throw notImplemented();
      // nothing to do
    }
    public int getBytesRead()
    {
      return currentPos - startPos;
    }
    @Override
@@ -891,10 +919,8 @@
        {
          try
          {
            List<Integer> bufferPositions = readBufferPositions(treeName);
            // TODO JNR build ByteSequence implementation reading from memory mapped files?
            return getCursors(treeName, bufferPositions);
            Buffer buffer = new Buffer(treeName, bufferDir, MapMode.READ_ONLY);
            return buffer.openCursor();
          }
          catch (IOException e)
          {
@@ -902,49 +928,6 @@
          }
        }
        private List<Integer> readBufferPositions(TreeName treeName) throws IOException
        {
          // TODO JNR move to Buffer class?
          File indexFile = new File(bufferDir, treeName + ".index");
          List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
          if (indexLines.size() != 1)
          {
            throw new IllegalStateException("Not implemented");// TODO JNR
          }
          final String[] bufferPositions = indexLines.get(0).split(" ");
          final List<Integer> results = new ArrayList<>(bufferPositions.length);
          for (String bufferPos : bufferPositions)
          {
            results.add(Integer.valueOf(bufferPos));
          }
          return results;
        }
        private Cursor<ByteString, ByteString> getCursors(TreeName treeName, List<Integer> bufferPositions)
            throws IOException
        {
          // TODO JNR move to Buffer class?
          File bufferFile = new File(bufferDir, treeName.toString());
          FileChannel fileChannel = new RandomAccessFile(bufferFile, "r").getChannel();
          long fileSize = Files.size(bufferFile.toPath());
          final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
          final List<SequentialCursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
          Iterator<Integer> it = bufferPositions.iterator();
          if (it.hasNext())
          {
            int lastPos = it.next();
            while (it.hasNext())
            {
              final int bufferPos = it.next();
              cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
              lastPos = bufferPos;
            }
          }
          return new CompositeCursor<ByteString, ByteString>(cursors);
        }
        @Override
        public ByteString read(TreeName treeName, ByteSequence key)
        {
@@ -1056,7 +1039,7 @@
      {
        // Creates sub directories for each suffix
        // FIXME JNR cannot directly use DN names as directory + file names
        buffer = new Buffer(new File(bufferDir, treeName.toString()));
        buffer = new Buffer(treeName, bufferDir, MapMode.READ_WRITE);
        treeNameToBufferMap.put(treeName, buffer);
      }
      return buffer;
@@ -1668,8 +1651,9 @@
   */
  private void importPhaseOne(Storage backendStorage, Storage tmpStorage) throws Exception
  {
    final FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
    final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
    scheduleAtFixedRate(timerService, progressTask);
    threadCount = 2; // FIXME JNR id2entry + another task
    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
@@ -1694,8 +1678,11 @@
      id2EntryPutTask.finishedWrites();
      dn2IdPutFuture.get();
    }
    shutdownAll(timerService, execService);
    finally
    {
      shutdownAll(timerService, execService);
      progressTask.run();
    }
  }
  private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
@@ -1715,36 +1702,41 @@
    }
  }
  private void importPhaseTwo(final Storage outStorage, Storage inStorage) throws Exception
  private void importPhaseTwo(final Storage outputStorage, Storage inputStorage) throws Exception
  {
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
    final SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask();
    scheduleAtFixedRate(timerService, progressTask);
    final Set<TreeName> treeNames = inStorage.listTrees();
    final Set<TreeName> treeNames = inputStorage.listTrees();
    ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
    try (Importer importer = outStorage.startImport())
    try (Importer outputImporter = outputStorage.startImport())
    {
      for (final TreeName treeName : treeNames)
      {
        copyTo(treeName, inStorage, importer);// FIXME JNR use dbService
        copyTo(treeName, inputStorage, outputImporter, progressTask);// FIXME JNR use dbService
      }
    }
    finally
    {
      shutdownAll(timerService, dbService);
      progressTask.run();
    }
  }
  private void copyTo(final TreeName treeName, Storage input, final Importer output) throws Exception
  private void copyTo(final TreeName treeName, Storage input, final Importer output,
      final SecondPhaseProgressTask progressTask) throws Exception
  {
    input.read(new ReadOperation<Void>()
    {
      @Override
      public Void run(ReadableTransaction txn) throws Exception
      {
        try (SequentialCursor<ByteString, ByteString> cursor =
            new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
        try (Cursor<ByteString, ByteString> cursor0 = txn.openCursor(treeName);
            SequentialCursor<ByteString, ByteString> cursor =
                new MergingCursor<ByteString, ByteString>(cursor0, getMerger(treeName)))
        {
          progressTask.addCursor(cursor0);
          while (cursor.next())
          {
            output.put(treeName, cursor.getKey(), cursor.getValue());
@@ -1966,6 +1958,8 @@
        }
      }
      // trim the array to the actual size
      entryIDs = Arrays.copyOf(entryIDs, i);
      // due to how the entryIDSets are built, there should not be any duplicate entryIDs
      Arrays.sort(entryIDs);
      return EntryIDSet.newDefinedSet(entryIDs);
@@ -2360,15 +2354,26 @@
  /** This class reports progress of the second phase of import processing at fixed intervals. */
  private class SecondPhaseProgressTask extends TimerTask
  {
    private final Map<ProgressCursor<?, ?>, Integer> cursors = new LinkedHashMap<>();
    /** The time in milliseconds of the previous progress report. */
    private long previousTime;
    /** Create a new import progress task. */
    public SecondPhaseProgressTask()
    private SecondPhaseProgressTask()
    {
      previousTime = System.currentTimeMillis();
    }
    private void addCursor(Cursor<ByteString, ByteString> cursor)
    {
      if (cursor instanceof ProgressCursor)
      {
        final ProgressCursor<?, ?> c = (ProgressCursor<?, ?>) cursor;
        cursors.put(c, 0);
        logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, c.getBufferFileName(), 1, 1);
      }
    }
    /** The action to be performed by this timer task. */
    @Override
    public void run()
@@ -2382,15 +2387,41 @@
      previousTime = latestTime;
      // DN index managers first.
      printStats(deltaTime, true);
      // non-DN index managers second
      printStats(deltaTime, false);
      for (Iterator<Map.Entry<ProgressCursor<?, ?>, Integer>> it = cursors.entrySet().iterator(); it.hasNext();)
      {
        final Map.Entry<ProgressCursor<?, ?>, Integer> mapEntry = it.next();
        ProgressCursor<?, ?> cursor = mapEntry.getKey();
        int lastBytesRead = mapEntry.getValue();
        printStats(deltaTime, cursor, lastBytesRead);
        if (!cursor.isDefined())
        {
          logger.info(NOTE_IMPORT_LDIF_INDEX_CLOSE, cursor.getBufferFileName());
          it.remove();
        }
      }
    }
    private void printStats(long deltaTime, boolean dn2id)
    private void printStats(long deltaTime, final ProgressCursor<?, ?> cursor, int lastBytesRead)
    {
      // TODO JNR
      final long bufferFileSize = cursor.getTotalBytes();
      final int tmpBytesRead = cursor.getBytesRead();
      if (lastBytesRead == tmpBytesRead)
      {
        return;
      }
      final long bytesReadInterval = tmpBytesRead - lastBytesRead;
      final int bytesReadPercent = Math.round((100f * tmpBytesRead) / bufferFileSize);
      // Kilo and milli approximately cancel out.
      final long kiloBytesRate = bytesReadInterval / deltaTime;
      final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
      logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
          kiloBytesRate, 1, 1);
      lastBytesRead = tmpBytesRead;
    }
  }