/* * The contents of this file are subject to the terms of the Common Development and * Distribution License (the License). You may not use this file except in compliance with the * License. * * You can obtain a copy of the License at legal/CDDLv1.0.txt. See the License for the * specific language governing permission and limitations under the License. * * When distributing Covered Software, include this CDDL Header Notice in each file and include * the License file at legal/CDDLv1.0.txt. If applicable, add the following below the CDDL * Header, with the fields enclosed by brackets [] replaced by your own identifying * information: "Portions Copyright [year] [name of copyright owner]". * * Copyright 2015 ForgeRock AS. */ package org.opends.server.backends.pluggable; import static org.opends.server.backends.pluggable.CursorTransformer.*; import java.util.NoSuchElementException; import org.forgerock.opendj.ldap.ByteSequence; import org.forgerock.opendj.ldap.ByteString; import org.forgerock.opendj.ldap.ByteStringBuilder; import org.forgerock.util.Function; import org.forgerock.util.promise.NeverThrowsException; import org.opends.server.backends.pluggable.CursorTransformer.ValueTransformer; import org.opends.server.backends.pluggable.OnDiskMergeImporter.SequentialCursorDecorator; import org.opends.server.backends.pluggable.spi.Cursor; import org.opends.server.backends.pluggable.spi.Importer; import org.opends.server.backends.pluggable.spi.ReadableTransaction; import org.opends.server.backends.pluggable.spi.SequentialCursor; import org.opends.server.backends.pluggable.spi.TreeName; import org.opends.server.backends.pluggable.spi.UpdateFunction; import org.opends.server.backends.pluggable.spi.WriteableTransaction; /** * Store counters associated to a key. Counter value is sharded amongst multiple keys to allow concurrent update without * contention (at the price of a slower read). */ final class ShardedCounter extends AbstractTree { /** * Must be a power of 2 * @see Performance issues */ private static final long SHARD_COUNT = 256; private static final ValueTransformer TO_LONG = new ValueTransformer() { @Override public Long transform(ByteString key, ByteString value) { return decodeValue(value); } }; private static final Function TO_KEY = new Function() { @Override public ByteString apply(ByteString shardedKey) { // -1 to remove the shard id. return shardedKey.subSequence(0, shardedKey.length() - 1); } }; ShardedCounter(TreeName name) { super(name); } SequentialCursor openCursor(ReadableTransaction txn) { return new UniqueKeysCursor<>(transformKeysAndValues( txn.openCursor(getName()), TO_KEY, CursorTransformer. constant(null))); } private Cursor openCursor0(ReadableTransaction txn) { return transformKeysAndValues(txn.openCursor(getName()), TO_KEY, TO_LONG); } void addCount(final WriteableTransaction txn, ByteSequence key, final long delta) { txn.update(getName(), getShardedKey(key), new UpdateFunction() { @Override public ByteSequence computeNewValue(ByteSequence oldValue) { final long currentValue = oldValue == null ? 0 : decodeValue(oldValue.toByteString()); return encodeValue(currentValue + delta); } }); } void importPut(Importer importer, ByteSequence key, long delta) { if (delta != 0) { importer.put(getName(), getShardedKey(key), encodeValue(delta)); } } long getCount(final ReadableTransaction txn, ByteSequence key) { long counterValue = 0; try (final SequentialCursor cursor = new ShardCursor(openCursor0(txn), key)) { while (cursor.next()) { counterValue += cursor.getValue(); } } return counterValue; } long removeCount(final WriteableTransaction txn, ByteSequence key) { long counterValue = 0; try (final SequentialCursor cursor = new ShardCursor(openCursor0(txn), key)) { // Iterate over and remove all the thread local shards while (cursor.next()) { counterValue += cursor.getValue(); cursor.delete(); } } return counterValue; } static long decodeValue(ByteString value) { switch (value.length()) { case 1: return value.byteAt(0); case (Integer.SIZE / Byte.SIZE): return value.toInt(); case (Long.SIZE / Byte.SIZE): return value.toLong(); default: throw new IllegalArgumentException("Unsupported sharded-counter value format."); } } static ByteString encodeValue(long value) { final byte valueAsByte = (byte) value; if (valueAsByte == value) { return ByteString.wrap(new byte[] { valueAsByte }); } final int valueAsInt = (int) value; if (valueAsInt == value) { return ByteString.valueOfInt(valueAsInt); } return ByteString.valueOfLong(value); } @Override public String valueToString(ByteString value) { return String.valueOf(decodeValue(value)); } private static ByteSequence getShardedKey(ByteSequence key) { final byte bucket = (byte) (Thread.currentThread().getId() & (SHARD_COUNT - 1)); return new ByteStringBuilder(key.length() + ByteStringBuilder.MAX_COMPACT_SIZE).appendBytes(key).appendByte(bucket); } /** Restricts a cursor to the shards of a specific key. */ private final class ShardCursor extends SequentialCursorDecorator, ByteString, Long> { private final ByteSequence targetKey; private boolean initialized; ShardCursor(Cursor delegate, ByteSequence targetKey) { super(delegate); this.targetKey = targetKey; } @Override public boolean next() { if (!initialized) { initialized = true; return delegate.positionToKeyOrNext(targetKey) && isOnTargetKey(); } return delegate.next() && isOnTargetKey(); } private boolean isOnTargetKey() { return targetKey.equals(delegate.getKey()); } } /** * Cursor that returns unique keys and null values. Ensure that {@link #getKey()} will return a different key after * each {@link #next()}. */ private static final class UniqueKeysCursor implements SequentialCursor { private final Cursor delegate; private boolean isDefined; private K key; private UniqueKeysCursor(Cursor cursor) { this.delegate = cursor; if (!delegate.isDefined()) { delegate.next(); } } @Override public boolean next() { isDefined = delegate.isDefined(); if (isDefined) { key = delegate.getKey(); skipEntriesWithSameKey(); } return isDefined; } private void skipEntriesWithSameKey() { throwIfUndefined(this); while (delegate.next() && key.equals(delegate.getKey())) { // Skip all entries having the same key. } // Delegate is one step beyond. When delegate.isDefined() return false, we have to return true once more. isDefined = true; } @Override public boolean isDefined() { return isDefined; } @Override public K getKey() throws NoSuchElementException { throwIfUndefined(this); return key; } @Override public Void getValue() throws NoSuchElementException { throwIfUndefined(this); return null; } @Override public void delete() throws NoSuchElementException, UnsupportedOperationException { throw new UnsupportedOperationException(); } @Override public void close() { key = null; delegate.close(); } private static void throwIfUndefined(SequentialCursor cursor) { if (!cursor.isDefined()) { throw new NoSuchElementException(); } } } }