OPENDJ-2393: Possible index corruption
In case of operation cancellation/exception, transactions are rolled
back and with them the potential update of indexes' trusted state.
That's why in case of transaction rollback we persist the indexes
trusted state using another transaction.
The number of keys contained in a VLV index is cached. This number can
becomes incorrect when transaction updating index keys is aborted. The
number of keys is now persisted using a sharded counted.
Incorrect PDB RollbackException handling could have cause operations
to abort rather than being retried.
Refactoring: fix comments, removed dead code (obsoleted by id2children &
id2subtree removal)
Use specific compact encoding for shared-counter values.
1 files deleted
3 files added
1 files renamed
15 files modified
| New file |
| | |
| | | # Generated by org.codehaus.mojo.license.AddThirdPartyMojo |
| | | #------------------------------------------------------------------------------- |
| | | # Already used licenses in project : |
| | | # - Apache Software License, Version 2.0 |
| | | # - BSD |
| | | # - CC BY-NC-ND 3.0 |
| | | # - Common Development and Distribution License 1.0 |
| | | # - Common Development and Distribution License 1.1 |
| | | # - Dual licensed (CDDL and GPL) |
| | | # - Dual licensed (CDDL and GPLv2+CE) |
| | | # - The GNU Lesser General Public License, version 2.0 with Classpath Exception |
| | | # - The GNU Lesser General Public License, version 2.1 |
| | | # - The GNU Lesser General Public License, version 3.0 |
| | | # - The MIT License |
| | | # - The Sleepycat License |
| | | #------------------------------------------------------------------------------- |
| | | # Please fill the missing licenses for dependencies : |
| | | # |
| | | # |
| | | #Tue Nov 17 10:59:10 CET 2015 |
| | | com.sleepycat--je--5.0.104=The Sleepycat License |
| | |
| | | { |
| | | exchange.remove(); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | bytesToValue(ex.getValue(), value); |
| | | ex.store(); |
| | | } |
| | | catch (final Exception e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | bytesToKey(ex.getKey(), key); |
| | | return ex.remove(); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | ex = getExchangeFromCache(treeName); |
| | | ex.removeTree(); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | */ |
| | | return new CursorImpl(getNewExchange(treeName, false)); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | { |
| | | getExchangeFromCache(treeName); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | ex.fetch(); |
| | | return valueToBytes(ex.getValue()); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | } |
| | | return false; |
| | | } |
| | | catch (final Exception e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | put(treeName, dummyKey, ByteString.empty()); |
| | | delete(treeName, dummyKey); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | { |
| | | // ignore missing trees. |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | catch(final InUseException e) { |
| | | throw new StorageInUseException(e); |
| | | } |
| | | catch (final PersistitException e) |
| | | catch (final PersistitException | RollbackException e) |
| | | { |
| | | throw new StorageRuntimeException(e); |
| | | } |
| | |
| | | public final void open(WriteableTransaction txn, boolean createOnDemand) throws StorageRuntimeException |
| | | { |
| | | txn.openTree(name, createOnDemand); |
| | | open0(txn); |
| | | afterOpen(txn); |
| | | } |
| | | |
| | | /** Override in order to perform any additional initialization after the index has opened. */ |
| | | void open0(WriteableTransaction txn) throws StorageRuntimeException |
| | | void afterOpen(WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | // Do nothing by default. |
| | | } |
| | |
| | | @Override |
| | | public final void delete(WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | beforeDelete(txn); |
| | | txn.deleteTree(name); |
| | | } |
| | | |
| | | /** Override in order to perform any additional operation before index tree deletion. */ |
| | | void beforeDelete(WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | // Do nothing by default. |
| | | } |
| | | |
| | | @Override |
| | | public final long getRecordCount(ReadableTransaction txn) throws StorageRuntimeException |
| | | { |
| | |
| | | import java.util.NoSuchElementException; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.Functions; |
| | | import org.forgerock.util.Function; |
| | | import org.forgerock.util.promise.NeverThrowsException; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | |
| | | VO transform(KI key, VI value) throws E; |
| | | } |
| | | |
| | | private static final Function<Object, Object, NeverThrowsException> NO_TRANSFORM = |
| | | new Function<Object, Object, NeverThrowsException>() |
| | | private final static ValueTransformer<?, ?, ?, NeverThrowsException> KEEP_VALUES_UNCHANGED = |
| | | new ValueTransformer<Object, Object, Object, NeverThrowsException>() |
| | | { |
| | | @Override |
| | | public Object apply(Object value) throws NeverThrowsException |
| | | public Object transform(Object key, Object value) throws NeverThrowsException |
| | | { |
| | | return value; |
| | | } |
| | |
| | | return new CursorTransformer<>(new SequentialCursorAdapter<>(input), keyTransformer, valueTransformer); |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | static <KI, VI, VO> Cursor<KI, VO> transformValues(Cursor<KI, VI> input, |
| | | ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer) |
| | | { |
| | | return transformKeysAndValues(input, (Function<KI, KI, NeverThrowsException>) NO_TRANSFORM, valueTransformer); |
| | | return transformKeysAndValues(input, Functions.<KI>identityFunction(), valueTransformer); |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | static <KI, VI, VO> Cursor<KI, VO> transformValues(SequentialCursor<KI, VI> input, |
| | | ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer) |
| | | { |
| | | // SequentialCursorAdapter constructor never throws |
| | | return transformKeysAndValues(new SequentialCursorAdapter<>(input), |
| | | (Function<KI, KI, NeverThrowsException>) NO_TRANSFORM, valueTransformer); |
| | | return transformKeysAndValues(new SequentialCursorAdapter<>(input), Functions.<KI> identityFunction(), |
| | | valueTransformer); |
| | | } |
| | | |
| | | @SuppressWarnings("unchecked") |
| | | static <K, V> ValueTransformer<K, V, V, NeverThrowsException> keepValuesUnchanged() |
| | | { |
| | | return (ValueTransformer<K, V, V, NeverThrowsException>) KEEP_VALUES_UNCHANGED; |
| | | } |
| | | |
| | | static <K, VI, VO> ValueTransformer<K, VI, VO, NeverThrowsException> constant(final VO constant) |
| | | { |
| | | return new ValueTransformer<K, VI, VO, NeverThrowsException>() |
| | | { |
| | | @Override |
| | | public VO transform(K key, VI value) throws NeverThrowsException |
| | | { |
| | | return constant; |
| | | } |
| | | }; |
| | | } |
| | | |
| | | private CursorTransformer(Cursor<KI, VI> input, Function<KI, KO, ? extends Exception> keyTransformer, |
| | |
| | | * A flag to indicate if this index should be trusted to be consistent with the entries tree. |
| | | * If not trusted, we assume that existing entryIDSets for a key is still accurate. However, keys |
| | | * that do not exist are undefined instead of an empty entryIDSet. The following rules will be |
| | | * observed when the index is not trusted: - no entryIDs will be added to a non-existing key. - |
| | | * undefined entryIdSet will be returned whenever a key is not found. |
| | | * observed when the index is not trusted: |
| | | * <ul> |
| | | * <li>no entryIDs will be added to a non-existing key.</li> |
| | | * <li>undefined entryIdSet will be returned whenever a key is not found.</li> |
| | | * </ul> |
| | | */ |
| | | private volatile boolean trusted; |
| | | |
| | |
| | | } |
| | | |
| | | @Override |
| | | final void open0(WriteableTransaction txn) |
| | | final void afterOpen(WriteableTransaction txn) |
| | | { |
| | | final EnumSet<IndexFlag> flags = state.getIndexFlags(txn, getName()); |
| | | codec = flags.contains(COMPACTED) ? CODEC_V2 : CODEC_V1; |
| | |
| | | public final void update(final WriteableTransaction txn, final ByteString key, final EntryIDSet deletedIDs, |
| | | final EntryIDSet addedIDs) throws StorageRuntimeException |
| | | { |
| | | /* |
| | | * Check the special condition where both deletedIDs and addedIDs are null. This is used when |
| | | * deleting entries must be completely removed. |
| | | */ |
| | | if (deletedIDs == null && addedIDs == null) |
| | | { |
| | | boolean success = txn.delete(getName(), key); |
| | | if (!success && logger.isTraceEnabled()) |
| | | { |
| | | logger.trace("The expected key does not exist in the index %s.\nKey:%s ", |
| | | getName(), key.toHexPlusAsciiString(4)); |
| | | } |
| | | return; |
| | | } |
| | | |
| | | // Handle cases where nothing is changed early to avoid DB access. |
| | | if (isNullOrEmpty(deletedIDs) && isNullOrEmpty(addedIDs)) |
| | | { |
| | |
| | | /** The entry tree maps an entry ID (8 bytes) to a complete encoded entry. */ |
| | | private ID2Entry id2entry; |
| | | /** Store the number of children for each entry. */ |
| | | private final ID2Count id2childrenCount; |
| | | private final ID2ChildrenCount id2childrenCount; |
| | | /** The referral tree maps a normalized DN string to labeled URIs. */ |
| | | private final DN2URI dn2uri; |
| | | /** The state tree maps a config DN to config entries. */ |
| | |
| | | this.storage = storage; |
| | | this.rootContainer = rootContainer; |
| | | this.treePrefix = baseDN.toNormalizedUrlSafeString(); |
| | | this.id2childrenCount = new ID2Count(getIndexName(ID2CHILDREN_COUNT_TREE_NAME)); |
| | | this.id2childrenCount = new ID2ChildrenCount(getIndexName(ID2CHILDREN_COUNT_TREE_NAME)); |
| | | this.dn2id = new DN2ID(getIndexName(DN2ID_TREE_NAME), baseDN); |
| | | this.dn2uri = new DN2URI(getIndexName(REFERRAL_TREE_NAME), this); |
| | | this.state = new State(getIndexName(STATE_TREE_NAME)); |
| | |
| | | * |
| | | * @return The children tree. |
| | | */ |
| | | ID2Count getID2ChildrenCount() |
| | | ID2ChildrenCount getID2ChildrenCount() |
| | | { |
| | | return id2childrenCount; |
| | | } |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | // No need to call indexBuffer.reset() since IndexBuffer content will be the same for each retry attempt. |
| | | try |
| | | { |
| | | // Check whether the entry already exists. |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | writeTrustState(indexBuffer); |
| | | throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class); |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | private void writeTrustState(final IndexBuffer indexBuffer) |
| | | { |
| | | // Transaction modifying the index has been rolled back. |
| | | // Ensure that the index trusted state is persisted. |
| | | try |
| | | { |
| | | storage.write(new WriteOperation() |
| | | { |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | indexBuffer.writeTrustState(txn); |
| | | } |
| | | }); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | // Cannot throw because this method is used in a catch block and we do not want to hide the real exception. |
| | | logger.traceException(e); |
| | | } |
| | | } |
| | | |
| | | void importEntry(WriteableTransaction txn, EntryID entryID, Entry entry) throws DirectoryException, |
| | | StorageRuntimeException |
| | | { |
| | |
| | | void deleteEntry(final DN entryDN, final DeleteOperation deleteOperation) |
| | | throws DirectoryException, StorageRuntimeException, CanceledOperationException |
| | | { |
| | | final IndexBuffer indexBuffer = new IndexBuffer(); |
| | | try |
| | | { |
| | | storage.write(new WriteOperation() |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | indexBuffer.reset(); |
| | | try |
| | | { |
| | | // Check for referral entries above the target entry. |
| | |
| | | |
| | | // Now update id2entry, dn2uri, and id2childrenCount in key order. |
| | | id2childrenCount.updateCount(txn, parentID, -1); |
| | | final IndexBuffer indexBuffer = new IndexBuffer(); |
| | | final EntryCache<?> entryCache = DirectoryServer.getEntryCache(); |
| | | boolean isBaseEntry = true; |
| | | try (final Cursor<EntryID, Entry> cursor = id2entry.openCursor(txn)) |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | writeTrustState(indexBuffer); |
| | | throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class); |
| | | } |
| | | } |
| | |
| | | void replaceEntry(final Entry oldEntry, final Entry newEntry, final ModifyOperation modifyOperation) |
| | | throws StorageRuntimeException, DirectoryException, CanceledOperationException |
| | | { |
| | | final IndexBuffer indexBuffer = new IndexBuffer(); |
| | | try |
| | | { |
| | | storage.write(new WriteOperation() |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | indexBuffer.reset(); |
| | | try |
| | | { |
| | | EntryID entryID = dn2id.get(txn, newEntry.getName()); |
| | |
| | | dn2uri.replaceEntry(txn, oldEntry, newEntry); |
| | | } |
| | | |
| | | |
| | | // Update the indexes. |
| | | final IndexBuffer indexBuffer = new IndexBuffer(); |
| | | if (modifyOperation != null) |
| | | { |
| | | // In this case we know from the operation what the modifications were. |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | writeTrustState(indexBuffer); |
| | | throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class); |
| | | } |
| | | } |
| | |
| | | void renameEntry(final DN oldTargetDN, final Entry newTargetEntry, final ModifyDNOperation modifyDNOperation) |
| | | throws StorageRuntimeException, DirectoryException, CanceledOperationException |
| | | { |
| | | final IndexBuffer indexBuffer = new IndexBuffer(); |
| | | try |
| | | { |
| | | storage.write(new WriteOperation() |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | indexBuffer.reset(); |
| | | try |
| | | { |
| | | // Validate the request. |
| | |
| | | id2childrenCount.updateCount(txn, oldSuperiorID, -1); |
| | | id2childrenCount.updateCount(txn, newSuperiorID, 1); |
| | | } |
| | | final IndexBuffer indexBuffer = new IndexBuffer(); |
| | | boolean isBaseEntry = true; |
| | | try (final Cursor<EntryID, Entry> cursor = id2entry.openCursor(txn)) |
| | | { |
| | |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | writeTrustState(indexBuffer); |
| | | throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class); |
| | | } |
| | | } |
| | |
| | | private static int getEstimatedSize(EntryIDSet idSet) |
| | | { |
| | | checkNotNull(idSet, "idSet must not be null"); |
| | | return idSet.getIDs().length * LONG_SIZE + INT_SIZE; |
| | | return idSet.getIDs().length * ByteStringBuilder.MAX_COMPACT_SIZE + INT_SIZE; |
| | | } |
| | | |
| | | private static long[] decodeRaw(ByteSequenceReader reader, int nbEntriesToDecode) |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2015 ForgeRock AS |
| | | */ |
| | | package org.opends.server.backends.pluggable; |
| | | |
| | | import static org.opends.server.backends.pluggable.CursorTransformer.*; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteSequenceReader; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.forgerock.util.Function; |
| | | import org.forgerock.util.Reject; |
| | | import org.forgerock.util.promise.NeverThrowsException; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.SequentialCursor; |
| | | import org.opends.server.backends.pluggable.spi.TreeName; |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | |
| | | import com.forgerock.opendj.util.PackedLong; |
| | | |
| | | /** Maintain counters reflecting the total number of entries and the number of immediate children for each entry. */ |
| | | final class ID2ChildrenCount extends AbstractTree |
| | | { |
| | | private static final EntryID TOTAL_COUNT_ENTRY_ID = new EntryID(PackedLong.COMPACTED_MAX_VALUE); |
| | | |
| | | private static final Function<ByteString, EntryID, NeverThrowsException> TO_ENTRY_ID = |
| | | new Function<ByteString, EntryID, NeverThrowsException>() |
| | | { |
| | | @Override |
| | | public EntryID apply(ByteString value) throws NeverThrowsException |
| | | { |
| | | return new EntryID(value.asReader().readCompactUnsignedLong()); |
| | | } |
| | | }; |
| | | |
| | | private final ShardedCounter counter; |
| | | |
| | | ID2ChildrenCount(TreeName name) |
| | | { |
| | | super(name); |
| | | this.counter = new ShardedCounter(name); |
| | | } |
| | | |
| | | SequentialCursor<EntryID, Void> openCursor(ReadableTransaction txn) |
| | | { |
| | | return transformKeysAndValues(counter.openCursor(txn), |
| | | TO_ENTRY_ID, CursorTransformer.<ByteString, Void> keepValuesUnchanged()); |
| | | } |
| | | |
| | | /** |
| | | * Updates the number of children for a given entry without updating the total number of entries. |
| | | * <p> |
| | | * Implementation note: this method accepts a {@code null} entryID in order to eliminate null checks in client code. |
| | | * In particular, client code has to deal with the special case where a target entry does not have a parent because |
| | | * the target entry is a base entry within the backend. |
| | | * |
| | | * @param txn storage transaction |
| | | * @param entryID The entryID identifying to the counter, which may be |
| | | * {@code null} in which case calling this method has no effect. |
| | | * @param delta The value to add. Can be negative to decrease counter value. |
| | | */ |
| | | void updateCount(final WriteableTransaction txn, final EntryID entryID, final long delta) { |
| | | if (entryID != null) |
| | | { |
| | | addToCounter(txn, entryID, delta); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Updates the total number of entries which should be the sum of all counters. |
| | | * @param txn storage transaction |
| | | * @param delta The value to add. Can be negative to decrease counter value. |
| | | */ |
| | | void updateTotalCount(final WriteableTransaction txn, final long delta) { |
| | | addToCounter(txn, TOTAL_COUNT_ENTRY_ID, delta); |
| | | } |
| | | |
| | | private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta) |
| | | { |
| | | counter.addCount(txn, toKey(entryID), delta); |
| | | } |
| | | |
| | | void importPut(Importer importer, EntryID entryID, long total) |
| | | { |
| | | Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow."); |
| | | importPut0(importer, entryID, total); |
| | | } |
| | | |
| | | void importPutTotalCount(Importer importer, long total) |
| | | { |
| | | importPut0(importer, TOTAL_COUNT_ENTRY_ID, total); |
| | | } |
| | | |
| | | private void importPut0(Importer importer, EntryID entryID, final long delta) |
| | | { |
| | | counter.importPut(importer, toKey(entryID), delta); |
| | | } |
| | | |
| | | @Override |
| | | public String keyToString(ByteString key) |
| | | { |
| | | ByteSequenceReader keyReader = key.asReader(); |
| | | long keyID = keyReader.readCompactUnsignedLong(); |
| | | long shardBucket = keyReader.readByte(); |
| | | return (keyID == TOTAL_COUNT_ENTRY_ID.longValue() ? "Total Children Count" : keyID) + "#" + shardBucket; |
| | | } |
| | | |
| | | @Override |
| | | public String valueToString(ByteString value) |
| | | { |
| | | return counter.valueToString(value); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString generateKey(String data) |
| | | { |
| | | return new EntryID(Long.parseLong(data)).toByteString(); |
| | | } |
| | | |
| | | /** |
| | | * Get the number of children for the given entry. |
| | | * @param txn storage transaction |
| | | * @param entryID The entryID identifying to the counter |
| | | * @return Value of the counter. 0 if no counter is associated yet. |
| | | */ |
| | | long getCount(ReadableTransaction txn, EntryID entryID) |
| | | { |
| | | return counter.getCount(txn, toKey(entryID)); |
| | | } |
| | | |
| | | /** |
| | | * Get the total number of entries. |
| | | * @param txn storage transaction |
| | | * @return Sum of all the counter contained in this tree |
| | | */ |
| | | long getTotalCount(ReadableTransaction txn) |
| | | { |
| | | return getCount(txn, TOTAL_COUNT_ENTRY_ID); |
| | | } |
| | | |
| | | /** |
| | | * Removes the counter associated to the given entry, but does not update the total count. |
| | | * @param txn storage transaction |
| | | * @param entryID The entryID identifying the counter |
| | | * @return Value of the counter before it's deletion. |
| | | */ |
| | | long removeCount(final WriteableTransaction txn, final EntryID entryID) { |
| | | return counter.removeCount(txn, toKey(entryID)); |
| | | } |
| | | |
| | | private static ByteSequence toKey(EntryID entryID) |
| | | { |
| | | return new ByteStringBuilder(ByteStringBuilder.MAX_COMPACT_SIZE).appendCompactUnsigned(entryID.longValue()); |
| | | } |
| | | |
| | | static Collector<Long, ByteString> getSumLongCollectorInstance() |
| | | { |
| | | return ShardedCounterCollector.INSTANCE; |
| | | } |
| | | |
| | | /** |
| | | * {@link Collector} that accepts sharded-counter values encoded into {@link ByteString} objects and produces a |
| | | * {@link ByteString} representing the sum of the sharded-counter values. |
| | | */ |
| | | private static final class ShardedCounterCollector implements Collector<Long, ByteString> |
| | | { |
| | | private static final Collector<Long, ByteString> INSTANCE = new ShardedCounterCollector(); |
| | | |
| | | @Override |
| | | public Long get() |
| | | { |
| | | return 0L; |
| | | } |
| | | |
| | | @Override |
| | | public Long accept(Long resultContainer, ByteString value) |
| | | { |
| | | return resultContainer + ShardedCounter.decodeValue(value); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString merge(Long resultContainer) |
| | | { |
| | | return ShardedCounter.encodeValue(resultContainer); |
| | | } |
| | | } |
| | | } |
| | |
| | | } |
| | | |
| | | @Override |
| | | void open0(WriteableTransaction txn) throws StorageRuntimeException |
| | | void afterOpen(WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | // Make sure the tree is there and readable, even if the storage is READ_ONLY. |
| | | // Would be nice if there were a better way... |
| | |
| | | { |
| | | void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException; |
| | | |
| | | void writeTrustState(WriteableTransaction txn) throws StorageRuntimeException; |
| | | |
| | | void put(Index index, ByteString key, EntryID entryID); |
| | | |
| | | void put(VLVIndex index, ByteString sortKey); |
| | |
| | | void remove(VLVIndex index, ByteString sortKey); |
| | | |
| | | void remove(Index index, ByteString key, EntryID entryID); |
| | | |
| | | void reset(); |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void writeTrustState(WriteableTransaction txn) |
| | | { |
| | | // Indexes cache the index trust flag. Ensure that the cached value is written into the db. |
| | | for (Index index : bufferedIndexes.keySet()) |
| | | { |
| | | index.setTrusted(txn, index.isTrusted()); |
| | | } |
| | | |
| | | for (VLVIndex index : bufferedVLVIndexes.keySet()) |
| | | { |
| | | index.setTrusted(txn, index.isTrusted()); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void put(Index index, ByteString key, EntryID entryID) |
| | | { |
| | | createOrGetBufferedIndexValues(index, key).addEntryID(entryID); |
| | |
| | | index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public void reset() |
| | | { |
| | | bufferedIndexes.clear(); |
| | | bufferedVLVIndexes.clear(); |
| | | } |
| | | } |
| | | |
| | | /** |
| | |
| | | } |
| | | |
| | | @Override |
| | | public void writeTrustState(WriteableTransaction txn) |
| | | { |
| | | // Nothing to do |
| | | } |
| | | |
| | | @Override |
| | | public void remove(VLVIndex index, ByteString sortKey) |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | | } |
| | | |
| | | @Override |
| | | public void reset() |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | | } |
| | | } |
| | | |
| | | private final IndexBufferImplementor impl; |
| | |
| | | impl.flush(txn); |
| | | } |
| | | |
| | | /** |
| | | * Indexes might cache their trust state. This ensure that the cached state is persisted into the database. |
| | | * |
| | | * @param txn |
| | | * a non null transaction |
| | | * @throws StorageRuntimeException |
| | | * If an error occurs in the storage. |
| | | */ |
| | | void writeTrustState(WriteableTransaction txn) |
| | | { |
| | | impl.writeTrustState(txn); |
| | | } |
| | | |
| | | void put(Index index, ByteString key, EntryID entryID) |
| | | { |
| | | impl.put(index, key, entryID); |
| | |
| | | { |
| | | impl.remove(index, key, entryID); |
| | | } |
| | | |
| | | void reset() |
| | | { |
| | | impl.reset(); |
| | | } |
| | | } |
| | |
| | | PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported) |
| | | { |
| | | final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN()); |
| | | final ID2Count id2count = entryContainer.getID2ChildrenCount(); |
| | | final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount(); |
| | | |
| | | return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), chunk, |
| | | id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported); |
| | |
| | | |
| | | /** |
| | | * This task optionally copy the dn2id chunk into the database and takes advantages of it's cursoring to compute the |
| | | * {@link ID2Count} index. |
| | | * {@link ID2ChildrenCount} index. |
| | | */ |
| | | private static final class DN2IDImporterTask implements Callable<Void> |
| | | { |
| | |
| | | private final File tempDir; |
| | | private final BufferPool bufferPool; |
| | | private final DN2ID dn2id; |
| | | private final ID2Count id2count; |
| | | private final ID2ChildrenCount id2count; |
| | | private final Collector<?, ByteString> id2countCollector; |
| | | private final Chunk dn2IdSourceChunk; |
| | | private final Chunk dn2IdDestination; |
| | | |
| | | DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool, |
| | | DN2ID dn2id, Chunk dn2IdChunk, ID2Count id2count, Collector<?, ByteString> id2countCollector, |
| | | DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, Collector<?, ByteString> id2countCollector, |
| | | boolean dn2idAlreadyImported) |
| | | { |
| | | this.reporter = progressReporter; |
| | |
| | | else if (isID2ChildrenCount(treeName)) |
| | | { |
| | | // key conflicts == sum values |
| | | return new AddLongCollector(entryContainer.getID2ChildrenCount()); |
| | | return ID2ChildrenCount.getSumLongCollectorInstance(); |
| | | } |
| | | else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName)) |
| | | { |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * {@link Collector} that accepts {@code long} values encoded into {@link ByteString} objects and produces a |
| | | * {@link ByteString} representing the sum of the supplied {@code long}s. |
| | | */ |
| | | static final class AddLongCollector implements Collector<Long, ByteString> |
| | | { |
| | | private final ID2Count id2count; |
| | | |
| | | AddLongCollector(ID2Count id2count) |
| | | { |
| | | this.id2count = id2count; |
| | | } |
| | | |
| | | @Override |
| | | public Long get() |
| | | { |
| | | return 0L; |
| | | } |
| | | |
| | | @Override |
| | | public Long accept(Long resultContainer, ByteString value) |
| | | { |
| | | return resultContainer + id2count.fromValue(value); |
| | | } |
| | | |
| | | @Override |
| | | public ByteString merge(Long resultContainer) |
| | | { |
| | | return id2count.toValue(resultContainer); |
| | | } |
| | | } |
| | | |
| | | private static MeteredCursor<ByteString, ByteString> asProgressCursor( |
| | | SequentialCursor<ByteString, ByteString> delegate, String metricName, long totalSize) |
| | | { |
| New file |
| | |
| | | /* |
| | | * CDDL HEADER START |
| | | * |
| | | * The contents of this file are subject to the terms of the |
| | | * Common Development and Distribution License, Version 1.0 only |
| | | * (the "License"). You may not use this file except in compliance |
| | | * with the License. |
| | | * |
| | | * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt |
| | | * or http://forgerock.org/license/CDDLv1.0.html. |
| | | * See the License for the specific language governing permissions |
| | | * and limitations under the License. |
| | | * |
| | | * When distributing Covered Code, include this CDDL HEADER in each |
| | | * file and include the License file at legal-notices/CDDLv1_0.txt. |
| | | * If applicable, add the following below this CDDL HEADER, with the |
| | | * fields enclosed by brackets "[]" replaced with your own identifying |
| | | * information: |
| | | * Portions Copyright [yyyy] [name of copyright owner] |
| | | * |
| | | * CDDL HEADER END |
| | | * |
| | | * Copyright 2015 ForgeRock AS |
| | | */ |
| | | package org.opends.server.backends.pluggable; |
| | | |
| | | import static org.opends.server.backends.pluggable.CursorTransformer.*; |
| | | |
| | | import java.util.NoSuchElementException; |
| | | |
| | | import org.forgerock.opendj.ldap.ByteSequence; |
| | | import org.forgerock.opendj.ldap.ByteString; |
| | | import org.forgerock.opendj.ldap.ByteStringBuilder; |
| | | import org.forgerock.util.Function; |
| | | import org.forgerock.util.promise.NeverThrowsException; |
| | | import org.opends.server.backends.pluggable.CursorTransformer.ValueTransformer; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.SequentialCursorDecorator; |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.Importer; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.SequentialCursor; |
| | | import org.opends.server.backends.pluggable.spi.TreeName; |
| | | import org.opends.server.backends.pluggable.spi.UpdateFunction; |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | |
| | | /** |
| | | * Store counters associated to a key. Counter value is sharded amongst multiple keys to allow concurrent update without |
| | | * contention (at the price of a slower read). |
| | | */ |
| | | final class ShardedCounter extends AbstractTree |
| | | { |
| | | /** |
| | | * Must be a power of 2 |
| | | * @see <a href="http://en.wikipedia.org/wiki/Modulo_operation#Performance_issues">Performance issues</a> |
| | | */ |
| | | private static final long SHARD_COUNT = 256; |
| | | |
| | | private static final ValueTransformer<ByteString, ByteString, Long, NeverThrowsException> TO_LONG = |
| | | new ValueTransformer<ByteString, ByteString, Long, NeverThrowsException>() |
| | | { |
| | | @Override |
| | | public Long transform(ByteString key, ByteString value) |
| | | { |
| | | return decodeValue(value); |
| | | } |
| | | }; |
| | | |
| | | private static final Function<ByteString, ByteString, NeverThrowsException> TO_KEY = |
| | | new Function<ByteString, ByteString, NeverThrowsException>() |
| | | { |
| | | @Override |
| | | public ByteString apply(ByteString shardedKey) |
| | | { |
| | | // -1 to remove the shard id. |
| | | return shardedKey.subSequence(0, shardedKey.length() - 1); |
| | | } |
| | | }; |
| | | |
| | | ShardedCounter(TreeName name) |
| | | { |
| | | super(name); |
| | | } |
| | | |
| | | SequentialCursor<ByteString, Void> openCursor(ReadableTransaction txn) |
| | | { |
| | | return new UniqueKeysCursor<>(transformKeysAndValues( |
| | | txn.openCursor(getName()), TO_KEY, |
| | | CursorTransformer.<ByteString, ByteString, Void> constant(null))); |
| | | } |
| | | |
| | | private Cursor<ByteString, Long> openCursor0(ReadableTransaction txn) |
| | | { |
| | | return transformKeysAndValues(txn.openCursor(getName()), TO_KEY, TO_LONG); |
| | | } |
| | | |
| | | void addCount(final WriteableTransaction txn, ByteSequence key, final long delta) |
| | | { |
| | | txn.update(getName(), getShardedKey(key), new UpdateFunction() |
| | | { |
| | | @Override |
| | | public ByteSequence computeNewValue(ByteSequence oldValue) |
| | | { |
| | | final long currentValue = oldValue == null ? 0 : decodeValue(oldValue.toByteString()); |
| | | return encodeValue(currentValue + delta); |
| | | } |
| | | }); |
| | | } |
| | | |
| | | void importPut(Importer importer, ByteSequence key, long delta) |
| | | { |
| | | if (delta != 0) |
| | | { |
| | | importer.put(getName(), getShardedKey(key), encodeValue(delta)); |
| | | } |
| | | } |
| | | |
| | | long getCount(final ReadableTransaction txn, ByteSequence key) |
| | | { |
| | | long counterValue = 0; |
| | | try (final SequentialCursor<ByteString, Long> cursor = new ShardCursor(openCursor0(txn), key)) |
| | | { |
| | | while (cursor.next()) |
| | | { |
| | | counterValue += cursor.getValue(); |
| | | } |
| | | } |
| | | return counterValue; |
| | | } |
| | | |
| | | long removeCount(final WriteableTransaction txn, ByteSequence key) |
| | | { |
| | | long counterValue = 0; |
| | | try (final SequentialCursor<ByteString, Long> cursor = new ShardCursor(openCursor0(txn), key)) |
| | | { |
| | | // Iterate over and remove all the thread local shards |
| | | while (cursor.next()) |
| | | { |
| | | counterValue += cursor.getValue(); |
| | | cursor.delete(); |
| | | } |
| | | } |
| | | return counterValue; |
| | | } |
| | | |
| | | static long decodeValue(ByteString value) |
| | | { |
| | | switch (value.length()) |
| | | { |
| | | case 1: |
| | | return value.byteAt(0); |
| | | case (Integer.SIZE / Byte.SIZE): |
| | | return value.toInt(); |
| | | case (Long.SIZE / Byte.SIZE): |
| | | return value.toLong(); |
| | | default: |
| | | throw new IllegalArgumentException("Unsupported sharded-counter value format."); |
| | | } |
| | | } |
| | | |
| | | static ByteString encodeValue(long value) |
| | | { |
| | | final byte valueAsByte = (byte) value; |
| | | if (valueAsByte == value) |
| | | { |
| | | return ByteString.wrap(new byte[] { valueAsByte }); |
| | | } |
| | | final int valueAsInt = (int) value; |
| | | if (valueAsInt == value) |
| | | { |
| | | return ByteString.valueOfInt(valueAsInt); |
| | | } |
| | | return ByteString.valueOfLong(value); |
| | | } |
| | | |
| | | @Override |
| | | public String valueToString(ByteString value) |
| | | { |
| | | return String.valueOf(decodeValue(value)); |
| | | } |
| | | |
| | | private static ByteSequence getShardedKey(ByteSequence key) |
| | | { |
| | | final byte bucket = (byte) (Thread.currentThread().getId() & (SHARD_COUNT - 1)); |
| | | return new ByteStringBuilder(key.length() + ByteStringBuilder.MAX_COMPACT_SIZE).appendBytes(key).appendByte(bucket); |
| | | } |
| | | |
| | | /** Restricts a cursor to the shards of a specific key. */ |
| | | private final class ShardCursor extends SequentialCursorDecorator<Cursor<ByteString, Long>, ByteString, Long> |
| | | { |
| | | private final ByteSequence targetKey; |
| | | private boolean initialized; |
| | | |
| | | ShardCursor(Cursor<ByteString, Long> delegate, ByteSequence targetKey) |
| | | { |
| | | super(delegate); |
| | | this.targetKey = targetKey; |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | if (!initialized) |
| | | { |
| | | initialized = true; |
| | | return delegate.positionToKeyOrNext(targetKey) && isOnTargetKey(); |
| | | } |
| | | return delegate.next() && isOnTargetKey(); |
| | | } |
| | | |
| | | private boolean isOnTargetKey() |
| | | { |
| | | return targetKey.equals(delegate.getKey()); |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * Cursor that returns unique keys and null values. Ensure that {@link #getKey()} will return a different key after |
| | | * each {@link #next()}. |
| | | */ |
| | | static final class UniqueKeysCursor<K> implements SequentialCursor<K, Void> |
| | | { |
| | | private final Cursor<K, ?> delegate; |
| | | private boolean isDefined; |
| | | private K key; |
| | | |
| | | UniqueKeysCursor(Cursor<K, ?> cursor) |
| | | { |
| | | this.delegate = cursor; |
| | | if (!delegate.isDefined()) |
| | | { |
| | | delegate.next(); |
| | | } |
| | | } |
| | | |
| | | @Override |
| | | public boolean next() |
| | | { |
| | | isDefined = delegate.isDefined(); |
| | | if (isDefined) |
| | | { |
| | | key = delegate.getKey(); |
| | | skipEntriesWithSameKey(); |
| | | } |
| | | return isDefined; |
| | | } |
| | | |
| | | private void skipEntriesWithSameKey() |
| | | { |
| | | throwIfUndefined(this); |
| | | while (delegate.next() && key.equals(delegate.getKey())) |
| | | { |
| | | // Skip all entries having the same key. |
| | | } |
| | | // Delegate is one step beyond. When delegate.isDefined() return false, we have to return true once more. |
| | | isDefined = true; |
| | | } |
| | | |
| | | @Override |
| | | public boolean isDefined() |
| | | { |
| | | return isDefined; |
| | | } |
| | | |
| | | @Override |
| | | public K getKey() throws NoSuchElementException |
| | | { |
| | | throwIfUndefined(this); |
| | | return key; |
| | | } |
| | | |
| | | @Override |
| | | public Void getValue() throws NoSuchElementException |
| | | { |
| | | throwIfUndefined(this); |
| | | return null; |
| | | } |
| | | |
| | | @Override |
| | | public void delete() throws NoSuchElementException, UnsupportedOperationException |
| | | { |
| | | throw new UnsupportedOperationException(); |
| | | } |
| | | |
| | | @Override |
| | | public void close() |
| | | { |
| | | key = null; |
| | | delegate.close(); |
| | | } |
| | | |
| | | private static void throwIfUndefined(SequentialCursor<?, ?> cursor) |
| | | { |
| | | if (!cursor.isDefined()) |
| | | { |
| | | throw new NoSuchElementException(); |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | import java.util.LinkedList; |
| | | import java.util.List; |
| | | import java.util.TreeSet; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | | import org.forgerock.i18n.slf4j.LocalizedLogger; |
| | |
| | | * "tie-breaker" and ensures that keys correspond to one and only one entry. This ensures that all |
| | | * tree updates can be performed using lock-free operations. |
| | | */ |
| | | @SuppressWarnings("javadoc") |
| | | class VLVIndex extends AbstractTree implements ConfigurationChangeListener<BackendVLVIndexCfg>, Closeable |
| | | { |
| | | private static final ByteString COUNT_KEY = ByteString.valueOfUtf8("nbRecords"); |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | /** The VLV vlvIndex configuration. */ |
| | | private BackendVLVIndexCfg config; |
| | | |
| | | /** The cached count of entries in this index. */ |
| | | private final AtomicInteger count = new AtomicInteger(0); |
| | | /** The count of entries in this index. */ |
| | | private final ShardedCounter counter; |
| | | |
| | | private DN baseDN; |
| | | private SearchScope scope; |
| | |
| | | ConfigException |
| | | { |
| | | super(new TreeName(entryContainer.getTreePrefix(), "vlv." + config.getName())); |
| | | |
| | | this.counter = new ShardedCounter(new TreeName(entryContainer.getTreePrefix(), "counter.vlv." + config.getName())); |
| | | this.config = config; |
| | | this.baseDN = config.getBaseDN(); |
| | | this.scope = convertScope(config.getScope()); |
| | |
| | | } |
| | | |
| | | @Override |
| | | void open0(final WriteableTransaction txn) throws StorageRuntimeException |
| | | void afterOpen(final WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | count.set((int) txn.getRecordCount(getName())); |
| | | counter.open(txn, true); |
| | | } |
| | | |
| | | @Override |
| | | void beforeDelete(WriteableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | counter.delete(txn); |
| | | } |
| | | |
| | | @Override |
| | |
| | | { |
| | | txn.put(getName(), nextAddedKey, toValue()); |
| | | nextAddedKey = nextOrNull(ai); |
| | | count.incrementAndGet(); |
| | | counter.addCount(txn, COUNT_KEY, 1); |
| | | } |
| | | else |
| | | { |
| | | txn.delete(getName(), nextDeletedKey); |
| | | nextDeletedKey = nextOrNull(di); |
| | | count.decrementAndGet(); |
| | | counter.addCount(txn, COUNT_KEY, -1); |
| | | } |
| | | } |
| | | } |
| | |
| | | { |
| | | try (Cursor<ByteString, ByteString> cursor = txn.openCursor(getName())) |
| | | { |
| | | final long[] selectedIDs = readRange(cursor, count.get(), debugBuilder); |
| | | final long[] selectedIDs = readRange(cursor, getEntryCount(txn), debugBuilder); |
| | | return newDefinedSet(selectedIDs); |
| | | } |
| | | } |
| | | |
| | | private int getEntryCount(final ReadableTransaction txn) |
| | | { |
| | | return (int) counter.getCount(txn, COUNT_KEY); |
| | | } |
| | | |
| | | /** |
| | | * Reads a page of entries from the VLV which includes the nearest entry corresponding to the VLV |
| | | * assertion, {@code beforeCount} entries leading up to the nearest entry, and {@code afterCount} |
| | |
| | | final SearchOperation searchOperation, final VLVRequestControl vlvRequest) |
| | | throws DirectoryException |
| | | { |
| | | final int currentCount = count.get(); |
| | | final int currentCount = getEntryCount(txn); |
| | | final int beforeCount = vlvRequest.getBeforeCount(); |
| | | final int afterCount = vlvRequest.getAfterCount(); |
| | | final ByteString assertion = vlvRequest.getGreaterThanOrEqualAssertion(); |
| | |
| | | private EntryIDSet evaluateVLVRequestByOffset(final ReadableTransaction txn, final SearchOperation searchOperation, |
| | | final VLVRequestControl vlvRequest, final StringBuilder debugBuilder) throws DirectoryException |
| | | { |
| | | final int currentCount = count.get(); |
| | | final int currentCount = getEntryCount(txn); |
| | | int beforeCount = vlvRequest.getBeforeCount(); |
| | | int afterCount = vlvRequest.getAfterCount(); |
| | | int targetOffset = vlvRequest.getOffset(); |
| | |
| | | import org.opends.server.backends.pluggable.spi.Cursor; |
| | | import org.opends.server.backends.pluggable.spi.ReadOperation; |
| | | import org.opends.server.backends.pluggable.spi.ReadableTransaction; |
| | | import org.opends.server.backends.pluggable.spi.SequentialCursor; |
| | | import org.opends.server.backends.pluggable.spi.StorageRuntimeException; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.types.AttributeType; |
| | |
| | | /** The DN tree. */ |
| | | private DN2ID dn2id; |
| | | /** The children tree. */ |
| | | private ID2Count id2childrenCount; |
| | | private ID2ChildrenCount id2childrenCount; |
| | | |
| | | /** A list of the attribute indexes to be verified. */ |
| | | private final ArrayList<AttributeIndex> attrIndexList = new ArrayList<>(); |
| | |
| | | |
| | | private void iterateID2ChildrenCount(ReadableTransaction txn) throws StorageRuntimeException |
| | | { |
| | | Cursor<EntryID, Long> cursor = id2childrenCount.openCursor(txn); |
| | | if (!cursor.next()) { |
| | | return; |
| | | } |
| | | |
| | | EntryID currentEntryID = new EntryID(-1); |
| | | while(cursor.next()) { |
| | | if (cursor.getKey().equals(currentEntryID)) { |
| | | // Sharded cursor may return the same EntryID multiple times |
| | | continue; |
| | | } |
| | | currentEntryID = cursor.getKey(); |
| | | if (!id2entry.containsEntryID(txn, currentEntryID)) { |
| | | logger.trace("File id2ChildrenCount reference non-existing EntryID <%d>%n", currentEntryID); |
| | | errorCount++; |
| | | try (final SequentialCursor<EntryID, Void> cursor = id2childrenCount.openCursor(txn)) |
| | | { |
| | | while (cursor.next()) |
| | | { |
| | | final EntryID entryID = cursor.getKey(); |
| | | if (!id2entry.containsEntryID(txn, entryID)) |
| | | { |
| | | logger.trace("File id2ChildrenCount references non-existing EntryID <%d>%n", entryID); |
| | | errorCount++; |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | * |
| | | * @param key |
| | | * the key where to position the cursor |
| | | * @return {@code true} if the cursor could be positioned to the key, |
| | | * {@code false} otherwise |
| | | * @return {@code true} if the cursor could be positioned to the key |
| | | * or the next one, {@code false} otherwise |
| | | */ |
| | | boolean positionToKeyOrNext(ByteSequence key); |
| | | |
| | |
| | | public interface WriteOperation |
| | | { |
| | | /** |
| | | * Executes a write operation. |
| | | * Executes a write operation. Implementation must be idempotent since |
| | | * operation might be retried (for example in case of optimistic locking failure). |
| | | * |
| | | * @param txn |
| | | * the write transaction where to execute the write operation |
| File was renamed from opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2CountTest.java |
| | |
| | | import org.opends.server.extensions.DiskSpaceMonitor; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.testng.annotations.AfterClass; |
| | | import org.testng.annotations.AfterMethod; |
| | | import org.testng.annotations.BeforeClass; |
| | | import org.testng.annotations.BeforeMethod; |
| | | import org.testng.annotations.Test; |
| | | |
| | | @Test(groups = { "precommit", "pluggablebackend" }, sequential = true) |
| | | public class ID2CountTest extends DirectoryServerTestCase |
| | | public class ID2ChildrenCountTest extends DirectoryServerTestCase |
| | | { |
| | | private final TreeName id2CountTreeName = new TreeName("base-dn", "index-id"); |
| | | private ExecutorService parallelExecutor; |
| | | private ID2Count id2Count; |
| | | private ID2ChildrenCount id2ChildrenCount; |
| | | private PDBStorage storage; |
| | | |
| | | @BeforeClass |
| | |
| | | } |
| | | }); |
| | | |
| | | id2Count = new ID2Count(id2CountTreeName); |
| | | id2ChildrenCount = new ID2ChildrenCount(id2CountTreeName); |
| | | |
| | | parallelExecutor = Executors.newFixedThreadPool(32); |
| | | } |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | final long delta = id2Count.removeCount(txn, key); |
| | | id2Count.updateTotalCount(txn, -delta); |
| | | final long delta = id2ChildrenCount.removeCount(txn, key); |
| | | id2ChildrenCount.updateTotalCount(txn, -delta); |
| | | l.handleResult(delta); |
| | | } |
| | | }); |
| | |
| | | @Override |
| | | public void run(WriteableTransaction txn) throws Exception |
| | | { |
| | | id2Count.updateCount(txn, key, delta); |
| | | id2Count.updateTotalCount(txn, delta); |
| | | id2ChildrenCount.updateCount(txn, key, delta); |
| | | id2ChildrenCount.updateTotalCount(txn, delta); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public Long run(ReadableTransaction txn) throws Exception |
| | | { |
| | | return id2Count.getCount(txn, key); |
| | | return id2ChildrenCount.getCount(txn, key); |
| | | } |
| | | }); |
| | | } |
| | |
| | | @Override |
| | | public Long run(ReadableTransaction txn) throws Exception |
| | | { |
| | | return id2Count.getTotalCount(txn); |
| | | return id2ChildrenCount.getTotalCount(txn); |
| | | } |
| | | }); |
| | | } |
| | |
| | | import org.mockito.Mockito; |
| | | import org.opends.server.DirectoryServerTestCase; |
| | | import org.opends.server.TestCaseUtils; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.AddLongCollector; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk; |
| | | import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector; |
| | |
| | | |
| | | @Test |
| | | @SuppressWarnings(value = { "unchecked", "resource" }) |
| | | public void testAddLongCollector() |
| | | public void testCounterCollector() |
| | | { |
| | | final ID2Count id2count = new ID2Count(TreeName.valueOf("/dummy/dummy")); |
| | | final MeteredCursor<String, ByteString> source = cursorOf( |
| | | Pair.of("key1", id2count.toValue(10)), |
| | | Pair.of("key1", id2count.toValue(20)), |
| | | Pair.of("key2", id2count.toValue(5)), |
| | | Pair.of("key3", id2count.toValue(6)), |
| | | Pair.of("key3", id2count.toValue(4))); |
| | | Pair.of("key1", ShardedCounter.encodeValue(10)), |
| | | Pair.of("key1", ShardedCounter.encodeValue(20)), |
| | | Pair.of("key2", ShardedCounter.encodeValue(5)), |
| | | Pair.of("key3", ShardedCounter.encodeValue(6)), |
| | | Pair.of("key3", ShardedCounter.encodeValue(4))); |
| | | |
| | | final SequentialCursor<String, ByteString> expected = cursorOf( |
| | | Pair.of("key1", id2count.toValue(30)), |
| | | Pair.of("key2", id2count.toValue(5)), |
| | | Pair.of("key3", id2count.toValue(10))); |
| | | Pair.of("key1", ShardedCounter.encodeValue(30)), |
| | | Pair.of("key2", ShardedCounter.encodeValue(5)), |
| | | Pair.of("key3", ShardedCounter.encodeValue(10))); |
| | | |
| | | final SequentialCursor<String, ByteString> result = new CollectorCursor<>(source, new AddLongCollector(id2count)); |
| | | final SequentialCursor<String, ByteString> result = |
| | | new CollectorCursor<>(source, ID2ChildrenCount.getSumLongCollectorInstance()); |
| | | |
| | | assertThat(toPairs(result)).containsExactlyElementsOf(toPairs(expected)); |
| | | } |
| | |
| | | { |
| | | final Storage storage = backend.getRootContainer().getStorage(); |
| | | final DN2ID dn2ID = backend.getRootContainer().getEntryContainer(testBaseDN).getDN2ID(); |
| | | final ID2Count id2ChildrenCount = backend.getRootContainer().getEntryContainer(testBaseDN).getID2ChildrenCount(); |
| | | final ID2ChildrenCount id2ChildrenCount = backend.getRootContainer().getEntryContainer(testBaseDN).getID2ChildrenCount(); |
| | | |
| | | final VerifyConfig config = new VerifyConfig(); |
| | | config.setBaseDN(DN.valueOf("dc=test,dc=com")); |