From f888fe7a1b88a5c280c619ffe13c91dcec1ff35b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Mar 2015 13:14:20 +0000
Subject: [PATCH] OPENDJ-1707 Persistit: various import problems

---
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java |   65 +++++++++-------
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java   |   64 +++++++++++++++
 opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java      |  108 ++++++++++++++++++++-------
 3 files changed, 179 insertions(+), 58 deletions(-)

diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
index 93b2483..f4b286c 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -58,6 +58,7 @@
 import org.opends.server.backends.pluggable.spi.WriteableStorage;
 import org.opends.server.core.*;
 import org.opends.server.types.*;
+import org.opends.server.util.RuntimeInformation;
 
 /**
  * This is an implementation of a Directory Server Backend which stores entries locally in a
@@ -642,13 +643,74 @@
   @Override
   public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException
   {
+    RuntimeInformation.logInfo();
+
     // If the rootContainer is open, the backend is initialized by something else.
     // We can't do import while the backend is online.
     if (rootContainer != null)
     {
       throw new DirectoryException(getServerErrorResultCode(), ERR_JEB_IMPORT_BACKEND_ONLINE.get());
     }
-    return new RootContainer(this, cfg).importLDIF(importConfig);
+
+    try
+    {
+      if (Importer.mustClearBackend(importConfig, cfg))
+      {
+        try
+        {
+          // clear all files before opening the root container
+          storage.removeStorageFiles();
+        }
+        catch (Exception e)
+        {
+          LocalizableMessage m = ERR_JEB_REMOVE_FAIL.get(e.getMessage());
+          throw new DirectoryException(getServerErrorResultCode(), m, e);
+        }
+      }
+
+      rootContainer = initializeRootContainer();
+      return rootContainer.importLDIF(importConfig);
+    }
+    catch (StorageRuntimeException e)
+    {
+      logger.traceException(e);
+      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()), e);
+    }
+    catch (DirectoryException e)
+    {
+      throw e;
+    }
+    catch (OpenDsException e)
+    {
+      logger.traceException(e);
+      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+    }
+    catch (ConfigException e)
+    {
+      logger.traceException(e);
+      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+    }
+    finally
+    {
+      try
+      {
+        if (rootContainer != null)
+        {
+          long startTime = System.currentTimeMillis();
+          rootContainer.close();
+          long finishTime = System.currentTimeMillis();
+          long closeTime = (finishTime - startTime) / 1000;
+          logger.info(NOTE_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE, closeTime);
+          rootContainer = null;
+        }
+
+        logger.info(NOTE_JEB_IMPORT_CLOSING_DATABASE);
+      }
+      catch (StorageRuntimeException de)
+      {
+        logger.traceException(de);
+      }
+    }
   }
 
   /** {@inheritDoc} */
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 804c395..0a70da3 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -339,6 +339,7 @@
     {
       this.threadCount = importConfiguration.getThreadCount();
     }
+    this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
 
     // Determine the number of indexes.
     this.indexCount = getTotalIndexCount(backendCfg);
@@ -384,6 +385,11 @@
   {
     return !importCfg.appendToExistingData()
         && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+    /*
+     * Why do we clear when there is only one baseDN?
+     * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
+     * so if there is only one baseDN for this backend, then clear it now.
+     */
   }
 
   private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
@@ -930,13 +936,11 @@
    *
    * @param rootContainer
    *          The root container to use during the import.
-   * @param txn
-   *          The database transaction
    * @return A LDIF result.
    * @throws Exception
    *           If the import failed
    */
-  public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception
+  public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
   {
     this.rootContainer = rootContainer;
     try {
@@ -953,11 +957,20 @@
       logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
               BUILD_ID, REVISION_NUMBER);
       logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
-      initializeSuffixes(txn);
-      setIndexesTrusted(txn, false);
+
+      final Storage storage = rootContainer.getStorage();
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableStorage txn) throws Exception
+        {
+          initializeSuffixes(txn);
+          setIndexesTrusted(txn, false);
+        }
+      });
 
       final long startTime = System.currentTimeMillis();
-      importPhaseOne(txn);
+      importPhaseOne();
       isPhaseOneDone = true;
       final long phaseOneFinishTime = System.currentTimeMillis();
 
@@ -978,8 +991,15 @@
       }
       final long phaseTwoFinishTime = System.currentTimeMillis();
 
-      setIndexesTrusted(txn, true);
-      switchEntryContainers(txn);
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableStorage txn) throws Exception
+        {
+          setIndexesTrusted(txn, true);
+          switchEntryContainers(txn);
+        }
+      });
       recursiveDelete(tempDir);
       final long finishTime = System.currentTimeMillis();
       final long importTime = finishTime - startTime;
@@ -1077,7 +1097,7 @@
    * The scratch files will be read by phaseTwo to perform on-disk merge</li>
    * </ol>
    */
-  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+  private void importPhaseOne() throws Exception
   {
     initializeIndexBuffers();
 
@@ -1087,7 +1107,15 @@
     bufferSortService = Executors.newFixedThreadPool(threadCount);
     final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 
-    execService.submit(new MigrateExistingTask(txn)).get();
+    final Storage storage = rootContainer.getStorage();
+    storage.write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        execService.submit(new MigrateExistingTask(txn)).get();
+      }
+    });
 
     final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
     if (importConfiguration.appendToExistingData()
@@ -1095,20 +1123,27 @@
     {
       for (int i = 0; i < threadCount; i++)
       {
-        tasks.add(new AppendReplaceTask(txn));
+        tasks.add(new AppendReplaceTask(storage.getWriteableStorage()));
       }
     }
     else
     {
       for (int i = 0; i < threadCount; i++)
       {
-        tasks.add(new ImportTask(txn));
+        tasks.add(new ImportTask(storage.getWriteableStorage()));
       }
     }
     getAll(execService.invokeAll(tasks));
     tasks.clear();
 
-    execService.submit(new MigrateExcludedTask(txn)).get();
+    storage.write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        execService.submit(new MigrateExcludedTask(txn)).get();
+      }
+    });
 
     stopScratchFileWriters();
     getAll(scratchFileWriterFutures);
@@ -1466,6 +1501,10 @@
         isCanceled = true;
         throw e;
       }
+      finally
+      {
+        txn.close();
+      }
     }
 
     void processEntry(Entry entry, Suffix suffix)
@@ -1586,6 +1625,10 @@
         isCanceled = true;
         throw e;
       }
+      finally
+      {
+        txn.close();
+      }
     }
 
     void processEntry(Entry entry, EntryID entryID, Suffix suffix)
@@ -1807,6 +1850,7 @@
   {
     private final IndexManager indexMgr;
     private final int cacheSize;
+    /** indexID => DNState map */
     private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
     private final Semaphore permits;
     private final int maxPermits;
@@ -2101,16 +2145,12 @@
 
     private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException
     {
-      DNState dnState;
-      if (!dnStateMap.containsKey(indexID))
+      DNState dnState = dnStateMap.get(indexID);
+      if (dnState == null)
       {
         dnState = new DNState(indexIDToECMap.get(indexID));
         dnStateMap.put(indexID, dnState);
       }
-      else
-      {
-        dnState = dnStateMap.get(indexID);
-      }
       if (dnState.checkParent(txn, idSet))
       {
         dnState.writeToDN2ID(idSet);
@@ -3926,12 +3966,20 @@
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
     {
-      final Object returnValue = returnValues.get(method.getName());
+      final String methodName = method.getName();
+      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
+      {
+        // ignore calls to (add|remove)*ChangeListener() methods
+        return null;
+      }
+
+      final Object returnValue = returnValues.get(methodName);
       if (returnValue != null)
       {
         return returnValue;
       }
-      throw new IllegalArgumentException("Unhandled method call on object (" + proxy
+      throw new IllegalArgumentException("Unhandled method call on proxy ("
+          + BackendCfgHandler.class.getSimpleName()
           + ") for method (" + method
           + ") with arguments (" + Arrays.toString(args) + ")");
     }
@@ -3943,11 +3991,10 @@
    */
   private final class TmpEnv implements DNCache
   {
-    private final Storage storage;
-    private WriteableStorage txn;
-    private org.opends.server.backends.pluggable.spi.Importer importer;
     private static final String DB_NAME = "dn_cache";
     private final TreeName dnCache = new TreeName("", DB_NAME);
+    private final Storage storage;
+    private final WriteableStorage txn;
 
     /**
      * Create a temporary DB environment and database to be used as a cache of
@@ -3963,20 +4010,25 @@
       final Map<String, Object> returnValues = new HashMap<String, Object>();
       returnValues.put("getDBDirectory", envPath.getAbsolutePath());
       returnValues.put("getBackendId", DB_NAME);
-      // returnValues.put("getDBCacheSize", 10L);
+      returnValues.put("getDBCacheSize", 0L);
       returnValues.put("getDBCachePercent", 10);
       returnValues.put("isDBTxnNoSync", true);
+      returnValues.put("getDBDirectoryPermissions", "700");
+      returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
+      returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
       try
       {
+        returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config"));
         storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
             DirectoryServer.getInstance().getServerContext());
+        storage.open();
+        txn = storage.getWriteableStorage();
+        txn.openTree(dnCache);
       }
       catch (Exception e)
       {
         throw new StorageRuntimeException(e);
       }
-
-      importer.createTree(dnCache);
     }
 
     private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
@@ -4012,7 +4064,7 @@
     {
       try
       {
-        importer.close();
+        storage.close();
       }
       finally
       {
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
index 41a28f1..768d88f 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -26,18 +26,11 @@
  */
 package org.opends.server.backends.pluggable;
 
-import static org.opends.messages.BackendMessages.ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER;
-import static org.opends.messages.BackendMessages.ERR_LDIF_BACKEND_ERROR_READING_LDIF;
-import static org.opends.messages.JebMessages.ERR_JEB_CACHE_PRELOAD;
-import static org.opends.messages.JebMessages.ERR_JEB_REMOVE_FAIL;
-import static org.opends.messages.JebMessages.ERR_JEB_ENTRY_CONTAINER_ALREADY_REGISTERED;
-import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_PARENT_NOT_FOUND;
-import static org.opends.messages.JebMessages.NOTE_JEB_IMPORT_FINAL_STATUS;
-import static org.opends.messages.JebMessages.NOTE_JEB_IMPORT_PROGRESS_REPORT;
-import static org.opends.messages.JebMessages.WARN_JEB_IMPORT_ENTRY_EXISTS;
-import static org.opends.messages.UtilityMessages.ERR_LDIF_SKIP;
-import static org.opends.server.core.DirectoryServer.getServerErrorResultCode;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.messages.BackendMessages.*;
+import static org.opends.messages.JebMessages.*;
+import static org.opends.messages.UtilityMessages.*;
+import static org.opends.server.core.DirectoryServer.*;
+import static org.opends.server.util.StaticUtils.*;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -55,6 +48,7 @@
 import org.forgerock.opendj.config.server.ConfigException;
 import org.forgerock.opendj.ldap.ResultCode;
 import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.PersistitBackendCfg;
 import org.opends.server.admin.std.server.PluggableBackendCfg;
 import org.opends.server.api.CompressedSchema;
 import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -77,8 +71,6 @@
 import org.opends.server.types.Privilege;
 import org.opends.server.util.LDIFException;
 import org.opends.server.util.LDIFReader;
-import org.opends.server.util.RuntimeInformation;
-
 
 /**
  * Wrapper class for the JE environment. Root container holds all the entry
@@ -188,24 +180,14 @@
    *           If a problem occurs while performing the LDIF import.
    */
   LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException
+  {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy
+    return importLDIFWithOnDiskMerge(importConfig);
+  }
+
+  private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException
   {
-    RuntimeInformation.logInfo();
-    if (Importer.mustClearBackend(importConfig, config))
-    {
-      try
-      {
-        backend.getStorage().removeStorageFiles();
-      }
-      catch (Exception e)
-      {
-        LocalizableMessage m = ERR_JEB_REMOVE_FAIL.get(e.getMessage());
-        throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
-      }
-    }
     try
     {
-      open();
-
       ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
       try
       {
@@ -330,6 +312,31 @@
     timerService.awaitTermination(20, TimeUnit.SECONDS);
   }
 
+  private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig) throws DirectoryException
+  {
+    try
+    {
+      final Importer importer = new Importer(importConfig, (PersistitBackendCfg) config); // TODO JNR remove cast
+      return importer.processImport(this);
+    }
+    catch (DirectoryException e)
+    {
+      logger.traceException(e);
+      throw e;
+    }
+    catch (OpenDsException e)
+    {
+      logger.traceException(e);
+      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+    }
+    catch (Exception e)
+    {
+      logger.traceException(e);
+      throw new DirectoryException(getServerErrorResultCode(),
+          LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
+    }
+  }
+
   /**
    * Opens the root container.
    *

--
Gitblit v1.10.0