mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
10.07.2015 3df9a0cef805b4f35fd6e4a7d6dc18701b72762e
OPENDJ-1893 (CR-6600) Do not hardcode Importer threadCount to 1

Problem was due to multiple threads sharing the same Persistit exchanges. Generally speaking DBs do not like mixing transactions and threads.
By using transactions for each DB access, the problem disappears.
Are we losing any performance by using full fledged transactions? Could we somehow do these operations in a thread safe manner without the need to use transactions?

Importer.java:
Use transactions for read/write operations on DN cache.
1 files modified
143 ■■■■■ changed files
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 143 ●●●●● patch | view | raw | blame | history
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -79,6 +79,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -103,6 +104,7 @@
import org.opends.server.backends.persistit.PersistItStorage;
import org.opends.server.backends.pluggable.AttributeIndex.MatchingRuleIndex;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.ReadOperation;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.Storage;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
@@ -340,7 +342,6 @@
    {
      this.threadCount = importConfiguration.getThreadCount();
    }
    this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
    // Determine the number of indexes.
    this.indexCount = getTotalIndexCount(backendCfg);
@@ -2485,11 +2486,10 @@
        {
          if (previousRecord != null)
          {
            // save the previous record
            bufferLen += writeRecord(previousRecord);
            resetStreams();
          }
          // this is a new record, reinitialize all
          // this is a new record
          previousRecord = b.currentRecord();
        }
@@ -3802,7 +3802,6 @@
    private static final String DB_NAME = "dn_cache";
    private final TreeName dnCache = new TreeName("", DB_NAME);
    private final Storage storage;
    private final WriteableTransaction txn;
    /**
     * Create a temporary DB environment and database to be used as a cache of
@@ -3830,8 +3829,14 @@
        storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
            DirectoryServer.getInstance().getServerContext());
        storage.open();
        txn = storage.getWriteableTransaction();
        txn.openTree(dnCache);
        storage.write(new WriteOperation()
        {
          @Override
          public void run(WriteableTransaction txn) throws Exception
          {
            txn.openTree(dnCache);
          }
        });
      }
      catch (Exception e)
      {
@@ -3851,7 +3856,7 @@
    private static final long FNV_PRIME = 0x100000001b3L;
    /** Hash the DN bytes. Uses the FNV-1a hash. */
    private ByteString hashCode(ByteString b)
    private ByteString fnv1AHashCode(ByteString b)
    {
      long hash = FNV_INIT;
      for (int i = 0; i < b.length(); i++)
@@ -3897,53 +3902,73 @@
    {
      // Use a compact representation for key
      // and a reversible representation for value
      final ByteString key = hashCode(dn.toNormalizedByteString());
      final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
      final ByteStringBuilder dnValue = new ByteStringBuilder().append(dn.toString());
      return insert(key, dnValue);
    }
    private boolean insert(ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
    private boolean insert(final ByteString key, final ByteStringBuilder dn) throws StorageRuntimeException
    {
      return txn.update(dnCache, key, new UpdateFunction()
      final AtomicBoolean updateResult = new AtomicBoolean();
      try
      {
        @Override
        public ByteSequence computeNewValue(ByteSequence existingDns)
        storage.write(new WriteOperation()
        {
          if (containsDN(existingDns, dn))
          @Override
          public void run(WriteableTransaction txn) throws Exception
          {
            // no change
            return existingDns;
          }
          else if (existingDns != null)
          {
            return addDN(existingDns, dn);
          }
          else
          {
            return singletonList(dn);
          }
        }
            updateResult.set(txn.update(dnCache, key, new UpdateFunction()
            {
              @Override
              public ByteSequence computeNewValue(ByteSequence existingDns)
              {
                if (containsDN(existingDns, dn))
                {
                  // no change
                  return existingDns;
                }
                else if (existingDns != null)
                {
                  return addDN(existingDns, dn);
                }
                else
                {
                  return singletonList(dn);
                }
              }
        /** Add the DN to the DNs because of a hash collision. */
        private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
        {
          final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
          builder.append(dnList);
          builder.append(dntoAdd.length());
          builder.append(dntoAdd);
          return builder;
        }
              /** Add the DN to the DNs because of a hash collision. */
              private ByteSequence addDN(final ByteSequence dnList, final ByteSequence dntoAdd)
              {
                final ByteStringBuilder builder = new ByteStringBuilder(dnList.length() + INT_SIZE + dntoAdd.length());
                builder.append(dnList);
                builder.append(dntoAdd.length());
                builder.append(dntoAdd);
                return builder;
              }
        /** Create a list of dn made of one element. */
        private ByteSequence singletonList(final ByteSequence dntoAdd)
        {
          final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
          singleton.append(dntoAdd.length());
          singleton.append(dntoAdd);
          return singleton;
        }
      });
              /** Create a list of dn made of one element. */
              private ByteSequence singletonList(final ByteSequence dntoAdd)
              {
                final ByteStringBuilder singleton = new ByteStringBuilder(dntoAdd.length() + INT_SIZE);
                singleton.append(dntoAdd.length());
                singleton.append(dntoAdd);
                return singleton;
              }
            }));
          }
        });
      }
      catch (StorageRuntimeException e)
      {
        throw e;
      }
      catch (Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      return updateResult.get();
    }
    /** Return true if the specified DN is in the DNs saved as a result of hash collisions. */
@@ -3980,16 +4005,34 @@
     *         {@code false} if it is not.
     */
    @Override
    public boolean contains(DN dn)
    public boolean contains(final DN dn)
    {
      final ByteString key = hashCode(dn.toNormalizedByteString());
      final ByteString existingDns = txn.read(dnCache, key);
      if (existingDns != null)
      try
      {
        final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
        return containsDN(existingDns, dnBytes);
        return storage.read(new ReadOperation<Boolean>()
        {
          @Override
          public Boolean run(ReadableTransaction txn) throws Exception
          {
            final ByteString key = fnv1AHashCode(dn.toNormalizedByteString());
            final ByteString existingDns = txn.read(dnCache, key);
            if (existingDns != null)
            {
              final ByteStringBuilder dnBytes = new ByteStringBuilder().append(dn.toString());
              return containsDN(existingDns, dnBytes);
            }
            return false;
          }
        });
      }
      return false;
      catch (StorageRuntimeException e)
      {
        throw e;
      }
      catch (Exception e)
      {
        throw new StorageRuntimeException(e);
      }
    }
  }