From 82840bfe0d65a0715357a001e3224ba0d6a9c8df Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Wed, 03 Jun 2015 12:26:19 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java |  457 ++++++++++++++++++++++++++++++--------------------------
 1 files changed, 244 insertions(+), 213 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 13200de..85df3ff 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -64,8 +64,6 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -126,118 +124,6 @@
     return new UnsupportedOperationException("Not implemented");
   }
 
-  /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
-  private static final class ConcurrentHashSet<E> implements Set<E>
-  {
-    private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
-
-    @Override
-    public boolean add(E e)
-    {
-      return delegate.put(e, e) == null;
-    }
-
-    @Override
-    public boolean addAll(Collection<? extends E> c)
-    {
-      boolean changed = false;
-      for (E e : c)
-      {
-        changed &= add(e);
-      }
-      return changed;
-    }
-
-    @Override
-    public void clear()
-    {
-      delegate.clear();
-    }
-
-    @Override
-    public boolean contains(Object o)
-    {
-      return delegate.containsKey(o);
-    }
-
-    @Override
-    public boolean containsAll(Collection<?> c)
-    {
-      return delegateSet().containsAll(c);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      return delegateSet().equals(o);
-    }
-
-    @Override
-    public int hashCode()
-    {
-      return delegateSet().hashCode();
-    }
-
-    @Override
-    public boolean isEmpty()
-    {
-      return delegate.isEmpty();
-    }
-
-    @Override
-    public Iterator<E> iterator()
-    {
-      return delegateSet().iterator();
-    }
-
-    @Override
-    public boolean remove(Object o)
-    {
-      return delegateSet().remove(o);
-    }
-
-    @Override
-    public boolean removeAll(Collection<?> c)
-    {
-      return delegateSet().removeAll(c);
-    }
-
-    @Override
-    public boolean retainAll(Collection<?> c)
-    {
-      return delegateSet().retainAll(c);
-    }
-
-    @Override
-    public int size()
-    {
-      return delegate.size();
-    }
-
-    @Override
-    public Object[] toArray()
-    {
-      return delegateSet().toArray();
-    }
-
-    @Override
-    public <T> T[] toArray(T[] a)
-    {
-      return delegateSet().toArray(a);
-    }
-
-    @Override
-    public String toString()
-    {
-      return delegateSet().toString();
-    }
-
-    private Set<E> delegateSet()
-    {
-      return delegate.keySet();
-    }
-  }
-
   /** Data to put into id2entry tree. */
   private static final class Id2EntryData
   {
@@ -393,37 +279,49 @@
    */
   private static final class Buffer
   {
-    private final File file;
+    private final TreeName treeName;
+    private final File indexFile;
+    private final File bufferFile;
     private final FileChannel fileChannel;
     private final List<Integer> bufferPositions = new ArrayList<>();
     /** TODO JNR offer configuration for this. */
-    private int bufferSize = 10 * MB;
+    private final int bufferSize = 10 * MB;
 
-    // FIXME this is not thread safe yet!!!
     /**
      * Maps {@link ByteSequence} keys to (conflicting) values.
      * <p>
      * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
      * {@link #bufferSize}.
-     * <p>
-     * This code uses a {@link ConcurrentHashMap} instead of a {@code ConcurrentSkipListMap} because
-     * during performance testing it was found this code spent a lot of time in
-     * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
-     * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
-     * <p>
-     * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
-     * is not required. How to solve this problem?
      */
-    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
+    private Map<ByteSequence, List<ByteSequence>> inMemoryStore = new HashMap<>();
     /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
     private int maximumExpectedSizeOnDisk;
+    private long totalBytes;
 
-    private Buffer(File file) throws FileNotFoundException
+    public Buffer(TreeName treeName, File bufferDir, MapMode mapMode) throws IOException
     {
-      file.getParentFile().mkdirs();
-      this.file = file;
-      this.fileChannel = new RandomAccessFile(file, "rw").getChannel();
-      this.bufferPositions.add(0);
+      this.treeName = treeName;
+      bufferFile = new File(bufferDir, treeName.toString());
+      bufferFile.getParentFile().mkdirs();
+      indexFile = new File(bufferDir, treeName + ".index");
+      this.fileChannel = new RandomAccessFile(bufferFile, getMode(mapMode)).getChannel();
+      if (MapMode.READ_WRITE.equals(mapMode))
+      {
+        this.bufferPositions.add(0);
+      }
+    }
+
+    private String getMode(MapMode mapMode)
+    {
+      if (MapMode.READ_ONLY.equals(mapMode))
+      {
+        return "r";
+      }
+      else if (MapMode.READ_WRITE.equals(mapMode))
+      {
+        return "rw";
+      }
+      throw new IllegalArgumentException("Unhandled map mode: " + mapMode);
     }
 
     void putKeyValue(ByteSequence key, ByteSequence value) throws IOException
@@ -436,15 +334,11 @@
         maximumExpectedSizeOnDisk = 0;
       }
 
-      Set<ByteSequence> values = inMemoryStore.get(key);
+      List<ByteSequence> values = inMemoryStore.get(key);
       if (values == null)
       {
-        values = new ConcurrentHashSet<>();
-        Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
-        if (existingValues != null)
-        {
-          values = existingValues;
-        }
+        values = new ArrayList<>();
+        inMemoryStore.put(key, values);
       }
       values.add(value);
       maximumExpectedSizeOnDisk += recordSize;
@@ -452,13 +346,15 @@
 
     private void flushToMappedByteBuffer() throws IOException
     {
-      final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
+      final SortedMap<ByteSequence, List<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
 
       MappedByteBuffer byteBuffer = nextBuffer();
-      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
+      for (Map.Entry<ByteSequence, List<ByteSequence>> mapEntry : sortedStore.entrySet())
       {
         ByteSequence key = mapEntry.getKey();
         // FIXME JNR merge values before put
+        // Edit: Merging during phase one is slower than not merging at all,
+        // perhaps due to merging importIDSets for keys that will exceed index entry limits anyway?
         for (ByteSequence value : mapEntry.getValue())
         {
           put(byteBuffer, key);
@@ -513,8 +409,7 @@
 
     private void writeBufferIndexFile()
     {
-      final File bufferIndexFile = new File(file.getParent(), file.getName() + ".index");
-      try (PrintWriter writer = new PrintWriter(bufferIndexFile))
+      try (PrintWriter writer = new PrintWriter(indexFile))
       {
         writer.print(Utils.joinAsString(" ", this.bufferPositions));
       }
@@ -524,10 +419,47 @@
       }
     }
 
+    private Cursor<ByteString, ByteString> openCursor() throws IOException
+    {
+      readBufferPositions();
+
+      totalBytes = Files.size(bufferFile.toPath());
+      final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, totalBytes);
+
+      final List<ByteBufferCursor> cursors = new ArrayList<>(bufferPositions.size() - 1);
+      Iterator<Integer> it = bufferPositions.iterator();
+      if (it.hasNext())
+      {
+        int lastPos = it.next();
+        while (it.hasNext())
+        {
+          final int bufferPos = it.next();
+          cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
+          lastPos = bufferPos;
+        }
+      }
+      Cursor<ByteString, ByteString> composite = new CompositeCursor<ByteString, ByteString>(cursors);
+      return new ProgressCursor<ByteString, ByteString>(composite, this, cursors);
+    }
+
+    private void readBufferPositions() throws IOException
+    {
+      List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
+      if (indexLines.size() != 1)
+      {
+        throw new IllegalStateException("Not implemented");// TODO JNR
+      }
+
+      final String[] bufferPositionsString = indexLines.get(0).split(" ");
+      for (String bufferPos : bufferPositionsString)
+      {
+        bufferPositions.add(Integer.valueOf(bufferPos));
+      }
+    }
+
     @Override
     public String toString()
     {
-      String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
       return getClass().getSimpleName()
           + "(treeName=\"" + treeName + "\""
           + ", current buffer holds " + inMemoryStore.size() + " record(s)"
@@ -623,6 +555,95 @@
     }
   }
 
+  /** A cursor implementation keeping stats about reading progress. */
+  private static final class ProgressCursor<K, V> implements Cursor<K, V>
+  {
+    private final Cursor<K, V> delegate;
+    private final List<ByteBufferCursor> cursors;
+    private final Buffer buffer;
+
+    public ProgressCursor(Cursor<K, V> delegateCursor, Buffer buffer, List<ByteBufferCursor> cursors)
+    {
+      this.delegate = delegateCursor;
+      this.buffer = buffer;
+      this.cursors = new ArrayList<>(cursors);
+    }
+
+    public String getBufferFileName()
+    {
+      return buffer.treeName.toString();
+    }
+
+    private long getTotalBytes()
+    {
+      return buffer.totalBytes;
+    }
+
+    private int getBytesRead()
+    {
+      int count = 0;
+      for (ByteBufferCursor cursor : cursors)
+      {
+        count += cursor.getBytesRead();
+      }
+      return count;
+    }
+
+    @Override
+    public boolean next()
+    {
+      return delegate.next();
+    }
+
+    @Override
+    public boolean isDefined()
+    {
+      return delegate.isDefined();
+    }
+
+    @Override
+    public K getKey() throws NoSuchElementException
+    {
+      return delegate.getKey();
+    }
+
+    @Override
+    public V getValue() throws NoSuchElementException
+    {
+      return delegate.getValue();
+    }
+
+    @Override
+    public void close()
+    {
+      delegate.close();
+    }
+
+    @Override
+    public boolean positionToKey(ByteSequence key)
+    {
+      return delegate.positionToKey(key);
+    }
+
+    @Override
+    public boolean positionToKeyOrNext(ByteSequence key)
+    {
+      return delegate.positionToKeyOrNext(key);
+    }
+
+    @Override
+    public boolean positionToLastKey()
+    {
+      return delegate.positionToLastKey();
+    }
+
+    @Override
+    public boolean positionToIndex(int index)
+    {
+      return delegate.positionToIndex(index);
+    }
+  }
+
   /** A cursor implementation aggregating several cursors and ordering them by their key value. */
   private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
   {
@@ -655,11 +676,12 @@
           }
         });
 
-    private CompositeCursor(Collection<SequentialCursor<K, V>> cursors)
+    private CompositeCursor(Collection<? extends SequentialCursor<K, V>> cursors)
     {
       Reject.ifNull(cursors);
 
-      for (Iterator<SequentialCursor<K, V>> it = cursors.iterator(); it.hasNext();)
+      final List<SequentialCursor<K, V>> tmpCursors = new ArrayList<>(cursors);
+      for (Iterator<SequentialCursor<K, V>> it = tmpCursors.iterator(); it.hasNext();)
       {
         SequentialCursor<K, V> cursor = it.next();
         if (!cursor.isDefined() && !cursor.next())
@@ -668,7 +690,7 @@
         }
       }
 
-      this.cursors.addAll(cursors);
+      this.cursors.addAll(tmpCursors);
     }
 
     @Override
@@ -774,6 +796,7 @@
     private final ByteBuffer byteBuffer;
     private final int startPos;
     private final int endPos;
+    // TODO JNR build ByteSequence implementation reading from memory mapped files?
     private final ByteStringBuilder keyBuffer = new ByteStringBuilder();//FIXME JNR  bad: do zero copy?
     private final ByteStringBuilder valueBuffer = new ByteStringBuilder();//FIXME JNR  bad: do zero copy?
     private int currentPos;
@@ -843,7 +866,12 @@
     @Override
     public void close()
     {
-      throw notImplemented();
+      // nothing to do
+    }
+
+    public int getBytesRead()
+    {
+      return currentPos - startPos;
     }
 
     @Override
@@ -891,10 +919,8 @@
         {
           try
           {
-            List<Integer> bufferPositions = readBufferPositions(treeName);
-
-            // TODO JNR build ByteSequence implementation reading from memory mapped files?
-            return getCursors(treeName, bufferPositions);
+            Buffer buffer = new Buffer(treeName, bufferDir, MapMode.READ_ONLY);
+            return buffer.openCursor();
           }
           catch (IOException e)
           {
@@ -902,49 +928,6 @@
           }
         }
 
-        private List<Integer> readBufferPositions(TreeName treeName) throws IOException
-        {
-          // TODO JNR move to Buffer class?
-          File indexFile = new File(bufferDir, treeName + ".index");
-          List<String> indexLines = Files.readAllLines(indexFile.toPath(), Charset.defaultCharset());
-          if (indexLines.size() != 1)
-          {
-            throw new IllegalStateException("Not implemented");// TODO JNR
-          }
-
-          final String[] bufferPositions = indexLines.get(0).split(" ");
-          final List<Integer> results = new ArrayList<>(bufferPositions.length);
-          for (String bufferPos : bufferPositions)
-          {
-            results.add(Integer.valueOf(bufferPos));
-          }
-          return results;
-        }
-
-        private Cursor<ByteString, ByteString> getCursors(TreeName treeName, List<Integer> bufferPositions)
-            throws IOException
-        {
-          // TODO JNR move to Buffer class?
-          File bufferFile = new File(bufferDir, treeName.toString());
-          FileChannel fileChannel = new RandomAccessFile(bufferFile, "r").getChannel();
-          long fileSize = Files.size(bufferFile.toPath());
-          final MappedByteBuffer byteBuffer = fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
-
-          final List<SequentialCursor<ByteString, ByteString>> cursors = new ArrayList<>(bufferPositions.size() - 1);
-          Iterator<Integer> it = bufferPositions.iterator();
-          if (it.hasNext())
-          {
-            int lastPos = it.next();
-            while (it.hasNext())
-            {
-              final int bufferPos = it.next();
-              cursors.add(new ByteBufferCursor(byteBuffer, lastPos, bufferPos));
-              lastPos = bufferPos;
-            }
-          }
-          return new CompositeCursor<ByteString, ByteString>(cursors);
-        }
-
         @Override
         public ByteString read(TreeName treeName, ByteSequence key)
         {
@@ -1056,7 +1039,7 @@
       {
         // Creates sub directories for each suffix
         // FIXME JNR cannot directly use DN names as directory + file names
-        buffer = new Buffer(new File(bufferDir, treeName.toString()));
+        buffer = new Buffer(treeName, bufferDir, MapMode.READ_WRITE);
         treeNameToBufferMap.put(treeName, buffer);
       }
       return buffer;
@@ -1668,8 +1651,9 @@
    */
   private void importPhaseOne(Storage backendStorage, Storage tmpStorage) throws Exception
   {
+    final FirstPhaseProgressTask progressTask = new FirstPhaseProgressTask();
     final ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
-    scheduleAtFixedRate(timerService, new FirstPhaseProgressTask());
+    scheduleAtFixedRate(timerService, progressTask);
     threadCount = 2; // FIXME JNR id2entry + another task
     final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 
@@ -1694,8 +1678,11 @@
       id2EntryPutTask.finishedWrites();
       dn2IdPutFuture.get();
     }
-
-    shutdownAll(timerService, execService);
+    finally
+    {
+      shutdownAll(timerService, execService);
+      progressTask.run();
+    }
   }
 
   private static void scheduleAtFixedRate(ScheduledThreadPoolExecutor timerService, Runnable task)
@@ -1715,36 +1702,41 @@
     }
   }
 
-  private void importPhaseTwo(final Storage outStorage, Storage inStorage) throws Exception
+  private void importPhaseTwo(final Storage outputStorage, Storage inputStorage) throws Exception
   {
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
-    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
+    final SecondPhaseProgressTask progressTask = new SecondPhaseProgressTask();
+    scheduleAtFixedRate(timerService, progressTask);
 
-    final Set<TreeName> treeNames = inStorage.listTrees();
+    final Set<TreeName> treeNames = inputStorage.listTrees();
     ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
-    try (Importer importer = outStorage.startImport())
+    try (Importer outputImporter = outputStorage.startImport())
     {
       for (final TreeName treeName : treeNames)
       {
-        copyTo(treeName, inStorage, importer);// FIXME JNR use dbService
+        copyTo(treeName, inputStorage, outputImporter, progressTask);// FIXME JNR use dbService
       }
     }
     finally
     {
       shutdownAll(timerService, dbService);
+      progressTask.run();
     }
   }
 
-  private void copyTo(final TreeName treeName, Storage input, final Importer output) throws Exception
+  private void copyTo(final TreeName treeName, Storage input, final Importer output,
+      final SecondPhaseProgressTask progressTask) throws Exception
   {
     input.read(new ReadOperation<Void>()
     {
       @Override
       public Void run(ReadableTransaction txn) throws Exception
       {
-        try (SequentialCursor<ByteString, ByteString> cursor =
-            new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
+        try (Cursor<ByteString, ByteString> cursor0 = txn.openCursor(treeName);
+            SequentialCursor<ByteString, ByteString> cursor =
+                new MergingCursor<ByteString, ByteString>(cursor0, getMerger(treeName)))
         {
+          progressTask.addCursor(cursor0);
           while (cursor.next())
           {
             output.put(treeName, cursor.getKey(), cursor.getValue());
@@ -1966,6 +1958,8 @@
         }
       }
 
+      // trim the array to the actual size
+      entryIDs = Arrays.copyOf(entryIDs, i);
       // due to how the entryIDSets are built, there should not be any duplicate entryIDs
       Arrays.sort(entryIDs);
       return EntryIDSet.newDefinedSet(entryIDs);
@@ -2360,15 +2354,26 @@
   /** This class reports progress of the second phase of import processing at fixed intervals. */
   private class SecondPhaseProgressTask extends TimerTask
   {
+    private final Map<ProgressCursor<?, ?>, Integer> cursors = new LinkedHashMap<>();
     /** The time in milliseconds of the previous progress report. */
     private long previousTime;
 
     /** Create a new import progress task. */
-    public SecondPhaseProgressTask()
+    private SecondPhaseProgressTask()
     {
       previousTime = System.currentTimeMillis();
     }
 
+    private void addCursor(Cursor<ByteString, ByteString> cursor)
+    {
+      if (cursor instanceof ProgressCursor)
+      {
+        final ProgressCursor<?, ?> c = (ProgressCursor<?, ?>) cursor;
+        cursors.put(c, 0);
+        logger.info(NOTE_IMPORT_LDIF_INDEX_STARTED, c.getBufferFileName(), 1, 1);
+      }
+    }
+
     /** The action to be performed by this timer task. */
     @Override
     public void run()
@@ -2382,15 +2387,41 @@
 
       previousTime = latestTime;
 
-      // DN index managers first.
-      printStats(deltaTime, true);
-      // non-DN index managers second
-      printStats(deltaTime, false);
+      for (Iterator<Map.Entry<ProgressCursor<?, ?>, Integer>> it = cursors.entrySet().iterator(); it.hasNext();)
+      {
+        final Map.Entry<ProgressCursor<?, ?>, Integer> mapEntry = it.next();
+        ProgressCursor<?, ?> cursor = mapEntry.getKey();
+        int lastBytesRead = mapEntry.getValue();
+        printStats(deltaTime, cursor, lastBytesRead);
+
+        if (!cursor.isDefined())
+        {
+          logger.info(NOTE_IMPORT_LDIF_INDEX_CLOSE, cursor.getBufferFileName());
+          it.remove();
+        }
+      }
     }
 
-    private void printStats(long deltaTime, boolean dn2id)
+    private void printStats(long deltaTime, final ProgressCursor<?, ?> cursor, int lastBytesRead)
     {
-      // TODO JNR
+      final long bufferFileSize = cursor.getTotalBytes();
+      final int tmpBytesRead = cursor.getBytesRead();
+      if (lastBytesRead == tmpBytesRead)
+      {
+        return;
+      }
+
+      final long bytesReadInterval = tmpBytesRead - lastBytesRead;
+      final int bytesReadPercent = Math.round((100f * tmpBytesRead) / bufferFileSize);
+
+      // Kilo and milli approximately cancel out.
+      final long kiloBytesRate = bytesReadInterval / deltaTime;
+      final long kiloBytesRemaining = (bufferFileSize - tmpBytesRead) / 1024;
+
+      logger.info(NOTE_IMPORT_LDIF_PHASE_TWO_REPORT, cursor.getBufferFileName(), bytesReadPercent, kiloBytesRemaining,
+          kiloBytesRate, 1, 1);
+
+      lastBytesRead = tmpBytesRead;
     }
   }
 

--
Gitblit v1.10.0