From 3df9a0cef805b4f35fd6e4a7d6dc18701b72762e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 10 Apr 2015 09:07:55 +0000
Subject: [PATCH] OPENDJ-1893 (CR-6600) Do not hardcode Importer threadCount to 1
---
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 143 +++++++++++++++++++++++++++++++----------------
1 files changed, 93 insertions(+), 50 deletions(-)
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 0378630..41bec03 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -79,6 +79,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -103,6 +104,7 @@
import org.opends.server.backends.persistit.PersistItStorage;
import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -340,7 +342,6 @@
{
this.threadCount = importConfiguration.getThreadCount();
}
- this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
// Determine the number of indexes.
this.indexCount = getTotalIndexCount(backendCfg);
@@ -2485,11 +2486,10 @@
{
if (previousRecord != null)
{
- // save the previous record
bufferLen += writeRecord(previousRecord);
resetStreams();
}
- // this is a new record, reinitialize all
+ // this is a new record
previousRecord = b.currentRecord();
}
@@ -3802,7 +3802,6 @@
private static final String DB_NAME = "dn_cache";
private final TreeName dnCache = new TreeName("", DB_NAME);
private final Storage storage;
- private final WriteableTransaction txn;
/**
* Create a temporary DB environment and database to be used as a cache of
@@ -3830,8 +3829,14 @@
storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
DirectoryServer.getInstance().getServerContext());
storage.open();
- txn = storage.getWriteableTransaction();
- txn.openTree(dnCache);
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ txn.openTree(dnCache);
+ }
+ });
}
catch (Exception e)
{
@@ -3851,7 +3856,7 @@
private static final long FNV_PRIME = 0x100000001b3L;
/** Hash the DN bytes. Uses the FNV-1a hash. */
- private ByteString hashCode(ByteString b)
+ private ByteString fnv1AHashCode(ByteString b)
{
long hash = FNV_INIT;
for (int i = 0; i < b.length(); i++)
@@ -3897,53 +3902,73 @@
{
// Use a compact representation for key
// and a reversible representation for value
- final ByteString key = hashCode(dn.toNormalizedByteString());
+ final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
final ByteStringBuilder dnValue = new ByteStringBuilder().append(dn.toString());
return insert(key, dnValue);
}
- private boolean insert(ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
+ private boolean insert(final ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
{
- return txn.update(dnCache, key, new UpdateFunction()
+ final AtomicBoolean updateResult = new AtomicBoolean();
+ try
{
- @Override
- public ByteSequence computeNewValue(ByteSequence existingDns)
+ storage.write(new WriteOperation()
{
- if (containsDN(existingDns, dn))
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
{
- // no change
- return existingDns;
- }
- else if (existingDns != null)
- {
- return addDN(existingDns, dn);
- }
- else
- {
- return singletonList(dn);
- }
- }
+ updateResult.set(txn.update(dnCache, key, new UpdateFunction()
+ {
+ @Override
+ public ByteSequence computeNewValue(ByteSequence existingDns)
+ {
+ if (containsDN(existingDns, dn))
+ {
+ // no change
+ return existingDns;
+ }
+ else if (existingDns != null)
+ {
+ return addDN(existingDns, dn);
+ }
+ else
+ {
+ return singletonList(dn);
+ }
+ }
- /** Add the DN to the DNs because of a hash collision. */
- private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
- {
- final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
- builder.append(dnList);
- builder.append(dntoAdd.length());
- builder.append(dntoAdd);
- return builder;
- }
+ /** Add the DN to the DNs because of a hash collision. */
+ private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
+ {
+ final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
+ builder.append(dnList);
+ builder.append(dntoAdd.length());
+ builder.append(dntoAdd);
+ return builder;
+ }
- /** Create a list of dn made of one element. */
- private ByteSequence singletonList(final ByteSequence dntoAdd)
- {
- final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
- singleton.append(dntoAdd.length());
- singleton.append(dntoAdd);
- return singleton;
- }
- });
+ /** Create a list of dn made of one element. */
+ private ByteSequence singletonList(final ByteSequence dntoAdd)
+ {
+ final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
+ singleton.append(dntoAdd.length());
+ singleton.append(dntoAdd);
+ return singleton;
+ }
+ }));
+ }
+ });
+ }
+ catch (StorageRuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new StorageRuntimeException(e);
+ }
+ return updateResult.get();
}
/** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
@@ -3980,16 +4005,34 @@
* {@code false} if it is not.
*/
@Override
- public boolean contains(DN dn)
+ public boolean contains(final DN dn)
{
- final ByteString key = hashCode(dn.toNormalizedByteString());
- final ByteString existingDns = txn.read(dnCache, key);
- if (existingDns != null)
+ try
{
- final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
- return containsDN(existingDns, dnBytes);
+ return storage.read(new ReadOperation<Boolean>()
+ {
+ @Override
+ public Boolean run(ReadableTransaction txn) throws Exception
+ {
+ final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
+ final ByteString existingDns = txn.read(dnCache, key);
+ if (existingDns != null)
+ {
+ final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
+ return containsDN(existingDns, dnBytes);
+ }
+ return false;
+ }
+ });
}
- return false;
+ catch (StorageRuntimeException e)
+ {
+ throw e;
+ }
+ catch (Exception e)
+ {
+ throw new StorageRuntimeException(e);
+ }
}
}
--
Gitblit v1.10.0