From 3df9a0cef805b4f35fd6e4a7d6dc18701b72762e Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Fri, 10 Apr 2015 09:07:55 +0000
Subject: [PATCH] OPENDJ-1893 (CR-6600) Do not hardcode Importer threadCount to 1

---
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  143 +++++++++++++++++++++++++++++++----------------
 1 files changed, 93 insertions(+), 50 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 0378630..41bec03 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -79,6 +79,7 @@
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -103,6 +104,7 @@
 import org.opends.server.backends.persistit.PersistItStorage;
 import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
 import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.ReadOperation;
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.Storage;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -340,7 +342,6 @@
     {
       this.threadCount = importConfiguration.getThreadCount();
     }
-    this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
 
     // Determine the number of indexes.
     this.indexCount = getTotalIndexCount(backendCfg);
@@ -2485,11 +2486,10 @@
         {
           if (previousRecord != null)
           {
-            // save the previous record
             bufferLen += writeRecord(previousRecord);
             resetStreams();
           }
-          // this is a new record, reinitialize all
+          // this is a new record
           previousRecord = b.currentRecord();
         }
 
@@ -3802,7 +3802,6 @@
     private static final String DB_NAME = "dn_cache";
     private final TreeName dnCache = new TreeName("", DB_NAME);
     private final Storage storage;
-    private final WriteableTransaction txn;
 
     /**
      * Create a temporary DB environment and database to be used as a cache of
@@ -3830,8 +3829,14 @@
         storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
             DirectoryServer.getInstance().getServerContext());
         storage.open();
-        txn = storage.getWriteableTransaction();
-        txn.openTree(dnCache);
+        storage.write(new WriteOperation()
+        {
+          @Override
+          public void run(WriteableTransaction txn) throws Exception
+          {
+            txn.openTree(dnCache);
+          }
+        });
       }
       catch (Exception e)
       {
@@ -3851,7 +3856,7 @@
     private static final long FNV_PRIME = 0x100000001b3L;
 
     /** Hash the DN bytes. Uses the FNV-1a hash. */
-    private ByteString hashCode(ByteString b)
+    private ByteString fnv1AHashCode(ByteString b)
     {
       long hash = FNV_INIT;
       for (int i = 0; i < b.length(); i++)
@@ -3897,53 +3902,73 @@
     {
       // Use a compact representation for key
       // and a reversible representation for value
-      final ByteString key = hashCode(dn.toNormalizedByteString());
+      final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
       final ByteStringBuilder dnValue = new ByteStringBuilder().append(dn.toString());
 
       return insert(key, dnValue);
     }
 
-    private boolean insert(ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
+    private boolean insert(final ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
     {
-      return txn.update(dnCache, key, new UpdateFunction()
+      final AtomicBoolean updateResult = new AtomicBoolean();
+      try
       {
-        @Override
-        public ByteSequence computeNewValue(ByteSequence existingDns)
+        storage.write(new WriteOperation()
         {
-          if (containsDN(existingDns, dn))
+          @Override
+          public void run(WriteableTransaction txn) throws Exception
           {
-            // no change
-            return existingDns;
-          }
-          else if (existingDns != null)
-          {
-            return addDN(existingDns, dn);
-          }
-          else
-          {
-            return singletonList(dn);
-          }
-        }
+            updateResult.set(txn.update(dnCache, key, new UpdateFunction()
+            {
+              @Override
+              public ByteSequence computeNewValue(ByteSequence existingDns)
+              {
+                if (containsDN(existingDns, dn))
+                {
+                  // no change
+                  return existingDns;
+                }
+                else if (existingDns != null)
+                {
+                  return addDN(existingDns, dn);
+                }
+                else
+                {
+                  return singletonList(dn);
+                }
+              }
 
-        /** Add the DN to the DNs because of a hash collision. */
-        private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
-        {
-          final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
-          builder.append(dnList);
-          builder.append(dntoAdd.length());
-          builder.append(dntoAdd);
-          return builder;
-        }
+              /** Add the DN to the DNs because of a hash collision. */
+              private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
+              {
+                final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
+                builder.append(dnList);
+                builder.append(dntoAdd.length());
+                builder.append(dntoAdd);
+                return builder;
+              }
 
-        /** Create a list of dn made of one element. */
-        private ByteSequence singletonList(final ByteSequence dntoAdd)
-        {
-          final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
-          singleton.append(dntoAdd.length());
-          singleton.append(dntoAdd);
-          return singleton;
-        }
-      });
+              /** Create a list of dn made of one element. */
+              private ByteSequence singletonList(final ByteSequence dntoAdd)
+              {
+                final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
+                singleton.append(dntoAdd.length());
+                singleton.append(dntoAdd);
+                return singleton;
+              }
+            }));
+          }
+        });
+      }
+      catch (StorageRuntimeException e)
+      {
+        throw e;
+      }
+      catch (Exception e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+      return updateResult.get();
     }
 
     /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
@@ -3980,16 +4005,34 @@
      *         {@code false} if it is not.
      */
     @Override
-    public boolean contains(DN dn)
+    public boolean contains(final DN dn)
     {
-      final ByteString key = hashCode(dn.toNormalizedByteString());
-      final ByteString existingDns = txn.read(dnCache, key);
-      if (existingDns != null)
+      try
       {
-        final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
-        return containsDN(existingDns, dnBytes);
+        return storage.read(new ReadOperation<Boolean>()
+        {
+          @Override
+          public Boolean run(ReadableTransaction txn) throws Exception
+          {
+            final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
+            final ByteString existingDns = txn.read(dnCache, key);
+            if (existingDns != null)
+            {
+              final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
+              return containsDN(existingDns, dnBytes);
+            }
+            return false;
+          }
+        });
       }
-      return false;
+      catch (StorageRuntimeException e)
+      {
+        throw e;
+      }
+      catch (Exception e)
+      {
+        throw new StorageRuntimeException(e);
+      }
     }
   }
 

--
Gitblit v1.10.0