mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.14.2015 06e460f4e1b092e615753982b4080f1deaeeec4c
OPENDJ-1707 Persistit: various import problems

Switched the import to use on-disk merge.
Remains to fix the multithreading issue (threadCount is currently hardcoded to 1).


BackendImpl.java:
In importLDIF(), do not forget to close the RootContainer.
Moved import preliminary code here from RootContainer.

Importer.java:
In processImport(), did the same fix as in r11932: split processImport() into several transactions to do commits after each step and expose data to all threads for the final on-disk merge.
In TmpEnv constructor expanded the scope of the proxy.

RootContainer.java:
In importLDIF(), extracted method importLDIFWithSuccessiveAdds() + added importLDIFWithOnDiskMerge() to offer several import strategies.
Import initialization code has been moved to BackendImpl.importLDIF().
3 files modified
237 ■■■■ changed files
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java 64 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java 108 ●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java 65 ●●●●● patch | view | raw | blame | history
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -58,6 +58,7 @@
import org.opends.server.backends.pluggable.spi.WriteableStorage;
import org.opends.server.core.*;
import org.opends.server.types.*;
import org.opends.server.util.RuntimeInformation;
/**
 * This is an implementation of a Directory Server Backend which stores entries locally in a
@@ -642,13 +643,74 @@
  @Override
  public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException
  {
    RuntimeInformation.logInfo();
    // If the rootContainer is open, the backend is initialized by something else.
    // We can't do import while the backend is online.
    if (rootContainer != null)
    {
      throw new DirectoryException(getServerErrorResultCode(), ERR_JEB_IMPORT_BACKEND_ONLINE.get());
    }
    return new RootContainer(this, cfg).importLDIF(importConfig);
    try
    {
      if (Importer.mustClearBackend(importConfig, cfg))
      {
        try
        {
          // clear all files before opening the root container
          storage.removeStorageFiles();
        }
        catch (Exception e)
        {
          LocalizableMessage m = ERR_JEB_REMOVE_FAIL.get(e.getMessage());
          throw new DirectoryException(getServerErrorResultCode(), m, e);
        }
      }
      rootContainer = initializeRootContainer();
      return rootContainer.importLDIF(importConfig);
    }
    catch (StorageRuntimeException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()), e);
    }
    catch (DirectoryException e)
    {
      throw e;
    }
    catch (OpenDsException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
    }
    catch (ConfigException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
    }
    finally
    {
      try
      {
        if (rootContainer != null)
        {
          long startTime = System.currentTimeMillis();
          rootContainer.close();
          long finishTime = System.currentTimeMillis();
          long closeTime = (finishTime - startTime) / 1000;
          logger.info(NOTE_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE, closeTime);
          rootContainer = null;
        }
        logger.info(NOTE_JEB_IMPORT_CLOSING_DATABASE);
      }
      catch (StorageRuntimeException de)
      {
        logger.traceException(de);
      }
    }
  }
  /** {@inheritDoc} */
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -339,6 +339,7 @@
    {
      this.threadCount = importConfiguration.getThreadCount();
    }
    this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
    // Determine the number of indexes.
    this.indexCount = getTotalIndexCount(backendCfg);
@@ -384,6 +385,11 @@
  {
    return !importCfg.appendToExistingData()
        && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
    /*
     * Why do we clear when there is only one baseDN?
     * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
     * so if there is only one baseDN for this backend, then clear it now.
     */
  }
  private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
@@ -930,13 +936,11 @@
   *
   * @param rootContainer
   *          The root container to use during the import.
   * @param txn
   *          The database transaction
   * @return A LDIF result.
   * @throws Exception
   *           If the import failed
   */
  public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception
  public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
  {
    this.rootContainer = rootContainer;
    try {
@@ -953,11 +957,20 @@
      logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
              BUILD_ID, REVISION_NUMBER);
      logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
      initializeSuffixes(txn);
      setIndexesTrusted(txn, false);
      final Storage storage = rootContainer.getStorage();
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableStorage txn) throws Exception
        {
          initializeSuffixes(txn);
          setIndexesTrusted(txn, false);
        }
      });
      final long startTime = System.currentTimeMillis();
      importPhaseOne(txn);
      importPhaseOne();
      isPhaseOneDone = true;
      final long phaseOneFinishTime = System.currentTimeMillis();
@@ -978,8 +991,15 @@
      }
      final long phaseTwoFinishTime = System.currentTimeMillis();
      setIndexesTrusted(txn, true);
      switchEntryContainers(txn);
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableStorage txn) throws Exception
        {
          setIndexesTrusted(txn, true);
          switchEntryContainers(txn);
        }
      });
      recursiveDelete(tempDir);
      final long finishTime = System.currentTimeMillis();
      final long importTime = finishTime - startTime;
@@ -1077,7 +1097,7 @@
   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
   * </ol>
   */
  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
  private void importPhaseOne() throws Exception
  {
    initializeIndexBuffers();
@@ -1087,7 +1107,15 @@
    bufferSortService = Executors.newFixedThreadPool(threadCount);
    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
    execService.submit(new MigrateExistingTask(txn)).get();
    final Storage storage = rootContainer.getStorage();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        execService.submit(new MigrateExistingTask(txn)).get();
      }
    });
    final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    if (importConfiguration.appendToExistingData()
@@ -1095,20 +1123,27 @@
    {
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new AppendReplaceTask(txn));
        tasks.add(new AppendReplaceTask(storage.getWriteableStorage()));
      }
    }
    else
    {
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new ImportTask(txn));
        tasks.add(new ImportTask(storage.getWriteableStorage()));
      }
    }
    getAll(execService.invokeAll(tasks));
    tasks.clear();
    execService.submit(new MigrateExcludedTask(txn)).get();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        execService.submit(new MigrateExcludedTask(txn)).get();
      }
    });
    stopScratchFileWriters();
    getAll(scratchFileWriterFutures);
@@ -1466,6 +1501,10 @@
        isCanceled = true;
        throw e;
      }
      finally
      {
        txn.close();
      }
    }
    void processEntry(Entry entry, Suffix suffix)
@@ -1586,6 +1625,10 @@
        isCanceled = true;
        throw e;
      }
      finally
      {
        txn.close();
      }
    }
    void processEntry(Entry entry, EntryID entryID, Suffix suffix)
@@ -1807,6 +1850,7 @@
  {
    private final IndexManager indexMgr;
    private final int cacheSize;
    /** indexID => DNState map */
    private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
    private final Semaphore permits;
    private final int maxPermits;
@@ -2101,16 +2145,12 @@
    private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException
    {
      DNState dnState;
      if (!dnStateMap.containsKey(indexID))
      DNState dnState = dnStateMap.get(indexID);
      if (dnState == null)
      {
        dnState = new DNState(indexIDToECMap.get(indexID));
        dnStateMap.put(indexID, dnState);
      }
      else
      {
        dnState = dnStateMap.get(indexID);
      }
      if (dnState.checkParent(txn, idSet))
      {
        dnState.writeToDN2ID(idSet);
@@ -3926,12 +3966,20 @@
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
    {
      final Object returnValue = returnValues.get(method.getName());
      final String methodName = method.getName();
      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
      {
        // ignore calls to (add|remove)*ChangeListener() methods
        return null;
      }
      final Object returnValue = returnValues.get(methodName);
      if (returnValue != null)
      {
        return returnValue;
      }
      throw new IllegalArgumentException("Unhandled method call on object (" + proxy
      throw new IllegalArgumentException("Unhandled method call on proxy ("
          + BackendCfgHandler.class.getSimpleName()
          + ") for method (" + method
          + ") with arguments (" + Arrays.toString(args) + ")");
    }
@@ -3943,11 +3991,10 @@
   */
  private final class TmpEnv implements DNCache
  {
    private final Storage storage;
    private WriteableStorage txn;
    private org.opends.server.backends.pluggable.spi.Importer importer;
    private static final String DB_NAME = "dn_cache";
    private final TreeName dnCache = new TreeName("", DB_NAME);
    private final Storage storage;
    private final WriteableStorage txn;
    /**
     * Create a temporary DB environment and database to be used as a cache of
@@ -3963,20 +4010,25 @@
      final Map<String, Object> returnValues = new HashMap<String, Object>();
      returnValues.put("getDBDirectory", envPath.getAbsolutePath());
      returnValues.put("getBackendId", DB_NAME);
      // returnValues.put("getDBCacheSize", 10L);
      returnValues.put("getDBCacheSize", 0L);
      returnValues.put("getDBCachePercent", 10);
      returnValues.put("isDBTxnNoSync", true);
      returnValues.put("getDBDirectoryPermissions", "700");
      returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
      returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
      try
      {
        returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config"));
        storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
            DirectoryServer.getInstance().getServerContext());
        storage.open();
        txn = storage.getWriteableStorage();
        txn.openTree(dnCache);
      }
      catch (Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      importer.createTree(dnCache);
    }
    private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
@@ -4012,7 +4064,7 @@
    {
      try
      {
        importer.close();
        storage.close();
      }
      finally
      {
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -26,18 +26,11 @@
 */
package org.opends.server.backends.pluggable;
import static org.opends.messages.BackendMessages.ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER;
import static org.opends.messages.BackendMessages.ERR_LDIF_BACKEND_ERROR_READING_LDIF;
import static org.opends.messages.JebMessages.ERR_JEB_CACHE_PRELOAD;
import static org.opends.messages.JebMessages.ERR_JEB_REMOVE_FAIL;
import static org.opends.messages.JebMessages.ERR_JEB_ENTRY_CONTAINER_ALREADY_REGISTERED;
import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_PARENT_NOT_FOUND;
import static org.opends.messages.JebMessages.NOTE_JEB_IMPORT_FINAL_STATUS;
import static org.opends.messages.JebMessages.NOTE_JEB_IMPORT_PROGRESS_REPORT;
import static org.opends.messages.JebMessages.WARN_JEB_IMPORT_ENTRY_EXISTS;
import static org.opends.messages.UtilityMessages.ERR_LDIF_SKIP;
import static org.opends.server.core.DirectoryServer.getServerErrorResultCode;
import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
import static org.opends.messages.BackendMessages.*;
import static org.opends.messages.JebMessages.*;
import static org.opends.messages.UtilityMessages.*;
import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.Collection;
@@ -55,6 +48,7 @@
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.admin.server.ConfigurationChangeListener;
import org.opends.server.admin.std.server.PersistitBackendCfg;
import org.opends.server.admin.std.server.PluggableBackendCfg;
import org.opends.server.api.CompressedSchema;
import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -77,8 +71,6 @@
import org.opends.server.types.Privilege;
import org.opends.server.util.LDIFException;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.RuntimeInformation;
/**
 * Wrapper class for the JE environment. Root container holds all the entry
@@ -188,24 +180,14 @@
   *           If a problem occurs while performing the LDIF import.
   */
  LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException
  {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy
    return importLDIFWithOnDiskMerge(importConfig);
  }
  private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException
  {
    RuntimeInformation.logInfo();
    if (Importer.mustClearBackend(importConfig, config))
    {
      try
      {
        backend.getStorage().removeStorageFiles();
      }
      catch (Exception e)
      {
        LocalizableMessage m = ERR_JEB_REMOVE_FAIL.get(e.getMessage());
        throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
      }
    }
    try
    {
      open();
      ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
      try
      {
@@ -330,6 +312,31 @@
    timerService.awaitTermination(20, TimeUnit.SECONDS);
  }
  private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig) throws DirectoryException
  {
    try
    {
      final Importer importer = new Importer(importConfig, (PersistitBackendCfg) config); // TODO JNR remove cast
      return importer.processImport(this);
    }
    catch (DirectoryException e)
    {
      logger.traceException(e);
      throw e;
    }
    catch (OpenDsException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
    }
    catch (Exception e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(),
          LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
    }
  }
  /**
   * Opens the root container.
   *