From c1234a77530eb2c68c06c62c9a69f899f0e4a6e6 Mon Sep 17 00:00:00 2001
From: Yannick Lecaillez <yannick.lecaillez@forgerock.com>
Date: Thu, 16 Apr 2015 11:49:32 +0000
Subject: [PATCH] Storage's transaction must be created and used in the same thread (CR-6665)
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 129 +++++++++++++++++++++----------------------
1 files changed, 63 insertions(+), 66 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 786d502..91c22e7 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -187,7 +187,7 @@
private final TmpEnv tmpEnv;
/** Root container. */
- private RootContainer rootContainer;
+ private final RootContainer rootContainer;
/** Import configuration. */
private final LDIFImportConfig importConfiguration;
@@ -286,9 +286,10 @@
* @throws ConfigException
* If a problem occurs during initialization.
*/
- Importer(RebuildConfig rebuildConfig, PluggableBackendCfg cfg, ServerContext serverContext)
- throws InitializationException, StorageRuntimeException, ConfigException
+ Importer(RootContainer rootContainer, RebuildConfig rebuildConfig, PluggableBackendCfg cfg,
+ ServerContext serverContext) throws InitializationException, StorageRuntimeException, ConfigException
{
+ this.rootContainer = rootContainer;
this.importConfiguration = null;
this.serverContext = serverContext;
this.tmpEnv = null;
@@ -324,9 +325,10 @@
* @throws StorageRuntimeException
* If an error occurred when opening the DB.
*/
- Importer(LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg, ServerContext serverContext)
- throws InitializationException, ConfigException, StorageRuntimeException
+ Importer(RootContainer rootContainer, LDIFImportConfig importConfiguration, PluggableBackendCfg backendCfg,
+ ServerContext serverContext) throws InitializationException, ConfigException, StorageRuntimeException
{
+ this.rootContainer = rootContainer;
this.rebuildManager = null;
this.importConfiguration = importConfiguration;
this.serverContext = serverContext;
@@ -822,12 +824,9 @@
* @throws ExecutionException
* If an execution error occurred.
*/
- public void rebuildIndexes(RootContainer rootContainer)
- throws ConfigException, InitializationException, StorageRuntimeException,
+ public void rebuildIndexes() throws ConfigException, InitializationException, StorageRuntimeException,
InterruptedException, ExecutionException
{
- this.rootContainer = rootContainer;
-
try
{
if (rebuildManager.rebuildConfig.isClearDegradedState())
@@ -836,7 +835,7 @@
}
else
{
- rebuildIndexes();
+ doRebuildIndexes();
}
}
catch (Exception e)
@@ -862,7 +861,7 @@
});
}
- private void rebuildIndexes() throws Exception
+ private void doRebuildIndexes() throws Exception
{
final long startTime = System.currentTimeMillis();
final Storage storage = rootContainer.getStorage();
@@ -902,9 +901,8 @@
* @throws Exception
* If the import failed
*/
- public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
+ public LDIFImportResult processImport() throws Exception
{
- this.rootContainer = rootContainer;
try {
try
{
@@ -1070,14 +1068,7 @@
final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
final Storage storage = rootContainer.getStorage();
- storage.write(new WriteOperation()
- {
- @Override
- public void run(WriteableTransaction txn) throws Exception
- {
- execService.submit(new MigrateExistingTask(txn)).get();
- }
- });
+ execService.submit(new MigrateExistingTask(storage)).get();
final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
if (importConfiguration.appendToExistingData()
@@ -1085,27 +1076,20 @@
{
for (int i = 0; i < threadCount; i++)
{
- tasks.add(new AppendReplaceTask(storage.getWriteableTransaction()));
+ tasks.add(new AppendReplaceTask(storage));
}
}
else
{
for (int i = 0; i < threadCount; i++)
{
- tasks.add(new ImportTask(storage.getWriteableTransaction()));
+ tasks.add(new ImportTask(storage));
}
}
execService.invokeAll(tasks);
tasks.clear();
- storage.write(new WriteOperation()
- {
- @Override
- public void run(WriteableTransaction txn) throws Exception
- {
- execService.submit(new MigrateExcludedTask(txn)).get();
- }
- });
+ execService.submit(new MigrateExcludedTask(storage)).get();
stopScratchFileWriters();
getAll(scratchFileWriterFutures);
@@ -1267,14 +1251,14 @@
/** Task used to migrate excluded branch. */
private final class MigrateExcludedTask extends ImportTask
{
- private MigrateExcludedTask(final WriteableTransaction txn)
+ private MigrateExcludedTask(final Storage storage)
{
- super(txn);
+ super(storage);
}
/** {@inheritDoc} */
@Override
- public Void call() throws Exception
+ Void call0(WriteableTransaction txn) throws Exception
{
for (Suffix suffix : dnSuffixMap.values())
{
@@ -1304,7 +1288,7 @@
{
EntryID id = new EntryID(cursor.getValue());
Entry entry = entryContainer.getID2Entry().get(txn, id);
- processEntry(entry, rootContainer.getNextEntryID(), suffix);
+ processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
migratedCount++;
success = cursor.next();
}
@@ -1331,14 +1315,14 @@
/** Task to migrate existing entries. */
private final class MigrateExistingTask extends ImportTask
{
- private MigrateExistingTask(final WriteableTransaction txn)
+ private MigrateExistingTask(final Storage storage)
{
- super(txn);
+ super(storage);
}
/** {@inheritDoc} */
@Override
- public Void call() throws Exception
+ Void call0(WriteableTransaction txn) throws Exception
{
for (Suffix suffix : dnSuffixMap.values())
{
@@ -1360,7 +1344,7 @@
{
EntryID id = new EntryID(key);
Entry entry = entryContainer.getID2Entry().get(txn, id);
- processEntry(entry, rootContainer.getNextEntryID(), suffix);
+ processEntry(txn, entry, rootContainer.getNextEntryID(), suffix);
migratedCount++;
success = cursor.next();
}
@@ -1420,9 +1404,9 @@
*/
private class AppendReplaceTask extends ImportTask
{
- public AppendReplaceTask(final WriteableTransaction txn)
+ public AppendReplaceTask(final Storage storage)
{
- super(txn);
+ super(storage);
}
private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
@@ -1433,7 +1417,7 @@
/** {@inheritDoc} */
@Override
- public Void call() throws Exception
+ Void call0(WriteableTransaction txn) throws Exception
{
try
{
@@ -1452,7 +1436,7 @@
}
entryID = entryInfo.getEntryID();
Suffix suffix = entryInfo.getSuffix();
- processEntry(entry, suffix);
+ processEntry(txn, entry, suffix);
}
flushIndexBuffers();
return null;
@@ -1469,7 +1453,7 @@
}
}
- void processEntry(Entry entry, Suffix suffix)
+ void processEntry(WriteableTransaction txn, Entry entry, Suffix suffix)
throws DirectoryException, StorageRuntimeException, InterruptedException
{
DN entryDN = entry.getName();
@@ -1481,7 +1465,7 @@
}
if (oldEntry == null)
{
- if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
+ if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
{
suffix.removePending(entryDN);
return;
@@ -1494,7 +1478,7 @@
suffix.removePending(entryDN);
entryID = oldID;
}
- processDN2URI(suffix, oldEntry, entry);
+ processDN2URI(txn, suffix, oldEntry, entry);
suffix.getID2Entry().put(txn, entryID, entry);
if (oldEntry != null)
{
@@ -1504,7 +1488,7 @@
{
processIndexes(suffix, entry, entryID);
}
- processVLVIndexes(suffix, entry, entryID);
+ processVLVIndexes(txn, suffix, entry, entryID);
importCount.getAndIncrement();
}
@@ -1545,21 +1529,33 @@
*/
private class ImportTask implements Callable<Void>
{
- WriteableTransaction txn;
+ private final Storage storage;
private final Map<IndexKey, IndexOutputBuffer> indexBufferMap = new HashMap<IndexKey, IndexOutputBuffer>();
private final Set<ByteString> insertKeySet = new HashSet<ByteString>();
private final EntryInformation entryInfo = new EntryInformation();
private final IndexKey dnIndexKey = new IndexKey(DN_TYPE, DN2ID, 1);
- public ImportTask(final WriteableTransaction txn)
+ public ImportTask(final Storage storage)
{
- this.txn = txn;
+ this.storage = storage;
}
/** {@inheritDoc} */
@Override
- public Void call() throws Exception
+ public final Void call() throws Exception
{
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableTransaction txn) throws Exception
+ {
+ call0(txn);
+ }
+ });
+ return null;
+ }
+
+ Void call0(WriteableTransaction txn) throws Exception {
try
{
while (true)
@@ -1576,7 +1572,7 @@
}
EntryID entryID = entryInfo.getEntryID();
Suffix suffix = entryInfo.getSuffix();
- processEntry(entry, entryID, suffix);
+ processEntry(txn, entry, entryID, suffix);
}
flushIndexBuffers();
return null;
@@ -1593,26 +1589,26 @@
}
}
- void processEntry(Entry entry, EntryID entryID, Suffix suffix)
+ void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID, Suffix suffix)
throws DirectoryException, StorageRuntimeException, InterruptedException
{
DN entryDN = entry.getName();
- if (!skipDNValidation && !dnSanityCheck(entryDN, entry, suffix))
+ if (!skipDNValidation && !dnSanityCheck(txn, entryDN, entry, suffix))
{
suffix.removePending(entryDN);
return;
}
suffix.removePending(entryDN);
processDN2ID(suffix, entryDN, entryID);
- processDN2URI(suffix, null, entry);
+ processDN2URI(txn, suffix, null, entry);
processIndexes(suffix, entry, entryID);
- processVLVIndexes(suffix, entry, entryID);
+ processVLVIndexes(txn, suffix, entry, entryID);
suffix.getID2Entry().put(txn, entryID, entry);
importCount.getAndIncrement();
}
/** Examine the DN for duplicates and missing parents. */
- boolean dnSanityCheck(DN entryDN, Entry entry, Suffix suffix)
+ boolean dnSanityCheck(WriteableTransaction txn, DN entryDN, Entry entry, Suffix suffix)
throws StorageRuntimeException, InterruptedException
{
//Perform parent checking.
@@ -1666,7 +1662,8 @@
}
}
- void processVLVIndexes(Suffix suffix, Entry entry, EntryID entryID) throws DirectoryException
+ void processVLVIndexes(WriteableTransaction txn, Suffix suffix, Entry entry, EntryID entryID)
+ throws DirectoryException
{
final EntryContainer entryContainer = suffix.getEntryContainer();
final IndexBuffer buffer = new IndexBuffer(entryContainer);
@@ -1766,7 +1763,8 @@
indexIDToECMap.putIfAbsent(indexID, suffix.getEntryContainer());
}
- void processDN2URI(Suffix suffix, Entry oldEntry, Entry newEntry) throws StorageRuntimeException
+ void processDN2URI(WriteableTransaction txn, Suffix suffix, Entry oldEntry, Entry newEntry)
+ throws StorageRuntimeException
{
DN2URI dn2uri = suffix.getDN2URI();
if (oldEntry != null)
@@ -2850,7 +2848,6 @@
*/
void printStartMessage(WriteableTransaction txn) throws StorageRuntimeException
{
- this.txn = txn;
totalEntries = suffix.getID2Entry().getRecordCount(txn);
switch (rebuildConfig.getRebuildMode())
@@ -2896,7 +2893,7 @@
/** {@inheritDoc} */
@Override
- public Void call() throws Exception
+ Void call0(WriteableTransaction txn) throws Exception
{
ID2Entry id2entry = entryContainer.getID2Entry();
Cursor<ByteString, ByteString> cursor = txn.openCursor(id2entry.getName());
@@ -2912,7 +2909,7 @@
Entry entry =
ID2Entry.entryFromDatabase(cursor.getValue(),
entryContainer.getRootContainer().getCompressedSchema());
- processEntry(entry, entryID);
+ processEntry(txn, entry, entryID);
entriesProcessed.getAndIncrement();
}
flushIndexBuffers();
@@ -3339,7 +3336,7 @@
return result;
}
- private void processEntry(Entry entry, EntryID entryID)
+ private void processEntry(WriteableTransaction txn, Entry entry, EntryID entryID)
throws DirectoryException, StorageRuntimeException, InterruptedException
{
if (dn2id != null)
@@ -3348,13 +3345,13 @@
}
if (dn2uri != null)
{
- processDN2URI(suffix, null, entry);
+ processDN2URI(txn, suffix, null, entry);
}
processIndexes(entry, entryID);
- processVLVIndexes(entry, entryID);
+ processVLVIndexes(txn, entry, entryID);
}
- private void processVLVIndexes(Entry entry, EntryID entryID)
+ private void processVLVIndexes(WriteableTransaction txn, Entry entry, EntryID entryID)
throws StorageRuntimeException, DirectoryException
{
final IndexBuffer buffer = new IndexBuffer(entryContainer);
--
Gitblit v1.10.0