From f888fe7a1b88a5c280c619ffe13c91dcec1ff35b Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Thu, 26 Mar 2015 13:14:20 +0000
Subject: [PATCH] OPENDJ-1707 Persistit: various import problems
---
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java | 65 +++++++++-------
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java | 64 +++++++++++++++
opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | 108 ++++++++++++++++++++-------
3 files changed, 179 insertions(+), 58 deletions(-)
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
index 93b2483..f4b286c 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/BackendImpl.java
@@ -58,6 +58,7 @@
import org.opends.server.backends.pluggable.spi.WriteableStorage;
import org.opends.server.core.*;
import org.opends.server.types.*;
+import org.opends.server.util.RuntimeInformation;
/**
* This is an implementation of a Directory Server Backend which stores entries locally in a
@@ -642,13 +643,74 @@
@Override
public LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException
{
+ RuntimeInformation.logInfo();
+
// If the rootContainer is open, the backend is initialized by something else.
// We can't do import while the backend is online.
if (rootContainer != null)
{
throw new DirectoryException(getServerErrorResultCode(), ERR_JEB_IMPORT_BACKEND_ONLINE.get());
}
- return new RootContainer(this, cfg).importLDIF(importConfig);
+
+ try
+ {
+ if (Importer.mustClearBackend(importConfig, cfg))
+ {
+ try
+ {
+ // clear all files before opening the root container
+ storage.removeStorageFiles();
+ }
+ catch (Exception e)
+ {
+ LocalizableMessage m = ERR_JEB_REMOVE_FAIL.get(e.getMessage());
+ throw new DirectoryException(getServerErrorResultCode(), m, e);
+ }
+ }
+
+ rootContainer = initializeRootContainer();
+ return rootContainer.importLDIF(importConfig);
+ }
+ catch (StorageRuntimeException e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()), e);
+ }
+ catch (DirectoryException e)
+ {
+ throw e;
+ }
+ catch (OpenDsException e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+ }
+ catch (ConfigException e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+ }
+ finally
+ {
+ try
+ {
+ if (rootContainer != null)
+ {
+ long startTime = System.currentTimeMillis();
+ rootContainer.close();
+ long finishTime = System.currentTimeMillis();
+ long closeTime = (finishTime - startTime) / 1000;
+ logger.info(NOTE_JEB_IMPORT_LDIF_ROOTCONTAINER_CLOSE, closeTime);
+ rootContainer = null;
+ }
+
+ logger.info(NOTE_JEB_IMPORT_CLOSING_DATABASE);
+ }
+ catch (StorageRuntimeException de)
+ {
+ logger.traceException(de);
+ }
+ }
}
/** {@inheritDoc} */
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
index 804c395..0a70da3 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -339,6 +339,7 @@
{
this.threadCount = importConfiguration.getThreadCount();
}
+ this.threadCount = 1; // FIXME JNR. For the moment, cannot share exchanges across threads
// Determine the number of indexes.
this.indexCount = getTotalIndexCount(backendCfg);
@@ -384,6 +385,11 @@
{
return !importCfg.appendToExistingData()
&& (importCfg.clearBackend() || backendCfg.getBaseDN().size() <= 1);
+ /*
+ * Why do we clear when there is only one baseDN?
+ * any baseDN for which data is imported will be cleared anyway (see getSuffix()),
+ * so if there is only one baseDN for this backend, then clear it now.
+ */
}
private File getTempDir(PluggableBackendCfg backendCfg, String tmpDirectory)
@@ -930,13 +936,11 @@
*
* @param rootContainer
* The root container to use during the import.
- * @param txn
- * The database transaction
* @return A LDIF result.
* @throws Exception
* If the import failed
*/
- public LDIFImportResult processImport(RootContainer rootContainer, WriteableStorage txn) throws Exception
+ public LDIFImportResult processImport(RootContainer rootContainer) throws Exception
{
this.rootContainer = rootContainer;
try {
@@ -953,11 +957,20 @@
logger.info(NOTE_JEB_IMPORT_STARTING, DirectoryServer.getVersionString(),
BUILD_ID, REVISION_NUMBER);
logger.info(NOTE_JEB_IMPORT_THREAD_COUNT, threadCount);
- initializeSuffixes(txn);
- setIndexesTrusted(txn, false);
+
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ initializeSuffixes(txn);
+ setIndexesTrusted(txn, false);
+ }
+ });
final long startTime = System.currentTimeMillis();
- importPhaseOne(txn);
+ importPhaseOne();
isPhaseOneDone = true;
final long phaseOneFinishTime = System.currentTimeMillis();
@@ -978,8 +991,15 @@
}
final long phaseTwoFinishTime = System.currentTimeMillis();
- setIndexesTrusted(txn, true);
- switchEntryContainers(txn);
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ setIndexesTrusted(txn, true);
+ switchEntryContainers(txn);
+ }
+ });
recursiveDelete(tempDir);
final long finishTime = System.currentTimeMillis();
final long importTime = finishTime - startTime;
@@ -1077,7 +1097,7 @@
* The scratch files will be read by phaseTwo to perform on-disk merge</li>
* </ol>
*/
- private void importPhaseOne(WriteableStorage txn) throws InterruptedException, ExecutionException
+ private void importPhaseOne() throws Exception
{
initializeIndexBuffers();
@@ -1087,7 +1107,15 @@
bufferSortService = Executors.newFixedThreadPool(threadCount);
final ExecutorService execService = Executors.newFixedThreadPool(threadCount);
- execService.submit(new MigrateExistingTask(txn)).get();
+ final Storage storage = rootContainer.getStorage();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ execService.submit(new MigrateExistingTask(txn)).get();
+ }
+ });
final List<Callable<Void>> tasks = new ArrayList<Callable<Void>>(threadCount);
if (importConfiguration.appendToExistingData()
@@ -1095,20 +1123,27 @@
{
for (int i = 0; i < threadCount; i++)
{
- tasks.add(new AppendReplaceTask(txn));
+ tasks.add(new AppendReplaceTask(storage.getWriteableStorage()));
}
}
else
{
for (int i = 0; i < threadCount; i++)
{
- tasks.add(new ImportTask(txn));
+ tasks.add(new ImportTask(storage.getWriteableStorage()));
}
}
getAll(execService.invokeAll(tasks));
tasks.clear();
- execService.submit(new MigrateExcludedTask(txn)).get();
+ storage.write(new WriteOperation()
+ {
+ @Override
+ public void run(WriteableStorage txn) throws Exception
+ {
+ execService.submit(new MigrateExcludedTask(txn)).get();
+ }
+ });
stopScratchFileWriters();
getAll(scratchFileWriterFutures);
@@ -1466,6 +1501,10 @@
isCanceled = true;
throw e;
}
+ finally
+ {
+ txn.close();
+ }
}
void processEntry(Entry entry, Suffix suffix)
@@ -1586,6 +1625,10 @@
isCanceled = true;
throw e;
}
+ finally
+ {
+ txn.close();
+ }
}
void processEntry(Entry entry, EntryID entryID, Suffix suffix)
@@ -1807,6 +1850,7 @@
{
private final IndexManager indexMgr;
private final int cacheSize;
+ /** indexID => DNState map */
private final Map<Integer, DNState> dnStateMap = new HashMap<Integer, DNState>();
private final Semaphore permits;
private final int maxPermits;
@@ -2101,16 +2145,12 @@
private void addDN2ID(int indexID, ImportIDSet idSet) throws DirectoryException
{
- DNState dnState;
- if (!dnStateMap.containsKey(indexID))
+ DNState dnState = dnStateMap.get(indexID);
+ if (dnState == null)
{
dnState = new DNState(indexIDToECMap.get(indexID));
dnStateMap.put(indexID, dnState);
}
- else
- {
- dnState = dnStateMap.get(indexID);
- }
if (dnState.checkParent(txn, idSet))
{
dnState.writeToDN2ID(idSet);
@@ -3926,12 +3966,20 @@
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
{
- final Object returnValue = returnValues.get(method.getName());
+ final String methodName = method.getName();
+ if ((methodName.startsWith("add") || methodName.startsWith("remove")) && methodName.endsWith("ChangeListener"))
+ {
+ // ignore calls to (add|remove)*ChangeListener() methods
+ return null;
+ }
+
+ final Object returnValue = returnValues.get(methodName);
if (returnValue != null)
{
return returnValue;
}
- throw new IllegalArgumentException("Unhandled method call on object (" + proxy
+ throw new IllegalArgumentException("Unhandled method call on proxy ("
+ + BackendCfgHandler.class.getSimpleName()
+ ") for method (" + method
+ ") with arguments (" + Arrays.toString(args) + ")");
}
@@ -3943,11 +3991,10 @@
*/
private final class TmpEnv implements DNCache
{
- private final Storage storage;
- private WriteableStorage txn;
- private org.opends.server.backends.pluggable.spi.Importer importer;
private static final String DB_NAME = "dn_cache";
private final TreeName dnCache = new TreeName("", DB_NAME);
+ private final Storage storage;
+ private final WriteableStorage txn;
/**
* Create a temporary DB environment and database to be used as a cache of
@@ -3963,20 +4010,25 @@
final Map<String, Object> returnValues = new HashMap<String, Object>();
returnValues.put("getDBDirectory", envPath.getAbsolutePath());
returnValues.put("getBackendId", DB_NAME);
- // returnValues.put("getDBCacheSize", 10L);
+ returnValues.put("getDBCacheSize", 0L);
returnValues.put("getDBCachePercent", 10);
returnValues.put("isDBTxnNoSync", true);
+ returnValues.put("getDBDirectoryPermissions", "700");
+ returnValues.put("getDiskLowThreshold", Long.valueOf(200 * MB));
+ returnValues.put("getDiskFullThreshold", Long.valueOf(100 * MB));
try
{
+ returnValues.put("dn", DN.valueOf("ds-cfg-backend-id=importTmpEnvForDN,cn=Backends,cn=config"));
storage = new PersistItStorage(newPersistitBackendCfgProxy(returnValues),
DirectoryServer.getInstance().getServerContext());
+ storage.open();
+ txn = storage.getWriteableStorage();
+ txn.openTree(dnCache);
}
catch (Exception e)
{
throw new StorageRuntimeException(e);
}
-
- importer.createTree(dnCache);
}
private PersistitBackendCfg newPersistitBackendCfgProxy(Map<String, Object> returnValues)
@@ -4012,7 +4064,7 @@
{
try
{
- importer.close();
+ storage.close();
}
finally
{
diff --git a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
index 41a28f1..768d88f 100644
--- a/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
+++ b/opendj-sdk/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -26,18 +26,11 @@
*/
package org.opends.server.backends.pluggable;
-import static org.opends.messages.BackendMessages.ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER;
-import static org.opends.messages.BackendMessages.ERR_LDIF_BACKEND_ERROR_READING_LDIF;
-import static org.opends.messages.JebMessages.ERR_JEB_CACHE_PRELOAD;
-import static org.opends.messages.JebMessages.ERR_JEB_REMOVE_FAIL;
-import static org.opends.messages.JebMessages.ERR_JEB_ENTRY_CONTAINER_ALREADY_REGISTERED;
-import static org.opends.messages.JebMessages.ERR_JEB_IMPORT_PARENT_NOT_FOUND;
-import static org.opends.messages.JebMessages.NOTE_JEB_IMPORT_FINAL_STATUS;
-import static org.opends.messages.JebMessages.NOTE_JEB_IMPORT_PROGRESS_REPORT;
-import static org.opends.messages.JebMessages.WARN_JEB_IMPORT_ENTRY_EXISTS;
-import static org.opends.messages.UtilityMessages.ERR_LDIF_SKIP;
-import static org.opends.server.core.DirectoryServer.getServerErrorResultCode;
-import static org.opends.server.util.StaticUtils.stackTraceToSingleLineString;
+import static org.opends.messages.BackendMessages.*;
+import static org.opends.messages.JebMessages.*;
+import static org.opends.messages.UtilityMessages.*;
+import static org.opends.server.core.DirectoryServer.*;
+import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
import java.util.Collection;
@@ -55,6 +48,7 @@
import org.forgerock.opendj.config.server.ConfigException;
import org.forgerock.opendj.ldap.ResultCode;
import org.opends.server.admin.server.ConfigurationChangeListener;
+import org.opends.server.admin.std.server.PersistitBackendCfg;
import org.opends.server.admin.std.server.PluggableBackendCfg;
import org.opends.server.api.CompressedSchema;
import org.opends.server.backends.pluggable.spi.ReadOperation;
@@ -77,8 +71,6 @@
import org.opends.server.types.Privilege;
import org.opends.server.util.LDIFException;
import org.opends.server.util.LDIFReader;
-import org.opends.server.util.RuntimeInformation;
-
/**
* Wrapper class for the JE environment. Root container holds all the entry
@@ -188,24 +180,14 @@
* If a problem occurs while performing the LDIF import.
*/
LDIFImportResult importLDIF(LDIFImportConfig importConfig) throws DirectoryException
+ {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy
+ return importLDIFWithOnDiskMerge(importConfig);
+ }
+
+ private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException
{
- RuntimeInformation.logInfo();
- if (Importer.mustClearBackend(importConfig, config))
- {
- try
- {
- backend.getStorage().removeStorageFiles();
- }
- catch (Exception e)
- {
- LocalizableMessage m = ERR_JEB_REMOVE_FAIL.get(e.getMessage());
- throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
- }
- }
try
{
- open();
-
ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
try
{
@@ -330,6 +312,31 @@
timerService.awaitTermination(20, TimeUnit.SECONDS);
}
+ private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig) throws DirectoryException
+ {
+ try
+ {
+ final Importer importer = new Importer(importConfig, (PersistitBackendCfg) config); // TODO JNR remove cast
+ return importer.processImport(this);
+ }
+ catch (DirectoryException e)
+ {
+ logger.traceException(e);
+ throw e;
+ }
+ catch (OpenDsException e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
+ }
+ catch (Exception e)
+ {
+ logger.traceException(e);
+ throw new DirectoryException(getServerErrorResultCode(),
+ LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
+ }
+ }
+
/**
* Opens the root container.
*
--
Gitblit v1.10.0