From c5d246665c8d72aa524009a12af556f8fba76fe4 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 26 May 2015 13:01:55 +0000
Subject: [PATCH] OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java | 7
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java | 18 +
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java | 46 ++++--
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java | 327 +++++++++++++++++++++++++++++++++++++++++++++-
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java | 10 +
5 files changed, 374 insertions(+), 34 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
index 744245c..6899251 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
@@ -96,7 +96,7 @@
*/
void put(final WriteableTransaction txn, DN dn, final EntryID entryID) throws StorageRuntimeException
{
- txn.put(getName(), toKey(dn), entryID.toByteString());
+ txn.put(getName(), toKey(dn), toValue(entryID));
}
boolean insert(final WriteableTransaction txn, DN dn, final EntryID entryID) throws StorageRuntimeException
@@ -112,7 +112,7 @@
return oldEntryID;
}
// it did not exist before, insert the new value
- return entryID.toByteString();
+ return toValue(entryID);
}
});
}
@@ -122,6 +122,12 @@
return dnToDNKey(dn, baseDN.size());
}
+ ByteString toValue(final EntryID entryID)
+ {
+ // TODO JNR do we want to use compacted longs?
+ return entryID.toByteString();
+ }
+
/**
* Remove a record from the DN tree.
* @param txn a non null transaction
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
index a516ddf..c28ba83 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -60,12 +60,9 @@
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/** The limit on the number of entry IDs that may be indexed by one key. */
- private int indexEntryLimit;
-
private final State state;
-
private final EntryContainer entryContainer;
-
+ private int indexEntryLimit;
private EntryIDSetCodec codec;
/**
@@ -124,11 +121,32 @@
@Override
public EntryIDSet transform(ByteString key, ByteString value) throws NeverThrowsException
{
- return codec.decode(key, value);
+ return decodeValue(key, value);
}
});
}
+ EntryIDSet decodeValue(ByteSequence key, ByteString value)
+ {
+ return codec.decode(key, value);
+ }
+
+ ByteString toValue(EntryID entryID)
+ {
+ return codec.encode(newDefinedSet(entryID.longValue()));
+ }
+
+ ByteString toValue(EntryIDSet entryIDSet)
+ {
+ return codec.encode(entryIDSet);
+ }
+
+ ByteString toValue(ImportIDSet importIDSet)
+ {
+ return importIDSet.valueToByteString(codec);
+ }
+
+ // TODO JNR rename to importUpsert() ?
@Override
public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException
{
@@ -138,14 +156,14 @@
ByteString value = importer.read(getName(), key);
if (value != null)
{
- final EntryIDSet entryIDSet = codec.decode(key, value);
+ final EntryIDSet entryIDSet = decodeValue(key, value);
final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
importIDSet.merge(idsToBeAdded);
- importer.put(getName(), key, importIDSet.valueToByteString(codec));
+ importer.put(getName(), key, toValue(importIDSet));
}
else
{
- importer.put(getName(), key, idsToBeAdded.valueToByteString(codec));
+ importer.put(getName(), key, toValue(idsToBeAdded));
}
}
@@ -162,7 +180,7 @@
throw new IllegalStateException("Expected to have a value associated to key " + key + " for index " + getName());
}
- final EntryIDSet entryIDSet = codec.decode(key, value);
+ final EntryIDSet entryIDSet = decodeValue(key, value);
final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
importIDSet.remove(idsToBeRemoved);
if (importIDSet.isDefined() && importIDSet.size() == 0)
@@ -171,7 +189,7 @@
}
else
{
- importer.put(getName(), key, importIDSet.valueToByteString(codec));
+ importer.put(getName(), key, toValue(importIDSet));
}
}
@@ -218,7 +236,7 @@
if (oldValue != null)
{
EntryIDSet entryIDSet = computeEntryIDSet(key, oldValue.toByteString(), deletedIDs, addedIDs);
- ByteString after = codec.encode(entryIDSet);
+ ByteString after = toValue(entryIDSet);
/*
* If there are no more IDs then return null indicating that the record should be removed.
* If index is not trusted then this will cause all subsequent reads for this key to
@@ -234,7 +252,7 @@
}
if (isNotEmpty(addedIDs))
{
- return codec.encode(addedIDs);
+ return toValue(addedIDs);
}
}
return null; // no change.
@@ -254,7 +272,7 @@
private EntryIDSet computeEntryIDSet(ByteString key, ByteString value, EntryIDSet deletedIDs, EntryIDSet addedIDs)
{
- EntryIDSet entryIDSet = codec.decode(key, value);
+ EntryIDSet entryIDSet = decodeValue(key, value);
if (addedIDs != null)
{
if (entryIDSet.isDefined() && indexEntryLimit > 0)
@@ -300,7 +318,7 @@
ByteString value = txn.read(getName(), key);
if (value != null)
{
- return codec.decode(key, value);
+ return decodeValue(key, value);
}
return trusted ? newDefinedSet() : newUndefinedSet();
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
index 3725947..3e3b2d2 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
@@ -42,6 +42,7 @@
* the configured ID limit. If the limit it reached, the class stops tracking
* individual IDs and marks the set as undefined. This class is not thread safe.
*/
+@SuppressWarnings("javadoc")
final class ImportIDSet implements Iterable<EntryID> {
/** The encapsulated entryIDSet where elements are stored until reaching the limit. */
@@ -127,18 +128,23 @@
boolean merge(ImportIDSet importIdSet)
{
checkNotNull(importIdSet, "importIdSet must not be null");
+ return merge(importIdSet.entryIDSet);
+ }
- boolean definedBeforeMerge = isDefined();
- final long mergedSize = addWithoutOverflow(entryIDSet.size(), importIdSet.entryIDSet.size());
+ boolean merge(EntryIDSet entryIDSet)
+ {
+ checkNotNull(entryIDSet, "entryID must not be null");
+ boolean definedBeforeMerge = this.entryIDSet.isDefined();
+ final long mergedSize = addWithoutOverflow(this.entryIDSet.size(), entryIDSet.size());
- if (!definedBeforeMerge || !importIdSet.isDefined() || mergedSize > indexEntryLimitSize)
+ if (!definedBeforeMerge || !entryIDSet.isDefined() || mergedSize > indexEntryLimitSize)
{
- entryIDSet = newUndefinedSetWithKey(key);
+ this.entryIDSet = newUndefinedSetWithKey(key);
return definedBeforeMerge;
}
- else if (isDefined())
+ else if (this.entryIDSet.isDefined())
{
- entryIDSet.addAll(importIdSet.entryIDSet);
+ this.entryIDSet.addAll(entryIDSet);
}
return false;
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
index dea8c2c..3f6170c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -28,6 +28,7 @@
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.backends.pluggable.DnKeyFormat.*;
+import static org.opends.server.backends.pluggable.SuffixContainer.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
@@ -331,6 +332,7 @@
for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
{
ByteSequence key = mapEntry.getKey();
+ // FIXME JNR merge values before put
for (ByteSequence value : mapEntry.getValue())
{
put(byteBuffer, key);
@@ -404,6 +406,118 @@
}
}
+ /** A cursor performing the "merge" phase of the on-disk merge. */
+ private static final class MergingCursor<K, V> implements Cursor<K, V>
+ {
+ private final Cursor<K, V> delegate;
+ private final MergingConsumer<V> merger;
+ private K key;
+ private V value;
+ private boolean isDefined;
+
+ private MergingCursor(Cursor<K, V> cursor, MergingConsumer<V> merger)
+ {
+ this.delegate = cursor;
+ this.merger = merger;
+ }
+
+ @Override
+ public boolean next()
+ {
+ if (key == null)
+ {
+ if (!delegate.next())
+ {
+ return isDefined = false;
+ }
+ key = delegate.getKey();
+ accumulateValues();
+ return isDefined = true;
+ }
+ else if (delegate.isDefined())
+ {
+ // we did yet not consume key/value from the delegate cursor
+ key = delegate.getKey();
+ accumulateValues();
+ return isDefined = true;
+ }
+ else
+ {
+ // no more data to compute
+ return isDefined = false;
+ }
+ }
+
+ private void accumulateValues()
+ {
+ while (delegate.isDefined() && key.equals(delegate.getKey()))
+ {
+ merger.accept(delegate.getValue());
+ delegate.next();
+ }
+ value = merger.merge();
+ }
+
+ @Override
+ public boolean isDefined()
+ {
+ return isDefined;
+ }
+
+ @Override
+ public K getKey() throws NoSuchElementException
+ {
+ throwIfNotDefined();
+ return key;
+ }
+
+ @Override
+ public V getValue() throws NoSuchElementException
+ {
+ throwIfNotDefined();
+ return value;
+ }
+
+ private void throwIfNotDefined()
+ {
+ if (!isDefined())
+ {
+ throw new NoSuchElementException();
+ }
+ }
+
+ @Override
+ public void close()
+ {
+ delegate.close();
+ isDefined = false;
+ }
+
+ @Override
+ public boolean positionToKey(ByteSequence key)
+ {
+ return delegate.positionToKey(key);
+ }
+
+ @Override
+ public boolean positionToKeyOrNext(ByteSequence key)
+ {
+ return delegate.positionToKeyOrNext(key);
+ }
+
+ @Override
+ public boolean positionToLastKey()
+ {
+ return delegate.positionToLastKey();
+ }
+
+ @Override
+ public boolean positionToIndex(int index)
+ {
+ return delegate.positionToIndex(index);
+ }
+ }
+
/** A cursor implementation aggregating several cursors and ordering them by their key value. */
private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
{
@@ -524,28 +638,24 @@
return "not defined";
}
- /** {@inheritDoc} */
@Override
public boolean positionToKey(ByteSequence key)
{
throw notImplemented();
}
- /** {@inheritDoc} */
@Override
public boolean positionToKeyOrNext(ByteSequence key)
{
throw notImplemented();
}
- /** {@inheritDoc} */
@Override
public boolean positionToLastKey()
{
throw notImplemented();
}
- /** {@inheritDoc} */
@Override
public boolean positionToIndex(int index)
{
@@ -1544,18 +1654,209 @@
@Override
public Void run(ReadableTransaction txn) throws Exception
{
- try (Cursor<ByteString, ByteString> cursor = txn.openCursor(treeName))
+ try (Cursor<ByteString, ByteString> cursor =
+ new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
{
while (cursor.next())
- {// FIXME JNR add merge phase
+ {
output.put(treeName, cursor.getKey(), cursor.getValue());
}
}
return null;
}
+
+ private MergingConsumer<ByteString> getMerger(final TreeName treeName) throws DirectoryException
+ {
+ EntryContainer entryContainer = rootContainer.getEntryContainer(DN.valueOf(treeName.getBaseDN()));
+ final MatchingRuleIndex index = getIndex(entryContainer, treeName);
+ if (index != null)
+ {
+ // key conflicts == merge EntryIDSets
+ return new ImportIDSetsMerger(index);
+ }
+ else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
+ || treeName.getIndexId().equals(DN2URI_INDEX_NAME)
+ || isVLVIndex(entryContainer, treeName))
+ {
+ // key conflicts == exception
+ return new NoMultipleValuesConsumer<>();
+ }
+ throw new IllegalArgumentException("Unknown tree: " + treeName);
+ }
+
+ private boolean isVLVIndex(EntryContainer entryContainer, TreeName treeName)
+ {
+ for (VLVIndex vlvIndex : entryContainer.getVLVIndexes())
+ {
+ if (treeName.equals(vlvIndex.getName()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private MatchingRuleIndex getIndex(EntryContainer entryContainer, TreeName treeName)
+ {
+ for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
+ {
+ for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
+ {
+ if (treeName.equals(index.getName()))
+ {
+ return index;
+ }
+ }
+ }
+ return null;
+ }
});
}
+ /**
+ * Copies JDK 8 Consumer.
+ * <p>
+ * FIXME Remove once we move to Java 8.
+ *
+ * @see java.util.function.Consumer Consumer from JDK 8
+ */
+ private static interface Consumer<T>
+ {
+ /**
+ * Performs this operation on the given argument.
+ *
+ * @param t
+ * the input argument
+ */
+ void accept(T t);
+ }
+
+ /**
+ * A merging consumer that merges the supplied values.
+ * <p>
+ * Sample usage:
+ *
+ * <pre>
+ * while (it.hasNext())
+ * {
+ * mergingConsumer.accept(it.next());
+ * }
+ * Object result = mergingConsumer.merge();
+ *
+ * <pre>
+ *
+ * @param <T>
+ * the type of the arguments and the returned merged value
+ * @see java.util.function.Consumer Consumer from JDK 8
+ */
+ private static interface MergingConsumer<T> extends Consumer<T>
+ {
+ /**
+ * Merges the arguments provided via {@link Consumer#accept(Object)}.
+ *
+ * @return the merged value
+ */
+ T merge();
+ }
+
+ /** {@link MergingConsumer} that throws an exception when given several values to accept. */
+ private static final class NoMultipleValuesConsumer<V> implements MergingConsumer<V>
+ {
+ private V first;
+ private boolean moreThanOne;
+
+ @Override
+ public void accept(V value)
+ {
+ if (first == null)
+ {
+ this.first = value;
+ }
+ else
+ {
+ moreThanOne = true;
+ }
+ }
+
+ @Override
+ public V merge()
+ {
+ final boolean mustThrow = moreThanOne;
+ // clean up state
+ first = null;
+ moreThanOne = false;
+
+ if (mustThrow)
+ {
+ throw new IllegalArgumentException();
+ }
+ return first;
+ }
+ }
+
+ /**
+ * {@link MergingConsumer} that accepts {@link ByteSequence} objects
+ * and produces a {@link ByteSequence} representing the merged {@link ImportIDSet}.
+ */
+ private static final class ImportIDSetsMerger implements MergingConsumer<ByteString>
+ {
+ private final MatchingRuleIndex index;
+ private final Set<ByteString> values = new HashSet<>();
+ private boolean aboveIndexEntryLimit;
+
+ private ImportIDSetsMerger(MatchingRuleIndex index)
+ {
+ this.index = index;
+ }
+
+ @Override
+ public void accept(ByteString value)
+ {
+ if (!aboveIndexEntryLimit)
+ {
+ if (values.size() < index.getIndexEntryLimit())
+ {
+ values.add(value);
+ }
+ else
+ {
+ aboveIndexEntryLimit = true;
+ }
+ }
+ }
+
+ @Override
+ public ByteString merge()
+ {
+ try
+ {
+ if (aboveIndexEntryLimit)
+ {
+ return index.toValue(EntryIDSet.newUndefinedSet());
+ }
+ else if (values.size() == 1)
+ {
+ return values.iterator().next();
+ }
+
+ ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
+ for (ByteString value : values)
+ {
+ // FIXME JNR Can we make this more efficient?
+ // go through long[] + sort in the end?
+ idSet.merge(index.decodeValue(ByteString.empty(), value));
+ }
+ return index.toValue(idSet);
+ }
+ finally
+ {
+ // reset state
+ aboveIndexEntryLimit = false;
+ values.clear();
+ }
+ }
+ }
+
/** Task used to migrate excluded branch. */
private final class MigrateExcludedTask extends ImportTask
{
@@ -1828,7 +2129,7 @@
void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
{
DN2ID dn2id = suffix.getDN2ID();
- importer.put(dn2id.getName(), dn2id.toKey(dn), entryID.toByteString());
+ importer.put(dn2id.getName(), dn2id.toKey(dn), dn2id.toValue(entryID));
}
private void processDN2URI(Suffix suffix, Entry entry)
@@ -1845,7 +2146,6 @@
void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
throws StorageRuntimeException, InterruptedException
{
- final ByteString value = entryID.toByteString();
for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
{
final AttributeType attrType = mapEntry.getKey();
@@ -1854,9 +2154,14 @@
{
for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
{
- for (ByteString key : index.indexEntry(entry))
+ final Set<ByteString> keys = index.indexEntry(entry);
+ if (!keys.isEmpty())
{
- importer.put(index.getName(), key, value);
+ final ByteString value = index.toValue(entryID);
+ for (ByteString key : keys)
+ {
+ importer.put(index.getName(), key, value);
+ }
}
}
}
@@ -1868,7 +2173,7 @@
for (VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes())
{
ByteString key = vlvIndex.toKey(entry, entryID);
- importer.put(vlvIndex.getName(), key, ByteString.empty());
+ importer.put(vlvIndex.getName(), key, vlvIndex.toValue());
}
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
index 546937e..abe553b 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
@@ -357,6 +357,11 @@
return encodeVLVKey(entry, entryID.longValue());
}
+ ByteString toValue()
+ {
+ return ByteString.empty();
+ }
+
private boolean shouldInclude(final Entry entry) throws DirectoryException
{
return entry.getName().matchesBaseAndScope(baseDN, scope) && filter.matchesEntry(entry);
@@ -437,7 +442,7 @@
{
if (nextDeletedKey == null || (nextAddedKey != null && nextAddedKey.compareTo(nextDeletedKey) < 0))
{
- txn.put(getName(), nextAddedKey, ByteString.empty());
+ txn.put(getName(), nextAddedKey, toValue());
nextAddedKey = nextOrNull(ai);
count.incrementAndGet();
}
--
Gitblit v1.10.0