| | |
| | | package org.opends.server.backends.pluggable; |
| | | |
| | | import static org.opends.messages.BackendMessages.*; |
| | | import static org.opends.messages.UtilityMessages.*; |
| | | import static org.opends.server.core.DirectoryServer.*; |
| | | import static org.opends.server.util.StaticUtils.*; |
| | | |
| | | import java.util.ArrayList; |
| | |
| | | import java.util.List; |
| | | import java.util.Set; |
| | | import java.util.concurrent.ConcurrentHashMap; |
| | | import java.util.concurrent.ScheduledThreadPoolExecutor; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.concurrent.atomic.AtomicLong; |
| | | |
| | | import org.forgerock.i18n.LocalizableMessage; |
| | |
| | | import org.opends.server.backends.pluggable.spi.StorageStatus; |
| | | import org.opends.server.backends.pluggable.spi.WriteOperation; |
| | | import org.opends.server.backends.pluggable.spi.WriteableTransaction; |
| | | import org.opends.server.core.DirectoryServer; |
| | | import org.opends.server.core.SearchOperation; |
| | | import org.opends.server.core.ServerContext; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.Entry; |
| | | import org.opends.server.types.InitializationException; |
| | | import org.opends.server.types.LDIFImportConfig; |
| | | import org.opends.server.types.LDIFImportResult; |
| | | import org.opends.server.types.OpenDsException; |
| | | import org.opends.server.types.Operation; |
| | | import org.opends.server.types.Privilege; |
| | | import org.opends.server.util.LDIFException; |
| | | import org.opends.server.util.LDIFReader; |
| | | |
| | | /** |
| | | * Wrapper class for a backend "container". Root container holds all the entry |
| | |
| | | */ |
| | | public class RootContainer implements ConfigurationChangeListener<PluggableBackendCfg> |
| | | { |
| | | /** Logs the progress of the import. */ |
| | | private static final class ImportProgress implements Runnable |
| | | { |
| | | private final LDIFReader reader; |
| | | private long previousCount; |
| | | private long previousTime; |
| | | |
| | | public ImportProgress(LDIFReader reader) |
| | | { |
| | | this.reader = reader; |
| | | } |
| | | |
| | | @Override |
| | | public void run() |
| | | { |
| | | long latestCount = reader.getEntriesRead() + 0; |
| | | long deltaCount = latestCount - previousCount; |
| | | long latestTime = System.currentTimeMillis(); |
| | | long deltaTime = latestTime - previousTime; |
| | | if (deltaTime == 0) |
| | | { |
| | | return; |
| | | } |
| | | long entriesRead = reader.getEntriesRead(); |
| | | long entriesIgnored = reader.getEntriesIgnored(); |
| | | long entriesRejected = reader.getEntriesRejected(); |
| | | float rate = 1000f * deltaCount / deltaTime; |
| | | logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate); |
| | | |
| | | previousCount = latestCount; |
| | | previousTime = latestTime; |
| | | } |
| | | } |
| | | |
| | | private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass(); |
| | | |
| | | private static final int IMPORT_PROGRESS_INTERVAL = 10000; |
| | | |
| | | /** The tree storage. */ |
| | | private Storage storage; |
| | | |
| | |
| | | /** The base DNs contained in this root container. */ |
| | | private final ConcurrentHashMap<DN, EntryContainer> entryContainers = new ConcurrentHashMap<DN, EntryContainer>(); |
| | | |
| | | /** The cached value of the next entry identifier to be assigned. */ |
| | | private AtomicLong nextid = new AtomicLong(1); |
| | | /** Value of the next entryID to be assigned. */ |
| | | private AtomicLong nextEntryID = new AtomicLong(1); |
| | | |
| | | /** The compressed schema manager for this backend. */ |
| | | private PersistentCompressedSchema compressedSchema; |
| | |
| | | } |
| | | |
| | | LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException |
| | | {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy |
| | | return importLDIFWithOnDiskMerge(importConfig, serverContext); |
| | | { |
| | | return getImportStrategy().importLDIF(importConfig, this, serverContext); |
| | | } |
| | | |
| | | private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException |
| | | private ImportStrategy getImportStrategy() throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1); |
| | | try |
| | | { |
| | | final LDIFReader reader; |
| | | try |
| | | { |
| | | reader = new LDIFReader(importConfig); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e)); |
| | | throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e); |
| | | } |
| | | |
| | | long importCount = 0; |
| | | final long startTime = System.currentTimeMillis(); |
| | | timerService.scheduleAtFixedRate(new ImportProgress(reader), |
| | | IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS); |
| | | while (true) |
| | | { |
| | | final Entry entry; |
| | | try |
| | | { |
| | | entry = reader.readEntry(); |
| | | if (entry == null) |
| | | { |
| | | break; |
| | | } |
| | | } |
| | | catch (LDIFException le) |
| | | { |
| | | if (!le.canContinueReading()) |
| | | { |
| | | LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le)); |
| | | throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le); |
| | | } |
| | | continue; |
| | | } |
| | | |
| | | final DN dn = entry.getName(); |
| | | final EntryContainer ec = getEntryContainer(dn); |
| | | if (ec == null) |
| | | { |
| | | final LocalizableMessage m = ERR_LDIF_SKIP.get(dn); |
| | | logger.error(m); |
| | | reader.rejectLastEntry(m); |
| | | continue; |
| | | } |
| | | |
| | | try |
| | | { |
| | | ec.addEntry(entry, null); |
| | | importCount++; |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | switch (e.getResultCode().asEnum()) |
| | | { |
| | | case ENTRY_ALREADY_EXISTS: |
| | | if (importConfig.replaceExistingEntries()) |
| | | { |
| | | final Entry oldEntry = ec.getEntry(entry.getName()); |
| | | ec.replaceEntry(oldEntry, entry, null); |
| | | } |
| | | else |
| | | { |
| | | reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get()); |
| | | } |
| | | break; |
| | | case NO_SUCH_OBJECT: |
| | | reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent())); |
| | | break; |
| | | default: |
| | | // Not sure why it failed. |
| | | reader.rejectLastEntry(e.getMessageObject()); |
| | | break; |
| | | } |
| | | } |
| | | } |
| | | final long finishTime = System.currentTimeMillis(); |
| | | |
| | | waitForShutdown(timerService); |
| | | |
| | | final long importTime = finishTime - startTime; |
| | | float rate = 0; |
| | | if (importTime > 0) |
| | | { |
| | | rate = 1000f * reader.getEntriesRead() / importTime; |
| | | } |
| | | logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(), |
| | | reader.getEntriesRejected(), 0, importTime / 1000, rate); |
| | | return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored()); |
| | | } |
| | | finally |
| | | { |
| | | close(); |
| | | |
| | | // if not already stopped, then stop it |
| | | waitForShutdown(timerService); |
| | | } |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | logger.traceException(e); |
| | | throw e; |
| | | } |
| | | catch (OpenDsException e) |
| | | { |
| | | logger.traceException(e); |
| | | throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject()); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.traceException(e); |
| | | throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage())); |
| | | } |
| | | } |
| | | |
| | | private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException |
| | | { |
| | | timerService.shutdown(); |
| | | timerService.awaitTermination(20, TimeUnit.SECONDS); |
| | | } |
| | | |
| | | private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig, ServerContext serverContext) |
| | | throws DirectoryException |
| | | { |
| | | try |
| | | { |
| | | return new Importer(this, importConfig, config, serverContext).processImport(); |
| | | } |
| | | catch (DirectoryException e) |
| | | { |
| | | logger.traceException(e); |
| | | throw e; |
| | | } |
| | | catch (OpenDsException e) |
| | | { |
| | | logger.traceException(e); |
| | | throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e); |
| | | } |
| | | catch (Exception e) |
| | | { |
| | | logger.traceException(e); |
| | | throw new DirectoryException(getServerErrorResultCode(), |
| | | LocalizableMessage.raw(stackTraceToSingleLineString(e)), e); |
| | | } |
| | | //TODO JNR may call new SuccessiveAddsImportStrategy() depending on configured import strategy |
| | | return new Importer.StrategyImpl(config); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | void registerEntryContainer(DN baseDN, EntryContainer entryContainer) throws InitializationException |
| | | { |
| | | EntryContainer ec1 = this.entryContainers.get(baseDN); |
| | | |
| | | // If an entry container for this baseDN is already open we don't allow |
| | | // another to be opened. |
| | | if (ec1 != null) |
| | | EntryContainer ec = this.entryContainers.get(baseDN); |
| | | if (ec != null) |
| | | { |
| | | throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec1.getTreePrefix(), baseDN)); |
| | | throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec.getTreePrefix(), baseDN)); |
| | | } |
| | | |
| | | this.entryContainers.put(baseDN, entryContainer); |
| | | } |
| | | |
| | |
| | | } |
| | | } |
| | | |
| | | nextid = new AtomicLong(highestID.longValue() + 1); |
| | | nextEntryID = new AtomicLong(highestID.longValue() + 1); |
| | | } |
| | | |
| | | /** |
| | |
| | | String monitorName = backend.getBackendID() + " Storage"; |
| | | monitor = new BackendMonitor(monitorName, this); |
| | | } |
| | | |
| | | return monitor; |
| | | } |
| | | |
| | |
| | | // Sort the list in order of priority. |
| | | Collections.sort(trees, new TreePreloadComparator()); |
| | | |
| | | // Preload each tree until we reach the time limit or the cache |
| | | // is filled. |
| | | // Preload each tree until we reach the time limit or the cache is filled. |
| | | try |
| | | { |
| | | throw new UnsupportedOperationException("Not implemented exception"); |
| | |
| | | */ |
| | | EntryID getNextEntryID() |
| | | { |
| | | return new EntryID(nextid.getAndIncrement()); |
| | | return new EntryID(nextEntryID.getAndIncrement()); |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | public void resetNextEntryID() |
| | | { |
| | | nextid.set(1); |
| | | nextEntryID.set(1); |
| | | } |
| | | |
| | | /** {@inheritDoc} */ |