mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
26.14.2015 06e460f4e1b092e615753982b4080f1deaeeec4c
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -339,6 +339,7 @@
    {
      this.threadCount = importConfiguration.getThreadCount();
    }
    this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
    // Determine the number of indexes.
    this.indexCount = getTotalIndexCount(backendCfg);
@@ -384,6 +385,11 @@
  {
    return !importCfg.appendToExistingData()
        && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
    /*
     * Why do we clear when there is only one baseDN?
     * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
     * so if there is only one baseDN for this backend, then clear it now.
     */
  }
  private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
@@ -930,13 +936,11 @@
   *
   * @param rootContainer
   *          The root container to use during the import.
   * @param txn
   *          The database transaction
   * @return A LDIF result.
   * @throws Exception
   *           If the import failed
   */
  public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception
  public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
  {
    this.rootContainer = rootContainer;
    try {
@@ -953,11 +957,20 @@
      logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
              BUILD_ID, REVISION_NUMBER);
      logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
      initializeSuffixes(txn);
      setIndexesTrusted(txn, false);
      final Storage storage = rootContainer.getStorage();
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableStorage txn) throws Exception
        {
          initializeSuffixes(txn);
          setIndexesTrusted(txn, false);
        }
      });
      final long startTime = System.currentTimeMillis();
      importPhaseOne(txn);
      importPhaseOne();
      isPhaseOneDone = true;
      final long phaseOneFinishTime = System.currentTimeMillis();
@@ -978,8 +991,15 @@
      }
      final long phaseTwoFinishTime = System.currentTimeMillis();
      setIndexesTrusted(txn, true);
      switchEntryContainers(txn);
      storage.write(new WriteOperation()
      {
        @Override
        public void run(WriteableStorage txn) throws Exception
        {
          setIndexesTrusted(txn, true);
          switchEntryContainers(txn);
        }
      });
      recursiveDelete(tempDir);
      final long finishTime = System.currentTimeMillis();
      final long importTime = finishTime - startTime;
@@ -1077,7 +1097,7 @@
   * The scratch files will be read by phaseTwo to perform on-disk merge</li>
   * </ol>
   */
  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
  private void importPhaseOne() throws Exception
  {
    initializeIndexBuffers();
@@ -1087,7 +1107,15 @@
    bufferSortService = Executors.newFixedThreadPool(threadCount);
    final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
    execService.submit(new MigrateExistingTask(txn)).get();
    final Storage storage = rootContainer.getStorage();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        execService.submit(new MigrateExistingTask(txn)).get();
      }
    });
    final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
    if (importConfiguration.appendToExistingData()
@@ -1095,20 +1123,27 @@
    {
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new AppendReplaceTask(txn));
        tasks.add(new AppendReplaceTask(storage.getWriteableStorage()));
      }
    }
    else
    {
      for (int i = 0; i < threadCount; i++)
      {
        tasks.add(new ImportTask(txn));
        tasks.add(new ImportTask(storage.getWriteableStorage()));
      }
    }
    getAll(execService.invokeAll(tasks));
    tasks.clear();
    execService.submit(new MigrateExcludedTask(txn)).get();
    storage.write(new WriteOperation()
    {
      @Override
      public void run(WriteableStorage txn) throws Exception
      {
        execService.submit(new MigrateExcludedTask(txn)).get();
      }
    });
    stopScratchFileWriters();
    getAll(scratchFileWriterFutures);
@@ -1466,6 +1501,10 @@
        isCanceled = true;
        throw e;
      }
      finally
      {
        txn.close();
      }
    }
    void processEntry(Entry entry, Suffix suffix)
@@ -1586,6 +1625,10 @@
        isCanceled = true;
        throw e;
      }
      finally
      {
        txn.close();
      }
    }
    void processEntry(Entry entry, EntryID entryID, Suffix suffix)
@@ -1807,6 +1850,7 @@
  {
    private final IndexManager indexMgr;
    private final int cacheSize;
    /** indexID => DNState map */
    private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
    private final Semaphore permits;
    private final int maxPermits;
@@ -2101,16 +2145,12 @@
    private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException
    {
      DNState dnState;
      if (!dnStateMap.containsKey(indexID))
      DNState dnState = dnStateMap.get(indexID);
      if (dnState == null)
      {
        dnState = new DNState(indexIDToECMap.get(indexID));
        dnStateMap.put(indexID, dnState);
      }
      else
      {
        dnState = dnStateMap.get(indexID);
      }
      if (dnState.checkParent(txn, idSet))
      {
        dnState.writeToDN2ID(idSet);
@@ -3926,12 +3966,20 @@
    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
    {
      final Object returnValue = returnValues.get(method.getName());
      final String methodName = method.getName();
      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
      {
        // ignore calls to (add|remove)*ChangeListener() methods
        return null;
      }
      final Object returnValue = returnValues.get(methodName);
      if (returnValue != null)
      {
        return returnValue;
      }
      throw new IllegalArgumentException("Unhandled method call on object (" + proxy
      throw new IllegalArgumentException("Unhandled method call on proxy ("
          + BackendCfgHandler.class.getSimpleName()
          + ") for method (" + method
          + ") with arguments (" + Arrays.toString(args) + ")");
    }
@@ -3943,11 +3991,10 @@
   */
  private final class TmpEnv implements DNCache
  {
    private final Storage storage;
    private WriteableStorage txn;
    private org.opends.server.backends.pluggable.spi.Importer importer;
    private static final String DB_NAME = "dn_cache";
    private final TreeName dnCache = new TreeName("", DB_NAME);
    private final Storage storage;
    private final WriteableStorage txn;
    /**
     * Create a temporary DB environment and database to be used as a cache of
@@ -3963,20 +4010,25 @@
      final Map<String, Object> returnValues = new HashMap<String, Object>();
      returnValues.put("getDBDirectory", envPath.getAbsolutePath());
      returnValues.put("getBackendId", DB_NAME);
      // returnValues.put("getDBCacheSize", 10L);
      returnValues.put("getDBCacheSize", 0L);
      returnValues.put("getDBCachePercent", 10);
      returnValues.put("isDBTxnNoSync", true);
      returnValues.put("getDBDirectoryPermissions", "700");
      returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
      returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
      try
      {
        returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config"));
        storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
            DirectoryServer.getInstance().getServerContext());
        storage.open();
        txn = storage.getWriteableStorage();
        txn.openTree(dnCache);
      }
      catch (Exception e)
      {
        throw new StorageRuntimeException(e);
      }
      importer.createTree(dnCache);
    }
    private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
@@ -4012,7 +4064,7 @@
    {
      try
      {
        importer.close();
        storage.close();
      }
      finally
      {