From b88a555f5a584c355442ee8db6d218c9fe95fa36 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <ylecaillez@forgerock.com>
Date: Tue, 17 Nov 2015 10:16:04 +0000
Subject: [PATCH] OPENDJ-2393: Possible index corruption
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java | 42 -
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java | 24
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/WriteOperation.java | 3
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/CursorTransformer.java | 33 +
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VerifyJob.java | 28
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Cursor.java | 4
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2ChildrenCount.java | 211 +++++++++++
opendj-dsml-servlet/src/license/THIRD-PARTY.properties | 21 +
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java | 2
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ShardedCounter.java | 299 +++++++++++++++
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java | 35 +
/dev/null | 246 ------------
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java | 2
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2ChildrenCountTest.java | 19
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryContainer.java | 43 +
opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java | 23
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java | 56 ++
opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java | 22
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java | 2
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/AbstractTree.java | 11
20 files changed, 741 insertions(+), 385 deletions(-)
diff --git a/opendj-dsml-servlet/src/license/THIRD-PARTY.properties b/opendj-dsml-servlet/src/license/THIRD-PARTY.properties
new file mode 100644
index 0000000..532605a
--- /dev/null
+++ b/opendj-dsml-servlet/src/license/THIRD-PARTY.properties
@@ -0,0 +1,21 @@
+# Generated by org.codehaus.mojo.license.AddThirdPartyMojo
+#-------------------------------------------------------------------------------
+# Already used licenses in project :
+# - Apache Software License, Version 2.0
+# - BSD
+# - CC BY-NC-ND 3.0
+# - Common Development and Distribution License 1.0
+# - Common Development and Distribution License 1.1
+# - Dual licensed (CDDL and GPL)
+# - Dual licensed (CDDL and GPLv2+CE)
+# - The GNU Lesser General Public License, version 2.0 with Classpath Exception
+# - The GNU Lesser General Public License, version 2.1
+# - The GNU Lesser General Public License, version 3.0
+# - The MIT License
+# - The Sleepycat License
+#-------------------------------------------------------------------------------
+# Please fill the missing licenses for dependencies :
+#
+#
+#Tue Nov 17 10:59:10 CET 2015
+com.sleepycat--je--5.0.104=The Sleepycat License
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
index d6c6876..8b90b1f 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pdb/PDBStorage.java
@@ -184,7 +184,7 @@
{
exchange.remove();
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -439,7 +439,7 @@
bytesToValue(ex.getValue(), value);
ex.store();
}
- catch (final Exception e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -454,7 +454,7 @@
bytesToKey(ex.getKey(), key);
return ex.remove();
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -469,7 +469,7 @@
ex = getExchangeFromCache(treeName);
ex.removeTree();
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -507,7 +507,7 @@
*/
return new CursorImpl(getNewExchange(treeName, false));
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -526,7 +526,7 @@
{
getExchangeFromCache(treeName);
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -543,7 +543,7 @@
ex.fetch();
return valueToBytes(ex.getValue());
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -574,7 +574,7 @@
}
return false;
}
- catch (final Exception e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -592,7 +592,7 @@
put(treeName, dummyKey, ByteString.empty());
delete(treeName, dummyKey);
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -667,7 +667,7 @@
{
// ignore missing trees.
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
@@ -852,7 +852,7 @@
catch(final InUseException e) {
throw new StorageInUseException(e);
}
- catch (final PersistitException e)
+ catch (final PersistitException | RollbackException e)
{
throw new StorageRuntimeException(e);
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/AbstractTree.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/AbstractTree.java
index 08b561b..539a26c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/AbstractTree.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/AbstractTree.java
@@ -50,11 +50,11 @@
public final void open(WriteableTransaction txn, boolean createOnDemand) throws StorageRuntimeException
{
txn.openTree(name, createOnDemand);
- open0(txn);
+ afterOpen(txn);
}
/** Override in order to perform any additional initialization after the index has opened. */
- void open0(WriteableTransaction txn) throws StorageRuntimeException
+ void afterOpen(WriteableTransaction txn) throws StorageRuntimeException
{
// Do nothing by default.
}
@@ -62,9 +62,16 @@
@Override
public final void delete(WriteableTransaction txn) throws StorageRuntimeException
{
+ beforeDelete(txn);
txn.deleteTree(name);
}
+ /** Override in order to perform any additional operation before index tree deletion. */
+ void beforeDelete(WriteableTransaction txn) throws StorageRuntimeException
+ {
+ // Do nothing by default.
+ }
+
@Override
public final long getRecordCount(ReadableTransaction txn) throws StorageRuntimeException
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/CursorTransformer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/CursorTransformer.java
index 7bd3059..0f565ed 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/CursorTransformer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/CursorTransformer.java
@@ -30,6 +30,7 @@
import java.util.NoSuchElementException;
import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.Functions;
import org.forgerock.util.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.opends.server.backends.pluggable.spi.Cursor;
@@ -62,11 +63,11 @@
VO transform(KI key, VI value) throws E;
}
- private static final Function<Object, Object, NeverThrowsException> NO_TRANSFORM =
- new Function<Object, Object, NeverThrowsException>()
+ private final static ValueTransformer<?, ?, ?, NeverThrowsException> KEEP_VALUES_UNCHANGED =
+ new ValueTransformer<Object, Object, Object, NeverThrowsException>()
{
@Override
- public Object apply(Object value) throws NeverThrowsException
+ public Object transform(Object key, Object value) throws NeverThrowsException
{
return value;
}
@@ -93,20 +94,36 @@
return new CursorTransformer<>(new SequentialCursorAdapter<>(input), keyTransformer, valueTransformer);
}
- @SuppressWarnings("unchecked")
static <KI, VI, VO> Cursor<KI, VO> transformValues(Cursor<KI, VI> input,
ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer)
{
- return transformKeysAndValues(input, (Function<KI, KI, NeverThrowsException>) NO_TRANSFORM, valueTransformer);
+ return transformKeysAndValues(input, Functions.<KI>identityFunction(), valueTransformer);
}
- @SuppressWarnings("unchecked")
static <KI, VI, VO> Cursor<KI, VO> transformValues(SequentialCursor<KI, VI> input,
ValueTransformer<KI, VI, VO, ? extends Exception> valueTransformer)
{
// SequentialCursorAdapter constructor never throws
- return transformKeysAndValues(new SequentialCursorAdapter<>(input),
- (Function<KI, KI, NeverThrowsException>) NO_TRANSFORM, valueTransformer);
+ return transformKeysAndValues(new SequentialCursorAdapter<>(input), Functions.<KI> identityFunction(),
+ valueTransformer);
+ }
+
+ @SuppressWarnings("unchecked")
+ static <K, V> ValueTransformer<K, V, V, NeverThrowsException> keepValuesUnchanged()
+ {
+ return (ValueTransformer<K, V, V, NeverThrowsException>) KEEP_VALUES_UNCHANGED;
+ }
+
+ static <K, VI, VO> ValueTransformer<K, VI, VO, NeverThrowsException> constant(final VO constant)
+ {
+ return new ValueTransformer<K, VI, VO, NeverThrowsException>()
+ {
+ @Override
+ public VO transform(K key, VI value) throws NeverThrowsException
+ {
+ return constant;
+ }
+ };
}
private CursorTransformer(Cursor<KI, VI> input, Function<KI, KO, ? extends Exception> keyTransformer,
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
index cfc6bf6..876b13c 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -66,8 +66,11 @@
* A flag to indicate if this index should be trusted to be consistent with the entries tree.
* If not trusted, we assume that existing entryIDSets for a key is still accurate. However, keys
* that do not exist are undefined instead of an empty entryIDSet. The following rules will be
- * observed when the index is not trusted: - no entryIDs will be added to a non-existing key. -
- * undefined entryIdSet will be returned whenever a key is not found.
+ * observed when the index is not trusted:
+ * <ul>
+ * <li>no entryIDs will be added to a non-existing key.</li>
+ * <li>undefined entryIdSet will be returned whenever a key is not found.</li>
+ * </ul>
*/
private volatile boolean trusted;
@@ -95,7 +98,7 @@
}
@Override
- final void open0(WriteableTransaction txn)
+ final void afterOpen(WriteableTransaction txn)
{
final EnumSet<IndexFlag> flags = state.getIndexFlags(txn, getName());
codec = flags.contains(COMPACTED) ? CODEC_V2 : CODEC_V1;
@@ -154,21 +157,6 @@
public final void update(final WriteableTransaction txn, final ByteString key, final EntryIDSet deletedIDs,
final EntryIDSet addedIDs) throws StorageRuntimeException
{
- /*
- * Check the special condition where both deletedIDs and addedIDs are null. This is used when
- * deleting entries must be completely removed.
- */
- if (deletedIDs == null && addedIDs == null)
- {
- boolean success = txn.delete(getName(), key);
- if (!success && logger.isTraceEnabled())
- {
- logger.trace("The expected key does not exist in the index %s.\nKey:%s ",
- getName(), key.toHexPlusAsciiString(4));
- }
- return;
- }
-
// Handle cases where nothing is changed early to avoid DB access.
if (isNullOrEmpty(deletedIDs) && isNullOrEmpty(addedIDs))
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryContainer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryContainer.java
index 9ff764e..b4028c1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryContainer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryContainer.java
@@ -161,7 +161,7 @@
/** The entry tree maps an entry ID (8 bytes) to a complete encoded entry. */
private ID2Entry id2entry;
/** Store the number of children for each entry. */
- private final ID2Count id2childrenCount;
+ private final ID2ChildrenCount id2childrenCount;
/** The referral tree maps a normalized DN string to labeled URIs. */
private final DN2URI dn2uri;
/** The state tree maps a config DN to config entries. */
@@ -416,7 +416,7 @@
this.storage = storage;
this.rootContainer = rootContainer;
this.treePrefix = baseDN.toNormalizedUrlSafeString();
- this.id2childrenCount = new ID2Count(getIndexName(ID2CHILDREN_COUNT_TREE_NAME));
+ this.id2childrenCount = new ID2ChildrenCount(getIndexName(ID2CHILDREN_COUNT_TREE_NAME));
this.dn2id = new DN2ID(getIndexName(DN2ID_TREE_NAME), baseDN);
this.dn2uri = new DN2URI(getIndexName(REFERRAL_TREE_NAME), this);
this.state = new State(getIndexName(STATE_TREE_NAME));
@@ -565,7 +565,7 @@
*
* @return The children tree.
*/
- ID2Count getID2ChildrenCount()
+ ID2ChildrenCount getID2ChildrenCount()
{
return id2childrenCount;
}
@@ -1458,6 +1458,7 @@
@Override
public void run(WriteableTransaction txn) throws Exception
{
+ // No need to call indexBuffer.reset() since IndexBuffer content will be the same for each retry attempt.
try
{
// Check whether the entry already exists.
@@ -1515,6 +1516,7 @@
}
catch (Exception e)
{
+ writeTrustState(indexBuffer);
throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class);
}
@@ -1525,6 +1527,28 @@
}
}
+ private void writeTrustState(final IndexBuffer indexBuffer)
+ {
+ // Transaction modifying the index has been rolled back.
+ // Ensure that the index trusted state is persisted.
+ try
+ {
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ indexBuffer.writeTrustState(txn);
+ }
+ });
+ }
+ catch (Exception e)
+ {
+ // Cannot throw because this method is used in a catch block and we do not want to hide the real exception.
+ logger.traceException(e);
+ }
+ }
+
void importEntry(WriteableTransaction txn, EntryID entryID, Entry entry) throws DirectoryException,
StorageRuntimeException
{
@@ -1555,6 +1579,7 @@
void deleteEntry(final DN entryDN, final DeleteOperation deleteOperation)
throws DirectoryException, StorageRuntimeException, CanceledOperationException
{
+ final IndexBuffer indexBuffer = new IndexBuffer();
try
{
storage.write(new WriteOperation()
@@ -1562,6 +1587,7 @@
@Override
public void run(WriteableTransaction txn) throws Exception
{
+ indexBuffer.reset();
try
{
// Check for referral entries above the target entry.
@@ -1629,7 +1655,6 @@
// Now update id2entry, dn2uri, and id2childrenCount in key order.
id2childrenCount.updateCount(txn, parentID, -1);
- final IndexBuffer indexBuffer = new IndexBuffer();
final EntryCache<?> entryCache = DirectoryServer.getEntryCache();
boolean isBaseEntry = true;
try (final Cursor<EntryID, Entry> cursor = id2entry.openCursor(txn))
@@ -1713,6 +1738,7 @@
}
catch (Exception e)
{
+ writeTrustState(indexBuffer);
throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class);
}
}
@@ -1844,6 +1870,7 @@
void replaceEntry(final Entry oldEntry, final Entry newEntry, final ModifyOperation modifyOperation)
throws StorageRuntimeException, DirectoryException, CanceledOperationException
{
+ final IndexBuffer indexBuffer = new IndexBuffer();
try
{
storage.write(new WriteOperation()
@@ -1851,6 +1878,7 @@
@Override
public void run(WriteableTransaction txn) throws Exception
{
+ indexBuffer.reset();
try
{
EntryID entryID = dn2id.get(txn, newEntry.getName());
@@ -1883,9 +1911,7 @@
dn2uri.replaceEntry(txn, oldEntry, newEntry);
}
-
// Update the indexes.
- final IndexBuffer indexBuffer = new IndexBuffer();
if (modifyOperation != null)
{
// In this case we know from the operation what the modifications were.
@@ -1933,6 +1959,7 @@
}
catch (Exception e)
{
+ writeTrustState(indexBuffer);
throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class);
}
}
@@ -1960,6 +1987,7 @@
void renameEntry(final DN oldTargetDN, final Entry newTargetEntry, final ModifyDNOperation modifyDNOperation)
throws StorageRuntimeException, DirectoryException, CanceledOperationException
{
+ final IndexBuffer indexBuffer = new IndexBuffer();
try
{
storage.write(new WriteOperation()
@@ -1967,6 +1995,7 @@
@Override
public void run(WriteableTransaction txn) throws Exception
{
+ indexBuffer.reset();
try
{
// Validate the request.
@@ -2027,7 +2056,6 @@
id2childrenCount.updateCount(txn, oldSuperiorID, -1);
id2childrenCount.updateCount(txn, newSuperiorID, 1);
}
- final IndexBuffer indexBuffer = new IndexBuffer();
boolean isBaseEntry = true;
try (final Cursor<EntryID, Entry> cursor = id2entry.openCursor(txn))
{
@@ -2178,6 +2206,7 @@
}
catch (Exception e)
{
+ writeTrustState(indexBuffer);
throwAllowedExceptionTypes(e, DirectoryException.class, CanceledOperationException.class);
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
index 8e56910..599ffe5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/EntryIDSet.java
@@ -545,7 +545,7 @@
private static int getEstimatedSize(EntryIDSet idSet)
{
checkNotNull(idSet, "idSet must not be null");
- return idSet.getIDs().length * LONG_SIZE + INT_SIZE;
+ return idSet.getIDs().length * ByteStringBuilder.MAX_COMPACT_SIZE + INT_SIZE;
}
private static long[] decodeRaw(ByteSequenceReader reader, int nbEntriesToDecode)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2ChildrenCount.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2ChildrenCount.java
new file mode 100644
index 0000000..73d42d1
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2ChildrenCount.java
@@ -0,0 +1,211 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.backends.pluggable;
+
+import static org.opends.server.backends.pluggable.CursorTransformer.*;
+
+import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.ByteSequenceReader;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Function;
+import org.forgerock.util.Reject;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
+import org.opends.server.backends.pluggable.spi.Importer;
+import org.opends.server.backends.pluggable.spi.ReadableTransaction;
+import org.opends.server.backends.pluggable.spi.SequentialCursor;
+import org.opends.server.backends.pluggable.spi.TreeName;
+import org.opends.server.backends.pluggable.spi.WriteableTransaction;
+
+import com.forgerock.opendj.util.PackedLong;
+
+/** Maintain counters reflecting the total number of entries and the number of immediate children for each entry. */
+final class ID2ChildrenCount extends AbstractTree
+{
+ private static final EntryID TOTAL_COUNT_ENTRY_ID = new EntryID(PackedLong.COMPACTED_MAX_VALUE);
+
+ private static final Function<ByteString, EntryID, NeverThrowsException> TO_ENTRY_ID =
+ new Function<ByteString, EntryID, NeverThrowsException>()
+ {
+ @Override
+ public EntryID apply(ByteString value) throws NeverThrowsException
+ {
+ return new EntryID(value.asReader().readCompactUnsignedLong());
+ }
+ };
+
+ private final ShardedCounter counter;
+
+ ID2ChildrenCount(TreeName name)
+ {
+ super(name);
+ this.counter = new ShardedCounter(name);
+ }
+
+ SequentialCursor<EntryID, Void> openCursor(ReadableTransaction txn)
+ {
+ return transformKeysAndValues(counter.openCursor(txn),
+ TO_ENTRY_ID, CursorTransformer.<ByteString, Void> keepValuesUnchanged());
+ }
+
+ /**
+ * Updates the number of children for a given entry without updating the total number of entries.
+ * <p>
+ * Implementation note: this method accepts a {@code null} entryID in order to eliminate null checks in client code.
+ * In particular, client code has to deal with the special case where a target entry does not have a parent because
+ * the target entry is a base entry within the backend.
+ *
+ * @param txn storage transaction
+ * @param entryID The entryID identifying to the counter, which may be
+ * {@code null} in which case calling this method has no effect.
+ * @param delta The value to add. Can be negative to decrease counter value.
+ */
+ void updateCount(final WriteableTransaction txn, final EntryID entryID, final long delta) {
+ if (entryID != null)
+ {
+ addToCounter(txn, entryID, delta);
+ }
+ }
+
+ /**
+ * Updates the total number of entries which should be the sum of all counters.
+ * @param txn storage transaction
+ * @param delta The value to add. Can be negative to decrease counter value.
+ */
+ void updateTotalCount(final WriteableTransaction txn, final long delta) {
+ addToCounter(txn, TOTAL_COUNT_ENTRY_ID, delta);
+ }
+
+ private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta)
+ {
+ counter.addCount(txn, toKey(entryID), delta);
+ }
+
+ void importPut(Importer importer, EntryID entryID, long total)
+ {
+ Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
+ importPut0(importer, entryID, total);
+ }
+
+ void importPutTotalCount(Importer importer, long total)
+ {
+ importPut0(importer, TOTAL_COUNT_ENTRY_ID, total);
+ }
+
+ private void importPut0(Importer importer, EntryID entryID, final long delta)
+ {
+ counter.importPut(importer, toKey(entryID), delta);
+ }
+
+ @Override
+ public String keyToString(ByteString key)
+ {
+ ByteSequenceReader keyReader = key.asReader();
+ long keyID = keyReader.readCompactUnsignedLong();
+ long shardBucket = keyReader.readByte();
+ return (keyID == TOTAL_COUNT_ENTRY_ID.longValue() ? "Total Children Count" : keyID) + "#" + shardBucket;
+ }
+
+ @Override
+ public String valueToString(ByteString value)
+ {
+ return counter.valueToString(value);
+ }
+
+ @Override
+ public ByteString generateKey(String data)
+ {
+ return new EntryID(Long.parseLong(data)).toByteString();
+ }
+
+ /**
+ * Get the number of children for the given entry.
+ * @param txn storage transaction
+ * @param entryID The entryID identifying to the counter
+ * @return Value of the counter. 0 if no counter is associated yet.
+ */
+ long getCount(ReadableTransaction txn, EntryID entryID)
+ {
+ return counter.getCount(txn, toKey(entryID));
+ }
+
+ /**
+ * Get the total number of entries.
+ * @param txn storage transaction
+ * @return Sum of all the counter contained in this tree
+ */
+ long getTotalCount(ReadableTransaction txn)
+ {
+ return getCount(txn, TOTAL_COUNT_ENTRY_ID);
+ }
+
+ /**
+ * Removes the counter associated to the given entry, but does not update the total count.
+ * @param txn storage transaction
+ * @param entryID The entryID identifying the counter
+ * @return Value of the counter before it's deletion.
+ */
+ long removeCount(final WriteableTransaction txn, final EntryID entryID) {
+ return counter.removeCount(txn, toKey(entryID));
+ }
+
+ private static ByteSequence toKey(EntryID entryID)
+ {
+ return new ByteStringBuilder(ByteStringBuilder.MAX_COMPACT_SIZE).appendCompactUnsigned(entryID.longValue());
+ }
+
+ static Collector<Long, ByteString> getSumLongCollectorInstance()
+ {
+ return ShardedCounterCollector.INSTANCE;
+ }
+
+ /**
+ * {@link Collector} that accepts sharded-counter values encoded into {@link ByteString} objects and produces a
+ * {@link ByteString} representing the sum of the sharded-counter values.
+ */
+ private static final class ShardedCounterCollector implements Collector<Long, ByteString>
+ {
+ private static final Collector<Long, ByteString> INSTANCE = new ShardedCounterCollector();
+
+ @Override
+ public Long get()
+ {
+ return 0L;
+ }
+
+ @Override
+ public Long accept(Long resultContainer, ByteString value)
+ {
+ return resultContainer + ShardedCounter.decodeValue(value);
+ }
+
+ @Override
+ public ByteString merge(Long resultContainer)
+ {
+ return ShardedCounter.encodeValue(resultContainer);
+ }
+ }
+}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
deleted file mode 100644
index 7cd6766..0000000
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/*
- * CDDL HEADER START
- *
- * The contents of this file are subject to the terms of the
- * Common Development and Distribution License, Version 1.0 only
- * (the "License"). You may not use this file except in compliance
- * with the License.
- *
- * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
- * or http://forgerock.org/license/CDDLv1.0.html.
- * See the License for the specific language governing permissions
- * and limitations under the License.
- *
- * When distributing Covered Code, include this CDDL HEADER in each
- * file and include the License file at legal-notices/CDDLv1_0.txt.
- * If applicable, add the following below this CDDL HEADER, with the
- * fields enclosed by brackets "[]" replaced with your own identifying
- * information:
- * Portions Copyright [yyyy] [name of copyright owner]
- *
- * CDDL HEADER END
- *
- * Copyright 2015 ForgeRock AS
- */
-package org.opends.server.backends.pluggable;
-
-import org.forgerock.opendj.ldap.ByteSequence;
-import org.forgerock.opendj.ldap.ByteSequenceReader;
-import org.forgerock.opendj.ldap.ByteString;
-import org.forgerock.opendj.ldap.ByteStringBuilder;
-import org.forgerock.util.Reject;
-import org.forgerock.util.Function;
-import org.forgerock.util.promise.NeverThrowsException;
-import org.opends.server.backends.pluggable.spi.Cursor;
-import org.opends.server.backends.pluggable.spi.Importer;
-import org.opends.server.backends.pluggable.spi.ReadableTransaction;
-import org.opends.server.backends.pluggable.spi.TreeName;
-import org.opends.server.backends.pluggable.spi.UpdateFunction;
-import org.opends.server.backends.pluggable.spi.WriteableTransaction;
-
-import com.forgerock.opendj.util.PackedLong;
-
-/**
- * Store a counter associated to a key. Counter value is sharded amongst multiple keys to allow concurrent
- * update without contention (at the price of a slower read).
- */
-final class ID2Count extends AbstractTree
-{
- /**
- * Must be a power of 2 @see <a href="http://en.wikipedia.org/wiki/Modulo_operation#Performance_issues">Performance
- * issues</a>
- */
- private static final long SHARD_COUNT = 4096;
- private static final int LONG_SIZE = Long.SIZE / Byte.SIZE;
- private static final EntryID TOTAL_COUNT_ENTRY_ID = new EntryID(PackedLong.COMPACTED_MAX_VALUE);
-
- ID2Count(TreeName name)
- {
- super(name);
- }
-
- Cursor<EntryID, Long> openCursor(ReadableTransaction txn) {
- return CursorTransformer.transformKeysAndValues(txn.openCursor(getName()),
- new Function<ByteString, EntryID, Exception>()
- {
- @Override
- public EntryID apply(ByteString value) throws Exception
- {
- return new EntryID(value.asReader().readCompactUnsignedLong());
- }
- }, new CursorTransformer.ValueTransformer<ByteString, ByteString, Long, NeverThrowsException>()
- {
- @Override
- public Long transform(ByteString key, ByteString value) throws NeverThrowsException
- {
- return fromValue(value);
- }
- });
- }
-
- /**
- * Updates the counter associated with the given key, but does not update the total count.
- * <p>
- * Implementation note: this method accepts a {@code null} entryID in order to eliminate null checks in client code.
- * In particular, client code has to deal with the special case where a target entry does not have a parent because
- * the target entry is a base entry within the backend.
- *
- * @param txn storage transaction
- * @param entryID The entryID identifying to the counter, which may be
- * {@code null} in which case calling this method has no effect.
- * @param delta The value to add. Can be negative to decrease counter value.
- */
- void updateCount(final WriteableTransaction txn, final EntryID entryID, final long delta) {
- if (entryID != null)
- {
- addToCounter(txn, entryID, delta);
- }
- }
-
- /**
- * Updates the total count which should be the sum of all counters.
- * @param txn storage transaction
- * @param delta The value to add. Can be negative to decrease counter value.
- */
- void updateTotalCount(final WriteableTransaction txn, final long delta) {
- addToCounter(txn, TOTAL_COUNT_ENTRY_ID, delta);
- }
-
- private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta)
- {
- final ByteSequence shardedKey = getShardedKey(entryID);
- txn.update(getName(), shardedKey, new UpdateFunction()
- {
- @Override
- public ByteSequence computeNewValue(ByteSequence oldValue)
- {
- final long currentValue = oldValue == null ? 0 : oldValue.asReader().readLong();
- final long newValue = currentValue + delta;
- return newValue == 0 ? null : toValue(newValue);
- }
- });
- }
-
- void importPut(Importer importer, EntryID entryID, long total)
- {
- Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
- importPut0(importer, entryID, total);
- }
-
- void importPutTotalCount(Importer importer, long total)
- {
- importPut0(importer, TOTAL_COUNT_ENTRY_ID, total);
- }
-
- private void importPut0(Importer importer, EntryID entryID, final long delta)
- {
- Reject.ifNull(importer, "importer must not be null");
- if (delta != 0)
- {
- final ByteSequence shardedKey = getShardedKey(entryID);
- importer.put(getName(), shardedKey, toValue(delta));
- }
- }
-
- private ByteSequence getShardedKey(EntryID entryID)
- {
- final long bucket = Thread.currentThread().getId() & (SHARD_COUNT - 1);
- return getKeyFromEntryIDAndBucket(entryID, bucket);
- }
-
- ByteString toValue(final long count)
- {
- Reject.ifFalse(count != 0, "count must be != 0");
- return ByteString.valueOfLong(count);
- }
-
- long fromValue(ByteString value)
- {
- Reject.ifNull(value, "value must not be null");
- return value.toLong();
- }
-
- @Override
- public String keyToString(ByteString key)
- {
- ByteSequenceReader keyReader = key.asReader();
- long keyID = keyReader.readCompactUnsignedLong();
- long shardBucket = keyReader.readCompactUnsignedLong();
- return (keyID == TOTAL_COUNT_ENTRY_ID.longValue() ? "Total Children Count" : keyID) + "#" + shardBucket;
- }
-
- @Override
- public String valueToString(ByteString value)
- {
- return String.valueOf(fromValue(value));
- }
-
- @Override
- public ByteString generateKey(String data)
- {
- EntryID entryID = new EntryID(Long.parseLong(data));
- return entryID.toByteString();
- }
-
- /**
- * Get the counter value for the specified key
- * @param txn storage transaction
- * @param entryID The entryID identifying to the counter
- * @return Value of the counter. 0 if no counter is associated yet.
- */
- long getCount(ReadableTransaction txn, EntryID entryID)
- {
- long counterValue = 0;
- try(final Cursor<EntryID, Long> cursor = openCursor(txn)) {
- cursor.positionToKeyOrNext(getKeyFromEntryID(entryID));
- while (cursor.isDefined() && cursor.getKey().equals(entryID))
- {
- counterValue += cursor.getValue();
- cursor.next();
- }
- }
- return counterValue;
- }
-
- private static ByteSequence getKeyFromEntryID(EntryID entryID) {
- return new ByteStringBuilder(LONG_SIZE).appendCompactUnsigned(entryID.longValue());
- }
-
- private static ByteSequence getKeyFromEntryIDAndBucket(EntryID entryID, long bucket) {
- return new ByteStringBuilder(LONG_SIZE + LONG_SIZE).appendCompactUnsigned(entryID.longValue())
- .appendCompactUnsigned(bucket);
- }
-
- /**
- * Get the total counter value. The total counter maintain the sum of all
- * the counter contained in this tree.
- * @param txn storage transaction
- * @return Sum of all the counter contained in this tree
- */
- long getTotalCount(ReadableTransaction txn)
- {
- return getCount(txn, TOTAL_COUNT_ENTRY_ID);
- }
-
- /**
- * Removes the counter associated to the given key, but does not update the total count.
- * @param txn storage transaction
- * @param entryID The entryID identifying the counter
- * @return Value of the counter before it's deletion.
- */
- long removeCount(final WriteableTransaction txn, final EntryID entryID) {
- long counterValue = 0;
- try (final Cursor<ByteString, ByteString> cursor = txn.openCursor(getName())) {
- final ByteSequence encodedEntryID = getKeyFromEntryID(entryID);
- if (cursor.positionToKeyOrNext(encodedEntryID)) {
- // Iterate over and remove all the thread local shards
- while (cursor.isDefined() && cursor.getKey().startsWith(encodedEntryID)) {
- counterValue += cursor.getValue().asReader().readLong();
- cursor.delete();
- cursor.next();
- }
- }
- }
- return counterValue;
- }
-}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
index 58c68bc..d18c5a1 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Entry.java
@@ -248,7 +248,7 @@
}
@Override
- void open0(WriteableTransaction txn) throws StorageRuntimeException
+ void afterOpen(WriteableTransaction txn) throws StorageRuntimeException
{
// Make sure the tree is there and readable, even if the storage is READ_ONLY.
// Would be nice if there were a better way...
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
index c4682a3..9469591 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/IndexBuffer.java
@@ -55,6 +55,8 @@
{
void flush(WriteableTransaction txn) throws StorageRuntimeException, DirectoryException;
+ void writeTrustState(WriteableTransaction txn) throws StorageRuntimeException;
+
void put(Index index, ByteString key, EntryID entryID);
void put(VLVIndex index, ByteString sortKey);
@@ -62,6 +64,8 @@
void remove(VLVIndex index, ByteString sortKey);
void remove(Index index, ByteString key, EntryID entryID);
+
+ void reset();
}
/**
@@ -214,6 +218,21 @@
}
@Override
+ public void writeTrustState(WriteableTransaction txn)
+ {
+ // Indexes cache the index trust flag. Ensure that the cached value is written into the db.
+ for (Index index : bufferedIndexes.keySet())
+ {
+ index.setTrusted(txn, index.isTrusted());
+ }
+
+ for (VLVIndex index : bufferedVLVIndexes.keySet())
+ {
+ index.setTrusted(txn, index.isTrusted());
+ }
+ }
+
+ @Override
public void put(Index index, ByteString key, EntryID entryID)
{
createOrGetBufferedIndexValues(index, key).addEntryID(entryID);
@@ -246,6 +265,13 @@
index.update(txn, entry.getKey(), values.deletedEntryIDs, values.addedEntryIDs);
}
}
+
+ @Override
+ public void reset()
+ {
+ bufferedIndexes.clear();
+ bufferedVLVIndexes.clear();
+ }
}
/**
@@ -285,6 +311,12 @@
}
@Override
+ public void writeTrustState(WriteableTransaction txn)
+ {
+ // Nothing to do
+ }
+
+ @Override
public void remove(VLVIndex index, ByteString sortKey)
{
throw new UnsupportedOperationException();
@@ -295,6 +327,12 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public void reset()
+ {
+ throw new UnsupportedOperationException();
+ }
}
private final IndexBufferImplementor impl;
@@ -329,6 +367,19 @@
impl.flush(txn);
}
+ /**
+ * Indexes might cache their trust state. This ensure that the cached state is persisted into the database.
+ *
+ * @param txn
+ * a non null transaction
+ * @throws StorageRuntimeException
+ * If an error occurs in the storage.
+ */
+ void writeTrustState(WriteableTransaction txn)
+ {
+ impl.writeTrustState(txn);
+ }
+
void put(Index index, ByteString key, EntryID entryID)
{
impl.put(index, key, entryID);
@@ -348,4 +399,9 @@
{
impl.remove(index, key, entryID);
}
+
+ void reset()
+ {
+ impl.reset();
+ }
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
index 875b0b1..76ff156 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/OnDiskMergeImporter.java
@@ -979,7 +979,7 @@
PhaseTwoProgressReporter progressReporter, boolean dn2idAlreadyImported)
{
final EntryContainer entryContainer = entryContainers.get(treeName.getBaseDN());
- final ID2Count id2count = entryContainer.getID2ChildrenCount();
+ final ID2ChildrenCount id2count = entryContainer.getID2ChildrenCount();
return new DN2IDImporterTask(progressReporter, importer, tempDir, bufferPool, entryContainer.getDN2ID(), chunk,
id2count, newCollector(entryContainer, id2count.getName()), dn2idAlreadyImported);
@@ -2207,7 +2207,7 @@
/**
* This task optionally copy the dn2id chunk into the database and takes advantages of it's cursoring to compute the
- * {@link ID2Count} index.
+ * {@link ID2ChildrenCount} index.
*/
private static final class DN2IDImporterTask implements Callable<Void>
{
@@ -2216,13 +2216,13 @@
private final File tempDir;
private final BufferPool bufferPool;
private final DN2ID dn2id;
- private final ID2Count id2count;
+ private final ID2ChildrenCount id2count;
private final Collector<?, ByteString> id2countCollector;
private final Chunk dn2IdSourceChunk;
private final Chunk dn2IdDestination;
DN2IDImporterTask(PhaseTwoProgressReporter progressReporter, Importer importer, File tempDir, BufferPool bufferPool,
- DN2ID dn2id, Chunk dn2IdChunk, ID2Count id2count, Collector<?, ByteString> id2countCollector,
+ DN2ID dn2id, Chunk dn2IdChunk, ID2ChildrenCount id2count, Collector<?, ByteString> id2countCollector,
boolean dn2idAlreadyImported)
{
this.reporter = progressReporter;
@@ -3037,7 +3037,7 @@
else if (isID2ChildrenCount(treeName))
{
// key conflicts == sum values
- return new AddLongCollector(entryContainer.getID2ChildrenCount());
+ return ID2ChildrenCount.getSumLongCollectorInstance();
}
else if (isDN2ID(treeName) || isDN2URI(treeName) || isVLVIndex(entryContainer, treeName))
{
@@ -3244,38 +3244,6 @@
}
}
- /**
- * {@link Collector} that accepts {@code long} values encoded into {@link ByteString} objects and produces a
- * {@link ByteString} representing the sum of the supplied {@code long}s.
- */
- static final class AddLongCollector implements Collector<Long, ByteString>
- {
- private final ID2Count id2count;
-
- AddLongCollector(ID2Count id2count)
- {
- this.id2count = id2count;
- }
-
- @Override
- public Long get()
- {
- return 0L;
- }
-
- @Override
- public Long accept(Long resultContainer, ByteString value)
- {
- return resultContainer + id2count.fromValue(value);
- }
-
- @Override
- public ByteString merge(Long resultContainer)
- {
- return id2count.toValue(resultContainer);
- }
- }
-
private static MeteredCursor<ByteString, ByteString> asProgressCursor(
SequentialCursor<ByteString, ByteString> delegate, String metricName, long totalSize)
{
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ShardedCounter.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ShardedCounter.java
new file mode 100644
index 0000000..534ebab
--- /dev/null
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ShardedCounter.java
@@ -0,0 +1,299 @@
+/*
+ * CDDL HEADER START
+ *
+ * The contents of this file are subject to the terms of the
+ * Common Development and Distribution License, Version 1.0 only
+ * (the "License"). You may not use this file except in compliance
+ * with the License.
+ *
+ * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt
+ * or http://forgerock.org/license/CDDLv1.0.html.
+ * See the License for the specific language governing permissions
+ * and limitations under the License.
+ *
+ * When distributing Covered Code, include this CDDL HEADER in each
+ * file and include the License file at legal-notices/CDDLv1_0.txt.
+ * If applicable, add the following below this CDDL HEADER, with the
+ * fields enclosed by brackets "[]" replaced with your own identifying
+ * information:
+ * Portions Copyright [yyyy] [name of copyright owner]
+ *
+ * CDDL HEADER END
+ *
+ * Copyright 2015 ForgeRock AS
+ */
+package org.opends.server.backends.pluggable;
+
+import static org.opends.server.backends.pluggable.CursorTransformer.*;
+
+import java.util.NoSuchElementException;
+
+import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.ByteString;
+import org.forgerock.opendj.ldap.ByteStringBuilder;
+import org.forgerock.util.Function;
+import org.forgerock.util.promise.NeverThrowsException;
+import org.opends.server.backends.pluggable.CursorTransformer.ValueTransformer;
+import org.opends.server.backends.pluggable.OnDiskMergeImporter.SequentialCursorDecorator;
+import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
+import org.opends.server.backends.pluggable.spi.ReadableTransaction;
+import org.opends.server.backends.pluggable.spi.SequentialCursor;
+import org.opends.server.backends.pluggable.spi.TreeName;
+import org.opends.server.backends.pluggable.spi.UpdateFunction;
+import org.opends.server.backends.pluggable.spi.WriteableTransaction;
+
+/**
+ * Store counters associated to a key. Counter value is sharded amongst multiple keys to allow concurrent update without
+ * contention (at the price of a slower read).
+ */
+final class ShardedCounter extends AbstractTree
+{
+ /**
+ * Must be a power of 2
+ * @see <a href="http://en.wikipedia.org/wiki/Modulo_operation#Performance_issues">Performance issues</a>
+ */
+ private static final long SHARD_COUNT = 256;
+
+ private static final ValueTransformer<ByteString, ByteString, Long, NeverThrowsException> TO_LONG =
+ new ValueTransformer<ByteString, ByteString, Long, NeverThrowsException>()
+ {
+ @Override
+ public Long transform(ByteString key, ByteString value)
+ {
+ return decodeValue(value);
+ }
+ };
+
+ private static final Function<ByteString, ByteString, NeverThrowsException> TO_KEY =
+ new Function<ByteString, ByteString, NeverThrowsException>()
+ {
+ @Override
+ public ByteString apply(ByteString shardedKey)
+ {
+ // -1 to remove the shard id.
+ return shardedKey.subSequence(0, shardedKey.length() - 1);
+ }
+ };
+
+ ShardedCounter(TreeName name)
+ {
+ super(name);
+ }
+
+ SequentialCursor<ByteString, Void> openCursor(ReadableTransaction txn)
+ {
+ return new UniqueKeysCursor<>(transformKeysAndValues(
+ txn.openCursor(getName()), TO_KEY,
+ CursorTransformer.<ByteString, ByteString, Void> constant(null)));
+ }
+
+ private Cursor<ByteString, Long> openCursor0(ReadableTransaction txn)
+ {
+ return transformKeysAndValues(txn.openCursor(getName()), TO_KEY, TO_LONG);
+ }
+
+ void addCount(final WriteableTransaction txn, ByteSequence key, final long delta)
+ {
+ txn.update(getName(), getShardedKey(key), new UpdateFunction()
+ {
+ @Override
+ public ByteSequence computeNewValue(ByteSequence oldValue)
+ {
+ final long currentValue = oldValue == null ? 0 : decodeValue(oldValue.toByteString());
+ return encodeValue(currentValue + delta);
+ }
+ });
+ }
+
+ void importPut(Importer importer, ByteSequence key, long delta)
+ {
+ if (delta != 0)
+ {
+ importer.put(getName(), getShardedKey(key), encodeValue(delta));
+ }
+ }
+
+ long getCount(final ReadableTransaction txn, ByteSequence key)
+ {
+ long counterValue = 0;
+ try (final SequentialCursor<ByteString, Long> cursor = new ShardCursor(openCursor0(txn), key))
+ {
+ while (cursor.next())
+ {
+ counterValue += cursor.getValue();
+ }
+ }
+ return counterValue;
+ }
+
+ long removeCount(final WriteableTransaction txn, ByteSequence key)
+ {
+ long counterValue = 0;
+ try (final SequentialCursor<ByteString, Long> cursor = new ShardCursor(openCursor0(txn), key))
+ {
+ // Iterate over and remove all the thread local shards
+ while (cursor.next())
+ {
+ counterValue += cursor.getValue();
+ cursor.delete();
+ }
+ }
+ return counterValue;
+ }
+
+ static long decodeValue(ByteString value)
+ {
+ switch (value.length())
+ {
+ case 1:
+ return value.byteAt(0);
+ case (Integer.SIZE / Byte.SIZE):
+ return value.toInt();
+ case (Long.SIZE / Byte.SIZE):
+ return value.toLong();
+ default:
+ throw new IllegalArgumentException("Unsupported sharded-counter value format.");
+ }
+ }
+
+ static ByteString encodeValue(long value)
+ {
+ final byte valueAsByte = (byte) value;
+ if (valueAsByte == value)
+ {
+ return ByteString.wrap(new byte[] { valueAsByte });
+ }
+ final int valueAsInt = (int) value;
+ if (valueAsInt == value)
+ {
+ return ByteString.valueOfInt(valueAsInt);
+ }
+ return ByteString.valueOfLong(value);
+ }
+
+ @Override
+ public String valueToString(ByteString value)
+ {
+ return String.valueOf(decodeValue(value));
+ }
+
+ private static ByteSequence getShardedKey(ByteSequence key)
+ {
+ final byte bucket = (byte) (Thread.currentThread().getId() & (SHARD_COUNT - 1));
+ return new ByteStringBuilder(key.length() + ByteStringBuilder.MAX_COMPACT_SIZE).appendBytes(key).appendByte(bucket);
+ }
+
+ /** Restricts a cursor to the shards of a specific key. */
+ private final class ShardCursor extends SequentialCursorDecorator<Cursor<ByteString, Long>, ByteString, Long>
+ {
+ private final ByteSequence targetKey;
+ private boolean initialized;
+
+ ShardCursor(Cursor<ByteString, Long> delegate, ByteSequence targetKey)
+ {
+ super(delegate);
+ this.targetKey = targetKey;
+ }
+
+ @Override
+ public boolean next()
+ {
+ if (!initialized)
+ {
+ initialized = true;
+ return delegate.positionToKeyOrNext(targetKey) && isOnTargetKey();
+ }
+ return delegate.next() && isOnTargetKey();
+ }
+
+ private boolean isOnTargetKey()
+ {
+ return targetKey.equals(delegate.getKey());
+ }
+ }
+
+ /**
+ * Cursor that returns unique keys and null values. Ensure that {@link #getKey()} will return a different key after
+ * each {@link #next()}.
+ */
+ static final class UniqueKeysCursor<K> implements SequentialCursor<K, Void>
+ {
+ private final Cursor<K, ?> delegate;
+ private boolean isDefined;
+ private K key;
+
+ UniqueKeysCursor(Cursor<K, ?> cursor)
+ {
+ this.delegate = cursor;
+ if (!delegate.isDefined())
+ {
+ delegate.next();
+ }
+ }
+
+ @Override
+ public boolean next()
+ {
+ isDefined = delegate.isDefined();
+ if (isDefined)
+ {
+ key = delegate.getKey();
+ skipEntriesWithSameKey();
+ }
+ return isDefined;
+ }
+
+ private void skipEntriesWithSameKey()
+ {
+ throwIfUndefined(this);
+ while (delegate.next() && key.equals(delegate.getKey()))
+ {
+ // Skip all entries having the same key.
+ }
+ // Delegate is one step beyond. When delegate.isDefined() return false, we have to return true once more.
+ isDefined = true;
+ }
+
+ @Override
+ public boolean isDefined()
+ {
+ return isDefined;
+ }
+
+ @Override
+ public K getKey() throws NoSuchElementException
+ {
+ throwIfUndefined(this);
+ return key;
+ }
+
+ @Override
+ public Void getValue() throws NoSuchElementException
+ {
+ throwIfUndefined(this);
+ return null;
+ }
+
+ @Override
+ public void delete() throws NoSuchElementException, UnsupportedOperationException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close()
+ {
+ key = null;
+ delegate.close();
+ }
+
+ private static void throwIfUndefined(SequentialCursor<?, ?> cursor)
+ {
+ if (!cursor.isDefined())
+ {
+ throw new NoSuchElementException();
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
index f133792..a1ff2a7 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VLVIndex.java
@@ -37,7 +37,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -86,16 +85,17 @@
* "tie-breaker" and ensures that keys correspond to one and only one entry. This ensures that all
* tree updates can be performed using lock-free operations.
*/
-@SuppressWarnings("javadoc")
class VLVIndex extends AbstractTree implements ConfigurationChangeListener<BackendVLVIndexCfg>, Closeable
{
+ private static final ByteString COUNT_KEY = ByteString.valueOfUtf8("nbRecords");
+
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
/** The VLV vlvIndex configuration. */
private BackendVLVIndexCfg config;
- /** The cached count of entries in this index. */
- private final AtomicInteger count = new AtomicInteger(0);
+ /** The count of entries in this index. */
+ private final ShardedCounter counter;
private DN baseDN;
private SearchScope scope;
@@ -116,7 +116,7 @@
ConfigException
{
super(new TreeName(entryContainer.getTreePrefix(), "vlv." + config.getName()));
-
+ this.counter = new ShardedCounter(new TreeName(entryContainer.getTreePrefix(), "counter.vlv." + config.getName()));
this.config = config;
this.baseDN = config.getBaseDN();
this.scope = convertScope(config.getScope());
@@ -163,9 +163,15 @@
}
@Override
- void open0(final WriteableTransaction txn) throws StorageRuntimeException
+ void afterOpen(final WriteableTransaction txn) throws StorageRuntimeException
{
- count.set((int) txn.getRecordCount(getName()));
+ counter.open(txn, true);
+ }
+
+ @Override
+ void beforeDelete(WriteableTransaction txn) throws StorageRuntimeException
+ {
+ counter.delete(txn);
}
@Override
@@ -438,13 +444,13 @@
{
txn.put(getName(), nextAddedKey, toValue());
nextAddedKey = nextOrNull(ai);
- count.incrementAndGet();
+ counter.addCount(txn, COUNT_KEY, 1);
}
else
{
txn.delete(getName(), nextDeletedKey);
nextDeletedKey = nextOrNull(di);
- count.decrementAndGet();
+ counter.addCount(txn, COUNT_KEY, -1);
}
}
}
@@ -494,11 +500,16 @@
{
try (Cursor<ByteString, ByteString> cursor = txn.openCursor(getName()))
{
- final long[] selectedIDs = readRange(cursor, count.get(), debugBuilder);
+ final long[] selectedIDs = readRange(cursor, getEntryCount(txn), debugBuilder);
return newDefinedSet(selectedIDs);
}
}
+ private int getEntryCount(final ReadableTransaction txn)
+ {
+ return (int) counter.getCount(txn, COUNT_KEY);
+ }
+
/**
* Reads a page of entries from the VLV which includes the nearest entry corresponding to the VLV
* assertion, {@code beforeCount} entries leading up to the nearest entry, and {@code afterCount}
@@ -508,7 +519,7 @@
final SearchOperation searchOperation, final VLVRequestControl vlvRequest)
throws DirectoryException
{
- final int currentCount = count.get();
+ final int currentCount = getEntryCount(txn);
final int beforeCount = vlvRequest.getBeforeCount();
final int afterCount = vlvRequest.getAfterCount();
final ByteString assertion = vlvRequest.getGreaterThanOrEqualAssertion();
@@ -611,7 +622,7 @@
private EntryIDSet evaluateVLVRequestByOffset(final ReadableTransaction txn, final SearchOperation searchOperation,
final VLVRequestControl vlvRequest, final StringBuilder debugBuilder) throws DirectoryException
{
- final int currentCount = count.get();
+ final int currentCount = getEntryCount(txn);
int beforeCount = vlvRequest.getBeforeCount();
int afterCount = vlvRequest.getAfterCount();
int targetOffset = vlvRequest.getOffset();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VerifyJob.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VerifyJob.java
index 85dd687..9a25354 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VerifyJob.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/VerifyJob.java
@@ -57,6 +57,7 @@
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
+import org.opends.server.backends.pluggable.spi.SequentialCursor;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.core.DirectoryServer;
import org.opends.server.types.AttributeType;
@@ -104,7 +105,7 @@
/** The DN tree. */
private DN2ID dn2id;
/** The children tree. */
- private ID2Count id2childrenCount;
+ private ID2ChildrenCount id2childrenCount;
/** A list of the attribute indexes to be verified. */
private final ArrayList<AttributeIndex> attrIndexList = new ArrayList<>();
@@ -538,21 +539,16 @@
private void iterateID2ChildrenCount(ReadableTransaction txn) throws StorageRuntimeException
{
- Cursor<EntryID, Long> cursor = id2childrenCount.openCursor(txn);
- if (!cursor.next()) {
- return;
- }
-
- EntryID currentEntryID = new EntryID(-1);
- while(cursor.next()) {
- if (cursor.getKey().equals(currentEntryID)) {
- // Sharded cursor may return the same EntryID multiple times
- continue;
- }
- currentEntryID = cursor.getKey();
- if (!id2entry.containsEntryID(txn, currentEntryID)) {
- logger.trace("File id2ChildrenCount reference non-existing EntryID <%d>%n", currentEntryID);
- errorCount++;
+ try (final SequentialCursor<EntryID, Void> cursor = id2childrenCount.openCursor(txn))
+ {
+ while (cursor.next())
+ {
+ final EntryID entryID = cursor.getKey();
+ if (!id2entry.containsEntryID(txn, entryID))
+ {
+ logger.trace("File id2ChildrenCount references non-existing EntryID <%d>%n", entryID);
+ errorCount++;
+ }
}
}
}
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Cursor.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Cursor.java
index 87455d0..8084b92 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Cursor.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Cursor.java
@@ -50,8 +50,8 @@
*
* @param key
* the key where to position the cursor
- * @return {@code true} if the cursor could be positioned to the key,
- * {@code false} otherwise
+ * @return {@code true} if the cursor could be positioned to the key
+ * or the next one, {@code false} otherwise
*/
boolean positionToKeyOrNext(ByteSequence key);
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/WriteOperation.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/WriteOperation.java
index ed5ae15..5ce0487 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/WriteOperation.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/WriteOperation.java
@@ -32,7 +32,8 @@
public interface WriteOperation
{
/**
- * Executes a write operation.
+ * Executes a write operation. Implementation must be idempotent since
+ * operation might be retried (for example in case of optimistic locking failure).
*
* @param txn
* the write transaction where to execute the write operation
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2CountTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2ChildrenCountTest.java
similarity index 93%
rename from opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2CountTest.java
rename to opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2ChildrenCountTest.java
index 68419de..2728e06 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2CountTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/ID2ChildrenCountTest.java
@@ -57,18 +57,17 @@
import org.opends.server.extensions.DiskSpaceMonitor;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = { "precommit", "pluggablebackend" }, sequential = true)
-public class ID2CountTest extends DirectoryServerTestCase
+public class ID2ChildrenCountTest extends DirectoryServerTestCase
{
private final TreeName id2CountTreeName = new TreeName("base-dn", "index-id");
private ExecutorService parallelExecutor;
- private ID2Count id2Count;
+ private ID2ChildrenCount id2ChildrenCount;
private PDBStorage storage;
@BeforeClass
@@ -95,7 +94,7 @@
}
});
- id2Count = new ID2Count(id2CountTreeName);
+ id2ChildrenCount = new ID2ChildrenCount(id2CountTreeName);
parallelExecutor = Executors.newFixedThreadPool(32);
}
@@ -186,8 +185,8 @@
@Override
public void run(WriteableTransaction txn) throws Exception
{
- final long delta = id2Count.removeCount(txn, key);
- id2Count.updateTotalCount(txn, -delta);
+ final long delta = id2ChildrenCount.removeCount(txn, key);
+ id2ChildrenCount.updateTotalCount(txn, -delta);
l.handleResult(delta);
}
});
@@ -200,8 +199,8 @@
@Override
public void run(WriteableTransaction txn) throws Exception
{
- id2Count.updateCount(txn, key, delta);
- id2Count.updateTotalCount(txn, delta);
+ id2ChildrenCount.updateCount(txn, key, delta);
+ id2ChildrenCount.updateTotalCount(txn, delta);
}
});
}
@@ -212,7 +211,7 @@
@Override
public Long run(ReadableTransaction txn) throws Exception
{
- return id2Count.getCount(txn, key);
+ return id2ChildrenCount.getCount(txn, key);
}
});
}
@@ -223,7 +222,7 @@
@Override
public Long run(ReadableTransaction txn) throws Exception
{
- return id2Count.getTotalCount(txn);
+ return id2ChildrenCount.getTotalCount(txn);
}
});
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
index d4d00aa..27e1235 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/OnDiskMergeImporterTest.java
@@ -48,7 +48,6 @@
import org.mockito.Mockito;
import org.opends.server.DirectoryServerTestCase;
import org.opends.server.TestCaseUtils;
-import org.opends.server.backends.pluggable.OnDiskMergeImporter.AddLongCollector;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.BufferPool;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Chunk;
import org.opends.server.backends.pluggable.OnDiskMergeImporter.Collector;
@@ -125,22 +124,22 @@
@Test
@SuppressWarnings(value = { "unchecked", "resource" })
- public void testAddLongCollector()
+ public void testCounterCollector()
{
- final ID2Count id2count = new ID2Count(TreeName.valueOf("/dummy/dummy"));
final MeteredCursor<String, ByteString> source = cursorOf(
- Pair.of("key1", id2count.toValue(10)),
- Pair.of("key1", id2count.toValue(20)),
- Pair.of("key2", id2count.toValue(5)),
- Pair.of("key3", id2count.toValue(6)),
- Pair.of("key3", id2count.toValue(4)));
+ Pair.of("key1", ShardedCounter.encodeValue(10)),
+ Pair.of("key1", ShardedCounter.encodeValue(20)),
+ Pair.of("key2", ShardedCounter.encodeValue(5)),
+ Pair.of("key3", ShardedCounter.encodeValue(6)),
+ Pair.of("key3", ShardedCounter.encodeValue(4)));
final SequentialCursor<String, ByteString> expected = cursorOf(
- Pair.of("key1", id2count.toValue(30)),
- Pair.of("key2", id2count.toValue(5)),
- Pair.of("key3", id2count.toValue(10)));
+ Pair.of("key1", ShardedCounter.encodeValue(30)),
+ Pair.of("key2", ShardedCounter.encodeValue(5)),
+ Pair.of("key3", ShardedCounter.encodeValue(10)));
- final SequentialCursor<String, ByteString> result = new CollectorCursor<>(source, new AddLongCollector(id2count));
+ final SequentialCursor<String, ByteString> result =
+ new CollectorCursor<>(source, ID2ChildrenCount.getSumLongCollectorInstance());
assertThat(toPairs(result)).containsExactlyElementsOf(toPairs(expected));
}
diff --git a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
index ddbbaaf..588bc05 100644
--- a/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
+++ b/opendj-server-legacy/src/test/java/org/opends/server/backends/pluggable/PluggableBackendImplTestCase.java
@@ -1090,7 +1090,7 @@
{
final Storage storage = backend.getRootContainer().getStorage();
final DN2ID dn2ID = backend.getRootContainer().getEntryContainer(testBaseDN).getDN2ID();
- final ID2Count id2ChildrenCount = backend.getRootContainer().getEntryContainer(testBaseDN).getID2ChildrenCount();
+ final ID2ChildrenCount id2ChildrenCount = backend.getRootContainer().getEntryContainer(testBaseDN).getID2ChildrenCount();
final VerifyConfig config = new VerifyConfig();
config.setBaseDN(DN.valueOf("dc=test,dc=com"));
--
Gitblit v1.10.0