From e73561d3b0db47696c578736a50489a454ad6f9c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 17 Mar 2015 13:08:57 +0000
Subject: [PATCH] OPENDJ-1708 Persistit: no rebuild-index support

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  233 +++++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 141 insertions(+), 92 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index e685a1d..757b048 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -859,22 +859,17 @@
       InterruptedException, ExecutionException
   {
     this.rootContainer = rootContainer;
-    final long startTime = System.currentTimeMillis();
 
     try
     {
-      rootContainer.getStorage().write(new WriteOperation()
+      if (rebuildManager.rebuildConfig.isClearDegradedState())
       {
-        @Override
-        public void run(WriteableStorage txn) throws Exception
-        {
-          rebuildManager.initialize();
-          rebuildManager.printStartMessage(txn);
-          rebuildManager.rebuildIndexes(txn);
-          recursiveDelete(tempDir);
-          rebuildManager.printStopMessage(startTime);
-        }
-      });
+        clearDegradedState();
+      }
+      else
+      {
+        rebuildIndexes();
+      }
     }
     catch (Exception e)
     {
@@ -882,6 +877,54 @@
     }
   }
 
+  private void clearDegradedState() throws Exception
+  {
+    rootContainer.getStorage().write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        final long startTime = System.currentTimeMillis();
+        rebuildManager.initialize();
+        rebuildManager.printStartMessage(txn);
+        rebuildManager.clearDegradedState(txn);
+        recursiveDelete(tempDir);
+        rebuildManager.printStopMessage(startTime);
+      }
+    });
+  }
+
+  private void rebuildIndexes() throws Exception
+  {
+    final long startTime = System.currentTimeMillis();
+    final Storage storage = rootContainer.getStorage();
+    storage.write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        rebuildManager.initialize();
+        rebuildManager.printStartMessage(txn);
+        rebuildManager.preRebuildIndexes(txn);
+      }
+    });
+
+    rebuildManager.rebuildIndexesPhaseOne();
+    rebuildManager.throwIfCancelled();
+    rebuildManager.rebuildIndexesPhaseTwo();
+
+    storage.write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        rebuildManager.postRebuildIndexes(txn);
+      }
+    });
+    recursiveDelete(tempDir);
+    rebuildManager.printStopMessage(startTime);
+  }
+
   /**
    * Import a LDIF using the specified root container.
    *
@@ -924,7 +967,7 @@
       setIndexesTrusted(false);
 
       final long startTime = System.currentTimeMillis();
-      phaseOne(txn);
+      importPhaseOne(txn);
       isPhaseOneDone = true;
       final long phaseOneFinishTime = System.currentTimeMillis();
 
@@ -938,7 +981,7 @@
       }
 
       final long phaseTwoTime = System.currentTimeMillis();
-      phaseTwo(txn);
+      importPhaseTwo();
       if (isCanceled)
       {
         throw new InterruptedException("Import processing canceled.");
@@ -1035,7 +1078,16 @@
     }
   }
 
-  private void phaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+  /**
+   * Reads all entries from id2entry, and:
+   * <ol>
+   * <li>compute how the entry is indexed for each index</li>
+   * <li>store the result of indexing entries into in-memory index buffers</li>
+   * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
+   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
+   * </ol>
+   */
+  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
   {
     initializeIndexBuffers();
 
@@ -1106,13 +1158,13 @@
     }
   }
 
-  private void phaseTwo(WriteableStorage txn) throws InterruptedException, ExecutionException
+  private void importPhaseTwo() throws InterruptedException, ExecutionException
   {
     ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
     scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
     try
     {
-      processIndexFiles(txn);
+      processIndexFiles();
     }
     finally
     {
@@ -1120,7 +1172,11 @@
     }
   }
 
-  private void processIndexFiles(WriteableStorage txn) throws InterruptedException, ExecutionException
+  /**
+   * Performs on-disk merge by reading several scratch files at once
+   * and write their ordered content into the target indexes.
+   */
+  private void processIndexFiles() throws InterruptedException, ExecutionException
   {
     if (bufferCount.get() == 0)
     {
@@ -1190,17 +1246,20 @@
     Semaphore permits = new Semaphore(buffers);
 
     // Start DN processing first.
-    submitIndexDBWriteTasks(DNIndexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
-    submitIndexDBWriteTasks(indexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
+    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
+    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
     getAll(futures);
     shutdownAll(dbService);
   }
 
-  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, WriteableStorage txn, ExecutorService dbService,
+  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
       Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
   {
     for (IndexManager indexMgr : indexMgrs)
     {
+      // avoid threading issues by allocating one writeable storage per thread
+      // DB transactions are generally tied to a single thread
+      WriteableStorage txn = this.rootContainer.getStorage().getWriteableStorage();
       futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, txn, permits, buffers, readAheadSize)));
     }
   }
@@ -1257,7 +1316,8 @@
 
                 while (success
                     && ByteSequence.COMPARATOR.compare(key, end) < 0
-                    && !importConfiguration.isCancelled() && !isCanceled)
+                    && !importConfiguration.isCancelled()
+                    && !isCanceled)
                 {
                   EntryID id = new EntryID(cursor.getValue());
                   Entry entry = entryContainer.getID2Entry().get(txn, id);
@@ -1908,7 +1968,7 @@
       }
       finally
       {
-        close(bufferFile, bufferIndexFile);
+        close(bufferFile, bufferIndexFile, txn);
 
         indexMgr.getBufferFile().delete();
         indexMgr.getBufferIndexFile().delete();
@@ -2767,8 +2827,7 @@
   /**
    * The rebuild index manager handles all rebuild index related processing.
    */
-  private class RebuildIndexManager extends ImportTask implements
-      DiskSpaceMonitorHandler
+  private class RebuildIndexManager extends ImportTask implements DiskSpaceMonitorHandler
   {
 
     /** Rebuild index configuration. */
@@ -2927,55 +2986,43 @@
       }
     }
 
-    /**
-     * Perform rebuild index processing.
-     *
-     * @param txn
-     *          The database transaction
-     * @throws InterruptedException
-     *           If an interrupted error occurred.
-     * @throws ExecutionException
-     *           If an Execution error occurred.
-     * @throws StorageRuntimeException
-     *           If an JEB error occurred.
-     */
-    public void rebuildIndexes(WriteableStorage txn)
-        throws InterruptedException, ExecutionException, StorageRuntimeException
+    private void clearDegradedState(WriteableStorage txn)
     {
-      this.txn = txn;
-      // Sets only the needed indexes.
-      setIndexesListsToBeRebuilt();
+      setIndexesListsToBeRebuilt(txn);
+      logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
+      postRebuildIndexes(txn);
+    }
 
-      if (!rebuildConfig.isClearDegradedState())
-      {
-        // If not in a 'clear degraded state' operation,
-        // need to rebuild the indexes.
-        setRebuildListIndexesTrusted(false);
-        clearIndexes(txn, true);
-        phaseOne();
-        if (isCanceled)
-        {
-          throw new InterruptedException("Rebuild Index canceled.");
-        }
-        phaseTwo();
-      }
-      else
-      {
-        logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
-      }
 
-      setRebuildListIndexesTrusted(true);
+    private void preRebuildIndexes(WriteableStorage txn)
+    {
+      setIndexesListsToBeRebuilt(txn);
+      setRebuildListIndexesTrusted(txn, false);
+      clearIndexes(txn, true);
+    }
+
+    private void throwIfCancelled() throws InterruptedException
+    {
+      if (isCanceled)
+      {
+        throw new InterruptedException("Rebuild Index canceled.");
+      }
+    }
+
+    private void postRebuildIndexes(WriteableStorage txn)
+    {
+      setRebuildListIndexesTrusted(txn, true);
     }
 
     @SuppressWarnings("fallthrough")
-    private void setIndexesListsToBeRebuilt() throws StorageRuntimeException
+    private void setIndexesListsToBeRebuilt(WriteableStorage txn) throws StorageRuntimeException
     {
       // Depends on rebuild mode, (re)building indexes' lists.
       final RebuildMode mode = rebuildConfig.getRebuildMode();
       switch (mode)
       {
       case ALL:
-        rebuildIndexMap(false);
+        rebuildIndexMap(txn, false);
         // falls through
       case DEGRADED:
         if (mode == RebuildMode.ALL
@@ -2991,7 +3038,7 @@
         if (mode == RebuildMode.DEGRADED
             || entryContainer.getAttributeIndexes().isEmpty())
         {
-          rebuildIndexMap(true); // only degraded.
+          rebuildIndexMap(txn, true); // only degraded.
         }
         if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
         {
@@ -3001,14 +3048,14 @@
 
       case USER_DEFINED:
         // false may be required if the user wants to rebuild specific index.
-        rebuildIndexMap(false);
+        rebuildIndexMap(txn, false);
         break;
       default:
         break;
       }
     }
 
-    private void rebuildIndexMap(final boolean onlyDegraded)
+    private void rebuildIndexMap(WriteableStorage txn, boolean onlyDegraded)
     {
       // rebuildList contains the user-selected index(in USER_DEFINED mode).
       final List<String> rebuildList = rebuildConfig.getRebuildList();
@@ -3020,7 +3067,7 @@
             || rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
         {
           // Get all existing indexes for all && degraded mode.
-          rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+          rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
         }
         else if (!rebuildList.isEmpty())
         {
@@ -3029,46 +3076,46 @@
           {
             if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
             {
-              rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
+              rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
             }
           }
         }
       }
     }
 
-    private void rebuildAttributeIndexes(final AttributeIndex attrIndex, final AttributeType attrType,
-        final boolean onlyDegraded) throws StorageRuntimeException
+    private void rebuildAttributeIndexes(WriteableStorage txn, AttributeIndex attrIndex, AttributeType attrType,
+        boolean onlyDegraded) throws StorageRuntimeException
     {
-      fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
-      fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
-      fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
-      fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
-      fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
+      fillIndexMap(txn, attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
+      fillIndexMap(txn, attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
+      fillIndexMap(txn, attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
+      fillIndexMap(txn, attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
+      fillIndexMap(txn, attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
 
       final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
       if (!extensibleMap.isEmpty())
       {
         final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
-        fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
+        fillIndexMap(txn, attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
         final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
-        fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
+        fillIndexMap(txn, attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
       }
     }
 
-    private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
-        final ImportIndexType importIndexType, final boolean onlyDegraded)
+    private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Collection<Index> indexes,
+        ImportIndexType importIndexType, boolean onlyDegraded)
     {
       if (indexes != null && !indexes.isEmpty())
       {
         final List<Index> mutableCopy = new LinkedList<Index>(indexes);
         for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
         {
-          final Index sharedIndex = it.next();
-          if (!onlyDegraded || !sharedIndex.isTrusted())
+          final Index index = it.next();
+          if (!onlyDegraded || !index.isTrusted())
           {
-            if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount(txn) == 0)
+            if (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0)
             {
-              putInIdContainerMap(sharedIndex);
+              putInIdContainerMap(index);
             }
           }
           else
@@ -3084,10 +3131,11 @@
       }
     }
 
-    private void fillIndexMap(final AttributeType attrType, final Index index,
-        final ImportIndexType importIndexType, final boolean onlyDegraded)
+    private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Index index,
+        ImportIndexType importIndexType, boolean onlyDegraded)
     {
-      if (index != null && (!onlyDegraded || !index.isTrusted())
+      if (index != null
+          && (!onlyDegraded || !index.isTrusted())
           && (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0))
       {
         putInIdContainerMap(index);
@@ -3148,7 +3196,7 @@
       }
     }
 
-    private void setRebuildListIndexesTrusted(boolean trusted) throws StorageRuntimeException
+    private void setRebuildListIndexesTrusted(WriteableStorage txn, boolean trusted) throws StorageRuntimeException
     {
       try
       {
@@ -3158,7 +3206,7 @@
           ec.getID2Children().setTrusted(txn, trusted);
           ec.getID2Subtree().setTrusted(txn, trusted);
         }
-        setTrusted(indexMap.values(), trusted);
+        setTrusted(txn, indexMap.values(), trusted);
         if (!vlvIndexes.isEmpty())
         {
           for (VLVIndex vlvIndex : vlvIndexes)
@@ -3170,7 +3218,7 @@
         {
           for (Collection<Index> subIndexes : extensibleIndexMap.values())
           {
-            setTrusted(subIndexes, trusted);
+            setTrusted(txn, subIndexes, trusted);
           }
         }
       }
@@ -3180,7 +3228,7 @@
       }
     }
 
-    private void setTrusted(final Collection<Index> indexes, boolean trusted)
+    private void setTrusted(WriteableStorage txn, final Collection<Index> indexes, boolean trusted)
     {
       if (indexes != null && !indexes.isEmpty())
       {
@@ -3191,7 +3239,8 @@
       }
     }
 
-    private void phaseOne() throws StorageRuntimeException, InterruptedException,
+    /** @see Importer#importPhaseOne(WriteableStorage) */
+    private void rebuildIndexesPhaseOne() throws StorageRuntimeException, InterruptedException,
         ExecutionException
     {
       initializeIndexBuffers();
@@ -3217,12 +3266,12 @@
       indexKeyQueueMap.clear();
     }
 
-    private void phaseTwo() throws InterruptedException, ExecutionException
+    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
     {
       final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
       try
       {
-        processIndexFiles(txn);
+        processIndexFiles();
       }
       finally
       {

--
Gitblit v1.10.0