mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
30.20.2015 5d07ec161328a94de355aa4bf93918a2da5a8602
OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()

This change brings back to life the usage of Storage.startImport() for the import.


pluggable/spi/Importer.java:
Added read() and delete().

Storage.java:
Changed exception thrown from startImport().

PersistItStorage.java, TracedStorage.java:
Implemented the changes in Storage and Importer.


Index.java, DefaultIndex.java, Importer.java:
Used Importer instead of WriteableTransaction.

Importer.java:
For import, used Importer instead of WriteableTransaction.
In processIndexFiles(), closed the storage before starting the import.
In IndexDBWriteTask.endWriteTask(), called DNState.finalFlush() instead of DNState.flush().
In DNState, added finalFlush().

ID2Count.java:
Added importPut(), importPutTotalCount() and importPut0() methods for import. It is sad but they duplicate a bit the WriteableTransaction ones.
8 files modified
418 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java 166 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java 21 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java 29 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 122 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java 5 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java 45 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java 23 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java 7 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -46,6 +46,8 @@
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedDeque;
import org.forgerock.i18n.LocalizableMessage;
import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -85,7 +87,6 @@
import com.persistit.Persistit;
import com.persistit.Transaction;
import com.persistit.Tree;
import com.persistit.TreeBuilder;
import com.persistit.Value;
import com.persistit.Volume;
import com.persistit.VolumeSpecification;
@@ -247,26 +248,31 @@
  /** PersistIt implementation of the {@link Importer} interface. */
  private final class ImporterImpl implements Importer
  {
    private final TreeBuilder importer = new TreeBuilder(db);
    private final Key importKey = new Key(db);
    private final Value importValue = new Value(db);
    private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>();
    private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
    private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
    {
      @Override
      protected Map<TreeName, Exchange> initialValue()
      {
        final Map<TreeName, Exchange> value = new HashMap<>();
        allExchanges.add(value);
        return value;
      }
    };
    @Override
    public void close()
    {
      try
      for (Map<TreeName, Exchange> map : allExchanges)
      {
        importer.merge();
        for (Exchange exchange : map.values())
        {
          db.releaseExchange(exchange);
        }
        map.clear();
      }
      catch (final Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      finally
      {
        PersistItStorage.this.close();
      }
      PersistItStorage.this.close();
    }
    @Override
@@ -288,16 +294,59 @@
    {
      try
      {
        final Tree tree = trees.get(treeName);
        importer.store(tree,
            bytesToKey(importKey, key),
            bytesToValue(importValue, value));
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        bytesToValue(ex.getValue(), value);
        ex.store();
      }
      catch (final Exception e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    @Override
    public boolean delete(final TreeName treeName, final ByteSequence key)
    {
      try
      {
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        return ex.remove();
      }
      catch (final PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    @Override
    public ByteString read(final TreeName treeName, final ByteSequence key)
    {
      try
      {
        final Exchange ex = getExchangeFromCache(treeName);
        bytesToKey(ex.getKey(), key);
        ex.fetch();
        return valueToBytes(ex.getValue());
      }
      catch (final PersistitException e)
      {
        throw new StorageRuntimeException(e);
      }
    }
    private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
    {
      Map<TreeName, Exchange> threadExchanges = exchanges.get();
      Exchange exchange = threadExchanges.get(treeName);
      if (exchange == null)
      {
        exchange = getNewExchange(treeName, false);
        threadExchanges.put(treeName, exchange);
      }
      return exchange;
    }
  }
  /** PersistIt implementation of the {@link WriteableTransaction} interface. */
@@ -306,8 +355,7 @@
    private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>();
    @Override
    public void put(final TreeName treeName, final ByteSequence key,
        final ByteSequence value)
    public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value)
    {
      try
      {
@@ -475,8 +523,7 @@
      return b1.equals(b2);
    }
    private Exchange getExchangeFromCache(final TreeName treeName)
        throws PersistitException
    private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
    {
      Exchange exchange = exchanges.get(treeName);
      if (exchange == null)
@@ -495,34 +542,18 @@
      }
      exchanges.clear();
    }
    private Exchange getNewExchange(final TreeName treeName, final boolean create)
        throws PersistitException
    {
      return db.getExchange(volume, mangleTreeName(treeName), create);
    }
  }
  private static void clearAndCreateDbDir(final File dbDir)
  private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException
  {
    if (dbDir.exists())
    {
      for (final File child : dbDir.listFiles())
      {
        child.delete();
      }
    }
    else
    {
      dbDir.mkdirs();
    }
    return db.getExchange(volume, mangleTreeName(treeName), create);
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private final ServerContext serverContext;
  private File backendDirectory;
  private Persistit db;
  private Volume volume;
  private Configuration dbCfg;
  private PersistitBackendCfg config;
  private DiskSpaceMonitor diskMonitor;
  private MemoryQuota memQuota;
@@ -540,30 +571,36 @@
  // FIXME: should be package private once importer is decoupled.
  public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException
  {
    this.serverContext = serverContext;
    backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId());
    config = cfg;
    dbCfg = new Configuration();
    cfg.addPersistitChangeListener(this);
  }
  private Configuration buildConfiguration()
  {
    final Configuration dbCfg = new Configuration();
    dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath());
    dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath());
    dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null,
        BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false)));
    final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg();
    final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg);
    bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE);
    diskMonitor = serverContext.getDiskSpaceMonitor();
    memQuota = serverContext.getMemoryQuota();
    if (cfg.getDBCacheSize() > 0)
    if (config.getDBCacheSize() > 0)
    {
      bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize());
      memQuota.acquireMemory(cfg.getDBCacheSize());
      bufferPoolCfg.setMaximumMemory(config.getDBCacheSize());
      memQuota.acquireMemory(config.getDBCacheSize());
    }
    else
    {
      bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f);
      memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent()));
      bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f);
      memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent()));
    }
    dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP);
    cfg.addPersistitChangeListener(this);
    dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP);
    return dbCfg;
  }
  /** {@inheritDoc} */
@@ -594,21 +631,31 @@
    diskMonitor.deregisterMonitoredDirectory(getDirectory(), this);
  }
  private BufferPoolConfiguration getBufferPoolCfg()
  private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg)
  {
    return dbCfg.getBufferPoolMap().get(BUFFER_SIZE);
  }
  /** {@inheritDoc} */
  @Override
  public void open() throws Exception
  public void open() throws ConfigException, StorageRuntimeException
  {
    open0(buildConfiguration());
  }
  private void open0(final Configuration dbCfg) throws ConfigException
  {
    setupStorageFiles();
    try
    {
      if (db != null)
      {
        throw new IllegalStateException(
            "Database is already open, either the backend is enabled or an import is currently running.");
      }
      db = new Persistit(dbCfg);
      final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap());
      final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap());
      final long totalSize = bufferCount * BUFFER_SIZE / 1024;
      logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(),
          bufferCount, BUFFER_SIZE, totalSize);
@@ -676,14 +723,13 @@
  /** {@inheritDoc} */
  @Override
  public Importer startImport() throws Exception
  public Importer startImport() throws ConfigException, StorageRuntimeException
  {
    clearAndCreateDbDir(backendDirectory);
    open();
    open0(buildConfiguration());
    return new ImporterImpl();
  }
  private String mangleTreeName(final TreeName treeName)
  private static String mangleTreeName(final TreeName treeName)
  {
    StringBuilder mangled = new StringBuilder();
    String name = treeName.toString();
@@ -879,19 +925,19 @@
   * TODO: it would be nice to use the low-level key/value APIs. They seem quite
   * inefficient at the moment for simple byte arrays.
   */
  private Key bytesToKey(final Key key, final ByteSequence bytes)
  private static Key bytesToKey(final Key key, final ByteSequence bytes)
  {
    final byte[] tmp = bytes.toByteArray();
    return key.clear().appendByteArray(tmp, 0, tmp.length);
  }
  private Value bytesToValue(final Value value, final ByteSequence bytes)
  private static Value bytesToValue(final Value value, final ByteSequence bytes)
  {
    value.clear().putByteArray(bytes.toByteArray());
    return value;
  }
  private ByteString valueToBytes(final Value value)
  private static ByteString valueToBytes(final Value value)
  {
    if (value.isDefined())
    {
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -42,6 +42,7 @@
import org.opends.server.backends.pluggable.EntryIDSet.EntryIDSetCodec;
import org.opends.server.backends.pluggable.State.IndexFlag;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
import org.opends.server.backends.pluggable.spi.TreeName;
@@ -124,32 +125,32 @@
  }
  @Override
  public final void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded) throws StorageRuntimeException
  public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException
  {
    Reject.ifNull(txn, "txn must not be null");
    Reject.ifNull(importer, "importer must not be null");
    Reject.ifNull(idsToBeAdded, "idsToBeAdded must not be null");
    ByteSequence key = idsToBeAdded.getKey();
    ByteString value = txn.read(getName(), key);
    ByteString value = importer.read(getName(), key);
    if (value != null)
    {
      final EntryIDSet entryIDSet = codec.decode(key, value);
      final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
      importIDSet.merge(idsToBeAdded);
      txn.put(getName(), key, importIDSet.valueToByteString(codec));
      importer.put(getName(), key, importIDSet.valueToByteString(codec));
    }
    else
    {
      txn.put(getName(), key, idsToBeAdded.valueToByteString(codec));
      importer.put(getName(), key, idsToBeAdded.valueToByteString(codec));
    }
  }
  @Override
  public final void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved) throws StorageRuntimeException
  public final void importRemove(Importer importer, ImportIDSet idsToBeRemoved) throws StorageRuntimeException
  {
    Reject.ifNull(txn, "txn must not be null");
    Reject.ifNull(importer, "importer must not be null");
    Reject.ifNull(idsToBeRemoved, "idsToBeRemoved must not be null");
    ByteSequence key = idsToBeRemoved.getKey();
    ByteString value = txn.read(getName(), key);
    ByteString value = importer.read(getName(), key);
    if (value == null)
    {
      // Should never happen -- the keys should always be there.
@@ -161,11 +162,11 @@
    importIDSet.remove(idsToBeRemoved);
    if (importIDSet.isDefined() && importIDSet.size() == 0)
    {
      txn.delete(getName(), key);
      importer.delete(getName(), key);
    }
    else
    {
      txn.put(getName(), key, importIDSet.valueToByteString(codec));
      importer.put(getName(), key, importIDSet.valueToByteString(codec));
    }
  }
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
@@ -31,6 +31,7 @@
import org.forgerock.util.promise.Function;
import org.forgerock.util.promise.NeverThrowsException;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.TreeName;
import org.opends.server.backends.pluggable.spi.UpdateFunction;
@@ -90,8 +91,7 @@
  private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta)
  {
    final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1));
    final ByteSequence shardedKey = getKeyFromEntryIDAndBucket(entryID, bucket);
    final ByteSequence shardedKey = getShardedKey(entryID);
    txn.update(getName(), shardedKey, new UpdateFunction()
    {
      @Override
@@ -103,6 +103,30 @@
    });
  }
  void importPut(Importer importer, EntryID entryID, long total)
  {
    Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
    importPut0(importer, entryID, total);
  }
  void importPutTotalCount(Importer importer, long total)
  {
    importPut0(importer, TOTAL_COUNT_ENTRY_ID, total);
  }
  private void importPut0(Importer importer, EntryID entryID, final long delta)
  {
    Reject.ifNull(importer, "importer must not be null");
    final ByteSequence shardedKey = getShardedKey(entryID);
    importer.put(getName(), shardedKey, ByteString.valueOf(delta));
  }
  private ByteSequence getShardedKey(EntryID entryID)
  {
    final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1));
    return getKeyFromEntryIDAndBucket(entryID, bucket);
  }
  /**
   * Get the counter value for the specified key
   * @param txn The database transaction
@@ -168,5 +192,4 @@
    return counterValue;
  }
}
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1061,7 +1061,7 @@
    }
  }
  private void importPhaseTwo() throws InterruptedException, ExecutionException
  private void importPhaseTwo() throws Exception
  {
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
@@ -1079,7 +1079,7 @@
   * Performs on-disk merge by reading several scratch files at once
   * and write their ordered content into the target indexes.
   */
  private void processIndexFiles() throws InterruptedException, ExecutionException
  private void processIndexFiles() throws Exception
  {
    if (bufferCount.get() == 0)
    {
@@ -1147,20 +1147,30 @@
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    List<Future<Void>> futures = new LinkedList<>();
    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
    getAll(futures);
    Storage storage = rootContainer.getStorage();
    storage.close();
    try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
    {
      List<Future<Void>> futures = new LinkedList<>();
      submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
      submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
      getAll(futures);
    }
    finally
    {
      storage.open();
    }
    shutdownAll(dbService);
  }
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
      Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
      org.opends.server.backends.pluggable.spi.Importer importer,
      ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
  {
    for (IndexManager indexMgr : indexMgrs)
    {
      futures.add(dbService.submit(
          new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize)));
      futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
    }
  }
@@ -1706,7 +1716,7 @@
   */
  private final class IndexDBWriteTask implements Callable<Void>
  {
    private final Storage storage;
    private final org.opends.server.backends.pluggable.spi.Importer importer;
    private final IndexManager indexMgr;
    private final int cacheSize;
    /** indexID => DNState map */
@@ -1728,10 +1738,10 @@
    /**
     * Creates a new index DB writer.
     *
     * @param importer
     *          The importer
     * @param indexMgr
     *          The index manager.
     * @param storage
     *          Where to store data
     * @param permits
     *          The semaphore used for restricting the number of buffer allocations.
     * @param maxPermits
@@ -1739,9 +1749,10 @@
     * @param cacheSize
     *          The buffer cache size.
     */
    public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize)
    public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
        Semaphore permits, int maxPermits, int cacheSize)
    {
      this.storage = storage;
      this.importer = importer;
      this.indexMgr = indexMgr;
      this.permits = permits;
      this.maxPermits = maxPermits;
@@ -1822,7 +1833,7 @@
    }
    /** Finishes this task. */
    private void endWriteTask(WriteableTransaction txn)
    private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
    {
      isRunning = false;
@@ -1839,8 +1850,9 @@
        {
          for (DNState dnState : dnStateMap.values())
          {
            dnState.flush(txn);
            dnState.finalFlush(importer);
          }
          if (!isCanceled)
          {
            logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
@@ -1896,18 +1908,11 @@
    @Override
    public Void call() throws Exception
    {
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableTransaction txn) throws Exception
        {
          call0(txn);
        }
      });
      call0(importer);
      return null;
    }
    private void call0(WriteableTransaction txn) throws Exception
    private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
    {
      if (isCanceled)
      {
@@ -1936,7 +1941,7 @@
            {
              if (previousRecord != null)
              {
                addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
                addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
              }
              // this is a new record
@@ -1960,7 +1965,7 @@
          if (previousRecord != null)
          {
            addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
            addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
          }
        }
      }
@@ -1971,7 +1976,7 @@
      }
      finally
      {
        endWriteTask(txn);
        endWriteTask(importer);
      }
    }
@@ -1986,30 +1991,31 @@
      return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
    }
    private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
        throws DirectoryException
    private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
        ImportIDSet deleteSet) throws DirectoryException
    {
      keyCount.incrementAndGet();
      if (indexMgr.isDN2ID())
      {
        addDN2ID(txn, indexID, insertSet);
        addDN2ID(importer, indexID, insertSet);
      }
      else
      {
        if (!deleteSet.isDefined() || deleteSet.size() > 0)
        {
          final Index index = indexIDToIndexMap.get(indexID);
          index.importRemove(txn, deleteSet);
          index.importRemove(importer, deleteSet);
        }
        if (!insertSet.isDefined() || insertSet.size() > 0)
        {
          final Index index = indexIDToIndexMap.get(indexID);
          index.importPut(txn, insertSet);
          index.importPut(importer, insertSet);
        }
      }
    }
    private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException
    private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
        throws DirectoryException
    {
      DNState dnState = dnStateMap.get(indexID);
      if (dnState == null)
@@ -2017,9 +2023,9 @@
        dnState = new DNState(indexIDToECMap.get(indexID));
        dnStateMap.put(indexID, dnState);
      }
      if (dnState.checkParent(txn, idSet))
      if (dnState.checkParent(importer, idSet))
      {
        dnState.writeToDN2ID(txn, idSet.getKey());
        dnState.writeToDN2ID(importer, idSet.getKey());
      }
    }
@@ -2032,7 +2038,7 @@
     * This class is used to by a index DB merge thread performing DN processing
     * to keep track of the state of individual DN2ID index processing.
     */
    final class DNState
    private final class DNState
    {
      private static final int DN_STATE_CACHE_SIZE = 64 * KB;
@@ -2043,8 +2049,9 @@
      private ByteSequence parentDN;
      private final ByteStringBuilder lastDN = new ByteStringBuilder();
      private EntryID parentID, lastID, entryID;
      private long totalNbEntries;
      DNState(EntryContainer entryContainer)
      private DNState(EntryContainer entryContainer)
      {
        this.entryContainer = entryContainer;
        dn2id = entryContainer.getDN2ID().getName();
@@ -2062,7 +2069,8 @@
      }
      /** Why do we still need this if we are checking parents in the first phase? */
      private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException
      boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
          throws StorageRuntimeException
      {
        entryID = idSet.iterator().next();
        parentDN = getParent(idSet.getKey());
@@ -2072,7 +2080,7 @@
          // If null is returned then this is a suffix DN.
          if (parentDN != null)
          {
            parentID = get(txn, dn2id, parentDN);
            parentID = get(importer, dn2id, parentDN);
            if (parentID == null)
            {
              // We have a missing parent. Maybe parent checking was turned off?
@@ -2145,43 +2153,53 @@
        return importCfg != null && importCfg.appendToExistingData();
      }
      EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException
      private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
          throws StorageRuntimeException
      {
        ByteString value = txn.read(dn2id, dn);
        ByteString value = importer.read(dn2id, dn);
        return value != null ? new EntryID(value) : null;
      }
      public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
      void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
          throws DirectoryException
      {
        txn.put(dn2id, key, entryID.toByteString());
        importer.put(dn2id, key, entryID.toByteString());
        indexMgr.addTotDNCount(1);
        if (parentID != null)
        {
          incrementChildrenCounter(txn);
          incrementChildrenCounter(importer);
        }
      }
      private void incrementChildrenCounter(WriteableTransaction txn)
      private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
      {
        final AtomicLong counter = getId2childrenCounter();
        counter.incrementAndGet();
        if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
        {
          flush(txn);
          flush(importer);
        }
      }
      private void flush(WriteableTransaction txn)
      private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
      {
        for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
        {
          entryContainer.getID2ChildrenCount()
              .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
          final EntryID entryID = childrenCounter.getKey();
          final long totalForEntryID = childrenCounter.getValue().get();
          totalNbEntries += totalForEntryID;
          entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
        }
        id2childrenCountTree.clear();
      }
      void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
      {
        flush(importer);
        entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
      }
    }
  }
@@ -2957,7 +2975,7 @@
      indexKeyQueueMap.clear();
    }
    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
    private void rebuildIndexesPhaseTwo() throws Exception
    {
      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
      try
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
@@ -29,6 +29,7 @@
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.backends.pluggable.spi.Cursor;
import org.opends.server.backends.pluggable.spi.Importer;
import org.opends.server.backends.pluggable.spi.ReadableTransaction;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
@@ -44,10 +45,10 @@
  int getIndexEntryLimit();
  // Ignores trusted state.
  void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded);
  void importPut(Importer importer, ImportIDSet idsToBeAdded);
  // Ignores trusted state.
  void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved);
  void importRemove(Importer importer, ImportIDSet idsToBeRemoved);
  boolean isTrusted();
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -26,6 +26,7 @@
package org.opends.server.backends.pluggable;
import org.forgerock.i18n.slf4j.LocalizedLogger;
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
import org.opends.server.backends.pluggable.spi.Cursor;
@@ -44,15 +45,11 @@
import org.opends.server.types.DirectoryException;
import org.opends.server.types.RestoreConfig;
/**
 * Decorates a {@link Storage} with additional trace logging.
 */
/** Decorates a {@link Storage} with additional trace logging. */
@SuppressWarnings("javadoc")
final class TracedStorage implements Storage
{
  /**
   * Decorates an {@link Importer} with additional trace logging.
   */
  /** Decorates an {@link Importer} with additional trace logging. */
  private final class TracedImporter implements Importer
  {
    private final Importer importer;
@@ -63,14 +60,6 @@
    }
    @Override
    public void close()
    {
      importer.close();
      logger.trace("Storage@%s.Importer@%s.close(%s)",
          storageId(), id(), backendId);
    }
    @Override
    public void createTree(final TreeName name)
    {
      importer.createTree(name);
@@ -86,6 +75,32 @@
          storageId(), id(), backendId, name, hex(key), hex(value));
    }
    @Override
    public ByteString read(TreeName name, ByteSequence key)
    {
      final ByteString value = importer.read(name, key);
      logger.trace("Storage@%s.Importer@%s.read(%s, %s, %s) = %s",
          storageId(), id(), backendId, name, hex(key), hex(value));
      return value;
    }
    @Override
    public boolean delete(TreeName name, ByteSequence key)
    {
      final boolean delete = importer.delete(name, key);
      logger.trace("Storage@%s.Importer@%s.delete(%s, %s, %s) = %b",
          storageId(), id(), backendId, name, hex(key), delete);
      return delete;
    }
    @Override
    public void close()
    {
      importer.close();
      logger.trace("Storage@%s.Importer@%s.close(%s)",
          storageId(), id(), backendId);
    }
    private int id()
    {
      return System.identityHashCode(this);
@@ -294,7 +309,7 @@
  }
  @Override
  public Importer startImport() throws Exception
  public Importer startImport() throws ConfigException, StorageRuntimeException
  {
    final Importer importer = storage.startImport();
    if (logger.isTraceEnabled())
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
@@ -28,6 +28,7 @@
import java.io.Closeable;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
/**
 * Allows to run an import. For performance reasons, imports are run without transactions.
@@ -54,6 +55,28 @@
   */
  void put(TreeName treeName, ByteSequence key, ByteSequence value);
  /**
   * Deletes the record with the provided key, in the tree whose name is provided.
   *
   * @param treeName
   *          the tree name
   * @param key
   *          the key of the record to delete
   * @return {@code true} if the record could be deleted, {@code false} otherwise
   */
  boolean delete(TreeName treeName, ByteSequence key);
  /**
   * Reads the record's value associated to the provided key, in the tree whose name is provided.
   *
   * @param treeName
   *          the tree name
   * @param key
   *          the record's key
   * @return the record's value, or {@code null} if none exists
   */
  ByteString read(TreeName treeName, ByteSequence key);
  /** {@inheritDoc} */
  @Override
  void close();
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -27,6 +27,7 @@
import java.io.Closeable;
import org.forgerock.opendj.config.server.ConfigException;
import org.opends.server.types.BackupConfig;
import org.opends.server.types.BackupDirectory;
import org.opends.server.types.DirectoryException;
@@ -42,11 +43,13 @@
   * Starts the import operation.
   *
   * @return a new Importer object which must be closed to release all resources
   * @throws Exception
   * @throws ConfigException
   *           if there is a problem with the configuration
   * @throws StorageRuntimeException
   *           if a problem occurs with the underlying storage engine
   * @see #close() to release all resources once import is finished
   */
  Importer startImport() throws Exception;
  Importer startImport() throws ConfigException, StorageRuntimeException;
  /**
   * Opens the storage engine to allow executing operations on it.