From 06e460f4e1b092e615753982b4080f1deaeeec4c Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Mar 2015 13:14:20 +0000
Subject: [PATCH] OPENDJ-1707 Persistit: various import problems
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 108 ++++++++++++++++++++++++++++++++++++++++--------------
1 files changed, 80 insertions(+), 28 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 804c395..0a70da3 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -339,6 +339,7 @@
{
this.threadCount = importConfiguration.getThreadCount();
}
+ this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
// Determine the number of indexes.
this.indexCount = getTotalIndexCount(backendCfg);
@@ -384,6 +385,11 @@
{
return !importCfg.appendToExistingData()
&& (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+ /*
+ * Why do we clear when there is only one baseDN?
+ * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
+ * so if there is only one baseDN for this backend, then clear it now.
+ */
}
private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
@@ -930,13 +936,11 @@
*
* @param rootContainer
* The root container to use during the import.
- * @param txn
- * The database transaction
* @return A LDIF result.
* @throws Exception
* If the import failed
*/
- public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception
+ public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
{
this.rootContainer = rootContainer;
try {
@@ -953,11 +957,20 @@
logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
BUILD_ID, REVISION_NUMBER);
logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
- initializeSuffixes(txn);
- setIndexesTrusted(txn, false);
+
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ initializeSuffixes(txn);
+ setIndexesTrusted(txn, false);
+ }
+ });
final long startTime = System.currentTimeMillis();
- importPhaseOne(txn);
+ importPhaseOne();
isPhaseOneDone = true;
final long phaseOneFinishTime = System.currentTimeMillis();
@@ -978,8 +991,15 @@
}
final long phaseTwoFinishTime = System.currentTimeMillis();
- setIndexesTrusted(txn, true);
- switchEntryContainers(txn);
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ setIndexesTrusted(txn, true);
+ switchEntryContainers(txn);
+ }
+ });
recursiveDelete(tempDir);
final long finishTime = System.currentTimeMillis();
final long importTime = finishTime - startTime;
@@ -1077,7 +1097,7 @@
* The scratch files will be read by phaseTwo to perform on-disk merge</li>
* </ol>
*/
- private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+ private void importPhaseOne() throws Exception
{
initializeIndexBuffers();
@@ -1087,7 +1107,15 @@
bufferSortService = Executors.newFixedThreadPool(threadCount);
final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- execService.submit(new MigrateExistingTask(txn)).get();
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ execService.submit(new MigrateExistingTask(txn)).get();
+ }
+ });
final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
if (importConfiguration.appendToExistingData()
@@ -1095,20 +1123,27 @@
{
for (int i = 0; i < threadCount; i++)
{
- tasks.add(new AppendReplaceTask(txn));
+ tasks.add(new AppendReplaceTask(storage.getWriteableStorage()));
}
}
else
{
for (int i = 0; i < threadCount; i++)
{
- tasks.add(new ImportTask(txn));
+ tasks.add(new ImportTask(storage.getWriteableStorage()));
}
}
getAll(execService.invokeAll(tasks));
tasks.clear();
- execService.submit(new MigrateExcludedTask(txn)).get();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ execService.submit(new MigrateExcludedTask(txn)).get();
+ }
+ });
stopScratchFileWriters();
getAll(scratchFileWriterFutures);
@@ -1466,6 +1501,10 @@
isCanceled = true;
throw e;
}
+ finally
+ {
+ txn.close();
+ }
}
void processEntry(Entry entry, Suffix suffix)
@@ -1586,6 +1625,10 @@
isCanceled = true;
throw e;
}
+ finally
+ {
+ txn.close();
+ }
}
void processEntry(Entry entry, EntryID entryID, Suffix suffix)
@@ -1807,6 +1850,7 @@
{
private final IndexManager indexMgr;
private final int cacheSize;
+ /** indexID => DNState map */
private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
private final Semaphore permits;
private final int maxPermits;
@@ -2101,16 +2145,12 @@
private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException
{
- DNState dnState;
- if (!dnStateMap.containsKey(indexID))
+ DNState dnState = dnStateMap.get(indexID);
+ if (dnState == null)
{
dnState = new DNState(indexIDToECMap.get(indexID));
dnStateMap.put(indexID, dnState);
}
- else
- {
- dnState = dnStateMap.get(indexID);
- }
if (dnState.checkParent(txn, idSet))
{
dnState.writeToDN2ID(idSet);
@@ -3926,12 +3966,20 @@
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
- final Object returnValue = returnValues.get(method.getName());
+ final String methodName = method.getName();
+ if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
+ {
+ // ignore calls to (add|remove)*ChangeListener() methods
+ return null;
+ }
+
+ final Object returnValue = returnValues.get(methodName);
if (returnValue != null)
{
return returnValue;
}
- throw new IllegalArgumentException("Unhandled method call on object (" + proxy
+ throw new IllegalArgumentException("Unhandled method call on proxy ("
+ + BackendCfgHandler.class.getSimpleName()
+ ") for method (" + method
+ ") with arguments (" + Arrays.toString(args) + ")");
}
@@ -3943,11 +3991,10 @@
*/
private final class TmpEnv implements DNCache
{
- private final Storage storage;
- private WriteableStorage txn;
- private org.opends.server.backends.pluggable.spi.Importer importer;
private static final String DB_NAME = "dn_cache";
private final TreeName dnCache = new TreeName("", DB_NAME);
+ private final Storage storage;
+ private final WriteableStorage txn;
/**
* Create a temporary DB environment and database to be used as a cache of
@@ -3963,20 +4010,25 @@
final Map<String, Object> returnValues = new HashMap<String, Object>();
returnValues.put("getDBDirectory", envPath.getAbsolutePath());
returnValues.put("getBackendId", DB_NAME);
- // returnValues.put("getDBCacheSize", 10L);
+ returnValues.put("getDBCacheSize", 0L);
returnValues.put("getDBCachePercent", 10);
returnValues.put("isDBTxnNoSync", true);
+ returnValues.put("getDBDirectoryPermissions", "700");
+ returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
+ returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
try
{
+ returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config"));
storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
DirectoryServer.getInstance().getServerContext());
+ storage.open();
+ txn = storage.getWriteableStorage();
+ txn.openTree(dnCache);
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
-
- importer.createTree(dnCache);
}
private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
@@ -4012,7 +4064,7 @@
{
try
{
- importer.close();
+ storage.close();
}
finally
{
--
Gitblit v1.10.0