From 06e460f4e1b092e615753982b4080f1deaeeec4c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Mar 2015 13:14:20 +0000
Subject: [PATCH] OPENDJ-1707 Persistit: various import problems

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  108 ++++++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 80 insertions(+), 28 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 804c395..0a70da3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -339,6 +339,7 @@
     {
       this.threadCount = importConfiguration.getThreadCount();
     }
+    this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
 
     // Determine the number of indexes.
     this.indexCount = getTotalIndexCount(backendCfg);
@@ -384,6 +385,11 @@
   {
     return !importCfg.appendToExistingData()
         && (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+    /*
+     * Why do we clear when there is only one baseDN?
+     * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
+     * so if there is only one baseDN for this backend, then clear it now.
+     */
   }
 
   private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
@@ -930,13 +936,11 @@
    *
    * @param rootContainer
    *          The root container to use during the import.
-   * @param txn
-   *          The database transaction
    * @return A LDIF result.
    * @throws Exception
    *           If the import failed
    */
-  public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception
+  public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
   {
     this.rootContainer = rootContainer;
     try {
@@ -953,11 +957,20 @@
       logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
               BUILD_ID, REVISION_NUMBER);
       logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
-      initializeSuffixes(txn);
-      setIndexesTrusted(txn, false);
+
+      final Storage storage = rootContainer.getStorage();
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableStorage txn) throws Exception
+        {
+          initializeSuffixes(txn);
+          setIndexesTrusted(txn, false);
+        }
+      });
 
       final long startTime = System.currentTimeMillis();
-      importPhaseOne(txn);
+      importPhaseOne();
       isPhaseOneDone = true;
       final long phaseOneFinishTime = System.currentTimeMillis();
 
@@ -978,8 +991,15 @@
       }
       final long phaseTwoFinishTime = System.currentTimeMillis();
 
-      setIndexesTrusted(txn, true);
-      switchEntryContainers(txn);
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableStorage txn) throws Exception
+        {
+          setIndexesTrusted(txn, true);
+          switchEntryContainers(txn);
+        }
+      });
       recursiveDelete(tempDir);
       final long finishTime = System.currentTimeMillis();
       final long importTime = finishTime - startTime;
@@ -1077,7 +1097,7 @@
    * The scratch files will be read by phaseTwo to perform on-disk merge</li>
    * </ol>
    */
-  private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+  private void importPhaseOne() throws Exception
   {
     initializeIndexBuffers();
 
@@ -1087,7 +1107,15 @@
     bufferSortService = Executors.newFixedThreadPool(threadCount);
     final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 
-    execService.submit(new MigrateExistingTask(txn)).get();
+    final Storage storage = rootContainer.getStorage();
+    storage.write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        execService.submit(new MigrateExistingTask(txn)).get();
+      }
+    });
 
     final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
     if (importConfiguration.appendToExistingData()
@@ -1095,20 +1123,27 @@
     {
       for (int i = 0; i < threadCount; i++)
       {
-        tasks.add(new AppendReplaceTask(txn));
+        tasks.add(new AppendReplaceTask(storage.getWriteableStorage()));
       }
     }
     else
     {
       for (int i = 0; i < threadCount; i++)
       {
-        tasks.add(new ImportTask(txn));
+        tasks.add(new ImportTask(storage.getWriteableStorage()));
       }
     }
     getAll(execService.invokeAll(tasks));
     tasks.clear();
 
-    execService.submit(new MigrateExcludedTask(txn)).get();
+    storage.write(new WriteOperation()
+    {
+      @Override
+      public void run(WriteableStorage txn) throws Exception
+      {
+        execService.submit(new MigrateExcludedTask(txn)).get();
+      }
+    });
 
     stopScratchFileWriters();
     getAll(scratchFileWriterFutures);
@@ -1466,6 +1501,10 @@
         isCanceled = true;
         throw e;
       }
+      finally
+      {
+        txn.close();
+      }
     }
 
     void processEntry(Entry entry, Suffix suffix)
@@ -1586,6 +1625,10 @@
         isCanceled = true;
         throw e;
       }
+      finally
+      {
+        txn.close();
+      }
     }
 
     void processEntry(Entry entry, EntryID entryID, Suffix suffix)
@@ -1807,6 +1850,7 @@
   {
     private final IndexManager indexMgr;
     private final int cacheSize;
+    /** indexID => DNState map */
     private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
     private final Semaphore permits;
     private final int maxPermits;
@@ -2101,16 +2145,12 @@
 
     private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException
     {
-      DNState dnState;
-      if (!dnStateMap.containsKey(indexID))
+      DNState dnState = dnStateMap.get(indexID);
+      if (dnState == null)
       {
         dnState = new DNState(indexIDToECMap.get(indexID));
         dnStateMap.put(indexID, dnState);
       }
-      else
-      {
-        dnState = dnStateMap.get(indexID);
-      }
       if (dnState.checkParent(txn, idSet))
       {
         dnState.writeToDN2ID(idSet);
@@ -3926,12 +3966,20 @@
     @Override
     public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
     {
-      final Object returnValue = returnValues.get(method.getName());
+      final String methodName = method.getName();
+      if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
+      {
+        // ignore calls to (add|remove)*ChangeListener() methods
+        return null;
+      }
+
+      final Object returnValue = returnValues.get(methodName);
       if (returnValue != null)
       {
         return returnValue;
       }
-      throw new IllegalArgumentException("Unhandled method call on object (" + proxy
+      throw new IllegalArgumentException("Unhandled method call on proxy ("
+          + BackendCfgHandler.class.getSimpleName()
           + ") for method (" + method
           + ") with arguments (" + Arrays.toString(args) + ")");
     }
@@ -3943,11 +3991,10 @@
    */
   private final class TmpEnv implements DNCache
   {
-    private final Storage storage;
-    private WriteableStorage txn;
-    private org.opends.server.backends.pluggable.spi.Importer importer;
     private static final String DB_NAME = "dn_cache";
     private final TreeName dnCache = new TreeName("", DB_NAME);
+    private final Storage storage;
+    private final WriteableStorage txn;
 
     /**
      * Create a temporary DB environment and database to be used as a cache of
@@ -3963,20 +4010,25 @@
       final Map<String, Object> returnValues = new HashMap<String, Object>();
       returnValues.put("getDBDirectory", envPath.getAbsolutePath());
       returnValues.put("getBackendId", DB_NAME);
-      // returnValues.put("getDBCacheSize", 10L);
+      returnValues.put("getDBCacheSize", 0L);
       returnValues.put("getDBCachePercent", 10);
       returnValues.put("isDBTxnNoSync", true);
+      returnValues.put("getDBDirectoryPermissions", "700");
+      returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
+      returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
       try
       {
+        returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config"));
         storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
             DirectoryServer.getInstance().getServerContext());
+        storage.open();
+        txn = storage.getWriteableStorage();
+        txn.openTree(dnCache);
       }
       catch (Exception e)
       {
         throw new StorageRuntimeException(e);
       }
-
-      importer.createTree(dnCache);
     }
 
     private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
@@ -4012,7 +4064,7 @@
     {
       try
       {
-        importer.close();
+        storage.close();
       }
       finally
       {

--
Gitblit v1.10.0