mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.01.2015 c5d246665c8d72aa524009a12af556f8fba76fe4
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DN2ID.java
@@ -96,7 +96,7 @@
   */
  void put(final WriteableTransaction txn, DN dn, final EntryID entryID) throws StorageRuntimeException
  {
    txn.put(getName(), toKey(dn), entryID.toByteString());
    txn.put(getName(), toKey(dn), toValue(entryID));
  }
  boolean insert(final WriteableTransaction txn, DN dn, final EntryID entryID) throws StorageRuntimeException
@@ -112,7 +112,7 @@
          return oldEntryID;
        }
        // it did not exist before, insert the new value
        return entryID.toByteString();
        return toValue(entryID);
      }
    });
  }
@@ -122,6 +122,12 @@
    return dnToDNKey(dn, baseDN.size());
  }
  ByteString toValue(final EntryID entryID)
  {
    // TODO JNR do we want to use compacted longs?
    return entryID.toByteString();
  }
  /**
   * Remove a record from the DN tree.
   * @param txn a non null transaction
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -60,12 +60,9 @@
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  /** The limit on the number of entry IDs that may be indexed by one key. */
  private int indexEntryLimit;
  private final State state;
  private final EntryContainer entryContainer;
  private int indexEntryLimit;
  private EntryIDSetCodec codec;
  /**
@@ -124,11 +121,32 @@
          @Override
          public EntryIDSet transform(ByteString key, ByteString value) throws NeverThrowsException
          {
            return codec.decode(key, value);
            return decodeValue(key, value);
          }
        });
  }
  EntryIDSet decodeValue(ByteSequence key, ByteString value)
  {
    return codec.decode(key, value);
  }
  ByteString toValue(EntryID entryID)
  {
    return codec.encode(newDefinedSet(entryID.longValue()));
  }
  ByteString toValue(EntryIDSet entryIDSet)
  {
    return codec.encode(entryIDSet);
  }
  ByteString toValue(ImportIDSet importIDSet)
  {
    return importIDSet.valueToByteString(codec);
  }
  // TODO JNR rename to importUpsert() ?
  @Override
  public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException
  {
@@ -138,14 +156,14 @@
    ByteString value = importer.read(getName(), key);
    if (value != null)
    {
      final EntryIDSet entryIDSet = codec.decode(key, value);
      final EntryIDSet entryIDSet = decodeValue(key, value);
      final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
      importIDSet.merge(idsToBeAdded);
      importer.put(getName(), key, importIDSet.valueToByteString(codec));
      importer.put(getName(), key, toValue(importIDSet));
    }
    else
    {
      importer.put(getName(), key, idsToBeAdded.valueToByteString(codec));
      importer.put(getName(), key, toValue(idsToBeAdded));
    }
  }
@@ -162,7 +180,7 @@
      throw new IllegalStateException("Expected to have a value associated to key " + key + " for index " + getName());
    }
    final EntryIDSet entryIDSet = codec.decode(key, value);
    final EntryIDSet entryIDSet = decodeValue(key, value);
    final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
    importIDSet.remove(idsToBeRemoved);
    if (importIDSet.isDefined() && importIDSet.size() == 0)
@@ -171,7 +189,7 @@
    }
    else
    {
      importer.put(getName(), key, importIDSet.valueToByteString(codec));
      importer.put(getName(), key, toValue(importIDSet));
    }
  }
@@ -218,7 +236,7 @@
        if (oldValue != null)
        {
          EntryIDSet entryIDSet = computeEntryIDSet(key, oldValue.toByteString(), deletedIDs, addedIDs);
          ByteString after = codec.encode(entryIDSet);
          ByteString after = toValue(entryIDSet);
          /*
           * If there are no more IDs then return null indicating that the record should be removed.
           * If index is not trusted then this will cause all subsequent reads for this key to
@@ -234,7 +252,7 @@
          }
          if (isNotEmpty(addedIDs))
          {
            return codec.encode(addedIDs);
            return toValue(addedIDs);
          }
        }
        return null; // no change.
@@ -254,7 +272,7 @@
  private EntryIDSet computeEntryIDSet(ByteString key, ByteString value, EntryIDSet deletedIDs, EntryIDSet addedIDs)
  {
    EntryIDSet entryIDSet = codec.decode(key, value);
    EntryIDSet entryIDSet = decodeValue(key, value);
    if (addedIDs != null)
    {
      if (entryIDSet.isDefined() && indexEntryLimit > 0)
@@ -300,7 +318,7 @@
      ByteString value = txn.read(getName(), key);
      if (value != null)
      {
        return codec.decode(key, value);
        return decodeValue(key, value);
      }
      return trusted ? newDefinedSet() : newUndefinedSet();
    }
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportIDSet.java
@@ -42,6 +42,7 @@
 * the configured ID limit. If the limit it reached, the class stops tracking
 * individual IDs and marks the set as undefined. This class is not thread safe.
 */
@SuppressWarnings("javadoc")
final class ImportIDSet implements Iterable<EntryID> {
  /** The encapsulated entryIDSet where elements are stored until reaching the limit. */
@@ -127,18 +128,23 @@
  boolean merge(ImportIDSet importIdSet)
  {
    checkNotNull(importIdSet, "importIdSet must not be null");
    return merge(importIdSet.entryIDSet);
  }
    boolean definedBeforeMerge = isDefined();
    final long mergedSize = addWithoutOverflow(entryIDSet.size(), importIdSet.entryIDSet.size());
  boolean merge(EntryIDSet entryIDSet)
  {
    checkNotNull(entryIDSet, "entryID must not be null");
    boolean definedBeforeMerge = this.entryIDSet.isDefined();
    final long mergedSize = addWithoutOverflow(this.entryIDSet.size(), entryIDSet.size());
    if (!definedBeforeMerge || !importIdSet.isDefined() || mergedSize > indexEntryLimitSize)
    if (!definedBeforeMerge || !entryIDSet.isDefined() || mergedSize > indexEntryLimitSize)
    {
      entryIDSet = newUndefinedSetWithKey(key);
      this.entryIDSet = newUndefinedSetWithKey(key);
      return definedBeforeMerge;
    }
    else if (isDefined())
    else if (this.entryIDSet.isDefined())
    {
      entryIDSet.addAll(importIdSet.entryIDSet);
      this.entryIDSet.addAll(entryIDSet);
    }
    return false;
  }
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeStorageImporter.java
@@ -28,6 +28,7 @@
import static org.opends.messages.BackendMessages.*;
import static org.opends.server.backends.pluggable.DnKeyFormat.*;
import static org.opends.server.backends.pluggable.SuffixContainer.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.DynamicConstants.*;
import static org.opends.server.util.ServerConstants.*;
@@ -331,6 +332,7 @@
      for (Map.Entry<ByteSequence, Set<ByteSequence>> mapEntry : inMemoryStore.entrySet())
      {
        ByteSequence key = mapEntry.getKey();
        // FIXME JNR merge values before put
        for (ByteSequence value : mapEntry.getValue())
        {
          put(byteBuffer, key);
@@ -404,6 +406,118 @@
    }
  }
  /** A cursor performing the "merge" phase of the on-disk merge. */
  private static final class MergingCursor<K, V> implements Cursor<K, V>
  {
    private final Cursor<K, V> delegate;
    private final MergingConsumer<V> merger;
    private K key;
    private V value;
    private boolean isDefined;
    private MergingCursor(Cursor<K, V> cursor, MergingConsumer<V> merger)
    {
      this.delegate = cursor;
      this.merger = merger;
    }
    @Override
    public boolean next()
    {
      if (key == null)
      {
        if (!delegate.next())
        {
          return isDefined = false;
        }
        key = delegate.getKey();
        accumulateValues();
        return isDefined = true;
      }
      else if (delegate.isDefined())
      {
        // we did yet not consume key/value from the delegate cursor
        key = delegate.getKey();
        accumulateValues();
        return isDefined = true;
      }
      else
      {
        // no more data to compute
        return isDefined = false;
      }
    }
    private void accumulateValues()
    {
      while (delegate.isDefined() && key.equals(delegate.getKey()))
      {
        merger.accept(delegate.getValue());
        delegate.next();
      }
      value = merger.merge();
    }
    @Override
    public boolean isDefined()
    {
      return isDefined;
    }
    @Override
    public K getKey() throws NoSuchElementException
    {
      throwIfNotDefined();
      return key;
    }
    @Override
    public V getValue() throws NoSuchElementException
    {
      throwIfNotDefined();
      return value;
    }
    private void throwIfNotDefined()
    {
      if (!isDefined())
      {
        throw new NoSuchElementException();
      }
    }
    @Override
    public void close()
    {
      delegate.close();
      isDefined = false;
    }
    @Override
    public boolean positionToKey(ByteSequence key)
    {
      return delegate.positionToKey(key);
    }
    @Override
    public boolean positionToKeyOrNext(ByteSequence key)
    {
      return delegate.positionToKeyOrNext(key);
    }
    @Override
    public boolean positionToLastKey()
    {
      return delegate.positionToLastKey();
    }
    @Override
    public boolean positionToIndex(int index)
    {
      return delegate.positionToIndex(index);
    }
  }
  /** A cursor implementation aggregating several cursors and ordering them by their key value. */
  private static final class CompositeCursor<K extends Comparable<? super K>, V> implements Cursor<K, V>
  {
@@ -524,28 +638,24 @@
      return "not defined";
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionToKey(ByteSequence key)
    {
      throw notImplemented();
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionToKeyOrNext(ByteSequence key)
    {
      throw notImplemented();
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionToLastKey()
    {
      throw notImplemented();
    }
    /** {@inheritDoc} */
    @Override
    public boolean positionToIndex(int index)
    {
@@ -1544,18 +1654,209 @@
      @Override
      public Void run(ReadableTransaction txn) throws Exception
      {
        try (Cursor<ByteString, ByteString> cursor = txn.openCursor(treeName))
        try (Cursor<ByteString, ByteString> cursor =
            new MergingCursor<ByteString, ByteString>(txn.openCursor(treeName), getMerger(treeName)))
        {
          while (cursor.next())
          {// FIXME JNR add merge phase
          {
            output.put(treeName, cursor.getKey(), cursor.getValue());
          }
        }
        return null;
      }
      private MergingConsumer<ByteString> getMerger(final TreeName treeName) throws DirectoryException
      {
        EntryContainer entryContainer = rootContainer.getEntryContainer(DN.valueOf(treeName.getBaseDN()));
        final MatchingRuleIndex index = getIndex(entryContainer, treeName);
        if (index != null)
        {
          // key conflicts == merge EntryIDSets
          return new ImportIDSetsMerger(index);
        }
        else if (treeName.getIndexId().equals(DN2ID_INDEX_NAME)
            || treeName.getIndexId().equals(DN2URI_INDEX_NAME)
            || isVLVIndex(entryContainer, treeName))
        {
          // key conflicts == exception
          return new NoMultipleValuesConsumer<>();
        }
        throw new IllegalArgumentException("Unknown tree: " + treeName);
      }
      private boolean isVLVIndex(EntryContainer entryContainer, TreeName treeName)
      {
        for (VLVIndex vlvIndex : entryContainer.getVLVIndexes())
        {
          if (treeName.equals(vlvIndex.getName()))
          {
            return true;
          }
        }
        return false;
      }
      private MatchingRuleIndex getIndex(EntryContainer entryContainer, TreeName treeName)
      {
        for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
        {
          for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
          {
            if (treeName.equals(index.getName()))
            {
              return index;
            }
          }
        }
        return null;
      }
    });
  }
  /**
   * Copies JDK 8 Consumer.
   * <p>
   * FIXME Remove once we move to Java 8.
   *
   * @see java.util.function.Consumer Consumer from JDK 8
   */
  private static interface Consumer<T>
  {
    /**
     * Performs this operation on the given argument.
     *
     * @param t
     *          the input argument
     */
    void accept(T t);
  }
  /**
   * A merging consumer that merges the supplied values.
   * <p>
   * Sample usage:
   *
   * <pre>
   * while (it.hasNext())
   * {
   *   mergingConsumer.accept(it.next());
   * }
   * Object result = mergingConsumer.merge();
   *
   * <pre>
   *
   * @param <T>
   *          the type of the arguments and the returned merged value
   * @see java.util.function.Consumer Consumer from JDK 8
   */
  private static interface MergingConsumer<T> extends Consumer<T>
  {
    /**
     * Merges the arguments provided via {@link Consumer#accept(Object)}.
     *
     * @return the merged value
     */
     T merge();
  }
  /** {@link MergingConsumer} that throws an exception when given several values to accept. */
  private static final class NoMultipleValuesConsumer<V> implements MergingConsumer<V>
  {
    private V first;
    private boolean moreThanOne;
    @Override
    public void accept(V value)
    {
      if (first == null)
      {
        this.first = value;
      }
      else
      {
        moreThanOne = true;
      }
    }
    @Override
    public V merge()
    {
      final boolean mustThrow = moreThanOne;
      // clean up state
      first = null;
      moreThanOne = false;
      if (mustThrow)
      {
        throw new IllegalArgumentException();
      }
      return first;
    }
  }
  /**
   * {@link MergingConsumer} that accepts {@link ByteSequence} objects
   * and produces a {@link ByteSequence} representing the merged {@link ImportIDSet}.
   */
  private static final class ImportIDSetsMerger implements MergingConsumer<ByteString>
  {
    private final MatchingRuleIndex index;
    private final Set<ByteString> values = new HashSet<>();
    private boolean aboveIndexEntryLimit;
    private ImportIDSetsMerger(MatchingRuleIndex index)
    {
      this.index = index;
    }
    @Override
    public void accept(ByteString value)
    {
      if (!aboveIndexEntryLimit)
      {
        if (values.size() < index.getIndexEntryLimit())
        {
          values.add(value);
        }
        else
        {
          aboveIndexEntryLimit = true;
        }
      }
    }
    @Override
    public ByteString merge()
    {
      try
      {
        if (aboveIndexEntryLimit)
        {
          return index.toValue(EntryIDSet.newUndefinedSet());
        }
        else if (values.size() == 1)
        {
          return values.iterator().next();
        }
        ImportIDSet idSet = new ImportIDSet(ByteString.empty(), EntryIDSet.newDefinedSet(), index.getIndexEntryLimit());
        for (ByteString value : values)
        {
          // FIXME JNR Can we make this more efficient?
          // go through long[] + sort in the end?
          idSet.merge(index.decodeValue(ByteString.empty(), value));
        }
        return index.toValue(idSet);
      }
      finally
      {
        // reset state
        aboveIndexEntryLimit = false;
        values.clear();
      }
    }
  }
  /** Task used to migrate excluded branch. */
  private final class MigrateExcludedTask extends ImportTask
  {
@@ -1828,7 +2129,7 @@
    void processDN2ID(Suffix suffix, DN dn, EntryID entryID)
    {
      DN2ID dn2id = suffix.getDN2ID();
      importer.put(dn2id.getName(), dn2id.toKey(dn), entryID.toByteString());
      importer.put(dn2id.getName(), dn2id.toKey(dn), dn2id.toValue(entryID));
    }
    private void processDN2URI(Suffix suffix, Entry entry)
@@ -1845,7 +2146,6 @@
    void processIndexes(Suffix suffix, Entry entry, EntryID entryID)
        throws StorageRuntimeException, InterruptedException
    {
      final ByteString value = entryID.toByteString();
      for (Map.Entry<AttributeType, AttributeIndex> mapEntry : suffix.getAttrIndexMap().entrySet())
      {
        final AttributeType attrType = mapEntry.getKey();
@@ -1854,9 +2154,14 @@
        {
          for (MatchingRuleIndex index : attrIndex.getNameToIndexes().values())
          {
            for (ByteString key : index.indexEntry(entry))
            final Set<ByteString> keys = index.indexEntry(entry);
            if (!keys.isEmpty())
            {
              importer.put(index.getName(), key, value);
              final ByteString value = index.toValue(entryID);
              for (ByteString key : keys)
              {
                importer.put(index.getName(), key, value);
              }
            }
          }
        }
@@ -1868,7 +2173,7 @@
      for (VLVIndex vlvIndex : suffix.getEntryContainer().getVLVIndexes())
      {
        ByteString key = vlvIndex.toKey(entry, entryID);
        importer.put(vlvIndex.getName(), key, ByteString.empty());
        importer.put(vlvIndex.getName(), key, vlvIndex.toValue());
      }
    }
  }
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
@@ -357,6 +357,11 @@
    return encodeVLVKey(entry, entryID.longValue());
  }
  ByteString toValue()
  {
    return ByteString.empty();
  }
  private boolean shouldInclude(final Entry entry) throws DirectoryException
  {
    return entry.getName().matchesBaseAndScope(baseDN, scope) && filter.matchesEntry(entry);
@@ -437,7 +442,7 @@
    {
      if (nextDeletedKey == null || (nextAddedKey != null && nextAddedKey.compareTo(nextDeletedKey) < 0))
      {
        txn.put(getName(), nextAddedKey, ByteString.empty());
        txn.put(getName(), nextAddedKey, toValue());
        nextAddedKey = nextOrNull(ai);
        count.incrementAndGet();
      }