mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.08.2015 e73561d3b0db47696c578736a50489a454ad6f9c
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -859,22 +859,17 @@
      InterruptedException, ExecutionException
  {
    this.rootContainer = rootContainer;
    final long startTime = System.currentTimeMillis();
    try
    {
      rootContainer.getStorage().write(new WriteOperation()
      if (rebuildManager.rebuildConfig.isClearDegradedState())
      {
        @Override
        public void run(WriteableStorage txn) throws Exception
        {
          rebuildManager.initialize();
          rebuildManager.printStartMessage(txn);
          rebuildManager.rebuildIndexes(txn);
          recursiveDelete(tempDir);
          rebuildManager.printStopMessage(startTime);
        }
      });
        clearDegradedState();
      }
      else
      {
        rebuildIndexes();
      }
    }
    catch (Exception e)
    {
@@ -882,6 +877,54 @@
    }
  }
  private void clearDegradedState() throws Exception
  {
    rootContainer.getStorage().write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        final long startTime = System.currentTimeMillis();
        rebuildManager.initialize();
        rebuildManager.printStartMessage(txn);
        rebuildManager.clearDegradedState(txn);
        recursiveDelete(tempDir);
        rebuildManager.printStopMessage(startTime);
      }
    });
  }
  private void rebuildIndexes() throws Exception
  {
    final long startTime = System.currentTimeMillis();
    final Storage storage = rootContainer.getStorage();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        rebuildManager.initialize();
        rebuildManager.printStartMessage(txn);
        rebuildManager.preRebuildIndexes(txn);
      }
    });
    rebuildManager.rebuildIndexesPhaseOne();
    rebuildManager.throwIfCancelled();
    rebuildManager.rebuildIndexesPhaseTwo();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        rebuildManager.postRebuildIndexes(txn);
      }
    });
    recursiveDelete(tempDir);
    rebuildManager.printStopMessage(startTime);
  }
  /**
   * Import a LDIF using the specified root container.
   *
@@ -924,7 +967,7 @@
      setIndexesTrusted(false);
      final long startTime = System.currentTimeMillis();
      phaseOne(txn);
      importPhaseOne(txn);
      isPhaseOneDone = true;
      final long phaseOneFinishTime = System.currentTimeMillis();
@@ -938,7 +981,7 @@
      }
      final long phaseTwoTime = System.currentTimeMillis();
      phaseTwo(txn);
      importPhaseTwo();
      if (isCanceled)
      {
        throw new InterruptedException("Import processing canceled.");
@@ -1035,7 +1078,16 @@
    }
  }
  private void phaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
  /**
   * Reads all entries from id2entry, and:
   * <ol>
   * <li>compute how the entry is indexed for each index</li>
   * <li>store the result of indexing entries into in-memory index buffers</li>
   * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
   * </ol>
   */
  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
  {
    initializeIndexBuffers();
@@ -1106,13 +1158,13 @@
    }
  }
  private void phaseTwo(WriteableStorage txn) throws InterruptedException, ExecutionException
  private void importPhaseTwo() throws InterruptedException, ExecutionException
  {
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
    try
    {
      processIndexFiles(txn);
      processIndexFiles();
    }
    finally
    {
@@ -1120,7 +1172,11 @@
    }
  }
  private void processIndexFiles(WriteableStorage txn) throws InterruptedException, ExecutionException
  /**
   * Performs on-disk merge by reading several scratch files at once
   * and write their ordered content into the target indexes.
   */
  private void processIndexFiles() throws InterruptedException, ExecutionException
  {
    if (bufferCount.get() == 0)
    {
@@ -1190,17 +1246,20 @@
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    submitIndexDBWriteTasks(DNIndexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
    getAll(futures);
    shutdownAll(dbService);
  }
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, WriteableStorage txn, ExecutorService dbService,
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
      Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
  {
    for (IndexManager indexMgr : indexMgrs)
    {
      // avoid threading issues by allocating one writeable storage per thread
      // DB transactions are generally tied to a single thread
      WriteableStorage txn = this.rootContainer.getStorage().getWriteableStorage();
      futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, txn, permits, buffers, readAheadSize)));
    }
  }
@@ -1257,7 +1316,8 @@
                while (success
                    && ByteSequence.COMPARATOR.compare(key, end) < 0
                    && !importConfiguration.isCancelled() && !isCanceled)
                    && !importConfiguration.isCancelled()
                    && !isCanceled)
                {
                  EntryID id = new EntryID(cursor.getValue());
                  Entry entry = entryContainer.getID2Entry().get(txn, id);
@@ -1908,7 +1968,7 @@
      }
      finally
      {
        close(bufferFile, bufferIndexFile);
        close(bufferFile, bufferIndexFile, txn);
        indexMgr.getBufferFile().delete();
        indexMgr.getBufferIndexFile().delete();
@@ -2767,8 +2827,7 @@
  /**
   * The rebuild index manager handles all rebuild index related processing.
   */
  private class RebuildIndexManager extends ImportTask implements
      DiskSpaceMonitorHandler
  private class RebuildIndexManager extends ImportTask implements DiskSpaceMonitorHandler
  {
    /** Rebuild index configuration. */
@@ -2927,55 +2986,43 @@
      }
    }
    /**
     * Perform rebuild index processing.
     *
     * @param txn
     *          The database transaction
     * @throws InterruptedException
     *           If an interrupted error occurred.
     * @throws ExecutionException
     *           If an Execution error occurred.
     * @throws StorageRuntimeException
     *           If an JEB error occurred.
     */
    public void rebuildIndexes(WriteableStorage txn)
        throws InterruptedException, ExecutionException, StorageRuntimeException
    private void clearDegradedState(WriteableStorage txn)
    {
      this.txn = txn;
      // Sets only the needed indexes.
      setIndexesListsToBeRebuilt();
      setIndexesListsToBeRebuilt(txn);
      logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
      postRebuildIndexes(txn);
    }
      if (!rebuildConfig.isClearDegradedState())
      {
        // If not in a 'clear degraded state' operation,
        // need to rebuild the indexes.
        setRebuildListIndexesTrusted(false);
        clearIndexes(txn, true);
        phaseOne();
        if (isCanceled)
        {
          throw new InterruptedException("Rebuild Index canceled.");
        }
        phaseTwo();
      }
      else
      {
        logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
      }
      setRebuildListIndexesTrusted(true);
    private void preRebuildIndexes(WriteableStorage txn)
    {
      setIndexesListsToBeRebuilt(txn);
      setRebuildListIndexesTrusted(txn, false);
      clearIndexes(txn, true);
    }
    private void throwIfCancelled() throws InterruptedException
    {
      if (isCanceled)
      {
        throw new InterruptedException("Rebuild Index canceled.");
      }
    }
    private void postRebuildIndexes(WriteableStorage txn)
    {
      setRebuildListIndexesTrusted(txn, true);
    }
    @SuppressWarnings("fallthrough")
    private void setIndexesListsToBeRebuilt() throws StorageRuntimeException
    private void setIndexesListsToBeRebuilt(WriteableStorage txn) throws StorageRuntimeException
    {
      // Depends on rebuild mode, (re)building indexes' lists.
      final RebuildMode mode = rebuildConfig.getRebuildMode();
      switch (mode)
      {
      case ALL:
        rebuildIndexMap(false);
        rebuildIndexMap(txn, false);
        // falls through
      case DEGRADED:
        if (mode == RebuildMode.ALL
@@ -2991,7 +3038,7 @@
        if (mode == RebuildMode.DEGRADED
            || entryContainer.getAttributeIndexes().isEmpty())
        {
          rebuildIndexMap(true); // only degraded.
          rebuildIndexMap(txn, true); // only degraded.
        }
        if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
        {
@@ -3001,14 +3048,14 @@
      case USER_DEFINED:
        // false may be required if the user wants to rebuild specific index.
        rebuildIndexMap(false);
        rebuildIndexMap(txn, false);
        break;
      default:
        break;
      }
    }
    private void rebuildIndexMap(final boolean onlyDegraded)
    private void rebuildIndexMap(WriteableStorage txn, boolean onlyDegraded)
    {
      // rebuildList contains the user-selected index(in USER_DEFINED mode).
      final List<String> rebuildList = rebuildConfig.getRebuildList();
@@ -3020,7 +3067,7 @@
            || rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
        {
          // Get all existing indexes for all && degraded mode.
          rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
          rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
        }
        else if (!rebuildList.isEmpty())
        {
@@ -3029,46 +3076,46 @@
          {
            if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
            {
              rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
              rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
            }
          }
        }
      }
    }
    private void rebuildAttributeIndexes(final AttributeIndex attrIndex, final AttributeType attrType,
        final boolean onlyDegraded) throws StorageRuntimeException
    private void rebuildAttributeIndexes(WriteableStorage txn, AttributeIndex attrIndex, AttributeType attrType,
        boolean onlyDegraded) throws StorageRuntimeException
    {
      fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
      final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
      if (!extensibleMap.isEmpty())
      {
        final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
        fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
        fillIndexMap(txn, attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
        final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
        fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
        fillIndexMap(txn, attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
      }
    }
    private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
        final ImportIndexType importIndexType, final boolean onlyDegraded)
    private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Collection<Index> indexes,
        ImportIndexType importIndexType, boolean onlyDegraded)
    {
      if (indexes != null && !indexes.isEmpty())
      {
        final List<Index> mutableCopy = new LinkedList<Index>(indexes);
        for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
        {
          final Index sharedIndex = it.next();
          if (!onlyDegraded || !sharedIndex.isTrusted())
          final Index index = it.next();
          if (!onlyDegraded || !index.isTrusted())
          {
            if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount(txn) == 0)
            if (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0)
            {
              putInIdContainerMap(sharedIndex);
              putInIdContainerMap(index);
            }
          }
          else
@@ -3084,10 +3131,11 @@
      }
    }
    private void fillIndexMap(final AttributeType attrType, final Index index,
        final ImportIndexType importIndexType, final boolean onlyDegraded)
    private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Index index,
        ImportIndexType importIndexType, boolean onlyDegraded)
    {
      if (index != null && (!onlyDegraded || !index.isTrusted())
      if (index != null
          && (!onlyDegraded || !index.isTrusted())
          && (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0))
      {
        putInIdContainerMap(index);
@@ -3148,7 +3196,7 @@
      }
    }
    private void setRebuildListIndexesTrusted(boolean trusted) throws StorageRuntimeException
    private void setRebuildListIndexesTrusted(WriteableStorage txn, boolean trusted) throws StorageRuntimeException
    {
      try
      {
@@ -3158,7 +3206,7 @@
          ec.getID2Children().setTrusted(txn, trusted);
          ec.getID2Subtree().setTrusted(txn, trusted);
        }
        setTrusted(indexMap.values(), trusted);
        setTrusted(txn, indexMap.values(), trusted);
        if (!vlvIndexes.isEmpty())
        {
          for (VLVIndex vlvIndex : vlvIndexes)
@@ -3170,7 +3218,7 @@
        {
          for (Collection<Index> subIndexes : extensibleIndexMap.values())
          {
            setTrusted(subIndexes, trusted);
            setTrusted(txn, subIndexes, trusted);
          }
        }
      }
@@ -3180,7 +3228,7 @@
      }
    }
    private void setTrusted(final Collection<Index> indexes, boolean trusted)
    private void setTrusted(WriteableStorage txn, final Collection<Index> indexes, boolean trusted)
    {
      if (indexes != null && !indexes.isEmpty())
      {
@@ -3191,7 +3239,8 @@
      }
    }
    private void phaseOne() throws StorageRuntimeException, InterruptedException,
    /** @see Importer#importPhaseOne(WriteableStorage) */
    private void rebuildIndexesPhaseOne() throws StorageRuntimeException, InterruptedException,
        ExecutionException
    {
      initializeIndexBuffers();
@@ -3217,12 +3266,12 @@
      indexKeyQueueMap.clear();
    }
    private void phaseTwo() throws InterruptedException, ExecutionException
    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
    {
      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
      try
      {
        processIndexFiles(txn);
        processIndexFiles();
      }
      finally
      {