From 5d07ec161328a94de355aa4bf93918a2da5a8602 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 30 Apr 2015 14:20:06 +0000
Subject: [PATCH] OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java     |   23 +++
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java     |   21 +-
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java         |  122 ++++++++++-------
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java            |    5 
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java      |    7 
 opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java |  166 +++++++++++++++--------
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java         |   29 +++
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java    |   45 ++++--
 8 files changed, 274 insertions(+), 144 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
index ade7349..c1e9d80 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -46,6 +46,8 @@
 import java.util.ListIterator;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedDeque;
 
 import org.forgerock.i18n.LocalizableMessage;
 import org.forgerock.i18n.slf4j.LocalizedLogger;
@@ -85,7 +87,6 @@
 import com.persistit.Persistit;
 import com.persistit.Transaction;
 import com.persistit.Tree;
-import com.persistit.TreeBuilder;
 import com.persistit.Value;
 import com.persistit.Volume;
 import com.persistit.VolumeSpecification;
@@ -247,26 +248,31 @@
   /** PersistIt implementation of the {@link Importer} interface. */
   private final class ImporterImpl implements Importer
   {
-    private final TreeBuilder importer = new TreeBuilder(db);
-    private final Key importKey = new Key(db);
-    private final Value importValue = new Value(db);
     private final Map<TreeName, Tree> trees = new HashMap<TreeName, Tree>();
+    private final Queue<Map<TreeName, Exchange>> allExchanges = new ConcurrentLinkedDeque<>();
+    private final ThreadLocal<Map<TreeName, Exchange>> exchanges = new ThreadLocal<Map<TreeName, Exchange>>()
+    {
+      @Override
+      protected Map<TreeName, Exchange> initialValue()
+      {
+        final Map<TreeName, Exchange> value = new HashMap<>();
+        allExchanges.add(value);
+        return value;
+      }
+    };
 
     @Override
     public void close()
     {
-      try
+      for (Map<TreeName, Exchange> map : allExchanges)
       {
-        importer.merge();
+        for (Exchange exchange : map.values())
+        {
+          db.releaseExchange(exchange);
+        }
+        map.clear();
       }
-      catch (final Exception e)
-      {
-        throw new StorageRuntimeException(e);
-      }
-      finally
-      {
-        PersistItStorage.this.close();
-      }
+      PersistItStorage.this.close();
     }
 
     @Override
@@ -288,16 +294,59 @@
     {
       try
       {
-        final Tree tree = trees.get(treeName);
-        importer.store(tree,
-            bytesToKey(importKey, key),
-            bytesToValue(importValue, value));
+        final Exchange ex = getExchangeFromCache(treeName);
+        bytesToKey(ex.getKey(), key);
+        bytesToValue(ex.getValue(), value);
+        ex.store();
       }
       catch (final Exception e)
       {
         throw new StorageRuntimeException(e);
       }
     }
+
+    @Override
+    public boolean delete(final TreeName treeName, final ByteSequence key)
+    {
+      try
+      {
+        final Exchange ex = getExchangeFromCache(treeName);
+        bytesToKey(ex.getKey(), key);
+        return ex.remove();
+      }
+      catch (final PersistitException e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+    }
+
+    @Override
+    public ByteString read(final TreeName treeName, final ByteSequence key)
+    {
+      try
+      {
+        final Exchange ex = getExchangeFromCache(treeName);
+        bytesToKey(ex.getKey(), key);
+        ex.fetch();
+        return valueToBytes(ex.getValue());
+      }
+      catch (final PersistitException e)
+      {
+        throw new StorageRuntimeException(e);
+      }
+    }
+
+    private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
+    {
+      Map<TreeName, Exchange> threadExchanges = exchanges.get();
+      Exchange exchange = threadExchanges.get(treeName);
+      if (exchange == null)
+      {
+        exchange = getNewExchange(treeName, false);
+        threadExchanges.put(treeName, exchange);
+      }
+      return exchange;
+    }
   }
 
   /** PersistIt implementation of the {@link WriteableTransaction} interface. */
@@ -306,8 +355,7 @@
     private final Map<TreeName, Exchange> exchanges = new HashMap<TreeName, Exchange>();
 
     @Override
-    public void put(final TreeName treeName, final ByteSequence key,
-        final ByteSequence value)
+    public void put(final TreeName treeName, final ByteSequence key, final ByteSequence value)
     {
       try
       {
@@ -475,8 +523,7 @@
       return b1.equals(b2);
     }
 
-    private Exchange getExchangeFromCache(final TreeName treeName)
-        throws PersistitException
+    private Exchange getExchangeFromCache(final TreeName treeName) throws PersistitException
     {
       Exchange exchange = exchanges.get(treeName);
       if (exchange == null)
@@ -495,34 +542,18 @@
       }
       exchanges.clear();
     }
-
-    private Exchange getNewExchange(final TreeName treeName, final boolean create)
-        throws PersistitException
-    {
-      return db.getExchange(volume, mangleTreeName(treeName), create);
-    }
   }
 
-  private static void clearAndCreateDbDir(final File dbDir)
+  private Exchange getNewExchange(final TreeName treeName, final boolean create) throws PersistitException
   {
-    if (dbDir.exists())
-    {
-      for (final File child : dbDir.listFiles())
-      {
-        child.delete();
-      }
-    }
-    else
-    {
-      dbDir.mkdirs();
-    }
+    return db.getExchange(volume, mangleTreeName(treeName), create);
   }
 
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+  private final ServerContext serverContext;
   private File backendDirectory;
   private Persistit db;
   private Volume volume;
-  private Configuration dbCfg;
   private PersistitBackendCfg config;
   private DiskSpaceMonitor diskMonitor;
   private MemoryQuota memQuota;
@@ -540,30 +571,36 @@
   // FIXME: should be package private once importer is decoupled.
   public PersistItStorage(final PersistitBackendCfg cfg, ServerContext serverContext) throws ConfigException
   {
+    this.serverContext = serverContext;
     backendDirectory = new File(getFileForPath(cfg.getDBDirectory()), cfg.getBackendId());
     config = cfg;
-    dbCfg = new Configuration();
+    cfg.addPersistitChangeListener(this);
+  }
+
+  private Configuration buildConfiguration()
+  {
+    final Configuration dbCfg = new Configuration();
     dbCfg.setLogFile(new File(backendDirectory, VOLUME_NAME + ".log").getPath());
     dbCfg.setJournalPath(new File(backendDirectory, VOLUME_NAME + "_journal").getPath());
     dbCfg.setVolumeList(asList(new VolumeSpecification(new File(backendDirectory, VOLUME_NAME).getPath(), null,
         BUFFER_SIZE, 4096, Long.MAX_VALUE / BUFFER_SIZE, 2048, true, false, false)));
-    final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg();
+    final BufferPoolConfiguration bufferPoolCfg = getBufferPoolCfg(dbCfg);
     bufferPoolCfg.setMaximumCount(Integer.MAX_VALUE);
 
     diskMonitor = serverContext.getDiskSpaceMonitor();
     memQuota = serverContext.getMemoryQuota();
-    if (cfg.getDBCacheSize() > 0)
+    if (config.getDBCacheSize() > 0)
     {
-      bufferPoolCfg.setMaximumMemory(cfg.getDBCacheSize());
-      memQuota.acquireMemory(cfg.getDBCacheSize());
+      bufferPoolCfg.setMaximumMemory(config.getDBCacheSize());
+      memQuota.acquireMemory(config.getDBCacheSize());
     }
     else
     {
-      bufferPoolCfg.setFraction(cfg.getDBCachePercent() / 100.0f);
-      memQuota.acquireMemory(memQuota.memPercentToBytes(cfg.getDBCachePercent()));
+      bufferPoolCfg.setFraction(config.getDBCachePercent() / 100.0f);
+      memQuota.acquireMemory(memQuota.memPercentToBytes(config.getDBCachePercent()));
     }
-    dbCfg.setCommitPolicy(cfg.isDBTxnNoSync() ? SOFT : GROUP);
-    cfg.addPersistitChangeListener(this);
+    dbCfg.setCommitPolicy(config.isDBTxnNoSync() ? SOFT : GROUP);
+    return dbCfg;
   }
 
   /** {@inheritDoc} */
@@ -594,21 +631,31 @@
     diskMonitor.deregisterMonitoredDirectory(getDirectory(), this);
   }
 
-  private BufferPoolConfiguration getBufferPoolCfg()
+  private static BufferPoolConfiguration getBufferPoolCfg(Configuration dbCfg)
   {
     return dbCfg.getBufferPoolMap().get(BUFFER_SIZE);
   }
 
   /** {@inheritDoc} */
   @Override
-  public void open() throws Exception
+  public void open() throws ConfigException, StorageRuntimeException
+  {
+    open0(buildConfiguration());
+  }
+
+  private void open0(final Configuration dbCfg) throws ConfigException
   {
     setupStorageFiles();
     try
     {
+      if (db != null)
+      {
+        throw new IllegalStateException(
+            "Database is already open, either the backend is enabled or an import is currently running.");
+      }
       db = new Persistit(dbCfg);
 
-      final long bufferCount = getBufferPoolCfg().computeBufferCount(db.getAvailableHeap());
+      final long bufferCount = getBufferPoolCfg(dbCfg).computeBufferCount(db.getAvailableHeap());
       final long totalSize = bufferCount * BUFFER_SIZE / 1024;
       logger.info(NOTE_PERSISTIT_MEMORY_CFG, config.getBackendId(),
           bufferCount, BUFFER_SIZE, totalSize);
@@ -676,14 +723,13 @@
 
   /** {@inheritDoc} */
   @Override
-  public Importer startImport() throws Exception
+  public Importer startImport() throws ConfigException, StorageRuntimeException
   {
-    clearAndCreateDbDir(backendDirectory);
-    open();
+    open0(buildConfiguration());
     return new ImporterImpl();
   }
 
-  private String mangleTreeName(final TreeName treeName)
+  private static String mangleTreeName(final TreeName treeName)
   {
     StringBuilder mangled = new StringBuilder();
     String name = treeName.toString();
@@ -879,19 +925,19 @@
    * TODO: it would be nice to use the low-level key/value APIs. They seem quite
    * inefficient at the moment for simple byte arrays.
    */
-  private Key bytesToKey(final Key key, final ByteSequence bytes)
+  private static Key bytesToKey(final Key key, final ByteSequence bytes)
   {
     final byte[] tmp = bytes.toByteArray();
     return key.clear().appendByteArray(tmp, 0, tmp.length);
   }
 
-  private Value bytesToValue(final Value value, final ByteSequence bytes)
+  private static Value bytesToValue(final Value value, final ByteSequence bytes)
   {
     value.clear().putByteArray(bytes.toByteArray());
     return value;
   }
 
-  private ByteString valueToBytes(final Value value)
+  private static ByteString valueToBytes(final Value value)
   {
     if (value.isDefined())
     {
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
index 5ff09d8..e332072 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/DefaultIndex.java
@@ -42,6 +42,7 @@
 import org.opends.server.backends.pluggable.EntryIDSet.EntryIDSetCodec;
 import org.opends.server.backends.pluggable.State.IndexFlag;
 import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.StorageRuntimeException;
 import org.opends.server.backends.pluggable.spi.TreeName;
@@ -124,32 +125,32 @@
   }
 
   @Override
-  public final void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded) throws StorageRuntimeException
+  public final void importPut(Importer importer, ImportIDSet idsToBeAdded) throws StorageRuntimeException
   {
-    Reject.ifNull(txn, "txn must not be null");
+    Reject.ifNull(importer, "importer must not be null");
     Reject.ifNull(idsToBeAdded, "idsToBeAdded must not be null");
     ByteSequence key = idsToBeAdded.getKey();
-    ByteString value = txn.read(getName(), key);
+    ByteString value = importer.read(getName(), key);
     if (value != null)
     {
       final EntryIDSet entryIDSet = codec.decode(key, value);
       final ImportIDSet importIDSet = new ImportIDSet(key, entryIDSet, indexEntryLimit);
       importIDSet.merge(idsToBeAdded);
-      txn.put(getName(), key, importIDSet.valueToByteString(codec));
+      importer.put(getName(), key, importIDSet.valueToByteString(codec));
     }
     else
     {
-      txn.put(getName(), key, idsToBeAdded.valueToByteString(codec));
+      importer.put(getName(), key, idsToBeAdded.valueToByteString(codec));
     }
   }
 
   @Override
-  public final void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved) throws StorageRuntimeException
+  public final void importRemove(Importer importer, ImportIDSet idsToBeRemoved) throws StorageRuntimeException
   {
-    Reject.ifNull(txn, "txn must not be null");
+    Reject.ifNull(importer, "importer must not be null");
     Reject.ifNull(idsToBeRemoved, "idsToBeRemoved must not be null");
     ByteSequence key = idsToBeRemoved.getKey();
-    ByteString value = txn.read(getName(), key);
+    ByteString value = importer.read(getName(), key);
     if (value == null)
     {
       // Should never happen -- the keys should always be there.
@@ -161,11 +162,11 @@
     importIDSet.remove(idsToBeRemoved);
     if (importIDSet.isDefined() && importIDSet.size() == 0)
     {
-      txn.delete(getName(), key);
+      importer.delete(getName(), key);
     }
     else
     {
-      txn.put(getName(), key, importIDSet.valueToByteString(codec));
+      importer.put(getName(), key, importIDSet.valueToByteString(codec));
     }
   }
 
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
index 5259156..e922f95 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ID2Count.java
@@ -31,6 +31,7 @@
 import org.forgerock.util.promise.Function;
 import org.forgerock.util.promise.NeverThrowsException;
 import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.TreeName;
 import org.opends.server.backends.pluggable.spi.UpdateFunction;
@@ -90,8 +91,7 @@
 
   private void addToCounter(WriteableTransaction txn, EntryID entryID, final long delta)
   {
-    final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1));
-    final ByteSequence shardedKey = getKeyFromEntryIDAndBucket(entryID, bucket);
+    final ByteSequence shardedKey = getShardedKey(entryID);
     txn.update(getName(), shardedKey, new UpdateFunction()
     {
       @Override
@@ -103,6 +103,30 @@
     });
   }
 
+  void importPut(Importer importer, EntryID entryID, long total)
+  {
+    Reject.ifTrue(entryID.longValue() >= TOTAL_COUNT_ENTRY_ID.longValue(), "EntryID overflow.");
+    importPut0(importer, entryID, total);
+  }
+
+  void importPutTotalCount(Importer importer, long total)
+  {
+    importPut0(importer, TOTAL_COUNT_ENTRY_ID, total);
+  }
+
+  private void importPut0(Importer importer, EntryID entryID, final long delta)
+  {
+    Reject.ifNull(importer, "importer must not be null");
+    final ByteSequence shardedKey = getShardedKey(entryID);
+    importer.put(getName(), shardedKey, ByteString.valueOf(delta));
+  }
+
+  private ByteSequence getShardedKey(EntryID entryID)
+  {
+    final long bucket = (Thread.currentThread().getId() & (SHARD_COUNT - 1));
+    return getKeyFromEntryIDAndBucket(entryID, bucket);
+  }
+
   /**
    * Get the counter value for the specified key
    * @param txn The database transaction
@@ -168,5 +192,4 @@
 
     return counterValue;
   }
-
 }
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 90271fc..143d7d5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1061,7 +1061,7 @@
     }
   }
 
-  private void importPhaseTwo() throws InterruptedException, ExecutionException
+  private void importPhaseTwo() throws Exception
   {
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
@@ -1079,7 +1079,7 @@
    * Performs on-disk merge by reading several scratch files at once
    * and write their ordered content into the target indexes.
    */
-  private void processIndexFiles() throws InterruptedException, ExecutionException
+  private void processIndexFiles() throws Exception
   {
     if (bufferCount.get() == 0)
     {
@@ -1147,20 +1147,30 @@
     Semaphore permits = new Semaphore(buffers);
 
     // Start DN processing first.
-    List<Future<Void>> futures = new LinkedList<>();
-    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
-    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
-    getAll(futures);
+    Storage storage = rootContainer.getStorage();
+    storage.close();
+    try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
+    {
+      List<Future<Void>> futures = new LinkedList<>();
+      submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+      submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+      getAll(futures);
+    }
+    finally
+    {
+      storage.open();
+    }
+
     shutdownAll(dbService);
   }
 
-  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
-      Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
+  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
+      org.opends.server.backends.pluggable.spi.Importer importer,
+      ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
   {
     for (IndexManager indexMgr : indexMgrs)
     {
-      futures.add(dbService.submit(
-          new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize)));
+      futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
     }
   }
 
@@ -1706,7 +1716,7 @@
    */
   private final class IndexDBWriteTask implements Callable<Void>
   {
-    private final Storage storage;
+    private final org.opends.server.backends.pluggable.spi.Importer importer;
     private final IndexManager indexMgr;
     private final int cacheSize;
     /** indexID => DNState map */
@@ -1728,10 +1738,10 @@
     /**
      * Creates a new index DB writer.
      *
+     * @param importer
+     *          The importer
      * @param indexMgr
      *          The index manager.
-     * @param storage
-     *          Where to store data
      * @param permits
      *          The semaphore used for restricting the number of buffer allocations.
      * @param maxPermits
@@ -1739,9 +1749,10 @@
      * @param cacheSize
      *          The buffer cache size.
      */
-    public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize)
+    public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
+        Semaphore permits, int maxPermits, int cacheSize)
     {
-      this.storage = storage;
+      this.importer = importer;
       this.indexMgr = indexMgr;
       this.permits = permits;
       this.maxPermits = maxPermits;
@@ -1822,7 +1833,7 @@
     }
 
     /** Finishes this task. */
-    private void endWriteTask(WriteableTransaction txn)
+    private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
     {
       isRunning = false;
 
@@ -1839,8 +1850,9 @@
         {
           for (DNState dnState : dnStateMap.values())
           {
-            dnState.flush(txn);
+            dnState.finalFlush(importer);
           }
+
           if (!isCanceled)
           {
             logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
@@ -1896,18 +1908,11 @@
     @Override
     public Void call() throws Exception
     {
-      storage.write(new WriteOperation()
-      {
-        @Override
-        public void run(WriteableTransaction txn) throws Exception
-        {
-          call0(txn);
-        }
-      });
+      call0(importer);
       return null;
     }
 
-    private void call0(WriteableTransaction txn) throws Exception
+    private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
     {
       if (isCanceled)
       {
@@ -1936,7 +1941,7 @@
             {
               if (previousRecord != null)
               {
-                addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+                addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
               }
 
               // this is a new record
@@ -1960,7 +1965,7 @@
 
           if (previousRecord != null)
           {
-            addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+            addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
           }
         }
       }
@@ -1971,7 +1976,7 @@
       }
       finally
       {
-        endWriteTask(txn);
+        endWriteTask(importer);
       }
     }
 
@@ -1986,30 +1991,31 @@
       return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
     }
 
-    private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
-        throws DirectoryException
+    private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
+        ImportIDSet deleteSet) throws DirectoryException
     {
       keyCount.incrementAndGet();
       if (indexMgr.isDN2ID())
       {
-        addDN2ID(txn, indexID, insertSet);
+        addDN2ID(importer, indexID, insertSet);
       }
       else
       {
         if (!deleteSet.isDefined() || deleteSet.size() > 0)
         {
           final Index index = indexIDToIndexMap.get(indexID);
-          index.importRemove(txn, deleteSet);
+          index.importRemove(importer, deleteSet);
         }
         if (!insertSet.isDefined() || insertSet.size() > 0)
         {
           final Index index = indexIDToIndexMap.get(indexID);
-          index.importPut(txn, insertSet);
+          index.importPut(importer, insertSet);
         }
       }
     }
 
-    private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException
+    private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
+        throws DirectoryException
     {
       DNState dnState = dnStateMap.get(indexID);
       if (dnState == null)
@@ -2017,9 +2023,9 @@
         dnState = new DNState(indexIDToECMap.get(indexID));
         dnStateMap.put(indexID, dnState);
       }
-      if (dnState.checkParent(txn, idSet))
+      if (dnState.checkParent(importer, idSet))
       {
-        dnState.writeToDN2ID(txn, idSet.getKey());
+        dnState.writeToDN2ID(importer, idSet.getKey());
       }
     }
 
@@ -2032,7 +2038,7 @@
      * This class is used to by a index DB merge thread performing DN processing
      * to keep track of the state of individual DN2ID index processing.
      */
-    final class DNState
+    private final class DNState
     {
       private static final int DN_STATE_CACHE_SIZE = 64 * KB;
 
@@ -2043,8 +2049,9 @@
       private ByteSequence parentDN;
       private final ByteStringBuilder lastDN = new ByteStringBuilder();
       private EntryID parentID, lastID, entryID;
+      private long totalNbEntries;
 
-      DNState(EntryContainer entryContainer)
+      private DNState(EntryContainer entryContainer)
       {
         this.entryContainer = entryContainer;
         dn2id = entryContainer.getDN2ID().getName();
@@ -2062,7 +2069,8 @@
       }
 
       /** Why do we still need this if we are checking parents in the first phase? */
-      private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException
+      boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
+          throws StorageRuntimeException
       {
         entryID = idSet.iterator().next();
         parentDN = getParent(idSet.getKey());
@@ -2072,7 +2080,7 @@
           // If null is returned then this is a suffix DN.
           if (parentDN != null)
           {
-            parentID = get(txn, dn2id, parentDN);
+            parentID = get(importer, dn2id, parentDN);
             if (parentID == null)
             {
               // We have a missing parent. Maybe parent checking was turned off?
@@ -2145,43 +2153,53 @@
         return importCfg != null && importCfg.appendToExistingData();
       }
 
-      EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException
+      private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
+          throws StorageRuntimeException
       {
-        ByteString value = txn.read(dn2id, dn);
+        ByteString value = importer.read(dn2id, dn);
         return value != null ? new EntryID(value) : null;
       }
 
-      public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
+      void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
+          throws DirectoryException
       {
-        txn.put(dn2id, key, entryID.toByteString());
+        importer.put(dn2id, key, entryID.toByteString());
         indexMgr.addTotDNCount(1);
         if (parentID != null)
         {
-          incrementChildrenCounter(txn);
+          incrementChildrenCounter(importer);
         }
       }
 
-      private void incrementChildrenCounter(WriteableTransaction txn)
+      private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
       {
         final AtomicLong counter = getId2childrenCounter();
         counter.incrementAndGet();
         if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
         {
-          flush(txn);
+          flush(importer);
         }
       }
 
-      private void flush(WriteableTransaction txn)
+      private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
       {
         for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
         {
-          entryContainer.getID2ChildrenCount()
-              .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
+          final EntryID entryID = childrenCounter.getKey();
+          final long totalForEntryID = childrenCounter.getValue().get();
+          totalNbEntries += totalForEntryID;
+          entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
         }
         id2childrenCountTree.clear();
       }
 
 
+      void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
+      {
+        flush(importer);
+
+        entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
+      }
     }
   }
 
@@ -2957,7 +2975,7 @@
       indexKeyQueueMap.clear();
     }
 
-    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
+    private void rebuildIndexesPhaseTwo() throws Exception
     {
       final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
       try
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
index ddccb6e..2aa01f2 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Index.java
@@ -29,6 +29,7 @@
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
 import org.opends.server.backends.pluggable.spi.Cursor;
+import org.opends.server.backends.pluggable.spi.Importer;
 import org.opends.server.backends.pluggable.spi.ReadableTransaction;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
 
@@ -44,10 +45,10 @@
   int getIndexEntryLimit();
 
   // Ignores trusted state.
-  void importPut(WriteableTransaction txn, ImportIDSet idsToBeAdded);
+  void importPut(Importer importer, ImportIDSet idsToBeAdded);
 
   // Ignores trusted state.
-  void importRemove(WriteableTransaction txn, ImportIDSet idsToBeRemoved);
+  void importRemove(Importer importer, ImportIDSet idsToBeRemoved);
 
   boolean isTrusted();
 
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
index ee6bc45..a820646 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -26,6 +26,7 @@
 package org.opends.server.backends.pluggable;
 
 import org.forgerock.i18n.slf4j.LocalizedLogger;
+import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ByteSequence;
 import org.forgerock.opendj.ldap.ByteString;
 import org.opends.server.backends.pluggable.spi.Cursor;
@@ -44,15 +45,11 @@
 import org.opends.server.types.DirectoryException;
 import org.opends.server.types.RestoreConfig;
 
-/**
- * Decorates a {@link Storage} with additional trace logging.
- */
+/** Decorates a {@link Storage} with additional trace logging. */
 @SuppressWarnings("javadoc")
 final class TracedStorage implements Storage
 {
-  /**
-   * Decorates an {@link Importer} with additional trace logging.
-   */
+  /** Decorates an {@link Importer} with additional trace logging. */
   private final class TracedImporter implements Importer
   {
     private final Importer importer;
@@ -63,14 +60,6 @@
     }
 
     @Override
-    public void close()
-    {
-      importer.close();
-      logger.trace("Storage@%s.Importer@%s.close(%s)",
-          storageId(), id(), backendId);
-    }
-
-    @Override
     public void createTree(final TreeName name)
     {
       importer.createTree(name);
@@ -86,6 +75,32 @@
           storageId(), id(), backendId, name, hex(key), hex(value));
     }
 
+    @Override
+    public ByteString read(TreeName name, ByteSequence key)
+    {
+      final ByteString value = importer.read(name, key);
+      logger.trace("Storage@%s.Importer@%s.read(%s, %s, %s) = %s",
+          storageId(), id(), backendId, name, hex(key), hex(value));
+      return value;
+    }
+
+    @Override
+    public boolean delete(TreeName name, ByteSequence key)
+    {
+      final boolean delete = importer.delete(name, key);
+      logger.trace("Storage@%s.Importer@%s.delete(%s, %s, %s) = %b",
+          storageId(), id(), backendId, name, hex(key), delete);
+      return delete;
+    }
+
+    @Override
+    public void close()
+    {
+      importer.close();
+      logger.trace("Storage@%s.Importer@%s.close(%s)",
+          storageId(), id(), backendId);
+    }
+
     private int id()
     {
       return System.identityHashCode(this);
@@ -294,7 +309,7 @@
   }
 
   @Override
-  public Importer startImport() throws Exception
+  public Importer startImport() throws ConfigException, StorageRuntimeException
   {
     final Importer importer = storage.startImport();
     if (logger.isTraceEnabled())
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
index a31a73a..95e72fc 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Importer.java
@@ -28,6 +28,7 @@
 import java.io.Closeable;
 
 import org.forgerock.opendj.ldap.ByteSequence;
+import org.forgerock.opendj.ldap.ByteString;
 
 /**
  * Allows to run an import. For performance reasons, imports are run without transactions.
@@ -54,6 +55,28 @@
    */
   void put(TreeName treeName, ByteSequence key, ByteSequence value);
 
+  /**
+   * Deletes the record with the provided key, in the tree whose name is provided.
+   *
+   * @param treeName
+   *          the tree name
+   * @param key
+   *          the key of the record to delete
+   * @return {@code true} if the record could be deleted, {@code false} otherwise
+   */
+  boolean delete(TreeName treeName, ByteSequence key);
+
+  /**
+   * Reads the record's value associated to the provided key, in the tree whose name is provided.
+   *
+   * @param treeName
+   *          the tree name
+   * @param key
+   *          the record's key
+   * @return the record's value, or {@code null} if none exists
+   */
+  ByteString read(TreeName treeName, ByteSequence key);
+
   /** {@inheritDoc} */
   @Override
   void close();
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
index 3bf0247..b485282 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -27,6 +27,7 @@
 
 import java.io.Closeable;
 
+import org.forgerock.opendj.config.server.ConfigException;
 import org.opends.server.types.BackupConfig;
 import org.opends.server.types.BackupDirectory;
 import org.opends.server.types.DirectoryException;
@@ -42,11 +43,13 @@
    * Starts the import operation.
    *
    * @return a new Importer object which must be closed to release all resources
-   * @throws Exception
+   * @throws ConfigException
+   *           if there is a problem with the configuration
+   * @throws StorageRuntimeException
    *           if a problem occurs with the underlying storage engine
    * @see #close() to release all resources once import is finished
    */
-  Importer startImport() throws Exception;
+  Importer startImport() throws ConfigException, StorageRuntimeException;
 
   /**
    * Opens the storage engine to allow executing operations on it.

--
Gitblit v1.10.0