From c5d246665c8d72aa524009a12af556f8fba76fe4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 May 2015 13:01:55 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java                   |    7 
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java                |   18 +
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java               |   46 ++++--
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java |  327 +++++++++++++++++++++++++++++++++++++++++++++-
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java                      |   10 +
 5 files changed, 374 insertions(+), 34 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
index 744245c..6899251 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
@@ -96,7 +96,7 @@
    */
   void put(final WriteableTransaction txn, DN dn, final EntryID entryID) throws StorageRuntimeException
   {
-    txn.put(getName(), toKey(dn), entryID.toByteString());
+    txn.put(getName(), toKey(dn), toValue(entryID));
   }
 
   boolean insert(final WriteableTransaction txn, DN dn, final EntryID entryID) throws StorageRuntimeException
@@ -112,7 +112,7 @@
           return oldEntryID;
         }
         // it did not exist before, insert the new value
-        return entryID.toByteString();
+        return toValue(entryID);
       }
     });
   }
@@ -122,6 +122,12 @@
     return dnToDNKey(dn, baseDN.size());
   }
 
+  ByteString toValue(final EntryID entryID)
+  {
+    // TODO JNR do we want to use compacted longs?
+    return entryID.toByteString();
+  }
+
   /**
    * Remove a record from the DN tree.
    * @param txn a non null transaction
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
index a516ddf..c28ba83 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -60,12 +60,9 @@
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
   /** The limit on the number of entry IDs that may be indexed by one key. */
-  private int indexEntryLimit;
-
   private final State state;
-
   private final EntryContainer entryContainer;
-
+  private int indexEntryLimit;
   private EntryIDSetCodec codec;
 
   /**
@@ -124,11 +121,32 @@
           @Override
           public EntryIDSet transform(ByteString key, ByteString value) throws NeverThrowsException
           {
-            return codec.decode(key, value);
+            return decodeValue(key, value);
           }
         });
   }
 
+  EntryIDSet decodeValue(ByteSequence key, ByteString value)
+  {
+    return codec.decode(key, value);
+  }
+
+  ByteString toValue(EntryID entryID)
+  {
+    return codec.encode(newDefinedSet(entryID.longValue()));
+  }
+
+  ByteString toValue(EntryIDSet entryIDSet)
+  {
+    return codec.encode(entryIDSet);
+  }
+
+  ByteString toValue(ImportIDSet importIDSet)
+  {
+    return importIDSet.valueToByteString(codec);
+  }
+
+  // TODO JNR rename to importUpsert() ?
   @Override
   public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException
   {
@@ -138,14 +156,14 @@
     ByteString value = importer.read(getName(), key);
     if (value != null)
     {
-      final EntryIDSet entryIDSet = codec.decode(key, value);
+      final EntryIDSet entryIDSet = decodeValue(key, value);
       final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
       importIDSet.merge(idsToBeAdded);
-      importer.put(getName(), key, importIDSet.valueToByteString(codec));
+      importer.put(getName(), key, toValue(importIDSet));
     }
     else
     {
-      importer.put(getName(), key, idsToBeAdded.valueToByteString(codec));
+      importer.put(getName(), key, toValue(idsToBeAdded));
     }
   }
 
@@ -162,7 +180,7 @@
       throw new IllegalStateException("Expected to have a value associated to key " + key + " for index " + getName());
     }
 
-    final EntryIDSet entryIDSet = codec.decode(key, value);
+    final EntryIDSet entryIDSet = decodeValue(key, value);
     final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
     importIDSet.remove(idsToBeRemoved);
     if (importIDSet.isDefined() && importIDSet.size() == 0)
@@ -171,7 +189,7 @@
     }
     else
     {
-      importer.put(getName(), key, importIDSet.valueToByteString(codec));
+      importer.put(getName(), key, toValue(importIDSet));
     }
   }
 
@@ -218,7 +236,7 @@
         if (oldValue != null)
         {
           EntryIDSet entryIDSet = computeEntryIDSet(key, oldValue.toByteString(), deletedIDs, addedIDs);
-          ByteString after = codec.encode(entryIDSet);
+          ByteString after = toValue(entryIDSet);
           /*
            * If there are no more IDs then return null indicating that the record should be removed.
            * If index is not trusted then this will cause all subsequent reads for this key to
@@ -234,7 +252,7 @@
           }
           if (isNotEmpty(addedIDs))
           {
-            return codec.encode(addedIDs);
+            return toValue(addedIDs);
           }
         }
         return null; // no change.
@@ -254,7 +272,7 @@
 
   private EntryIDSet computeEntryIDSet(ByteString key, ByteString value, EntryIDSet deletedIDs, EntryIDSet addedIDs)
   {
-    EntryIDSet entryIDSet = codec.decode(key, value);
+    EntryIDSet entryIDSet = decodeValue(key, value);
     if (addedIDs != null)
     {
       if (entryIDSet.isDefined() && indexEntryLimit > 0)
@@ -300,7 +318,7 @@
       ByteString value = txn.read(getName(), key);
       if (value != null)
       {
-        return codec.decode(key, value);
+        return decodeValue(key, value);
       }
       return trusted ? newDefinedSet() : newUndefinedSet();
     }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
index 3725947..3e3b2d2 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
@@ -42,6 +42,7 @@
  * the configured ID limit. If the limit it reached, the class stops tracking
  * individual IDs and marks the set as undefined. This class is not thread safe.
  */
+@SuppressWarnings("javadoc")
 final class ImportIDSet implements Iterable<EntryID> {
 
   /** The encapsulated entryIDSet where elements are stored until reaching the limit. */
@@ -127,18 +128,23 @@
   boolean merge(ImportIDSet importIdSet)
   {
     checkNotNull(importIdSet, "importIdSet must not be null");
+    return merge(importIdSet.entryIDSet);
+  }
 
-    boolean definedBeforeMerge = isDefined();
-    final long mergedSize = addWithoutOverflow(entryIDSet.size(), importIdSet.entryIDSet.size());
+  boolean merge(EntryIDSet entryIDSet)
+  {
+    checkNotNull(entryIDSet, "entryID must not be null");
+    boolean definedBeforeMerge = this.entryIDSet.isDefined();
+    final long mergedSize = addWithoutOverflow(this.entryIDSet.size(), entryIDSet.size());
 
-    if (!definedBeforeMerge || !importIdSet.isDefined() || mergedSize > indexEntryLimitSize)
+    if (!definedBeforeMerge || !entryIDSet.isDefined() || mergedSize > indexEntryLimitSize)
     {
-      entryIDSet = newUndefinedSetWithKey(key);
+      this.entryIDSet = newUndefinedSetWithKey(key);
       return definedBeforeMerge;
     }
-    else if (isDefined())
+    else if (this.entryIDSet.isDefined())
     {
-      entryIDSet.addAll(importIdSet.entryIDSet);
+      this.entryIDSet.addAll(entryIDSet);
     }
     return false;
   }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index dea8c2c..3f6170c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -28,6 +28,7 @@
 
 import static org.opends.messages.BackendMessages.*;
 import static org.opends.server.backends.pluggable.DnKeyFormat.*;
+import static org.opends.server.backends.pluggable.SuffixContainer.*;
 import static org.opends.server.core.DirectoryServer.*;
 import static org.opends.server.util.DynamicConstants.*;
 import static org.opends.server.util.ServerConstants.*;
@@ -331,6 +332,7 @@
       for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
       {
         ByteSequence key = mapEntry.getKey();
+        // FIXME JNR merge values before put
         for (ByteSequence value : mapEntry.getValue())
         {
           put(byteBuffer, key);
@@ -404,6 +406,118 @@
     }
   }
 
+  /** A cursor performing the "merge" phase of the on-disk merge. */
+  private static final class MergingCursor<K, V> implements Cursor<K, V>
+  {
+    private final Cursor<K, V> delegate;
+    private final MergingConsumer<V> merger;
+    private K key;
+    private V value;
+    private boolean isDefined;
+
+    private MergingCursor(Cursor<K, V> cursor, MergingConsumer<V> merger)
+    {
+      this.delegate = cursor;
+      this.merger = merger;
+    }
+
+    @Override
+    public boolean next()
+    {
+      if (key == null)
+      {
+        if (!delegate.next())
+        {
+          return isDefined = false;
+        }
+        key = delegate.getKey();
+        accumulateValues();
+        return isDefined = true;
+      }
+      else if (delegate.isDefined())
+      {
+        // we did yet not consume key/value from the delegate cursor
+        key = delegate.getKey();
+        accumulateValues();
+        return isDefined = true;
+      }
+      else
+      {
+        // no more data to compute
+        return isDefined = false;
+      }
+    }
+
+    private void accumulateValues()
+    {
+      while (delegate.isDefined() && key.equals(delegate.getKey()))
+      {
+        merger.accept(delegate.getValue());
+        delegate.next();
+      }
+      value = merger.merge();
+    }
+
+    @Override
+    public boolean isDefined()
+    {
+      return isDefined;
+    }
+
+    @Override
+    public K getKey() throws NoSuchElementException
+    {
+      throwIfNotDefined();
+      return key;
+    }
+
+    @Override
+    public V getValue() throws NoSuchElementException
+    {
+      throwIfNotDefined();
+      return value;
+    }
+
+    private void throwIfNotDefined()
+    {
+      if (!isDefined())
+      {
+        throw new NoSuchElementException();
+      }
+    }
+
+    @Override
+    public void close()
+    {
+      delegate.close();
+      isDefined = false;
+    }
+
+    @Override
+    public boolean positionToKey(ByteSequence key)
+    {
+      return delegate.positionToKey(key);
+    }
+
+    @Override
+    public boolean positionToKeyOrNext(ByteSequence key)
+    {
+      return delegate.positionToKeyOrNext(key);
+    }
+
+    @Override
+    public boolean positionToLastKey()
+    {
+      return delegate.positionToLastKey();
+    }
+
+    @Override
+    public boolean positionToIndex(int index)
+    {
+      return delegate.positionToIndex(index);
+    }
+  }
+
   /** A cursor implementation aggregating several cursors and ordering them by their key value. */
   private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
   {
@@ -524,28 +638,24 @@
       return "not defined";
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean positionToKey(ByteSequence key)
     {
       throw notImplemented();
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean positionToKeyOrNext(ByteSequence key)
     {
       throw notImplemented();
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean positionToLastKey()
     {
       throw notImplemented();
     }
 
-    /** {@inheritDoc} */
     @Override
     public boolean positionToIndex(int index)
     {
@@ -1544,18 +1654,209 @@
       @Override
       public Void run(ReadableTransaction txn) throws Exception
       {
-        try (Cursor<ByteString, ByteString> cursor = txn.openCursor(treeName))
+        try (Cursor<ByteString, ByteString> cursor =
+            new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
         {
           while (cursor.next())
-          {// FIXME JNR add merge phase
+          {
             output.put(treeName, cursor.getKey(), cursor.getValue());
           }
         }
         return null;
       }
+
+      private MergingConsumer<ByteString> getMerger(final TreeName treeName) throws DirectoryException
+      {
+        EntryContainer entryContainer = rootContainer.getEntryContainer(DN.valueOf(treeName.getBaseDN()));
+        final MatchingRuleIndex index = getIndex(entryContainer, treeName);
+        if (index != null)
+        {
+          // key conflicts == merge EntryIDSets
+          return new ImportIDSetsMerger(index);
+        }
+        else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
+            || treeName.getIndexId().equals(DN2URI_INDEX_NAME)
+            || isVLVIndex(entryContainer, treeName))
+        {
+          // key conflicts == exception
+          return new NoMultipleValuesConsumer<>();
+        }
+        throw new IllegalArgumentException("Unknown tree: " + treeName);
+      }
+
+      private boolean isVLVIndex(EntryContainer entryContainer, TreeName treeName)
+      {
+        for (VLVIndex vlvIndex : entryContainer.getVLVIndexes())
+        {
+          if (treeName.equals(vlvIndex.getName()))
+          {
+            return true;
+          }
+        }
+        return false;
+      }
+
+      private MatchingRuleIndex getIndex(EntryContainer entryContainer, TreeName treeName)
+      {
+        for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
+        {
+          for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
+          {
+            if (treeName.equals(index.getName()))
+            {
+              return index;
+            }
+          }
+        }
+        return null;
+      }
     });
   }
 
+  /**
+   * Copies JDK 8 Consumer.
+   * <p>
+   * FIXME Remove once we move to Java 8.
+   *
+   * @see java.util.function.Consumer Consumer from JDK 8
+   */
+  private static interface Consumer<T>
+  {
+    /**
+     * Performs this operation on the given argument.
+     *
+     * @param t
+     *          the input argument
+     */
+    void accept(T t);
+  }
+
+  /**
+   * A merging consumer that merges the supplied values.
+   * <p>
+   * Sample usage:
+   *
+   * <pre>
+   * while (it.hasNext())
+   * {
+   *   mergingConsumer.accept(it.next());
+   * }
+   * Object result = mergingConsumer.merge();
+   *
+   * <pre>
+   *
+   * @param <T>
+   *          the type of the arguments and the returned merged value
+   * @see java.util.function.Consumer Consumer from JDK 8
+   */
+  private static interface MergingConsumer<T> extends Consumer<T>
+  {
+    /**
+     * Merges the arguments provided via {@link Consumer#accept(Object)}.
+     *
+     * @return the merged value
+     */
+     T merge();
+  }
+
+  /** {@link MergingConsumer} that throws an exception when given several values to accept. */
+  private static final class NoMultipleValuesConsumer<V> implements MergingConsumer<V>
+  {
+    private V first;
+    private boolean moreThanOne;
+
+    @Override
+    public void accept(V value)
+    {
+      if (first == null)
+      {
+        this.first = value;
+      }
+      else
+      {
+        moreThanOne = true;
+      }
+    }
+
+    @Override
+    public V merge()
+    {
+      final boolean mustThrow = moreThanOne;
+      // clean up state
+      first = null;
+      moreThanOne = false;
+
+      if (mustThrow)
+      {
+        throw new IllegalArgumentException();
+      }
+      return first;
+    }
+  }
+
+  /**
+   * {@link MergingConsumer} that accepts {@link ByteSequence} objects
+   * and produces a {@link ByteSequence} representing the merged {@link ImportIDSet}.
+   */
+  private static final class ImportIDSetsMerger implements MergingConsumer<ByteString>
+  {
+    private final MatchingRuleIndex index;
+    private final Set<ByteString> values = new HashSet<>();
+    private boolean aboveIndexEntryLimit;
+
+    private ImportIDSetsMerger(MatchingRuleIndex index)
+    {
+      this.index = index;
+    }
+
+    @Override
+    public void accept(ByteString value)
+    {
+      if (!aboveIndexEntryLimit)
+      {
+        if (values.size() < index.getIndexEntryLimit())
+        {
+          values.add(value);
+        }
+        else
+        {
+          aboveIndexEntryLimit = true;
+        }
+      }
+    }
+
+    @Override
+    public ByteString merge()
+    {
+      try
+      {
+        if (aboveIndexEntryLimit)
+        {
+          return index.toValue(EntryIDSet.newUndefinedSet());
+        }
+        else if (values.size() == 1)
+        {
+          return values.iterator().next();
+        }
+
+        ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
+        for (ByteString value : values)
+        {
+          // FIXME JNR Can we make this more efficient?
+          // go through long[] + sort in the end?
+          idSet.merge(index.decodeValue(ByteString.empty(), value));
+        }
+        return index.toValue(idSet);
+      }
+      finally
+      {
+        // reset state
+        aboveIndexEntryLimit = false;
+        values.clear();
+      }
+    }
+  }
+
   /** Task used to migrate excluded branch. */
   private final class MigrateExcludedTask extends ImportTask
   {
@@ -1828,7 +2129,7 @@
     void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
     {
       DN2ID dn2id = suffix.getDN2ID();
-      importer.put(dn2id.getName(), dn2id.toKey(dn), entryID.toByteString());
+      importer.put(dn2id.getName(), dn2id.toKey(dn), dn2id.toValue(entryID));
     }
 
     private void processDN2URI(Suffix suffix, Entry entry)
@@ -1845,7 +2146,6 @@
     void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
         throws StorageRuntimeException, InterruptedException
     {
-      final ByteString value = entryID.toByteString();
       for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
       {
         final AttributeType attrType = mapEntry.getKey();
@@ -1854,9 +2154,14 @@
         {
           for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
           {
-            for (ByteString key : index.indexEntry(entry))
+            final Set<ByteString> keys = index.indexEntry(entry);
+            if (!keys.isEmpty())
             {
-              importer.put(index.getName(), key, value);
+              final ByteString value = index.toValue(entryID);
+              for (ByteString key : keys)
+              {
+                importer.put(index.getName(), key, value);
+              }
             }
           }
         }
@@ -1868,7 +2173,7 @@
       for (VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes())
       {
         ByteString key = vlvIndex.toKey(entry, entryID);
-        importer.put(vlvIndex.getName(), key, ByteString.empty());
+        importer.put(vlvIndex.getName(), key, vlvIndex.toValue());
       }
     }
   }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
index 546937e..abe553b 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
@@ -357,6 +357,11 @@
     return encodeVLVKey(entry, entryID.longValue());
   }
 
+  ByteString toValue()
+  {
+    return ByteString.empty();
+  }
+
   private boolean shouldInclude(final Entry entry) throws DirectoryException
   {
     return entry.getName().matchesBaseAndScope(baseDN, scope) && filter.matchesEntry(entry);
@@ -437,7 +442,7 @@
     {
       if (nextDeletedKey == null || (nextAddedKey != null && nextAddedKey.compareTo(nextDeletedKey) < 0))
       {
-        txn.put(getName(), nextAddedKey, ByteString.empty());
+        txn.put(getName(), nextAddedKey, toValue());
         nextAddedKey = nextOrNull(ai);
         count.incrementAndGet();
       }

--
Gitblit v1.10.0