mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
17.08.2015 e73561d3b0db47696c578736a50489a454ad6f9c
OPENDJ-1708 Persistit: no rebuild-index support

r11899 fixed issues with index buffers and scratch files encoding/decoding.
After that, rebuild-index was still not working.
It seems the reason was due to creating a single WriteableStorage (equivalent to DB transaction) that was then used to:
- clean the indexes to rebuild (rebuild index thread, t1)
- set index state (rebuild index thread, t2)
- rebuild indexes (executor thread, t3)
Apparently, despite all the indexes having been cleaned at time t1 (but not committed yet), executor thread at time t3 could still see the cleaned data.
Once rebuild had finished and commit occurred, the data stored during rebuild index had disappeared.

To solve this problem, I split the RebuildManager.rebuildIndexes() into several methods allowing to do commits after each step and expose data to all threads for the final on-disk merge.
Now only remains to make the process truly multi-threaded by removing each places the number of threads have been hard-coded to 1.



ReadableStorage.java:
Now implements Closeable.

Storage.java:
Added getWriteableStorage().

PersistItStorage.java:
Rename StorageImpl.release() to close().
Consequence of the changes to ReadableStorage and Storage.

TracedStorage.java:
Consequence of the changes to ReadableStorage and Storage.

Importer.java:
In rebuildIndexes(), separated the clear degraded state case from the rebuild indexes case.
In the new rebuildIndexes() method, commit the changes before pahe one and two.
In submitIndexDBWriteTasks(), each task receives its own WriteableStorage to avoid problems with transactions used over several separate threads.
In RebuildManager:
- split rebuildIndexes() into several other methods: one for clearDegradedState() and several other ones for rebuild indexes: preRebuildIndex(), throwIfCancelled(), postRebuildIndex().
- renamed phaseOne() and phaseTwo() to rebuildIndexesPhaseOne() and rebuildIndexesPhaseTwo()
- passed transaction objects as methods parameters to easily trace down their origin
In Importer, renamed phaseOne() and phaseTwo() to importPhaseOne() and importPhaseTwo().
5 files modified
270 ■■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java 24 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 203 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java 23 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java 7 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java 13 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/persistit/PersistItStorage.java
@@ -27,13 +27,10 @@
import static com.persistit.Transaction.CommitPolicy.*;
import static java.util.Arrays.*;
import static org.opends.messages.ConfigMessages.ERR_CONFIG_BACKEND_INSANE_MODE;
import static org.opends.messages.ConfigMessages.ERR_CONFIG_BACKEND_MODE_INVALID;
import static org.opends.messages.ConfigMessages.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.server.util.ServerConstants.ALERT_DESCRIPTION_DISK_FULL;
import static org.opends.server.util.ServerConstants.ALERT_DESCRIPTION_DISK_SPACE_LOW;
import static org.opends.server.util.ServerConstants.ALERT_TYPE_DISK_FULL;
import static org.opends.server.util.ServerConstants.ALERT_TYPE_DISK_SPACE_LOW;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
import java.io.File;
@@ -482,7 +479,8 @@
      return exchange;
    }
    private void release()
    @Override
    public void close()
    {
      for (final Exchange ex : exchanges.values())
      {
@@ -632,7 +630,7 @@
        }
        finally
        {
          storageImpl.release();
          storageImpl.close();
        }
      }
      catch (final RollbackException e)
@@ -704,7 +702,7 @@
        }
        finally
        {
          storageImpl.release();
          storageImpl.close();
        }
      }
      catch (final RollbackException e)
@@ -724,6 +722,12 @@
  }
  @Override
  public WriteableStorage getWriteableStorage()
  {
    return new StorageImpl();
  }
  @Override
  public boolean supportsBackupAndRestore()
  {
    return true;
@@ -962,7 +966,7 @@
    setDBDirPermissions(config, backendDirectory);
  }
  /** {@inheritDoc} */
  @Override
  public void removeStorageFiles() throws StorageRuntimeException
  {
    if (!backendDirectory.isDirectory())
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -859,27 +859,70 @@
      InterruptedException, ExecutionException
  {
    this.rootContainer = rootContainer;
    final long startTime = System.currentTimeMillis();
    try
    {
      if (rebuildManager.rebuildConfig.isClearDegradedState())
      {
        clearDegradedState();
      }
      else
      {
        rebuildIndexes();
      }
    }
    catch (Exception e)
    {
      logger.traceException(e);
    }
  }
  private void clearDegradedState() throws Exception
  {
      rootContainer.getStorage().write(new WriteOperation()
      {
        @Override
        public void run(WriteableStorage txn) throws Exception
        {
        final long startTime = System.currentTimeMillis();
          rebuildManager.initialize();
          rebuildManager.printStartMessage(txn);
          rebuildManager.rebuildIndexes(txn);
        rebuildManager.clearDegradedState(txn);
          recursiveDelete(tempDir);
          rebuildManager.printStopMessage(startTime);
        }
      });
    }
    catch (Exception e)
  private void rebuildIndexes() throws Exception
    {
      logger.traceException(e);
    final long startTime = System.currentTimeMillis();
    final Storage storage = rootContainer.getStorage();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        rebuildManager.initialize();
        rebuildManager.printStartMessage(txn);
        rebuildManager.preRebuildIndexes(txn);
    }
    });
    rebuildManager.rebuildIndexesPhaseOne();
    rebuildManager.throwIfCancelled();
    rebuildManager.rebuildIndexesPhaseTwo();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        rebuildManager.postRebuildIndexes(txn);
      }
    });
    recursiveDelete(tempDir);
    rebuildManager.printStopMessage(startTime);
  }
  /**
@@ -924,7 +967,7 @@
      setIndexesTrusted(false);
      final long startTime = System.currentTimeMillis();
      phaseOne(txn);
      importPhaseOne(txn);
      isPhaseOneDone = true;
      final long phaseOneFinishTime = System.currentTimeMillis();
@@ -938,7 +981,7 @@
      }
      final long phaseTwoTime = System.currentTimeMillis();
      phaseTwo(txn);
      importPhaseTwo();
      if (isCanceled)
      {
        throw new InterruptedException("Import processing canceled.");
@@ -1035,7 +1078,16 @@
    }
  }
  private void phaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
  /**
   * Reads all entries from id2entry, and:
   * <ol>
   * <li>compute how the entry is indexed for each index</li>
   * <li>store the result of indexing entries into in-memory index buffers</li>
   * <li>each time an in-memory index buffer is filled, sort it and write it to scratch files.
   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
   * </ol>
   */
  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
  {
    initializeIndexBuffers();
@@ -1106,13 +1158,13 @@
    }
  }
  private void phaseTwo(WriteableStorage txn) throws InterruptedException, ExecutionException
  private void importPhaseTwo() throws InterruptedException, ExecutionException
  {
    ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
    scheduleAtFixedRate(timerService, new SecondPhaseProgressTask(reader.getEntriesRead()));
    try
    {
      processIndexFiles(txn);
      processIndexFiles();
    }
    finally
    {
@@ -1120,7 +1172,11 @@
    }
  }
  private void processIndexFiles(WriteableStorage txn) throws InterruptedException, ExecutionException
  /**
   * Performs on-disk merge by reading several scratch files at once
   * and write their ordered content into the target indexes.
   */
  private void processIndexFiles() throws InterruptedException, ExecutionException
  {
    if (bufferCount.get() == 0)
    {
@@ -1190,17 +1246,20 @@
    Semaphore permits = new Semaphore(buffers);
    // Start DN processing first.
    submitIndexDBWriteTasks(DNIndexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, txn, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(DNIndexMgrList, dbService, permits, buffers, readAheadSize, futures);
    submitIndexDBWriteTasks(indexMgrList, dbService, permits, buffers, readAheadSize, futures);
    getAll(futures);
    shutdownAll(dbService);
  }
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, WriteableStorage txn, ExecutorService dbService,
  private void submitIndexDBWriteTasks(List<IndexManager> indexMgrs, ExecutorService dbService,
      Semaphore permits, int buffers, int readAheadSize, List<Future<Void>> futures)
  {
    for (IndexManager indexMgr : indexMgrs)
    {
      // avoid threading issues by allocating one writeable storage per thread
      // DB transactions are generally tied to a single thread
      WriteableStorage txn = this.rootContainer.getStorage().getWriteableStorage();
      futures.add(dbService.submit(new IndexDBWriteTask(indexMgr, txn, permits, buffers, readAheadSize)));
    }
  }
@@ -1257,7 +1316,8 @@
                while (success
                    && ByteSequence.COMPARATOR.compare(key, end) < 0
                    && !importConfiguration.isCancelled() && !isCanceled)
                    && !importConfiguration.isCancelled()
                    && !isCanceled)
                {
                  EntryID id = new EntryID(cursor.getValue());
                  Entry entry = entryContainer.getID2Entry().get(txn, id);
@@ -1908,7 +1968,7 @@
      }
      finally
      {
        close(bufferFile, bufferIndexFile);
        close(bufferFile, bufferIndexFile, txn);
        indexMgr.getBufferFile().delete();
        indexMgr.getBufferIndexFile().delete();
@@ -2767,8 +2827,7 @@
  /**
   * The rebuild index manager handles all rebuild index related processing.
   */
  private class RebuildIndexManager extends ImportTask implements
      DiskSpaceMonitorHandler
  private class RebuildIndexManager extends ImportTask implements DiskSpaceMonitorHandler
  {
    /** Rebuild index configuration. */
@@ -2927,55 +2986,43 @@
      }
    }
    /**
     * Perform rebuild index processing.
     *
     * @param txn
     *          The database transaction
     * @throws InterruptedException
     *           If an interrupted error occurred.
     * @throws ExecutionException
     *           If an Execution error occurred.
     * @throws StorageRuntimeException
     *           If an JEB error occurred.
     */
    public void rebuildIndexes(WriteableStorage txn)
        throws InterruptedException, ExecutionException, StorageRuntimeException
    private void clearDegradedState(WriteableStorage txn)
    {
      this.txn = txn;
      // Sets only the needed indexes.
      setIndexesListsToBeRebuilt();
      setIndexesListsToBeRebuilt(txn);
      logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
      postRebuildIndexes(txn);
    }
      if (!rebuildConfig.isClearDegradedState())
    private void preRebuildIndexes(WriteableStorage txn)
      {
        // If not in a 'clear degraded state' operation,
        // need to rebuild the indexes.
        setRebuildListIndexesTrusted(false);
      setIndexesListsToBeRebuilt(txn);
      setRebuildListIndexesTrusted(txn, false);
        clearIndexes(txn, true);
        phaseOne();
    }
    private void throwIfCancelled() throws InterruptedException
    {
        if (isCanceled)
        {
          throw new InterruptedException("Rebuild Index canceled.");
        }
        phaseTwo();
      }
      else
      {
        logger.info(NOTE_JEB_REBUILD_CLEARDEGRADEDSTATE_FINAL_STATUS, rebuildConfig.getRebuildList());
      }
      setRebuildListIndexesTrusted(true);
    private void postRebuildIndexes(WriteableStorage txn)
    {
      setRebuildListIndexesTrusted(txn, true);
    }
    @SuppressWarnings("fallthrough")
    private void setIndexesListsToBeRebuilt() throws StorageRuntimeException
    private void setIndexesListsToBeRebuilt(WriteableStorage txn) throws StorageRuntimeException
    {
      // Depends on rebuild mode, (re)building indexes' lists.
      final RebuildMode mode = rebuildConfig.getRebuildMode();
      switch (mode)
      {
      case ALL:
        rebuildIndexMap(false);
        rebuildIndexMap(txn, false);
        // falls through
      case DEGRADED:
        if (mode == RebuildMode.ALL
@@ -2991,7 +3038,7 @@
        if (mode == RebuildMode.DEGRADED
            || entryContainer.getAttributeIndexes().isEmpty())
        {
          rebuildIndexMap(true); // only degraded.
          rebuildIndexMap(txn, true); // only degraded.
        }
        if (mode == RebuildMode.ALL || vlvIndexes.isEmpty())
        {
@@ -3001,14 +3048,14 @@
      case USER_DEFINED:
        // false may be required if the user wants to rebuild specific index.
        rebuildIndexMap(false);
        rebuildIndexMap(txn, false);
        break;
      default:
        break;
      }
    }
    private void rebuildIndexMap(final boolean onlyDegraded)
    private void rebuildIndexMap(WriteableStorage txn, boolean onlyDegraded)
    {
      // rebuildList contains the user-selected index(in USER_DEFINED mode).
      final List<String> rebuildList = rebuildConfig.getRebuildList();
@@ -3020,7 +3067,7 @@
            || rebuildConfig.getRebuildMode() == RebuildMode.DEGRADED)
        {
          // Get all existing indexes for all && degraded mode.
          rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
          rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
        }
        else if (!rebuildList.isEmpty())
        {
@@ -3029,46 +3076,46 @@
          {
            if (attributeType.getNameOrOID().toLowerCase().equals(index.toLowerCase()))
            {
              rebuildAttributeIndexes(attributeIndex, attributeType, onlyDegraded);
              rebuildAttributeIndexes(txn, attributeIndex, attributeType, onlyDegraded);
            }
          }
        }
      }
    }
    private void rebuildAttributeIndexes(final AttributeIndex attrIndex, final AttributeType attrType,
        final boolean onlyDegraded) throws StorageRuntimeException
    private void rebuildAttributeIndexes(WriteableStorage txn, AttributeIndex attrIndex, AttributeType attrType,
        boolean onlyDegraded) throws StorageRuntimeException
    {
      fillIndexMap(attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
      fillIndexMap(attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getSubstringIndex(), ImportIndexType.SUBSTRING, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getOrderingIndex(), ImportIndexType.ORDERING, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getEqualityIndex(), ImportIndexType.EQUALITY, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getPresenceIndex(), ImportIndexType.PRESENCE, onlyDegraded);
      fillIndexMap(txn, attrType, attrIndex.getApproximateIndex(), ImportIndexType.APPROXIMATE, onlyDegraded);
      final Map<String, Collection<Index>> extensibleMap = attrIndex.getExtensibleIndexes();
      if (!extensibleMap.isEmpty())
      {
        final Collection<Index> subIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SUBSTRING);
        fillIndexMap(attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
        fillIndexMap(txn, attrType, subIndexes, ImportIndexType.EX_SUBSTRING, onlyDegraded);
        final Collection<Index> sharedIndexes = extensibleMap.get(EXTENSIBLE_INDEXER_ID_SHARED);
        fillIndexMap(attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
        fillIndexMap(txn, attrType, sharedIndexes, ImportIndexType.EX_SHARED, onlyDegraded);
      }
    }
    private void fillIndexMap(final AttributeType attrType, final Collection<Index> indexes,
        final ImportIndexType importIndexType, final boolean onlyDegraded)
    private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Collection<Index> indexes,
        ImportIndexType importIndexType, boolean onlyDegraded)
    {
      if (indexes != null && !indexes.isEmpty())
      {
        final List<Index> mutableCopy = new LinkedList<Index>(indexes);
        for (final Iterator<Index> it = mutableCopy.iterator(); it.hasNext();)
        {
          final Index sharedIndex = it.next();
          if (!onlyDegraded || !sharedIndex.isTrusted())
          final Index index = it.next();
          if (!onlyDegraded || !index.isTrusted())
          {
            if (!rebuildConfig.isClearDegradedState() || sharedIndex.getRecordCount(txn) == 0)
            if (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0)
            {
              putInIdContainerMap(sharedIndex);
              putInIdContainerMap(index);
            }
          }
          else
@@ -3084,10 +3131,11 @@
      }
    }
    private void fillIndexMap(final AttributeType attrType, final Index index,
        final ImportIndexType importIndexType, final boolean onlyDegraded)
    private void fillIndexMap(WriteableStorage txn, AttributeType attrType, Index index,
        ImportIndexType importIndexType, boolean onlyDegraded)
    {
      if (index != null && (!onlyDegraded || !index.isTrusted())
      if (index != null
          && (!onlyDegraded || !index.isTrusted())
          && (!rebuildConfig.isClearDegradedState() || index.getRecordCount(txn) == 0))
      {
        putInIdContainerMap(index);
@@ -3148,7 +3196,7 @@
      }
    }
    private void setRebuildListIndexesTrusted(boolean trusted) throws StorageRuntimeException
    private void setRebuildListIndexesTrusted(WriteableStorage txn, boolean trusted) throws StorageRuntimeException
    {
      try
      {
@@ -3158,7 +3206,7 @@
          ec.getID2Children().setTrusted(txn, trusted);
          ec.getID2Subtree().setTrusted(txn, trusted);
        }
        setTrusted(indexMap.values(), trusted);
        setTrusted(txn, indexMap.values(), trusted);
        if (!vlvIndexes.isEmpty())
        {
          for (VLVIndex vlvIndex : vlvIndexes)
@@ -3170,7 +3218,7 @@
        {
          for (Collection<Index> subIndexes : extensibleIndexMap.values())
          {
            setTrusted(subIndexes, trusted);
            setTrusted(txn, subIndexes, trusted);
          }
        }
      }
@@ -3180,7 +3228,7 @@
      }
    }
    private void setTrusted(final Collection<Index> indexes, boolean trusted)
    private void setTrusted(WriteableStorage txn, final Collection<Index> indexes, boolean trusted)
    {
      if (indexes != null && !indexes.isEmpty())
      {
@@ -3191,7 +3239,8 @@
      }
    }
    private void phaseOne() throws StorageRuntimeException, InterruptedException,
    /** @see Importer#importPhaseOne(WriteableStorage) */
    private void rebuildIndexesPhaseOne() throws StorageRuntimeException, InterruptedException,
        ExecutionException
    {
      initializeIndexBuffers();
@@ -3217,12 +3266,12 @@
      indexKeyQueueMap.clear();
    }
    private void phaseTwo() throws InterruptedException, ExecutionException
    private void rebuildIndexesPhaseTwo() throws InterruptedException, ExecutionException
    {
      final Timer timer = scheduleAtFixedRate(new SecondPhaseProgressTask(entriesProcessed.get()));
      try
      {
        processIndexFiles(txn);
        processIndexFiles();
      }
      finally
      {
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/TracedStorage.java
@@ -138,6 +138,12 @@
      return value;
    }
    @Override
    public void close()
    {
      logger.trace("Storage@%s.ReadableStorage@%s.close()", storageId(), id());
    }
    private int id()
    {
      return System.identityHashCode(this);
@@ -251,6 +257,12 @@
      return isUpdated;
    }
    @Override
    public void close()
    {
      logger.trace("Storage@%s.WriteableStorage@%s.close()", storageId(), id());
    }
    private int id()
    {
      return System.identityHashCode(this);
@@ -370,6 +382,17 @@
    storage.write(op);
  }
  @Override
  public WriteableStorage getWriteableStorage()
  {
    final WriteableStorage writeableStorage = storage.getWriteableStorage();
    if (logger.isTraceEnabled())
    {
      return new TracedWriteableStorage(writeableStorage);
    }
    return writeableStorage;
  }
  private String hex(final ByteSequence bytes)
  {
    return bytes != null ? bytes.toByteString().toHexString() : null;
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/ReadableStorage.java
@@ -25,13 +25,15 @@
 */
package org.opends.server.backends.pluggable.spi;
import java.io.Closeable;
import org.forgerock.opendj.ldap.ByteSequence;
import org.forgerock.opendj.ldap.ByteString;
/**
 * Represents a readable transaction on a storage engine.
 */
public interface ReadableStorage
public interface ReadableStorage extends Closeable
{
  /**
   * Reads the record's value associated to the provided key, in the tree whose name is provided.
@@ -75,4 +77,7 @@
   * @return the number of key/value pairs in the provided tree.
   */
  long getRecordCount(TreeName treeName);
  @Override
  public void close();
}
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/spi/Storage.java
@@ -79,9 +79,12 @@
   */
  void write(WriteOperation writeOperation) throws Exception;
  /** {@inheritDoc} */
  @Override
  void close();
  /**
   * Returns a new writeable storage.
   *
   * @return a new writeable storage
   */
  WriteableStorage getWriteableStorage();
  /**
   * Remove all files for a backend of this storage.
@@ -123,4 +126,8 @@
   *           If backup and restore is not supported by this storage.
   */
  FilenameFilter getFilesToBackupFilter();
  /** {@inheritDoc} */
  @Override
  void close();
}