From 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 30 Sep 2015 14:28:07 +0000
Subject: [PATCH] OPENDJ-2016: New on disk merge import strategy based on storage engine.

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java |  377 +++++++++++++++++++++++++++++++++++------------------
 1 files changed, 251 insertions(+), 126 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
index 8579cef..157c3a3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
@@ -35,6 +35,7 @@
 import java.util.TreeSet;
 
 import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.util.Reject;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 import org.opends.server.types.DirectoryException;
@@ -50,181 +51,305 @@
 @SuppressWarnings("javadoc")
 class IndexBuffer
 {
+
+  /** Internal interface for IndexBuffer implementor. */
+  private interface IndexBufferImplementor
+  {
+    void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException;
+
+    void put(Index index, ByteString key, EntryID entryID);
+
+    void put(VLVIndex index, ByteString sortKey);
+
+    void remove(VLVIndex index, ByteString sortKey);
+
+    void remove(Index index, ByteString key, EntryID entryID);
+  }
+
   /**
-   * The buffered records stored as a map from the record key to the
-   * buffered value for that key for each index.
+   * A buffered index is used to buffer multiple reads or writes to the same index key into a single read or write.
    * <p>
-   * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
-   * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+   * It can only be used to buffer multiple reads and writes under the same transaction. The transaction may be null if
+   * it is known that there are no other concurrent updates to the index.
    */
-  private final SortedMap<Index, SortedMap<ByteString, BufferedIndexValues>> bufferedIndexes = new TreeMap<>();
+  private static final class DefaultIndexBuffer implements IndexBufferImplementor
+  {
+
+    /**
+     * The buffered records stored as a map from the record key to the buffered value for that key for each index.
+     * <p>
+     * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
+     * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+     */
+    private final SortedMap<Index, SortedMap<ByteString, BufferedIndexValues>> bufferedIndexes = new TreeMap<>();
+
+    /**
+     * The buffered records stored as a set of buffered VLV values for each index.
+     * <p>
+     * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
+     * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+     */
+    private final SortedMap<VLVIndex, BufferedVLVIndexValues> bufferedVLVIndexes = new TreeMap<>();
+
+    /**
+     * A simple class representing a pair of added and deleted indexed IDs. Initially both addedIDs and deletedIDs are
+     * {@code null} indicating that that the whole record should be deleted.
+     */
+    private static class BufferedIndexValues
+    {
+      private EntryIDSet addedEntryIDs;
+      private EntryIDSet deletedEntryIDs;
+
+      void addEntryID(EntryID entryID)
+      {
+        if (!remove(deletedEntryIDs, entryID))
+        {
+          if (this.addedEntryIDs == null)
+          {
+            this.addedEntryIDs = newDefinedSet();
+          }
+          this.addedEntryIDs.add(entryID);
+        }
+      }
+
+      void deleteEntryID(EntryID entryID)
+      {
+        if (!remove(addedEntryIDs, entryID))
+        {
+          if (this.deletedEntryIDs == null)
+          {
+            this.deletedEntryIDs = newDefinedSet();
+          }
+          this.deletedEntryIDs.add(entryID);
+        }
+      }
+
+      private static boolean remove(EntryIDSet entryIDs, EntryID entryID)
+      {
+        return entryIDs != null ? entryIDs.remove(entryID) : false;
+      }
+    }
+
+    /** A simple class representing a pair of added and deleted VLV values. */
+    private static class BufferedVLVIndexValues
+    {
+      private TreeSet<ByteString> addedSortKeys;
+      private TreeSet<ByteString> deletedSortKeys;
+
+      void addSortKey(ByteString sortKey)
+      {
+        if (!remove(deletedSortKeys, sortKey))
+        {
+          if (addedSortKeys == null)
+          {
+            addedSortKeys = new TreeSet<>();
+          }
+          addedSortKeys.add(sortKey);
+        }
+      }
+
+      void deleteSortKey(ByteString sortKey)
+      {
+        if (!remove(addedSortKeys, sortKey))
+        {
+          if (deletedSortKeys == null)
+          {
+            deletedSortKeys = new TreeSet<>();
+          }
+          deletedSortKeys.add(sortKey);
+        }
+      }
+
+      private static boolean remove(TreeSet<ByteString> sortKeys, ByteString sortKey)
+      {
+        return sortKeys != null ? sortKeys.remove(sortKey) : false;
+      }
+    }
+
+    private BufferedVLVIndexValues createOrGetBufferedVLVIndexValues(VLVIndex vlvIndex)
+    {
+      BufferedVLVIndexValues bufferedValues = bufferedVLVIndexes.get(vlvIndex);
+      if (bufferedValues == null)
+      {
+        bufferedValues = new BufferedVLVIndexValues();
+        bufferedVLVIndexes.put(vlvIndex, bufferedValues);
+      }
+      return bufferedValues;
+    }
+
+    private BufferedIndexValues createOrGetBufferedIndexValues(Index index, ByteString keyBytes)
+    {
+      Map<ByteString, BufferedIndexValues> bufferedOperations = createOrGetBufferedOperations(index);
+
+      BufferedIndexValues values = bufferedOperations.get(keyBytes);
+      if (values == null)
+      {
+        values = new BufferedIndexValues();
+        bufferedOperations.put(keyBytes, values);
+      }
+      return values;
+    }
+
+    private Map<ByteString, BufferedIndexValues> createOrGetBufferedOperations(Index index)
+    {
+      SortedMap<ByteString, BufferedIndexValues> bufferedOperations = bufferedIndexes.get(index);
+      if (bufferedOperations == null)
+      {
+        bufferedOperations = new TreeMap<>();
+        bufferedIndexes.put(index, bufferedOperations);
+      }
+      return bufferedOperations;
+    }
+
+    @Override
+    public void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException
+    {
+      // Indexes are stored in sorted map to prevent deadlock during flush with DB using pessimistic lock strategies.
+      for (Entry<Index, SortedMap<ByteString, BufferedIndexValues>> entry : bufferedIndexes.entrySet())
+      {
+        flushIndex(entry.getKey(), txn, entry.getValue());
+      }
+
+      for (Entry<VLVIndex, BufferedVLVIndexValues> entry : bufferedVLVIndexes.entrySet())
+      {
+        entry.getKey().updateIndex(txn, entry.getValue().addedSortKeys, entry.getValue().deletedSortKeys);
+      }
+    }
+
+    @Override
+    public void put(Index index, ByteString key, EntryID entryID)
+    {
+      createOrGetBufferedIndexValues(index, key).addEntryID(entryID);
+    }
+
+    @Override
+    public void put(VLVIndex index, ByteString sortKey)
+    {
+      createOrGetBufferedVLVIndexValues(index).addSortKey(sortKey);
+    }
+
+    @Override
+    public void remove(VLVIndex index, ByteString sortKey)
+    {
+      createOrGetBufferedVLVIndexValues(index).deleteSortKey(sortKey);
+    }
+
+    @Override
+    public void remove(Index index, ByteString key, EntryID entryID)
+    {
+      createOrGetBufferedIndexValues(index, key).deleteEntryID(entryID);
+    }
+
+    private static void flushIndex(Index index, WriteableTransaction txn,
+        Map<ByteString, BufferedIndexValues> bufferedValues)
+    {
+      for (Entry<ByteString, BufferedIndexValues> entry : bufferedValues.entrySet())
+      {
+        final BufferedIndexValues values = entry.getValue();
+        index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs);
+      }
+    }
+  }
 
   /**
-   * The buffered records stored as a set of buffered VLV values for each index.
-   * <p>
-   * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
-   * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+   * IndexBuffer used during import which actually doesn't buffer modifications but forward those directly to the
+   * supplied {@link WriteableTransaction}.
    */
-  private final SortedMap<VLVIndex, BufferedVLVIndexValues> bufferedVLVIndexes = new TreeMap<>();
-
-  /**
-   * A simple class representing a pair of added and deleted indexed IDs. Initially both addedIDs
-   * and deletedIDs are {@code null} indicating that that the whole record should be deleted.
-   */
-  private static class BufferedIndexValues
+  private static final class ImportIndexBuffer implements IndexBufferImplementor
   {
-    private EntryIDSet addedEntryIDs;
-    private EntryIDSet deletedEntryIDs;
+    private final WriteableTransaction txn;
+    private final EntryID expectedEntryID;
+    private final ByteString encodedEntryID;
 
-    void addEntryID(EntryID entryID)
+    ImportIndexBuffer(WriteableTransaction txn, EntryID expectedEntryID)
     {
-      if (!remove(deletedEntryIDs, entryID))
-      {
-        if (this.addedEntryIDs == null)
-        {
-          this.addedEntryIDs = newDefinedSet();
-        }
-        this.addedEntryIDs.add(entryID);
-      }
+      this.txn = txn;
+      this.expectedEntryID = expectedEntryID;
+      this.encodedEntryID = CODEC_V2.encode(EntryIDSet.newDefinedSet(expectedEntryID.longValue()));
     }
 
-    void deleteEntryID(EntryID entryID)
+    @Override
+    public void put(Index index, ByteString key, EntryID entryID)
     {
-      if (!remove(addedEntryIDs, entryID))
-      {
-        if (this.deletedEntryIDs == null)
-        {
-          this.deletedEntryIDs = newDefinedSet();
-        }
-        this.deletedEntryIDs.add(entryID);
-      }
+      Reject.ifFalse(this.expectedEntryID.equals(entryID), "Unexpected entryID");
+      txn.put(index.getName(), key, encodedEntryID);
     }
 
-    private static boolean remove(EntryIDSet entryIDs, EntryID entryID)
+    @Override
+    public void put(VLVIndex index, ByteString sortKey)
     {
-      return entryIDs != null ? entryIDs.remove(entryID) : false;
+      txn.put(index.getName(), sortKey, index.toValue());
+    }
+
+    @Override
+    public void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException
+    {
+      // Nothing to do
+    }
+
+    @Override
+    public void remove(VLVIndex index, ByteString sortKey)
+    {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void remove(Index index, ByteString key, EntryID entryID)
+    {
+      throw new UnsupportedOperationException();
     }
   }
 
-  /** A simple class representing a pair of added and deleted VLV values. */
-  private static class BufferedVLVIndexValues
+  private final IndexBufferImplementor impl;
+
+  static IndexBuffer newImportIndexBuffer(WriteableTransaction txn, EntryID entryID)
   {
-    private TreeSet<ByteString> addedSortKeys;
-    private TreeSet<ByteString> deletedSortKeys;
-
-    void addSortKey(ByteString sortKey)
-    {
-      if (!remove(deletedSortKeys, sortKey))
-      {
-        if (addedSortKeys == null)
-        {
-          addedSortKeys = new TreeSet<>();
-        }
-        addedSortKeys.add(sortKey);
-      }
-    }
-
-    void deleteSortKey(ByteString sortKey)
-    {
-      if (!remove(addedSortKeys, sortKey))
-      {
-        if (deletedSortKeys == null)
-        {
-          deletedSortKeys = new TreeSet<>();
-        }
-        deletedSortKeys.add(sortKey);
-      }
-    }
-
-    private static boolean remove(TreeSet<ByteString> sortKeys, ByteString sortKey)
-    {
-      return sortKeys != null ? sortKeys.remove(sortKey) : false;
-    }
+    return new IndexBuffer(new ImportIndexBuffer(txn, entryID));
   }
 
-  private BufferedVLVIndexValues createOrGetBufferedVLVIndexValues(VLVIndex vlvIndex)
+  public IndexBuffer()
   {
-    BufferedVLVIndexValues bufferedValues = bufferedVLVIndexes.get(vlvIndex);
-    if (bufferedValues == null)
-    {
-      bufferedValues = new BufferedVLVIndexValues();
-      bufferedVLVIndexes.put(vlvIndex, bufferedValues);
-    }
-    return bufferedValues;
+    this(new DefaultIndexBuffer());
   }
 
-  private BufferedIndexValues createOrGetBufferedIndexValues(Index index, ByteString keyBytes)
+  private IndexBuffer(IndexBufferImplementor impl)
   {
-    Map<ByteString, BufferedIndexValues> bufferedOperations = createOrGetBufferedOperations(index);
-
-    BufferedIndexValues values = bufferedOperations.get(keyBytes);
-    if (values == null)
-    {
-      values = new BufferedIndexValues();
-      bufferedOperations.put(keyBytes, values);
-    }
-    return values;
-  }
-
-  private Map<ByteString, BufferedIndexValues> createOrGetBufferedOperations(Index index)
-  {
-    SortedMap<ByteString, BufferedIndexValues> bufferedOperations = bufferedIndexes.get(index);
-    if (bufferedOperations == null)
-    {
-      bufferedOperations = new TreeMap<>();
-      bufferedIndexes.put(index, bufferedOperations);
-    }
-    return bufferedOperations;
+    this.impl = impl;
   }
 
   /**
    * Flush the buffered index changes to storage.
    *
-   * @param txn a non null transaction
-   * @throws StorageRuntimeException If an error occurs in the storage.
-   * @throws DirectoryException If a Directory Server error occurs.
+   * @param txn
+   *          a non null transaction
+   * @throws StorageRuntimeException
+   *           If an error occurs in the storage.
+   * @throws DirectoryException
+   *           If a Directory Server error occurs.
    */
   void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException
   {
-    // Indexes are stored in sorted map to prevent deadlock during flush with DB using pessimistic lock strategies.
-    for (Entry<Index, SortedMap<ByteString, BufferedIndexValues>> entry : bufferedIndexes.entrySet())
-    {
-      flushIndex(entry.getKey(), txn, entry.getValue());
-    }
-
-    for (Entry<VLVIndex, BufferedVLVIndexValues> entry : bufferedVLVIndexes.entrySet())
-    {
-      entry.getKey().updateIndex(txn, entry.getValue().addedSortKeys, entry.getValue().deletedSortKeys);
-    }
+    impl.flush(txn);
   }
 
   void put(Index index, ByteString key, EntryID entryID)
   {
-    createOrGetBufferedIndexValues(index, key).addEntryID(entryID);
+    impl.put(index, key, entryID);
   }
 
   void put(VLVIndex index, ByteString sortKey)
   {
-    createOrGetBufferedVLVIndexValues(index).addSortKey(sortKey);
+    impl.put(index, sortKey);
   }
 
   void remove(VLVIndex index, ByteString sortKey)
   {
-    createOrGetBufferedVLVIndexValues(index).deleteSortKey(sortKey);
+    impl.remove(index, sortKey);
   }
 
   void remove(Index index, ByteString key, EntryID entryID)
   {
-    createOrGetBufferedIndexValues(index, key).deleteEntryID(entryID);
+    impl.remove(index, key, entryID);
   }
 
-  private static void flushIndex(Index index, WriteableTransaction txn,
-      Map<ByteString, BufferedIndexValues> bufferedValues)
-  {
-    for (Entry<ByteString, BufferedIndexValues> entry : bufferedValues.entrySet())
-    {
-      final BufferedIndexValues values = entry.getValue();
-      index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs);
-    }
-  }
 }

--
Gitblit v1.10.0