From c1234a77530eb2c68c06c62c9a69f899f0e4a6e6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 16 Apr 2015 11:49:32 +0000
Subject: [PATCH] Storage's transaction must be created and used in the same thread (CR-6665)

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java |  129 +++++++++++++++++++++----------------------
 1 files changed, 63 insertions(+), 66 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 786d502..91c22e7 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -187,7 +187,7 @@
   private final TmpEnv tmpEnv;
 
   /** Root container. */
-  private RootContainer rootContainer;
+  private final RootContainer rootContainer;
 
   /** Import configuration. */
   private final LDIFImportConfig importConfiguration;
@@ -286,9 +286,10 @@
    * @throws ConfigException
    *           If a problem occurs during initialization.
    */
-  Importer(RebuildConfig rebuildConfig, PluggableBackendCfg cfg, ServerContext serverContext)
-      throws InitializationException, StorageRuntimeException, ConfigException
+  Importer(RootContainer rootContainer, RebuildConfig rebuildConfig, PluggableBackendCfg cfg,
+      ServerContext serverContext) throws InitializationException, StorageRuntimeException, ConfigException
   {
+    this.rootContainer = rootContainer;
     this.importConfiguration = null;
     this.serverContext = serverContext;
     this.tmpEnv = null;
@@ -324,9 +325,10 @@
    * @throws StorageRuntimeException
    *           If an error occurred when opening the DB.
    */
-  Importer(LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg, ServerContext serverContext)
-      throws InitializationException, ConfigException, StorageRuntimeException
+  Importer(RootContainer rootContainer, LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg,
+      ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException
   {
+    this.rootContainer = rootContainer;
     this.rebuildManager = null;
     this.importConfiguration = importConfiguration;
     this.serverContext = serverContext;
@@ -822,12 +824,9 @@
    * @throws ExecutionException
    *           If an execution error occurred.
    */
-  public void rebuildIndexes(RootContainer rootContainer)
-      throws ConfigException, InitializationException, StorageRuntimeException,
+  public void rebuildIndexes() throws ConfigException, InitializationException, StorageRuntimeException,
       InterruptedException, ExecutionException
   {
-    this.rootContainer = rootContainer;
-
     try
     {
       if (rebuildManager.rebuildConfig.isClearDegradedState())
@@ -836,7 +835,7 @@
       }
       else
       {
-        rebuildIndexes();
+        doRebuildIndexes();
       }
     }
     catch (Exception e)
@@ -862,7 +861,7 @@
     });
   }
 
-  private void rebuildIndexes() throws Exception
+  private void doRebuildIndexes() throws Exception
   {
     final long startTime = System.currentTimeMillis();
     final Storage storage = rootContainer.getStorage();
@@ -902,9 +901,8 @@
    * @throws Exception
    *           If the import failed
    */
-  public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
+  public LDIFImportResult processImport() throws Exception
   {
-    this.rootContainer = rootContainer;
     try {
       try
       {
@@ -1070,14 +1068,7 @@
     final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
 
     final Storage storage = rootContainer.getStorage();
-    storage.write(new WriteOperation()
-    {
-      @Override
-      public void run(WriteableTransaction txn) throws Exception
-      {
-        execService.submit(new MigrateExistingTask(txn)).get();
-      }
-    });
+    execService.submit(new MigrateExistingTask(storage)).get();
 
     final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
     if (importConfiguration.appendToExistingData()
@@ -1085,27 +1076,20 @@
     {
       for (int i = 0; i < threadCount; i++)
       {
-        tasks.add(new AppendReplaceTask(storage.getWriteableTransaction()));
+        tasks.add(new AppendReplaceTask(storage));
       }
     }
     else
     {
       for (int i = 0; i < threadCount; i++)
       {
-        tasks.add(new ImportTask(storage.getWriteableTransaction()));
+        tasks.add(new ImportTask(storage));
       }
     }
     execService.invokeAll(tasks);
     tasks.clear();
 
-    storage.write(new WriteOperation()
-    {
-      @Override
-      public void run(WriteableTransaction txn) throws Exception
-      {
-        execService.submit(new MigrateExcludedTask(txn)).get();
-      }
-    });
+    execService.submit(new MigrateExcludedTask(storage)).get();
 
     stopScratchFileWriters();
     getAll(scratchFileWriterFutures);
@@ -1267,14 +1251,14 @@
   /** Task used to migrate excluded branch. */
   private final class MigrateExcludedTask extends ImportTask
   {
-    private MigrateExcludedTask(final WriteableTransaction txn)
+    private MigrateExcludedTask(final Storage storage)
     {
-      super(txn);
+      super(storage);
     }
 
     /** {@inheritDoc} */
     @Override
-    public Void call() throws Exception
+    Void call0(WriteableTransaction txn) throws Exception
     {
       for (Suffix suffix : dnSuffixMap.values())
       {
@@ -1304,7 +1288,7 @@
                 {
                   EntryID id = new EntryID(cursor.getValue());
                   Entry entry = entryContainer.getID2Entry().get(txn, id);
-                  processEntry(entry, rootContainer.getNextEntryID(), suffix);
+                  processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
                   migratedCount++;
                   success = cursor.next();
                 }
@@ -1331,14 +1315,14 @@
   /** Task to migrate existing entries. */
   private final class MigrateExistingTask extends ImportTask
   {
-    private MigrateExistingTask(final WriteableTransaction txn)
+    private MigrateExistingTask(final Storage storage)
     {
-      super(txn);
+      super(storage);
     }
 
     /** {@inheritDoc} */
     @Override
-    public Void call() throws Exception
+    Void call0(WriteableTransaction txn) throws Exception
     {
       for (Suffix suffix : dnSuffixMap.values())
       {
@@ -1360,7 +1344,7 @@
               {
                 EntryID id = new EntryID(key);
                 Entry entry = entryContainer.getID2Entry().get(txn, id);
-                processEntry(entry, rootContainer.getNextEntryID(), suffix);
+                processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
                 migratedCount++;
                 success = cursor.next();
               }
@@ -1420,9 +1404,9 @@
    */
   private class AppendReplaceTask extends ImportTask
   {
-    public AppendReplaceTask(final WriteableTransaction txn)
+    public AppendReplaceTask(final Storage storage)
     {
-      super(txn);
+      super(storage);
     }
 
     private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
@@ -1433,7 +1417,7 @@
 
     /** {@inheritDoc} */
     @Override
-    public Void call() throws Exception
+    Void call0(WriteableTransaction txn) throws Exception
     {
       try
       {
@@ -1452,7 +1436,7 @@
           }
           entryID = entryInfo.getEntryID();
           Suffix suffix = entryInfo.getSuffix();
-          processEntry(entry, suffix);
+          processEntry(txn, entry, suffix);
         }
         flushIndexBuffers();
         return null;
@@ -1469,7 +1453,7 @@
       }
     }
 
-    void processEntry(Entry entry, Suffix suffix)
+    void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
         throws DirectoryException, StorageRuntimeException, InterruptedException
     {
       DN entryDN = entry.getName();
@@ -1481,7 +1465,7 @@
       }
       if (oldEntry == null)
       {
-        if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
+        if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
         {
           suffix.removePending(entryDN);
           return;
@@ -1494,7 +1478,7 @@
         suffix.removePending(entryDN);
         entryID = oldID;
       }
-      processDN2URI(suffix, oldEntry, entry);
+      processDN2URI(txn, suffix, oldEntry, entry);
       suffix.getID2Entry().put(txn, entryID, entry);
       if (oldEntry != null)
       {
@@ -1504,7 +1488,7 @@
       {
         processIndexes(suffix, entry, entryID);
       }
-      processVLVIndexes(suffix, entry, entryID);
+      processVLVIndexes(txn, suffix, entry, entryID);
       importCount.getAndIncrement();
     }
 
@@ -1545,21 +1529,33 @@
    */
   private class ImportTask implements Callable<Void>
   {
-    WriteableTransaction txn;
+    private final Storage storage;
     private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
     private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
     private final EntryInformation entryInfo = new EntryInformation();
     private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID, 1);
 
-    public ImportTask(final WriteableTransaction txn)
+    public ImportTask(final Storage storage)
     {
-      this.txn = txn;
+      this.storage = storage;
     }
 
     /** {@inheritDoc} */
     @Override
-    public Void call() throws Exception
+    public final Void call() throws Exception
     {
+      storage.write(new WriteOperation()
+      {
+        @Override
+        public void run(WriteableTransaction txn) throws Exception
+        {
+          call0(txn);
+        }
+      });
+      return null;
+    }
+
+    Void call0(WriteableTransaction txn) throws Exception {
       try
       {
         while (true)
@@ -1576,7 +1572,7 @@
           }
           EntryID entryID = entryInfo.getEntryID();
           Suffix suffix = entryInfo.getSuffix();
-          processEntry(entry, entryID, suffix);
+          processEntry(txn, entry, entryID, suffix);
         }
         flushIndexBuffers();
         return null;
@@ -1593,26 +1589,26 @@
       }
     }
 
-    void processEntry(Entry entry, EntryID entryID, Suffix suffix)
+    void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix)
         throws DirectoryException, StorageRuntimeException, InterruptedException
     {
       DN entryDN = entry.getName();
-      if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
+      if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
       {
         suffix.removePending(entryDN);
         return;
       }
       suffix.removePending(entryDN);
       processDN2ID(suffix, entryDN, entryID);
-      processDN2URI(suffix, null, entry);
+      processDN2URI(txn, suffix, null, entry);
       processIndexes(suffix, entry, entryID);
-      processVLVIndexes(suffix, entry, entryID);
+      processVLVIndexes(txn, suffix, entry, entryID);
       suffix.getID2Entry().put(txn, entryID, entry);
       importCount.getAndIncrement();
     }
 
     /** Examine the DN for duplicates and missing parents. */
-    boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
+    boolean dnSanityCheck(WriteableTransaction txn, DN entryDN, Entry entry, Suffix suffix)
         throws StorageRuntimeException, InterruptedException
     {
       //Perform parent checking.
@@ -1666,7 +1662,8 @@
       }
     }
 
-    void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException
+    void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID)
+        throws DirectoryException
     {
       final EntryContainer entryContainer = suffix.getEntryContainer();
       final IndexBuffer buffer = new IndexBuffer(entryContainer);
@@ -1766,7 +1763,8 @@
       indexIDToECMap.putIfAbsent(indexID, suffix.getEntryContainer());
     }
 
-    void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry) throws StorageRuntimeException
+    void processDN2URI(WriteableTransaction txn, Suffix suffix, Entry oldEntry, Entry newEntry)
+        throws StorageRuntimeException
     {
       DN2URI dn2uri = suffix.getDN2URI();
       if (oldEntry != null)
@@ -2850,7 +2848,6 @@
      */
     void printStartMessage(WriteableTransaction txn) throws StorageRuntimeException
     {
-      this.txn = txn;
       totalEntries = suffix.getID2Entry().getRecordCount(txn);
 
       switch (rebuildConfig.getRebuildMode())
@@ -2896,7 +2893,7 @@
 
     /** {@inheritDoc} */
     @Override
-    public Void call() throws Exception
+    Void call0(WriteableTransaction txn) throws Exception
     {
       ID2Entry id2entry = entryContainer.getID2Entry();
       Cursor<ByteString, ByteString> cursor = txn.openCursor(id2entry.getName());
@@ -2912,7 +2909,7 @@
           Entry entry =
               ID2Entry.entryFromDatabase(cursor.getValue(),
                   entryContainer.getRootContainer().getCompressedSchema());
-          processEntry(entry, entryID);
+          processEntry(txn, entry, entryID);
           entriesProcessed.getAndIncrement();
         }
         flushIndexBuffers();
@@ -3339,7 +3336,7 @@
       return result;
     }
 
-    private void processEntry(Entry entry, EntryID entryID)
+    private void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID)
         throws DirectoryException, StorageRuntimeException, InterruptedException
     {
       if (dn2id != null)
@@ -3348,13 +3345,13 @@
       }
       if (dn2uri != null)
       {
-        processDN2URI(suffix, null, entry);
+        processDN2URI(txn, suffix, null, entry);
       }
       processIndexes(entry, entryID);
-      processVLVIndexes(entry, entryID);
+      processVLVIndexes(txn, entry, entryID);
     }
 
-    private void processVLVIndexes(Entry entry, EntryID entryID)
+    private void processVLVIndexes(WriteableTransaction txn, Entry entry, EntryID entryID)
         throws StorageRuntimeException, DirectoryException
     {
       final IndexBuffer buffer = new IndexBuffer(entryContainer);

--
Gitblit v1.10.0