From 5d07ec161328a94de355aa4bf93918a2da5a8602 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 30 Apr 2015 14:20:06 +0000
Subject: [PATCH] OPENDJ-1801 (CR-6815) Revise usage of storage.open() and startImport()

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  122 +++++++++++++++++++++++-----------------
 1 files changed, 70 insertions(+), 52 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 90271fc..143d7d5 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -1061,7 +1061,7 @@
     }
   }
 
-  private void importPhaseTwo() throws InterruptedException, ExecutionException
+  private void importPhaseTwo() throws Exception
   {
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new SecondPhaseProgressTask());
@@ -1079,7 +1079,7 @@
    * Performs on-disk merge by reading several scratch files at once
    * and write their ordered content into the target indexes.
    */
-  private void processIndexFiles() throws InterruptedException, ExecutionException
+  private void processIndexFiles() throws Exception
   {
     if (bufferCount.get() == 0)
     {
@@ -1147,20 +1147,30 @@
     Semaphore permits = new Semaphore(buffers);
 
     // Start DN processing first.
-    List<Future<Void>> futures = new LinkedList<>();
-    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
-    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
-    getAll(futures);
+    Storage storage = rootContainer.getStorage();
+    storage.close();
+    try (final org.opends.server.backends.pluggable.spi.Importer importer = storage.startImport())
+    {
+      List<Future<Void>> futures = new LinkedList<>();
+      submitIndexDBWriteTasks(DNIndexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+      submitIndexDBWriteTasks(indexMgrList, importer, dbService, permits, buffers, readAheadSize, futures);
+      getAll(futures);
+    }
+    finally
+    {
+      storage.open();
+    }
+
     shutdownAll(dbService);
   }
 
-  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
-      Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
+  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs,
+      org.opends.server.backends.pluggable.spi.Importer importer,
+      ExecutorService dbService, Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
   {
     for (IndexManager indexMgr : indexMgrs)
     {
-      futures.add(dbService.submit(
-          new IndexDBWriteTask(rootContainer.getStorage(), indexMgr, permits, buffers, readAheadSize)));
+      futures.add(dbService.submit(new IndexDBWriteTask(importer, indexMgr, permits, buffers, readAheadSize)));
     }
   }
 
@@ -1706,7 +1716,7 @@
    */
   private final class IndexDBWriteTask implements Callable<Void>
   {
-    private final Storage storage;
+    private final org.opends.server.backends.pluggable.spi.Importer importer;
     private final IndexManager indexMgr;
     private final int cacheSize;
     /** indexID => DNState map */
@@ -1728,10 +1738,10 @@
     /**
      * Creates a new index DB writer.
      *
+     * @param importer
+     *          The importer
      * @param indexMgr
      *          The index manager.
-     * @param storage
-     *          Where to store data
      * @param permits
      *          The semaphore used for restricting the number of buffer allocations.
      * @param maxPermits
@@ -1739,9 +1749,10 @@
      * @param cacheSize
      *          The buffer cache size.
      */
-    public IndexDBWriteTask(Storage storage, IndexManager indexMgr, Semaphore permits, int maxPermits, int cacheSize)
+    public IndexDBWriteTask(org.opends.server.backends.pluggable.spi.Importer importer, IndexManager indexMgr,
+        Semaphore permits, int maxPermits, int cacheSize)
     {
-      this.storage = storage;
+      this.importer = importer;
       this.indexMgr = indexMgr;
       this.permits = permits;
       this.maxPermits = maxPermits;
@@ -1822,7 +1833,7 @@
     }
 
     /** Finishes this task. */
-    private void endWriteTask(WriteableTransaction txn)
+    private void endWriteTask(org.opends.server.backends.pluggable.spi.Importer importer)
     {
       isRunning = false;
 
@@ -1839,8 +1850,9 @@
         {
           for (DNState dnState : dnStateMap.values())
           {
-            dnState.flush(txn);
+            dnState.finalFlush(importer);
           }
+
           if (!isCanceled)
           {
             logger.info(NOTE_JEB_IMPORT_LDIF_DN_CLOSE, indexMgr.getDNCount());
@@ -1896,18 +1908,11 @@
     @Override
     public Void call() throws Exception
     {
-      storage.write(new WriteOperation()
-      {
-        @Override
-        public void run(WriteableTransaction txn) throws Exception
-        {
-          call0(txn);
-        }
-      });
+      call0(importer);
       return null;
     }
 
-    private void call0(WriteableTransaction txn) throws Exception
+    private void call0(org.opends.server.backends.pluggable.spi.Importer importer) throws Exception
     {
       if (isCanceled)
       {
@@ -1936,7 +1941,7 @@
             {
               if (previousRecord != null)
               {
-                addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+                addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
               }
 
               // this is a new record
@@ -1960,7 +1965,7 @@
 
           if (previousRecord != null)
           {
-            addToDB(txn, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
+            addToDB(importer, previousRecord.getIndexID(), insertIDSet, deleteIDSet);
           }
         }
       }
@@ -1971,7 +1976,7 @@
       }
       finally
       {
-        endWriteTask(txn);
+        endWriteTask(importer);
       }
     }
 
@@ -1986,30 +1991,31 @@
       return new ImportIDSet(record.getKey(), newDefinedSet(), index.getIndexEntryLimit());
     }
 
-    private void addToDB(WriteableTransaction txn, int indexID, ImportIDSet insertSet, ImportIDSet deleteSet)
-        throws DirectoryException
+    private void addToDB(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet insertSet,
+        ImportIDSet deleteSet) throws DirectoryException
     {
       keyCount.incrementAndGet();
       if (indexMgr.isDN2ID())
       {
-        addDN2ID(txn, indexID, insertSet);
+        addDN2ID(importer, indexID, insertSet);
       }
       else
       {
         if (!deleteSet.isDefined() || deleteSet.size() > 0)
         {
           final Index index = indexIDToIndexMap.get(indexID);
-          index.importRemove(txn, deleteSet);
+          index.importRemove(importer, deleteSet);
         }
         if (!insertSet.isDefined() || insertSet.size() > 0)
         {
           final Index index = indexIDToIndexMap.get(indexID);
-          index.importPut(txn, insertSet);
+          index.importPut(importer, insertSet);
         }
       }
     }
 
-    private void addDN2ID(WriteableTransaction txn, int indexID, ImportIDSet idSet) throws DirectoryException
+    private void addDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, int indexID, ImportIDSet idSet)
+        throws DirectoryException
     {
       DNState dnState = dnStateMap.get(indexID);
       if (dnState == null)
@@ -2017,9 +2023,9 @@
         dnState = new DNState(indexIDToECMap.get(indexID));
         dnStateMap.put(indexID, dnState);
       }
-      if (dnState.checkParent(txn, idSet))
+      if (dnState.checkParent(importer, idSet))
       {
-        dnState.writeToDN2ID(txn, idSet.getKey());
+        dnState.writeToDN2ID(importer, idSet.getKey());
       }
     }
 
@@ -2032,7 +2038,7 @@
      * This class is used to by a index DB merge thread performing DN processing
      * to keep track of the state of individual DN2ID index processing.
      */
-    final class DNState
+    private final class DNState
     {
       private static final int DN_STATE_CACHE_SIZE = 64 * KB;
 
@@ -2043,8 +2049,9 @@
       private ByteSequence parentDN;
       private final ByteStringBuilder lastDN = new ByteStringBuilder();
       private EntryID parentID, lastID, entryID;
+      private long totalNbEntries;
 
-      DNState(EntryContainer entryContainer)
+      private DNState(EntryContainer entryContainer)
       {
         this.entryContainer = entryContainer;
         dn2id = entryContainer.getDN2ID().getName();
@@ -2062,7 +2069,8 @@
       }
 
       /** Why do we still need this if we are checking parents in the first phase? */
-      private boolean checkParent(ReadableTransaction txn, ImportIDSet idSet) throws StorageRuntimeException
+      boolean checkParent(org.opends.server.backends.pluggable.spi.Importer importer, ImportIDSet idSet)
+          throws StorageRuntimeException
       {
         entryID = idSet.iterator().next();
         parentDN = getParent(idSet.getKey());
@@ -2072,7 +2080,7 @@
           // If null is returned then this is a suffix DN.
           if (parentDN != null)
           {
-            parentID = get(txn, dn2id, parentDN);
+            parentID = get(importer, dn2id, parentDN);
             if (parentID == null)
             {
               // We have a missing parent. Maybe parent checking was turned off?
@@ -2145,43 +2153,53 @@
         return importCfg != null && importCfg.appendToExistingData();
       }
 
-      EntryID get(ReadableTransaction txn, TreeName dn2id, ByteSequence dn) throws StorageRuntimeException
+      private EntryID get(org.opends.server.backends.pluggable.spi.Importer importer, TreeName dn2id, ByteSequence dn)
+          throws StorageRuntimeException
       {
-        ByteString value = txn.read(dn2id, dn);
+        ByteString value = importer.read(dn2id, dn);
         return value != null ? new EntryID(value) : null;
       }
 
-      public void writeToDN2ID(WriteableTransaction txn, ByteSequence key) throws DirectoryException
+      void writeToDN2ID(org.opends.server.backends.pluggable.spi.Importer importer, ByteSequence key)
+          throws DirectoryException
       {
-        txn.put(dn2id, key, entryID.toByteString());
+        importer.put(dn2id, key, entryID.toByteString());
         indexMgr.addTotDNCount(1);
         if (parentID != null)
         {
-          incrementChildrenCounter(txn);
+          incrementChildrenCounter(importer);
         }
       }
 
-      private void incrementChildrenCounter(WriteableTransaction txn)
+      private void incrementChildrenCounter(org.opends.server.backends.pluggable.spi.Importer importer)
       {
         final AtomicLong counter = getId2childrenCounter();
         counter.incrementAndGet();
         if (id2childrenCountTree.size() > DN_STATE_CACHE_SIZE)
         {
-          flush(txn);
+          flush(importer);
         }
       }
 
-      private void flush(WriteableTransaction txn)
+      private void flush(org.opends.server.backends.pluggable.spi.Importer importer)
       {
         for (Map.Entry<EntryID, AtomicLong> childrenCounter : id2childrenCountTree.entrySet())
         {
-          entryContainer.getID2ChildrenCount()
-              .addDelta(txn, childrenCounter.getKey(), childrenCounter.getValue().get());
+          final EntryID entryID = childrenCounter.getKey();
+          final long totalForEntryID = childrenCounter.getValue().get();
+          totalNbEntries += totalForEntryID;
+          entryContainer.getID2ChildrenCount().importPut(importer, entryID, totalForEntryID);
         }
         id2childrenCountTree.clear();
       }
 
 
+      void finalFlush(org.opends.server.backends.pluggable.spi.Importer importer)
+      {
+        flush(importer);
+
+        entryContainer.getID2ChildrenCount().importPutTotalCount(importer, totalNbEntries);
+      }
     }
   }
 
@@ -2957,7 +2975,7 @@
       indexKeyQueueMap.clear();
     }
 
-    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
+    private void rebuildIndexesPhaseTwo() throws Exception
     {
       final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask());
       try

--
Gitblit v1.10.0