From aaec0227c12c81b76899eb20ff99c947c7715df0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 12 May 2015 15:41:46 +0000
Subject: [PATCH] Partial OPENDJ-2016 Implement new on disk merge import strategy based on storage engine
---
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java | 226 +++-----------------------------------------------------
1 files changed, 14 insertions(+), 212 deletions(-)
diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
index 91907da..a53ef67 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -27,8 +27,6 @@
package org.opends.server.backends.pluggable;
import static org.opends.messages.BackendMessages.*;
-import static org.opends.messages.UtilityMessages.*;
-import static org.opends.server.core.DirectoryServer.*;
import static org.opends.server.util.StaticUtils.*;
import java.util.ArrayList;
@@ -37,8 +35,6 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
@@ -56,20 +52,15 @@
import org.opends.server.backends.pluggable.spi.StorageStatus;
import org.opends.server.backends.pluggable.spi.WriteOperation;
import org.opends.server.backends.pluggable.spi.WriteableTransaction;
-import org.opends.server.core.DirectoryServer;
import org.opends.server.core.SearchOperation;
import org.opends.server.core.ServerContext;
import org.opends.server.types.DN;
import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
import org.opends.server.types.InitializationException;
import org.opends.server.types.LDIFImportConfig;
import org.opends.server.types.LDIFImportResult;
-import org.opends.server.types.OpenDsException;
import org.opends.server.types.Operation;
import org.opends.server.types.Privilege;
-import org.opends.server.util.LDIFException;
-import org.opends.server.util.LDIFReader;
/**
* Wrapper class for a backend "container". Root container holds all the entry
@@ -78,44 +69,8 @@
*/
public class RootContainer implements ConfigurationChangeListener<PluggableBackendCfg>
{
- /** Logs the progress of the import. */
- private static final class ImportProgress implements Runnable
- {
- private final LDIFReader reader;
- private long previousCount;
- private long previousTime;
-
- public ImportProgress(LDIFReader reader)
- {
- this.reader = reader;
- }
-
- @Override
- public void run()
- {
- long latestCount = reader.getEntriesRead() + 0;
- long deltaCount = latestCount - previousCount;
- long latestTime = System.currentTimeMillis();
- long deltaTime = latestTime - previousTime;
- if (deltaTime == 0)
- {
- return;
- }
- long entriesRead = reader.getEntriesRead();
- long entriesIgnored = reader.getEntriesIgnored();
- long entriesRejected = reader.getEntriesRejected();
- float rate = 1000f * deltaCount / deltaTime;
- logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
-
- previousCount = latestCount;
- previousTime = latestTime;
- }
- }
-
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
- private static final int IMPORT_PROGRESS_INTERVAL = 10000;
-
/** The tree storage. */
private Storage storage;
@@ -129,8 +84,8 @@
/** The base DNs contained in this root container. */
private final ConcurrentHashMap<DN, EntryContainer> entryContainers = new ConcurrentHashMap<DN, EntryContainer>();
- /** The cached value of the next entry identifier to be assigned. */
- private AtomicLong nextid = new AtomicLong(1);
+ /** Value of the next entryID to be assigned. */
+ private AtomicLong nextEntryID = new AtomicLong(1);
/** The compressed schema manager for this backend. */
private PersistentCompressedSchema compressedSchema;
@@ -166,161 +121,14 @@
}
LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException
- {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy
- return importLDIFWithOnDiskMerge(importConfig, serverContext);
+ {
+ return getImportStrategy().importLDIF(importConfig, this, serverContext);
}
- private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException
+ private ImportStrategy getImportStrategy() throws DirectoryException
{
- try
- {
- ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
- try
- {
- final LDIFReader reader;
- try
- {
- reader = new LDIFReader(importConfig);
- }
- catch (Exception e)
- {
- LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
- throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
- }
-
- long importCount = 0;
- final long startTime = System.currentTimeMillis();
- timerService.scheduleAtFixedRate(new ImportProgress(reader),
- IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
- while (true)
- {
- final Entry entry;
- try
- {
- entry = reader.readEntry();
- if (entry == null)
- {
- break;
- }
- }
- catch (LDIFException le)
- {
- if (!le.canContinueReading())
- {
- LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
- throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
- }
- continue;
- }
-
- final DN dn = entry.getName();
- final EntryContainer ec = getEntryContainer(dn);
- if (ec == null)
- {
- final LocalizableMessage m = ERR_LDIF_SKIP.get(dn);
- logger.error(m);
- reader.rejectLastEntry(m);
- continue;
- }
-
- try
- {
- ec.addEntry(entry, null);
- importCount++;
- }
- catch (DirectoryException e)
- {
- switch (e.getResultCode().asEnum())
- {
- case ENTRY_ALREADY_EXISTS:
- if (importConfig.replaceExistingEntries())
- {
- final Entry oldEntry = ec.getEntry(entry.getName());
- ec.replaceEntry(oldEntry, entry, null);
- }
- else
- {
- reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get());
- }
- break;
- case NO_SUCH_OBJECT:
- reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent()));
- break;
- default:
- // Not sure why it failed.
- reader.rejectLastEntry(e.getMessageObject());
- break;
- }
- }
- }
- final long finishTime = System.currentTimeMillis();
-
- waitForShutdown(timerService);
-
- final long importTime = finishTime - startTime;
- float rate = 0;
- if (importTime > 0)
- {
- rate = 1000f * reader.getEntriesRead() / importTime;
- }
- logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(),
- reader.getEntriesRejected(), 0, importTime / 1000, rate);
- return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
- }
- finally
- {
- close();
-
- // if not already stopped, then stop it
- waitForShutdown(timerService);
- }
- }
- catch (DirectoryException e)
- {
- logger.traceException(e);
- throw e;
- }
- catch (OpenDsException e)
- {
- logger.traceException(e);
- throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
- }
- catch (Exception e)
- {
- logger.traceException(e);
- throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
- }
- }
-
- private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
- {
- timerService.shutdown();
- timerService.awaitTermination(20, TimeUnit.SECONDS);
- }
-
- private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig, ServerContext serverContext)
- throws DirectoryException
- {
- try
- {
- return new Importer(this, importConfig, config, serverContext).processImport();
- }
- catch (DirectoryException e)
- {
- logger.traceException(e);
- throw e;
- }
- catch (OpenDsException e)
- {
- logger.traceException(e);
- throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
- }
- catch (Exception e)
- {
- logger.traceException(e);
- throw new DirectoryException(getServerErrorResultCode(),
- LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
- }
+ //TODO JNR may call new SuccessiveAddsImportStrategy() depending on configured import strategy
+ return new Importer.StrategyImpl(config);
}
/**
@@ -391,15 +199,11 @@
*/
void registerEntryContainer(DN baseDN, EntryContainer entryContainer) throws InitializationException
{
- EntryContainer ec1 = this.entryContainers.get(baseDN);
-
- // If an entry container for this baseDN is already open we don't allow
- // another to be opened.
- if (ec1 != null)
+ EntryContainer ec = this.entryContainers.get(baseDN);
+ if (ec != null)
{
- throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec1.getTreePrefix(), baseDN));
+ throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec.getTreePrefix(), baseDN));
}
-
this.entryContainers.put(baseDN, entryContainer);
}
@@ -432,7 +236,7 @@
}
}
- nextid = new AtomicLong(highestID.longValue() + 1);
+ nextEntryID = new AtomicLong(highestID.longValue() + 1);
}
/**
@@ -470,7 +274,6 @@
String monitorName = backend.getBackendID() + " Storage";
monitor = new BackendMonitor(monitorName, this);
}
-
return monitor;
}
@@ -503,8 +306,7 @@
// Sort the list in order of priority.
Collections.sort(trees, new TreePreloadComparator());
- // Preload each tree until we reach the time limit or the cache
- // is filled.
+ // Preload each tree until we reach the time limit or the cache is filled.
try
{
throw new UnsupportedOperationException("Not implemented exception");
@@ -638,7 +440,7 @@
*/
EntryID getNextEntryID()
{
- return new EntryID(nextid.getAndIncrement());
+ return new EntryID(nextEntryID.getAndIncrement());
}
/**
@@ -647,7 +449,7 @@
*/
public void resetNextEntryID()
{
- nextid.set(1);
+ nextEntryID.set(1);
}
/** {@inheritDoc} */
--
Gitblit v1.10.0