| | |
| | | import com.sleepycat.je.StatsConfig; |
| | | import com.sleepycat.je.Transaction; |
| | | |
| | | import org.opends.server.api.Backend; |
| | | import org.opends.server.types.DebugLogLevel; |
| | | import org.opends.server.messages.JebMessages; |
| | | import org.opends.server.types.AttributeType; |
| | | import org.opends.server.types.DirectoryException; |
| | | import org.opends.server.types.DN; |
| | | import org.opends.server.types.Entry; |
| | |
| | | import org.opends.server.types.ResultCode; |
| | | import org.opends.server.util.LDIFException; |
| | | import org.opends.server.util.LDIFReader; |
| | | import org.opends.server.util.StaticUtils; |
| | | import static org.opends.server.util.StaticUtils.getFileForPath; |
| | | |
| | | import java.io.File; |
| | | import java.io.IOException; |
| | | import java.util.ArrayList; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.Timer; |
| | | import java.util.TimerTask; |
| | | import java.util.concurrent.LinkedBlockingQueue; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | import static org.opends.server.messages.JebMessages. |
| | | MSGID_JEB_IMPORT_ENTRY_EXISTS; |
| | | MSGID_JEB_IMPORT_ENTRY_EXISTS; |
| | | import static org.opends.server.messages.MessageHandler.getMessage; |
| | | import static org.opends.server.messages.JebMessages. |
| | | MSGID_JEB_IMPORT_PARENT_NOT_FOUND; |
| | | MSGID_JEB_IMPORT_PARENT_NOT_FOUND; |
| | | import static org.opends.server.loggers.ErrorLogger.logError; |
| | | import static org.opends.server.loggers.debug.DebugLogger.*; |
| | | import org.opends.server.loggers.debug.DebugTracer; |
| | | import static org.opends.server.messages.JebMessages.*; |
| | | import org.opends.server.admin.std.server.JEBackendCfg; |
| | | |
| | | /** |
| | | * Import from LDIF to a JE backend. |
| | |
| | | private static final DebugTracer TRACER = getTracer(); |
| | | |
| | | /** |
| | | * The backend instance we are importing into. |
| | | */ |
| | | private Backend backend; |
| | | |
| | | /** |
| | | * The JE backend configuration. |
| | | */ |
| | | private Config config; |
| | | private JEBackendCfg config; |
| | | |
| | | /** |
| | | * The root container used for this import job. |
| | |
| | | * Map of base DNs to their import context. |
| | | */ |
| | | private HashMap<DN,ImportContext> importMap = |
| | | new HashMap<DN, ImportContext>(); |
| | | new HashMap<DN, ImportContext>(); |
| | | |
| | | /** |
| | | * The number of entries imported. |
| | |
| | | */ |
| | | private long progressInterval = 10000; |
| | | |
| | | |
| | | /** |
| | | * The import worker threads. |
| | | */ |
| | | private CopyOnWriteArrayList<ImportThread> threads; |
| | | |
| | | /** |
| | | * Create a new import job. |
| | | * |
| | | * @param backend The backend performing the import. |
| | | * @param config The backend configuration. |
| | | * @param ldifImportConfig The LDIF import configuration. |
| | | */ |
| | | public ImportJob(Backend backend, Config config, |
| | | LDIFImportConfig ldifImportConfig) |
| | | public ImportJob(LDIFImportConfig ldifImportConfig) |
| | | { |
| | | this.backend = backend; |
| | | this.config = config; |
| | | this.ldifImportConfig = ldifImportConfig; |
| | | this.threads = new CopyOnWriteArrayList<ImportThread>(); |
| | | } |
| | | |
| | | /** |
| | |
| | | * processes the LDIF file, then merges the resulting intermediate |
| | | * files to load the index databases. |
| | | * |
| | | * @param rootContainer The root container to import into. |
| | | * @throws DatabaseException If an error occurs in the JE database. |
| | | * @throws IOException If a problem occurs while opening the LDIF file for |
| | | * reading, or while reading from the LDIF file. |
| | | * @throws JebException If an error occurs in the JE backend. |
| | | */ |
| | | public void importLDIF() |
| | | public void importLDIF(RootContainer rootContainer) |
| | | throws DatabaseException, IOException, JebException |
| | | { |
| | | /* |
| | | envConfig.setConfigParam("je.env.runCleaner", "false"); |
| | | envConfig.setConfigParam("je.log.numBuffers", "2"); |
| | | envConfig.setConfigParam("je.log.bufferSize", "15000000"); |
| | | envConfig.setConfigParam("je.log.totalBufferBytes", "30000000"); |
| | | envConfig.setConfigParam("je.log.fileMax", "100000000"); |
| | | */ |
| | | |
| | | // Create an LDIF reader. Throws an exception if the file does not exist. |
| | | reader = new LDIFReader(ldifImportConfig); |
| | | this.rootContainer = rootContainer; |
| | | this.config = rootContainer.getConfiguration(); |
| | | |
| | | int msgID; |
| | | String message; |
| | | long startTime; |
| | | |
| | | try |
| | | { |
| | | rootContainer = new RootContainer(config, backend); |
| | | if (ldifImportConfig.appendToExistingData()) |
| | | { |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, true, true, true, false); |
| | | } |
| | | else |
| | | { |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, false, false, false, false); |
| | | } |
| | | |
| | | if (!ldifImportConfig.appendToExistingData()) |
| | | { |
| | | // We have the writer lock on the environment, now delete the |
| | | // environment and re-open it. Only do this when we are |
| | | // importing to all the base DNs in the backend. |
| | | rootContainer.close(); |
| | | EnvManager.removeFiles(config.getBackendDirectory().getPath()); |
| | | rootContainer.open(config.getBackendDirectory(), |
| | | config.getBackendPermission(), |
| | | false, true, false, false, false, false); |
| | | } |
| | | |
| | | // Divide the total buffer size by the number of threads |
| | | // and give that much to each thread. |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | long bufferSize = config.getImportBufferSize() / |
| | | (importThreadCount*config.getBaseDNs().length); |
| | | int importThreadCount = config.getBackendImportThreadCount(); |
| | | long bufferSize = config.getBackendImportBufferSize() / |
| | | (importThreadCount*rootContainer.getBaseDNs().size()); |
| | | |
| | | msgID = MSGID_JEB_IMPORT_THREAD_COUNT; |
| | | message = getMessage(msgID, importThreadCount); |
| | |
| | | message, msgID); |
| | | |
| | | TRACER.debugInfo( |
| | | rootContainer.getEnvironmentConfig().toString()); |
| | | |
| | | |
| | | rootContainer.openEntryContainers(config.getBaseDNs()); |
| | | rootContainer.getEnvironmentConfig().toString()); |
| | | |
| | | // Create the import contexts for each base DN. |
| | | DN baseDN; |
| | |
| | | |
| | | // Create an entry queue. |
| | | LinkedBlockingQueue<Entry> queue = |
| | | new LinkedBlockingQueue<Entry>(config.getImportQueueSize()); |
| | | new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize()); |
| | | importContext.setQueue(queue); |
| | | |
| | | importMap.put(baseDN, importContext); |
| | |
| | | // Make a note of the time we started. |
| | | startTime = System.currentTimeMillis(); |
| | | |
| | | // Create a temporary work directory. |
| | | File tempDir = getFileForPath(config.getBackendImportTempDirectory()); |
| | | if(!tempDir.exists() && !tempDir.mkdir()) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_CREATE_TMPDIR_ERROR; |
| | | String msg = getMessage(msgID, tempDir); |
| | | throw new IOException(msg); |
| | | } |
| | | |
| | | if (tempDir.listFiles() != null) |
| | | { |
| | | for (File f : tempDir.listFiles()) |
| | | { |
| | | f.delete(); |
| | | } |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Create a temporary work directory. |
| | | File tempDir = new File(config.getImportTempDirectory()); |
| | | tempDir.mkdir(); |
| | | if (tempDir.listFiles() != null) |
| | | importedCount = 0; |
| | | int passNumber = 1; |
| | | boolean moreData = true; |
| | | while (moreData) |
| | | { |
| | | for (File f : tempDir.listFiles()) |
| | | moreData = processLDIF(); |
| | | if (moreData) |
| | | { |
| | | f.delete(); |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE; |
| | | message = getMessage(msgID, passNumber++); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | |
| | | try |
| | | { |
| | | importedCount = 0; |
| | | int passNumber = 1; |
| | | boolean moreData = true; |
| | | while (moreData) |
| | | else |
| | | { |
| | | moreData = processLDIF(); |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE; |
| | | message = getMessage(msgID, passNumber++); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | |
| | | |
| | | long mergeStartTime = System.currentTimeMillis(); |
| | | merge(); |
| | | long mergeEndTime = System.currentTimeMillis(); |
| | | |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | tempDir.delete(); |
| | | |
| | | |
| | | long mergeStartTime = System.currentTimeMillis(); |
| | | merge(); |
| | | long mergeEndTime = System.currentTimeMillis(); |
| | | |
| | | if (moreData) |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | else |
| | | { |
| | | msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED; |
| | | message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | } |
| | | } |
| | | } |
| | | finally |
| | | { |
| | | rootContainer.close(); |
| | | |
| | | // Sync the environment to disk. |
| | | msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE; |
| | | message = getMessage(msgID); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE, |
| | | message, msgID); |
| | | tempDir.delete(); |
| | | } |
| | | } |
| | | finally |
| | |
| | | */ |
| | | public void merge() |
| | | { |
| | | Map<AttributeType,IndexConfig> |
| | | indexConfigs = config.getIndexConfigMap(); |
| | | ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>(); |
| | | |
| | | // Create merge threads for each base DN. |
| | |
| | | EntryContainer entryContainer = importContext.getEntryContainer(); |
| | | |
| | | // For each configured attribute index. |
| | | for (IndexConfig indexConfig : indexConfigs.values()) |
| | | for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes()) |
| | | { |
| | | AttributeIndex attrIndex = |
| | | entryContainer.getAttributeIndex(indexConfig.getAttributeType()); |
| | | if (indexConfig.isEqualityIndex()) |
| | | int indexEntryLimit = config.getBackendIndexEntryLimit(); |
| | | if(attrIndex.getConfiguration().getIndexEntryLimit() != null) |
| | | { |
| | | indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit(); |
| | | } |
| | | |
| | | if (attrIndex.equalityIndex != null) |
| | | { |
| | | Index index = attrIndex.equalityIndex; |
| | | String name = containerName + "_" + index.toString(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, index, |
| | | indexConfig.getEqualityEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (indexConfig.isPresenceIndex()) |
| | | if (attrIndex.presenceIndex != null) |
| | | { |
| | | Index index = attrIndex.presenceIndex; |
| | | String name = containerName + "_" + index.toString(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, index, |
| | | indexConfig.getPresenceEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (indexConfig.isSubstringIndex()) |
| | | if (attrIndex.substringIndex != null) |
| | | { |
| | | Index index = attrIndex.substringIndex; |
| | | String name = containerName + "_" + index.toString(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, index, |
| | | indexConfig.getSubstringEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (indexConfig.isOrderingIndex()) |
| | | if (attrIndex.orderingIndex != null) |
| | | { |
| | | Index index = attrIndex.orderingIndex; |
| | | String name = containerName + "_" + index.toString(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, index, |
| | | indexConfig.getEqualityEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | if (indexConfig.isApproximateIndex()) |
| | | if (attrIndex.approximateIndex != null) |
| | | { |
| | | Index index = attrIndex.approximateIndex; |
| | | String name = containerName + "_" + index.toString(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, index, |
| | | indexConfig.getEqualityEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, index, |
| | | indexEntryLimit); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | } |
| | | |
| | | // Id2Children index. |
| | | Index id2Children = entryContainer.getID2Children(); |
| | | String name = containerName + "_" + id2Children.toString(); |
| | | IndexMergeThread indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, |
| | | id2Children, |
| | | config.getBackendIndexEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Children, |
| | | config.getBackendIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | |
| | | // Id2Subtree index. |
| | | Index id2Subtree = entryContainer.getID2Subtree(); |
| | | name = containerName + "_" + id2Subtree.toString(); |
| | | indexMergeThread = |
| | | new IndexMergeThread(name, config, ldifImportConfig, |
| | | id2Subtree, |
| | | config.getBackendIndexEntryLimit()); |
| | | new IndexMergeThread(config, |
| | | ldifImportConfig, |
| | | id2Subtree, |
| | | config.getBackendIndexEntryLimit()); |
| | | mergers.add(indexMergeThread); |
| | | } |
| | | |
| | |
| | | * reading, or while reading from the LDIF file. |
| | | */ |
| | | private boolean processLDIF() |
| | | throws JebException, DatabaseException, IOException |
| | | throws JebException, DatabaseException, IOException |
| | | { |
| | | boolean moreData = false; |
| | | |
| | | ArrayList<ImportThread> threads; |
| | | |
| | | // Create one set of worker threads for each base DN. |
| | | int importThreadCount = config.getImportThreadCount(); |
| | | threads = new ArrayList<ImportThread>(importThreadCount*importMap.size()); |
| | | int importThreadCount = config.getBackendImportThreadCount(); |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | for (int i = 0; i < importThreadCount; i++) |
| | |
| | | // Create a counter to use to determine whether we've hit the import |
| | | // pass size. |
| | | int entriesProcessed = 0; |
| | | int importPassSize = config.getImportPassSize(); |
| | | int importPassSize = config.getBackendImportPassSize(); |
| | | if (importPassSize <= 0) |
| | | { |
| | | importPassSize = Integer.MAX_VALUE; |
| | |
| | | { |
| | | do |
| | | { |
| | | if(threads.size() <= 0) |
| | | { |
| | | int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS; |
| | | String msg = getMessage(msgID); |
| | | throw new JebException(msgID, msg); |
| | | } |
| | | |
| | | try |
| | | { |
| | | // Read the next entry. |
| | |
| | | } |
| | | } while (true); |
| | | |
| | | // Wait for the queues to be drained. |
| | | for (ImportContext ic : importMap.values()) |
| | | if(threads.size() > 0) |
| | | { |
| | | while (ic.getQueue().size() > 0) |
| | | // Wait for the queues to be drained. |
| | | for (ImportContext ic : importMap.values()) |
| | | { |
| | | try |
| | | while (ic.getQueue().size() > 0) |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (Exception e) |
| | | { |
| | | // No action needed. |
| | | try |
| | | { |
| | | Thread.sleep(100); |
| | | } catch (Exception e) |
| | | { |
| | | // No action needed. |
| | | } |
| | | } |
| | | } |
| | | } |
| | |
| | | * @throws JebException If an error occurs in the JE backend. |
| | | */ |
| | | public void processEntry(ImportContext importContext, Entry entry) |
| | | throws JebException, DatabaseException |
| | | throws JebException, DatabaseException |
| | | { |
| | | DN entryDN = entry.getDN(); |
| | | LDIFImportConfig ldifImportConfig = importContext.getLDIFImportConfig(); |
| | |
| | | { |
| | | // See if we are allowed to replace the entry that exists. |
| | | if (ldifImportConfig.appendToExistingData() && |
| | | ldifImportConfig.replaceExistingEntries()) |
| | | ldifImportConfig.replaceExistingEntries()) |
| | | { |
| | | // Read the existing entry contents. |
| | | Entry oldEntry = id2entry.get(txn, entryID); |
| | |
| | | // Make sure the parent entry exists, unless this entry is a base DN. |
| | | EntryID parentID = null; |
| | | DN parentDN = importContext.getEntryContainer(). |
| | | getParentWithinBase(entryDN); |
| | | getParentWithinBase(entryDN); |
| | | if (parentDN != null) |
| | | { |
| | | parentID = dn2id.get(txn, parentDN); |
| | |
| | | // Put the entry on the queue. |
| | | try |
| | | { |
| | | importContext.getQueue().put(entry); |
| | | while(!importContext.getQueue().offer(entry, 1000, |
| | | TimeUnit.MILLISECONDS)) |
| | | { |
| | | if(threads.size() <= 0) |
| | | { |
| | | // All worker threads died. We must stop now. |
| | | return; |
| | | } |
| | | } |
| | | } |
| | | catch (InterruptedException e) |
| | | { |
| | |
| | | */ |
| | | public void uncaughtException(Thread t, Throwable e) |
| | | { |
| | | e.printStackTrace(); |
| | | threads.remove(t); |
| | | int msgID = MSGID_JEB_IMPORT_THREAD_EXCEPTION; |
| | | String msg = getMessage(msgID, t.getName(), |
| | | StaticUtils.stackTraceToSingleLineString(e)); |
| | | logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, msg, |
| | | msgID); |
| | | } |
| | | |
| | | /** |
| | |
| | | EnvironmentStats envStats = |
| | | rootContainer.getEnvironmentStats(new StatsConfig()); |
| | | long nCacheMiss = |
| | | envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); |
| | | envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss(); |
| | | |
| | | float cacheMissRate = 0; |
| | | if (deltaCount > 0) |