From 5a06735032d3c0155548b77c9e627674c400b4ec Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 28 May 2015 10:50:36 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java |  213 ++++++++++++++++++++++++++++++++++++++++++++++------
 1 files changed, 187 insertions(+), 26 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index 3f6170c..62ad490 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -46,6 +46,7 @@
 import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -57,12 +58,14 @@
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
 import java.util.Set;
+import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TimerTask;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -122,6 +125,118 @@
     return new UnsupportedOperationException("Not implemented");
   }
 
+  /** Concurrent {@link Set} implementation backed by a {@link ConcurrentHashMap}. */
+  private static final class ConcurrentHashSet<E> implements Set<E>
+  {
+    private final ConcurrentHashMap<E, E> delegate = new ConcurrentHashMap<>();
+
+    @Override
+    public boolean add(E e)
+    {
+      return delegate.put(e, e) == null;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends E> c)
+    {
+      boolean changed = false;
+      for (E e : c)
+      {
+        changed &= add(e);
+      }
+      return changed;
+    }
+
+    @Override
+    public void clear()
+    {
+      delegate.clear();
+    }
+
+    @Override
+    public boolean contains(Object o)
+    {
+      return delegate.containsKey(o);
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c)
+    {
+      return delegateSet().containsAll(c);
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+      return delegateSet().equals(o);
+    }
+
+    @Override
+    public int hashCode()
+    {
+      return delegateSet().hashCode();
+    }
+
+    @Override
+    public boolean isEmpty()
+    {
+      return delegate.isEmpty();
+    }
+
+    @Override
+    public Iterator<E> iterator()
+    {
+      return delegateSet().iterator();
+    }
+
+    @Override
+    public boolean remove(Object o)
+    {
+      return delegateSet().remove(o);
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c)
+    {
+      return delegateSet().removeAll(c);
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c)
+    {
+      return delegateSet().retainAll(c);
+    }
+
+    @Override
+    public int size()
+    {
+      return delegate.size();
+    }
+
+    @Override
+    public Object[] toArray()
+    {
+      return delegateSet().toArray();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a)
+    {
+      return delegateSet().toArray(a);
+    }
+
+    @Override
+    public String toString()
+    {
+      return delegateSet().toString();
+    }
+
+    private Set<E> delegateSet()
+    {
+      return delegate.keySet();
+    }
+  }
+
   /** Data to put into id2entry tree. */
   private static final class Id2EntryData
   {
@@ -281,7 +396,7 @@
     private final FileChannel fileChannel;
     private final List<Integer> bufferPositions = new ArrayList<>();
     /** TODO JNR offer configuration for this. */
-    private int bufferSize = 1024;
+    private int bufferSize = 10 * MB;
 
     // FIXME this is not thread safe yet!!!
     /**
@@ -289,8 +404,16 @@
      * <p>
      * This will be persisted once {@link #maximumExpectedSizeOnDisk} reaches the
      * {@link #bufferSize}.
+     * <p>
+     * This code uses a {@link ConcurrentHashMap} instead of a {@link ConcurrentSkipListMap} because
+     * during performance testing it was found this code spent a lot of time in
+     * {@link ByteString#compareTo(ByteSequence)} when putting entries to the map. However, at this
+     * point, we only need to put very quickly data in the map, we do not need keys to be sorted.
+     * <p>
+     * Note: using {@link Set} here will be a problem with id2childrencount where values deduplication
+     * is not required. How to solve this problem?
      */
-    private ConcurrentNavigableMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentSkipListMap<>();
+    private ConcurrentMap<ByteSequence, Set<ByteSequence>> inMemoryStore = new ConcurrentHashMap<>();
     /** Projected occupied disk for the data stored in {@link #inMemoryStore}. */
     private int maximumExpectedSizeOnDisk;
 
@@ -307,7 +430,7 @@
       int recordSize = INT_SIZE + key.length() + INT_SIZE + value.length();
       if (bufferSize < maximumExpectedSizeOnDisk + recordSize)
       {
-        copyToDisk();
+        flushToMappedByteBuffer();
         inMemoryStore.clear();
         maximumExpectedSizeOnDisk = 0;
       }
@@ -315,7 +438,7 @@
       Set<ByteSequence> values = inMemoryStore.get(key);
       if (values == null)
       {
-        values = new ConcurrentSkipListSet<>();
+        values = new ConcurrentHashSet<>();
         Set<ByteSequence> existingValues = inMemoryStore.putIfAbsent(key, values);
         if (existingValues != null)
         {
@@ -326,10 +449,12 @@
       maximumExpectedSizeOnDisk += recordSize;
     }
 
-    private void copyToDisk() throws IOException
+    private void flushToMappedByteBuffer() throws IOException
     {
+      final SortedMap<ByteSequence, Set<ByteSequence>> sortedStore = new TreeMap<>(inMemoryStore);
+
       MappedByteBuffer byteBuffer = nextBuffer();
-      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
+      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : sortedStore.entrySet())
       {
         ByteSequence key = mapEntry.getKey();
         // FIXME JNR merge values before put
@@ -352,8 +477,9 @@
 
     private MappedByteBuffer nextBuffer() throws IOException
     {
-      // FIXME JNR bufferSize is an acceptable over approximation
-      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), bufferSize);
+      // FIXME JNR when merging duplicate keys during phase one,
+      // maximumExpectedSizeOnDisk is an acceptable over approximation
+      return fileChannel.map(MapMode.READ_WRITE, getLastPosition(bufferPositions), maximumExpectedSizeOnDisk);
     }
 
     private int getLastPosition(List<Integer> l)
@@ -371,14 +497,16 @@
       byteBuffer.putInt(b.length());
       // Need to do all of this because b.copyTo(byteBuffer) calls ByteBuffer.flip().
       // Why does it do that?
+      final int limitBeforeFlip = byteBuffer.limit();
       final int posBeforeFlip = byteBuffer.position();
       b.copyTo(byteBuffer);
-      byteBuffer.limit(bufferSize);
+      byteBuffer.limit(limitBeforeFlip);
       byteBuffer.position(posBeforeFlip + b.length());
     }
 
-    void flush()
+    void flush() throws IOException
     {
+      flushToMappedByteBuffer();
       writeBufferIndexFile();
     }
 
@@ -401,7 +529,7 @@
       String treeName = "/" + file.getParentFile().getName() + "/" + file.getName();
       return getClass().getSimpleName()
           + "(treeName=\"" + treeName + "\""
-          + ", currentBuffer has " + inMemoryStore.size() + " record(s)"
+          + ", current buffer holds " + inMemoryStore.size() + " record(s)"
           + " and " + (bufferSize - maximumExpectedSizeOnDisk) + " byte(s) remaining)";
     }
   }
@@ -984,9 +1112,16 @@
     @Override
     public void close()
     {
-      for (Buffer buffer : treeNameToBufferMap.values())
+      try
       {
-        buffer.flush();
+        for (Buffer buffer : treeNameToBufferMap.values())
+        {
+          buffer.flush();
+        }
+      }
+      catch (IOException e)
+      {
+        throw new StorageRuntimeException(e);
       }
     }
 
@@ -1632,7 +1767,7 @@
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
 
-    final Set<TreeName> treeNames = inStorage.listTrees(); // FIXME JNR rename to listTreeNames()?
+    final Set<TreeName> treeNames = inStorage.listTrees();
     ExecutorService dbService = Executors.newFixedThreadPool(treeNames.size());
     try (Importer importer = outStorage.startImport())
     {
@@ -1674,6 +1809,11 @@
           // key conflicts == merge EntryIDSets
           return new ImportIDSetsMerger(index);
         }
+        else if (treeName.getIndexId().equals(ID2CHILDREN_COUNT_NAME))
+        {
+          // key conflicts == sum values
+          // TODO JNR
+        }
         else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
             || treeName.getIndexId().equals(DN2URI_INDEX_NAME)
             || isVLVIndex(entryContainer, treeName))
@@ -1836,17 +1976,11 @@
         }
         else if (values.size() == 1)
         {
+          // Avoids unnecessary decoding + encoding
           return values.iterator().next();
         }
 
-        ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
-        for (ByteString value : values)
-        {
-          // FIXME JNR Can we make this more efficient?
-          // go through long[] + sort in the end?
-          idSet.merge(index.decodeValue(ByteString.empty(), value));
-        }
-        return index.toValue(idSet);
+        return index.toValue(buildEntryIDSet(values));
       }
       finally
       {
@@ -1855,6 +1989,31 @@
         values.clear();
       }
     }
+
+    private EntryIDSet buildEntryIDSet(Set<ByteString> values)
+    {
+      // accumulate in array
+      int i = 0;
+      long[] entryIDs = new long[index.getIndexEntryLimit()];
+      for (ByteString value : values)
+      {
+        final EntryIDSet entryIDSet = index.decodeValue(ByteString.empty(), value);
+        if (!entryIDSet.isDefined() || i + entryIDSet.size() >= index.getIndexEntryLimit())
+        {
+          // above index entry limit
+          return EntryIDSet.newUndefinedSet();
+        }
+
+        for (EntryID entryID : entryIDSet)
+        {
+          entryIDs[i++] = entryID.longValue();
+        }
+      }
+
+      // due to how the entryIDSets are built, there should not be any duplicate entryIDs
+      Arrays.sort(entryIDs);
+      return EntryIDSet.newDefinedSet(entryIDs);
+    }
   }
 
   /** Task used to migrate excluded branch. */
@@ -2093,10 +2252,12 @@
       {
         processDN2ID(suffix, entry.getName(), entryID);
       }
+
       processDN2URI(suffix, entry);
       processIndexes(suffix, entry, entryID);
       processVLVIndexes(suffix, entry, entryID);
       id2EntryPutTask.put(suffix, entryID, entry);
+
       importCount.getAndIncrement();
     }
 
@@ -2267,9 +2428,9 @@
     @Override
     public boolean insert(final DN dn, final EntryID entryID)
     {
-      final AtomicBoolean result = new AtomicBoolean();
       try
       {
+        final AtomicBoolean result = new AtomicBoolean();
         storage.write(new WriteOperation()
         {
           @Override
@@ -2278,12 +2439,12 @@
             result.set(suffix.getDN2ID().insert(txn, dn, entryID));
           }
         });
+        return result.get();
       }
       catch (Exception e)
       {
         throw new StorageRuntimeException(e);
       }
-      return result.get();
     }
 
     @Override

--
Gitblit v1.10.0