From 641e89ef0e15c9edde69f3b8cf82c7dd5f68687a Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Wed, 30 Sep 2015 14:28:07 +0000
Subject: [PATCH] OPENDJ-2016: New on disk merge import strategy based on storage engine.
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java | 377 +++++++++++++++++++++++++++++++++++------------------
1 files changed, 251 insertions(+), 126 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
index 8579cef..157c3a3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
@@ -35,6 +35,7 @@
import java.util.TreeSet;
import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.util.Reject;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
import org.opends.server.types.DirectoryException;
@@ -50,181 +51,305 @@
@SuppressWarnings("javadoc")
class IndexBuffer
{
+
+ /** Internal interface for IndexBuffer implementor. */
+ private interface IndexBufferImplementor
+ {
+ void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException;
+
+ void put(Index index, ByteString key, EntryID entryID);
+
+ void put(VLVIndex index, ByteString sortKey);
+
+ void remove(VLVIndex index, ByteString sortKey);
+
+ void remove(Index index, ByteString key, EntryID entryID);
+ }
+
/**
- * The buffered records stored as a map from the record key to the
- * buffered value for that key for each index.
+ * A buffered index is used to buffer multiple reads or writes to the same index key into a single read or write.
* <p>
- * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
- * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+ * It can only be used to buffer multiple reads and writes under the same transaction. The transaction may be null if
+ * it is known that there are no other concurrent updates to the index.
*/
- private final SortedMap<Index, SortedMap<ByteString, BufferedIndexValues>> bufferedIndexes = new TreeMap<>();
+ private static final class DefaultIndexBuffer implements IndexBufferImplementor
+ {
+
+ /**
+ * The buffered records stored as a map from the record key to the buffered value for that key for each index.
+ * <p>
+ * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
+ * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+ */
+ private final SortedMap<Index, SortedMap<ByteString, BufferedIndexValues>> bufferedIndexes = new TreeMap<>();
+
+ /**
+ * The buffered records stored as a set of buffered VLV values for each index.
+ * <p>
+ * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
+ * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+ */
+ private final SortedMap<VLVIndex, BufferedVLVIndexValues> bufferedVLVIndexes = new TreeMap<>();
+
+ /**
+ * A simple class representing a pair of added and deleted indexed IDs. Initially both addedIDs and deletedIDs are
+ * {@code null} indicating that that the whole record should be deleted.
+ */
+ private static class BufferedIndexValues
+ {
+ private EntryIDSet addedEntryIDs;
+ private EntryIDSet deletedEntryIDs;
+
+ void addEntryID(EntryID entryID)
+ {
+ if (!remove(deletedEntryIDs, entryID))
+ {
+ if (this.addedEntryIDs == null)
+ {
+ this.addedEntryIDs = newDefinedSet();
+ }
+ this.addedEntryIDs.add(entryID);
+ }
+ }
+
+ void deleteEntryID(EntryID entryID)
+ {
+ if (!remove(addedEntryIDs, entryID))
+ {
+ if (this.deletedEntryIDs == null)
+ {
+ this.deletedEntryIDs = newDefinedSet();
+ }
+ this.deletedEntryIDs.add(entryID);
+ }
+ }
+
+ private static boolean remove(EntryIDSet entryIDs, EntryID entryID)
+ {
+ return entryIDs != null ? entryIDs.remove(entryID) : false;
+ }
+ }
+
+ /** A simple class representing a pair of added and deleted VLV values. */
+ private static class BufferedVLVIndexValues
+ {
+ private TreeSet<ByteString> addedSortKeys;
+ private TreeSet<ByteString> deletedSortKeys;
+
+ void addSortKey(ByteString sortKey)
+ {
+ if (!remove(deletedSortKeys, sortKey))
+ {
+ if (addedSortKeys == null)
+ {
+ addedSortKeys = new TreeSet<>();
+ }
+ addedSortKeys.add(sortKey);
+ }
+ }
+
+ void deleteSortKey(ByteString sortKey)
+ {
+ if (!remove(addedSortKeys, sortKey))
+ {
+ if (deletedSortKeys == null)
+ {
+ deletedSortKeys = new TreeSet<>();
+ }
+ deletedSortKeys.add(sortKey);
+ }
+ }
+
+ private static boolean remove(TreeSet<ByteString> sortKeys, ByteString sortKey)
+ {
+ return sortKeys != null ? sortKeys.remove(sortKey) : false;
+ }
+ }
+
+ private BufferedVLVIndexValues createOrGetBufferedVLVIndexValues(VLVIndex vlvIndex)
+ {
+ BufferedVLVIndexValues bufferedValues = bufferedVLVIndexes.get(vlvIndex);
+ if (bufferedValues == null)
+ {
+ bufferedValues = new BufferedVLVIndexValues();
+ bufferedVLVIndexes.put(vlvIndex, bufferedValues);
+ }
+ return bufferedValues;
+ }
+
+ private BufferedIndexValues createOrGetBufferedIndexValues(Index index, ByteString keyBytes)
+ {
+ Map<ByteString, BufferedIndexValues> bufferedOperations = createOrGetBufferedOperations(index);
+
+ BufferedIndexValues values = bufferedOperations.get(keyBytes);
+ if (values == null)
+ {
+ values = new BufferedIndexValues();
+ bufferedOperations.put(keyBytes, values);
+ }
+ return values;
+ }
+
+ private Map<ByteString, BufferedIndexValues> createOrGetBufferedOperations(Index index)
+ {
+ SortedMap<ByteString, BufferedIndexValues> bufferedOperations = bufferedIndexes.get(index);
+ if (bufferedOperations == null)
+ {
+ bufferedOperations = new TreeMap<>();
+ bufferedIndexes.put(index, bufferedOperations);
+ }
+ return bufferedOperations;
+ }
+
+ @Override
+ public void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException
+ {
+ // Indexes are stored in sorted map to prevent deadlock during flush with DB using pessimistic lock strategies.
+ for (Entry<Index, SortedMap<ByteString, BufferedIndexValues>> entry : bufferedIndexes.entrySet())
+ {
+ flushIndex(entry.getKey(), txn, entry.getValue());
+ }
+
+ for (Entry<VLVIndex, BufferedVLVIndexValues> entry : bufferedVLVIndexes.entrySet())
+ {
+ entry.getKey().updateIndex(txn, entry.getValue().addedSortKeys, entry.getValue().deletedSortKeys);
+ }
+ }
+
+ @Override
+ public void put(Index index, ByteString key, EntryID entryID)
+ {
+ createOrGetBufferedIndexValues(index, key).addEntryID(entryID);
+ }
+
+ @Override
+ public void put(VLVIndex index, ByteString sortKey)
+ {
+ createOrGetBufferedVLVIndexValues(index).addSortKey(sortKey);
+ }
+
+ @Override
+ public void remove(VLVIndex index, ByteString sortKey)
+ {
+ createOrGetBufferedVLVIndexValues(index).deleteSortKey(sortKey);
+ }
+
+ @Override
+ public void remove(Index index, ByteString key, EntryID entryID)
+ {
+ createOrGetBufferedIndexValues(index, key).deleteEntryID(entryID);
+ }
+
+ private static void flushIndex(Index index, WriteableTransaction txn,
+ Map<ByteString, BufferedIndexValues> bufferedValues)
+ {
+ for (Entry<ByteString, BufferedIndexValues> entry : bufferedValues.entrySet())
+ {
+ final BufferedIndexValues values = entry.getValue();
+ index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs);
+ }
+ }
+ }
/**
- * The buffered records stored as a set of buffered VLV values for each index.
- * <p>
- * The map is sorted by {@link TreeName}s to establish a deterministic iteration order (see {@link AbstractTree}).
- * This prevents potential deadlock for db having pessimistic lock strategy (e.g.: JE).
+ * IndexBuffer used during import which actually doesn't buffer modifications but forward those directly to the
+ * supplied {@link WriteableTransaction}.
*/
- private final SortedMap<VLVIndex, BufferedVLVIndexValues> bufferedVLVIndexes = new TreeMap<>();
-
- /**
- * A simple class representing a pair of added and deleted indexed IDs. Initially both addedIDs
- * and deletedIDs are {@code null} indicating that that the whole record should be deleted.
- */
- private static class BufferedIndexValues
+ private static final class ImportIndexBuffer implements IndexBufferImplementor
{
- private EntryIDSet addedEntryIDs;
- private EntryIDSet deletedEntryIDs;
+ private final WriteableTransaction txn;
+ private final EntryID expectedEntryID;
+ private final ByteString encodedEntryID;
- void addEntryID(EntryID entryID)
+ ImportIndexBuffer(WriteableTransaction txn, EntryID expectedEntryID)
{
- if (!remove(deletedEntryIDs, entryID))
- {
- if (this.addedEntryIDs == null)
- {
- this.addedEntryIDs = newDefinedSet();
- }
- this.addedEntryIDs.add(entryID);
- }
+ this.txn = txn;
+ this.expectedEntryID = expectedEntryID;
+ this.encodedEntryID = CODEC_V2.encode(EntryIDSet.newDefinedSet(expectedEntryID.longValue()));
}
- void deleteEntryID(EntryID entryID)
+ @Override
+ public void put(Index index, ByteString key, EntryID entryID)
{
- if (!remove(addedEntryIDs, entryID))
- {
- if (this.deletedEntryIDs == null)
- {
- this.deletedEntryIDs = newDefinedSet();
- }
- this.deletedEntryIDs.add(entryID);
- }
+ Reject.ifFalse(this.expectedEntryID.equals(entryID), "Unexpected entryID");
+ txn.put(index.getName(), key, encodedEntryID);
}
- private static boolean remove(EntryIDSet entryIDs, EntryID entryID)
+ @Override
+ public void put(VLVIndex index, ByteString sortKey)
{
- return entryIDs != null ? entryIDs.remove(entryID) : false;
+ txn.put(index.getName(), sortKey, index.toValue());
+ }
+
+ @Override
+ public void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException
+ {
+ // Nothing to do
+ }
+
+ @Override
+ public void remove(VLVIndex index, ByteString sortKey)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove(Index index, ByteString key, EntryID entryID)
+ {
+ throw new UnsupportedOperationException();
}
}
- /** A simple class representing a pair of added and deleted VLV values. */
- private static class BufferedVLVIndexValues
+ private final IndexBufferImplementor impl;
+
+ static IndexBuffer newImportIndexBuffer(WriteableTransaction txn, EntryID entryID)
{
- private TreeSet<ByteString> addedSortKeys;
- private TreeSet<ByteString> deletedSortKeys;
-
- void addSortKey(ByteString sortKey)
- {
- if (!remove(deletedSortKeys, sortKey))
- {
- if (addedSortKeys == null)
- {
- addedSortKeys = new TreeSet<>();
- }
- addedSortKeys.add(sortKey);
- }
- }
-
- void deleteSortKey(ByteString sortKey)
- {
- if (!remove(addedSortKeys, sortKey))
- {
- if (deletedSortKeys == null)
- {
- deletedSortKeys = new TreeSet<>();
- }
- deletedSortKeys.add(sortKey);
- }
- }
-
- private static boolean remove(TreeSet<ByteString> sortKeys, ByteString sortKey)
- {
- return sortKeys != null ? sortKeys.remove(sortKey) : false;
- }
+ return new IndexBuffer(new ImportIndexBuffer(txn, entryID));
}
- private BufferedVLVIndexValues createOrGetBufferedVLVIndexValues(VLVIndex vlvIndex)
+ public IndexBuffer()
{
- BufferedVLVIndexValues bufferedValues = bufferedVLVIndexes.get(vlvIndex);
- if (bufferedValues == null)
- {
- bufferedValues = new BufferedVLVIndexValues();
- bufferedVLVIndexes.put(vlvIndex, bufferedValues);
- }
- return bufferedValues;
+ this(new DefaultIndexBuffer());
}
- private BufferedIndexValues createOrGetBufferedIndexValues(Index index, ByteString keyBytes)
+ private IndexBuffer(IndexBufferImplementor impl)
{
- Map<ByteString, BufferedIndexValues> bufferedOperations = createOrGetBufferedOperations(index);
-
- BufferedIndexValues values = bufferedOperations.get(keyBytes);
- if (values == null)
- {
- values = new BufferedIndexValues();
- bufferedOperations.put(keyBytes, values);
- }
- return values;
- }
-
- private Map<ByteString, BufferedIndexValues> createOrGetBufferedOperations(Index index)
- {
- SortedMap<ByteString, BufferedIndexValues> bufferedOperations = bufferedIndexes.get(index);
- if (bufferedOperations == null)
- {
- bufferedOperations = new TreeMap<>();
- bufferedIndexes.put(index, bufferedOperations);
- }
- return bufferedOperations;
+ this.impl = impl;
}
/**
* Flush the buffered index changes to storage.
*
- * @param txn a non null transaction
- * @throws StorageRuntimeException If an error occurs in the storage.
- * @throws DirectoryException If a Directory Server error occurs.
+ * @param txn
+ * a non null transaction
+ * @throws StorageRuntimeException
+ * If an error occurs in the storage.
+ * @throws DirectoryException
+ * If a Directory Server error occurs.
*/
void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException
{
- // Indexes are stored in sorted map to prevent deadlock during flush with DB using pessimistic lock strategies.
- for (Entry<Index, SortedMap<ByteString, BufferedIndexValues>> entry : bufferedIndexes.entrySet())
- {
- flushIndex(entry.getKey(), txn, entry.getValue());
- }
-
- for (Entry<VLVIndex, BufferedVLVIndexValues> entry : bufferedVLVIndexes.entrySet())
- {
- entry.getKey().updateIndex(txn, entry.getValue().addedSortKeys, entry.getValue().deletedSortKeys);
- }
+ impl.flush(txn);
}
void put(Index index, ByteString key, EntryID entryID)
{
- createOrGetBufferedIndexValues(index, key).addEntryID(entryID);
+ impl.put(index, key, entryID);
}
void put(VLVIndex index, ByteString sortKey)
{
- createOrGetBufferedVLVIndexValues(index).addSortKey(sortKey);
+ impl.put(index, sortKey);
}
void remove(VLVIndex index, ByteString sortKey)
{
- createOrGetBufferedVLVIndexValues(index).deleteSortKey(sortKey);
+ impl.remove(index, sortKey);
}
void remove(Index index, ByteString key, EntryID entryID)
{
- createOrGetBufferedIndexValues(index, key).deleteEntryID(entryID);
+ impl.remove(index, key, entryID);
}
- private static void flushIndex(Index index, WriteableTransaction txn,
- Map<ByteString, BufferedIndexValues> bufferedValues)
- {
- for (Entry<ByteString, BufferedIndexValues> entry : bufferedValues.entrySet())
- {
- final BufferedIndexValues values = entry.getValue();
- index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs);
- }
- }
}
--
Gitblit v1.10.0