| opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java | ●●●●● patch | view | raw | blame | history | |
| opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java | ●●●●● patch | view | raw | blame | history | |
| opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java | ●●●●● patch | view | raw | blame | history | |
| opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/SuccessiveAddsImportStrategy.java | ●●●●● patch | view | raw | blame | history |
opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/ImportStrategy.java
New file @@ -0,0 +1,49 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS */ package org.opends.server.backends.pluggable; import org.opends.server.core.ServerContext; import org.opends.server.types.DirectoryException; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; /** The strategy to use for importing LDIF files. */ interface ImportStrategy { /** * Imports information from an LDIF file into the supplied root container. * * @param importConfig * The configuration to use when performing the import. * @param serverContext * The server context * @return Information about the result of the import processing. * @throws DirectoryException * If a problem occurs while performing the LDIF import. * @see {@link Backend#importLDIF(LDIFImportConfig, ServerContext)} */ LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext) throws DirectoryException; } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/Importer.java
@@ -31,6 +31,7 @@ import static org.opends.server.backends.pluggable.DnKeyFormat.*; import static org.opends.server.backends.pluggable.EntryIDSet.*; import static org.opends.server.backends.pluggable.SuffixContainer.*; import static org.opends.server.core.DirectoryServer.*; import static org.opends.server.util.DynamicConstants.*; import static org.opends.server.util.ServerConstants.*; import static org.opends.server.util.StaticUtils.*; @@ -85,6 +86,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.forgerock.opendj.config.server.ConfigException; import org.forgerock.opendj.ldap.ByteSequence; @@ -127,6 +129,46 @@ */ final class Importer { /** * Shim that allows properly constructing an {@link Importer} without polluting * {@link ImportStrategy} and {@link RootContainer} with this importer inner workings. */ static final class StrategyImpl implements ImportStrategy { private final PluggableBackendCfg backendCfg; public StrategyImpl(PluggableBackendCfg backendCfg) { this.backendCfg = backendCfg; } @Override public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext) throws DirectoryException { try { return new Importer(rootContainer, importConfig, backendCfg, serverContext).processImport(); } catch (DirectoryException e) { logger.traceException(e); throw e; } catch (InitializationException | ConfigException e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e); } catch (Exception e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(stackTraceToSingleLineString(e)), e); } } } private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private static final int TIMER_INTERVAL = 10000; @@ -853,14 +895,7 @@ rebuildManager.printStopMessage(startTime); } /** * Import a LDIF using the specified root container. * * @return A LDIF result. * @throws Exception * If the import failed */ public LDIFImportResult processImport() throws Exception private LDIFImportResult processImport() throws Exception { try { try @@ -2989,17 +3024,15 @@ return timer; } private int getIndexCount() throws ConfigException, StorageRuntimeException, InitializationException private int getIndexCount() throws ConfigException, StorageRuntimeException, InitializationException { switch (rebuildConfig.getRebuildMode()) { case ALL: return getTotalIndexCount(cfg); case DEGRADED: // FIXME: since the storgae is not opened we cannot determine which // indexes are degraded. As a workaround, be conservative and assume all // indexes need rebuilding. // FIXME: since the storage is not opened we cannot determine which indexes are degraded. // As a workaround, be conservative and assume all indexes need rebuilding. return getTotalIndexCount(cfg); default: return getRebuildListIndexCount(cfg); @@ -3023,7 +3056,7 @@ { indexCount += 3; } else if ("dn2uri".equals(lowerName)) else if (DN2URI_INDEX_NAME.equals(lowerName)) { indexCount++; } @@ -3057,11 +3090,11 @@ final String indexType = attrIndexParts[1]; if (attrIndexParts.length == 2) { if ("presence".equals(indexType) || "equality".equals(indexType) || "ordering".equals(indexType) || "substring".equals(indexType) || "approximate".equals(indexType)) if (PRESENCE.toString().equals(indexType) || EQUALITY.toString().equals(indexType) || ORDERING.toString().equals(indexType) || SUBSTRING.toString().equals(indexType) || APPROXIMATE.toString().equals(indexType)) { indexCount++; } opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -27,8 +27,6 @@ package org.opends.server.backends.pluggable; import static org.opends.messages.BackendMessages.*; import static org.opends.messages.UtilityMessages.*; import static org.opends.server.core.DirectoryServer.*; import static org.opends.server.util.StaticUtils.*; import java.util.ArrayList; @@ -37,8 +35,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.forgerock.i18n.LocalizableMessage; @@ -56,20 +52,15 @@ import org.opends.server.backends.pluggable.spi.StorageStatus; import org.opends.server.backends.pluggable.spi.WriteOperation; import org.opends.server.backends.pluggable.spi.WriteableTransaction; import org.opends.server.core.DirectoryServer; import org.opends.server.core.SearchOperation; import org.opends.server.core.ServerContext; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.InitializationException; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.types.OpenDsException; import org.opends.server.types.Operation; import org.opends.server.types.Privilege; import org.opends.server.util.LDIFException; import org.opends.server.util.LDIFReader; /** * Wrapper class for a backend "container". Root container holds all the entry @@ -78,44 +69,8 @@ */ public class RootContainer implements ConfigurationChangeListener<PluggableBackendCfg> { /** Logs the progress of the import. */ private static final class ImportProgress implements Runnable { private final LDIFReader reader; private long previousCount; private long previousTime; public ImportProgress(LDIFReader reader) { this.reader = reader; } @Override public void run() { long latestCount = reader.getEntriesRead() + 0; long deltaCount = latestCount - previousCount; long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } long entriesRead = reader.getEntriesRead(); long entriesIgnored = reader.getEntriesIgnored(); long entriesRejected = reader.getEntriesRejected(); float rate = 1000f * deltaCount / deltaTime; logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate); previousCount = latestCount; previousTime = latestTime; } } private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private static final int IMPORT_PROGRESS_INTERVAL = 10000; /** The tree storage. */ private Storage storage; @@ -129,8 +84,8 @@ /** The base DNs contained in this root container. */ private final ConcurrentHashMap<DN, EntryContainer> entryContainers = new ConcurrentHashMap<DN, EntryContainer>(); /** The cached value of the next entry identifier to be assigned. */ private AtomicLong nextid = new AtomicLong(1); /** Value of the next entryID to be assigned. */ private AtomicLong nextEntryID = new AtomicLong(1); /** The compressed schema manager for this backend. */ private PersistentCompressedSchema compressedSchema; @@ -166,161 +121,14 @@ } LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy return importLDIFWithOnDiskMerge(importConfig, serverContext); { return getImportStrategy().importLDIF(importConfig, this, serverContext); } private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException private ImportStrategy getImportStrategy() throws DirectoryException { try { ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); try { final LDIFReader reader; try { reader = new LDIFReader(importConfig); } catch (Exception e) { LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e); } long importCount = 0; final long startTime = System.currentTimeMillis(); timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); while (true) { final Entry entry; try { entry = reader.readEntry(); if (entry == null) { break; } } catch (LDIFException le) { if (!le.canContinueReading()) { LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le); } continue; } final DN dn = entry.getName(); final EntryContainer ec = getEntryContainer(dn); if (ec == null) { final LocalizableMessage m = ERR_LDIF_SKIP.get(dn); logger.error(m); reader.rejectLastEntry(m); continue; } try { ec.addEntry(entry, null); importCount++; } catch (DirectoryException e) { switch (e.getResultCode().asEnum()) { case ENTRY_ALREADY_EXISTS: if (importConfig.replaceExistingEntries()) { final Entry oldEntry = ec.getEntry(entry.getName()); ec.replaceEntry(oldEntry, entry, null); } else { reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get()); } break; case NO_SUCH_OBJECT: reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent())); break; default: // Not sure why it failed. reader.rejectLastEntry(e.getMessageObject()); break; } } } final long finishTime = System.currentTimeMillis(); waitForShutdown(timerService); final long importTime = finishTime - startTime; float rate = 0; if (importTime > 0) { rate = 1000f * reader.getEntriesRead() / importTime; } logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate); return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); } finally { close(); // if not already stopped, then stop it waitForShutdown(timerService); } } catch (DirectoryException e) { logger.traceException(e); throw e; } catch (OpenDsException e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject()); } catch (Exception e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage())); } } private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException { timerService.shutdown(); timerService.awaitTermination(20, TimeUnit.SECONDS); } private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException { try { return new Importer(this, importConfig, config, serverContext).processImport(); } catch (DirectoryException e) { logger.traceException(e); throw e; } catch (OpenDsException e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e); } catch (Exception e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(stackTraceToSingleLineString(e)), e); } //TODO JNR may call new SuccessiveAddsImportStrategy() depending on configured import strategy return new Importer.StrategyImpl(config); } /** @@ -391,15 +199,11 @@ */ void registerEntryContainer(DN baseDN, EntryContainer entryContainer) throws InitializationException { EntryContainer ec1 = this.entryContainers.get(baseDN); // If an entry container for this baseDN is already open we don't allow // another to be opened. if (ec1 != null) EntryContainer ec = this.entryContainers.get(baseDN); if (ec != null) { throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec1.getTreePrefix(), baseDN)); throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec.getTreePrefix(), baseDN)); } this.entryContainers.put(baseDN, entryContainer); } @@ -432,7 +236,7 @@ } } nextid = new AtomicLong(highestID.longValue() + 1); nextEntryID = new AtomicLong(highestID.longValue() + 1); } /** @@ -470,7 +274,6 @@ String monitorName = backend.getBackendID() + " Storage"; monitor = new BackendMonitor(monitorName, this); } return monitor; } @@ -503,8 +306,7 @@ // Sort the list in order of priority. Collections.sort(trees, new TreePreloadComparator()); // Preload each tree until we reach the time limit or the cache // is filled. // Preload each tree until we reach the time limit or the cache is filled. try { throw new UnsupportedOperationException("Not implemented exception"); @@ -638,7 +440,7 @@ */ EntryID getNextEntryID() { return new EntryID(nextid.getAndIncrement()); return new EntryID(nextEntryID.getAndIncrement()); } /** @@ -647,7 +449,7 @@ */ public void resetNextEntryID() { nextid.set(1); nextEntryID.set(1); } /** {@inheritDoc} */ opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/SuccessiveAddsImportStrategy.java
New file @@ -0,0 +1,222 @@ /* * CDDL HEADER START * * The contents of this file are subject to the terms of the * Common Development and Distribution License, Version 1.0 only * (the "License"). You may not use this file except in compliance * with the License. * * You can obtain a copy of the license at legal-notices/CDDLv1_0.txt * or http://forgerock.org/license/CDDLv1.0.html. * See the License for the specific language governing permissions * and limitations under the License. * * When distributing Covered Code, include this CDDL HEADER in each * file and include the License file at legal-notices/CDDLv1_0.txt. * If applicable, add the following below this CDDL HEADER, with the * fields enclosed by brackets "[]" replaced with your own identifying * information: * Portions Copyright [yyyy] [name of copyright owner] * * CDDL HEADER END * * Copyright 2015 ForgeRock AS */ package org.opends.server.backends.pluggable; import static org.opends.messages.BackendMessages.*; import static org.opends.messages.UtilityMessages.*; import static org.opends.server.core.DirectoryServer.*; import static org.opends.server.util.StaticUtils.*; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.forgerock.i18n.LocalizableMessage; import org.forgerock.i18n.slf4j.LocalizedLogger; import org.opends.server.core.DirectoryServer; import org.opends.server.core.ServerContext; import org.opends.server.types.DN; import org.opends.server.types.DirectoryException; import org.opends.server.types.Entry; import org.opends.server.types.LDIFImportConfig; import org.opends.server.types.LDIFImportResult; import org.opends.server.types.OpenDsException; import org.opends.server.util.LDIFException; import org.opends.server.util.LDIFReader; /** * Imports LDIF entries one by one, by calling * {@link EntryContainer#addEntry(Entry, org.opends.server.core.AddOperation)}. */ final class SuccessiveAddsImportStrategy implements ImportStrategy { /** Logs the progress of the import. */ private static final class ImportProgress implements Runnable { private final LDIFReader reader; private long previousCount; private long previousTime; public ImportProgress(LDIFReader reader) { this.reader = reader; } @Override public void run() { long latestCount = reader.getEntriesRead() + 0; long deltaCount = latestCount - previousCount; long latestTime = System.currentTimeMillis(); long deltaTime = latestTime - previousTime; if (deltaTime == 0) { return; } long entriesRead = reader.getEntriesRead(); long entriesIgnored = reader.getEntriesIgnored(); long entriesRejected = reader.getEntriesRejected(); float rate = 1000f * deltaCount / deltaTime; logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate); previousCount = latestCount; previousTime = latestTime; } } private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); private static final int IMPORT_PROGRESS_INTERVAL = 10000; /** {@inheritDoc} */ @Override public LDIFImportResult importLDIF(LDIFImportConfig importConfig, RootContainer rootContainer, ServerContext serverContext) throws DirectoryException { try { ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); try { final LDIFReader reader; try { reader = new LDIFReader(importConfig); } catch (Exception e) { LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e); } long importCount = 0; final long startTime = System.currentTimeMillis(); timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); while (true) { final Entry entry; try { entry = reader.readEntry(); if (entry == null) { break; } } catch (LDIFException le) { if (!le.canContinueReading()) { LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le)); throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le); } continue; } final DN dn = entry.getName(); final EntryContainer ec = rootContainer.getEntryContainer(dn); if (ec == null) { final LocalizableMessage m = ERR_LDIF_SKIP.get(dn); logger.error(m); reader.rejectLastEntry(m); continue; } try { ec.addEntry(entry, null); importCount++; } catch (DirectoryException e) { switch (e.getResultCode().asEnum()) { case ENTRY_ALREADY_EXISTS: if (importConfig.replaceExistingEntries()) { final Entry oldEntry = ec.getEntry(entry.getName()); ec.replaceEntry(oldEntry, entry, null); } else { reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get()); } break; case NO_SUCH_OBJECT: reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent())); break; default: // Not sure why it failed. reader.rejectLastEntry(e.getMessageObject()); break; } } } final long finishTime = System.currentTimeMillis(); waitForShutdown(timerService); final long importTime = finishTime - startTime; float rate = 0; if (importTime > 0) { rate = 1000f * reader.getEntriesRead() / importTime; } logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate); return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); } finally { rootContainer.close(); // if not already stopped, then stop it waitForShutdown(timerService); } } catch (DirectoryException e) { logger.traceException(e); throw e; } catch (OpenDsException e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject()); } catch (Exception e) { logger.traceException(e); throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage())); } } private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException { timerService.shutdown(); timerService.awaitTermination(20, TimeUnit.SECONDS); } }