mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

boli
20.27.2007 fceec4773fdc91fa7e11334a56f0403fd69292ed
opendj-sdk/opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -31,10 +31,8 @@
import com.sleepycat.je.StatsConfig;
import com.sleepycat.je.Transaction;
import org.opends.server.api.Backend;
import org.opends.server.types.DebugLogLevel;
import org.opends.server.messages.JebMessages;
import org.opends.server.types.AttributeType;
import org.opends.server.types.DirectoryException;
import org.opends.server.types.DN;
import org.opends.server.types.Entry;
@@ -44,25 +42,29 @@
import org.opends.server.types.ResultCode;
import org.opends.server.util.LDIFException;
import org.opends.server.util.LDIFReader;
import org.opends.server.util.StaticUtils;
import static org.opends.server.util.StaticUtils.getFileForPath;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import static org.opends.server.messages.JebMessages.
     MSGID_JEB_IMPORT_ENTRY_EXISTS;
    MSGID_JEB_IMPORT_ENTRY_EXISTS;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.JebMessages.
     MSGID_JEB_IMPORT_PARENT_NOT_FOUND;
    MSGID_JEB_IMPORT_PARENT_NOT_FOUND;
import static org.opends.server.loggers.ErrorLogger.logError;
import static org.opends.server.loggers.debug.DebugLogger.*;
import org.opends.server.loggers.debug.DebugTracer;
import static org.opends.server.messages.JebMessages.*;
import org.opends.server.admin.std.server.JEBackendCfg;
/**
 * Import from LDIF to a JE backend.
@@ -75,14 +77,9 @@
  private static final DebugTracer TRACER = getTracer();
  /**
   * The backend instance we are importing into.
   */
  private Backend backend;
  /**
   * The JE backend configuration.
   */
  private Config config;
  private JEBackendCfg config;
  /**
   * The root container used for this import job.
@@ -103,7 +100,7 @@
   * Map of base DNs to their import context.
   */
  private HashMap<DN,ImportContext> importMap =
       new HashMap<DN, ImportContext>();
      new HashMap<DN, ImportContext>();
  /**
   * The number of entries imported.
@@ -116,19 +113,21 @@
   */
  private long progressInterval = 10000;
  /**
   * The import worker threads.
   */
  private CopyOnWriteArrayList<ImportThread> threads;
  /**
   * Create a new import job.
   *
   * @param backend The backend performing the import.
   * @param config The backend configuration.
   * @param ldifImportConfig The LDIF import configuration.
   */
  public ImportJob(Backend backend, Config config,
                   LDIFImportConfig ldifImportConfig)
  public ImportJob(LDIFImportConfig ldifImportConfig)
  {
    this.backend = backend;
    this.config = config;
    this.ldifImportConfig = ldifImportConfig;
    this.threads = new CopyOnWriteArrayList<ImportThread>();
  }
  /**
@@ -138,61 +137,32 @@
   * processes the LDIF file, then merges the resulting intermediate
   * files to load the index databases.
   *
   * @param rootContainer The root container to import into.
   * @throws DatabaseException If an error occurs in the JE database.
   * @throws IOException  If a problem occurs while opening the LDIF file for
   *                      reading, or while reading from the LDIF file.
   * @throws JebException If an error occurs in the JE backend.
   */
  public void importLDIF()
  public void importLDIF(RootContainer rootContainer)
      throws DatabaseException, IOException, JebException
  {
/*
    envConfig.setConfigParam("je.env.runCleaner", "false");
    envConfig.setConfigParam("je.log.numBuffers", "2");
    envConfig.setConfigParam("je.log.bufferSize", "15000000");
    envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
    envConfig.setConfigParam("je.log.fileMax", "100000000");
*/
    // Create an LDIF reader. Throws an exception if the file does not exist.
    reader = new LDIFReader(ldifImportConfig);
    this.rootContainer = rootContainer;
    this.config = rootContainer.getConfiguration();
    int msgID;
    String message;
    long startTime;
    try
    {
      rootContainer = new RootContainer(config, backend);
      if (ldifImportConfig.appendToExistingData())
      {
        rootContainer.open(config.getBackendDirectory(),
                           config.getBackendPermission(),
                           false, true, true, true, true, false);
      }
      else
      {
        rootContainer.open(config.getBackendDirectory(),
                           config.getBackendPermission(),
                           false, true, false, false, false, false);
      }
      if (!ldifImportConfig.appendToExistingData())
      {
        // We have the writer lock on the environment, now delete the
        // environment and re-open it. Only do this when we are
        // importing to all the base DNs in the backend.
        rootContainer.close();
        EnvManager.removeFiles(config.getBackendDirectory().getPath());
        rootContainer.open(config.getBackendDirectory(),
                           config.getBackendPermission(),
                           false, true, false, false, false, false);
      }
      // Divide the total buffer size by the number of threads
      // and give that much to each thread.
      int importThreadCount = config.getImportThreadCount();
      long bufferSize = config.getImportBufferSize() /
           (importThreadCount*config.getBaseDNs().length);
      int importThreadCount = config.getBackendImportThreadCount();
      long bufferSize = config.getBackendImportBufferSize() /
          (importThreadCount*rootContainer.getBaseDNs().size());
      msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
      message = getMessage(msgID, importThreadCount);
@@ -211,10 +181,7 @@
               message, msgID);
      TRACER.debugInfo(
        rootContainer.getEnvironmentConfig().toString());
      rootContainer.openEntryContainers(config.getBaseDNs());
          rootContainer.getEnvironmentConfig().toString());
      // Create the import contexts for each base DN.
      DN baseDN;
@@ -237,7 +204,7 @@
        // Create an entry queue.
        LinkedBlockingQueue<Entry> queue =
             new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
            new LinkedBlockingQueue<Entry>(config.getBackendImportQueueSize());
        importContext.setQueue(queue);
        importMap.put(baseDN, importContext);
@@ -246,77 +213,70 @@
      // Make a note of the time we started.
      startTime = System.currentTimeMillis();
      // Create a temporary work directory.
      File tempDir = getFileForPath(config.getBackendImportTempDirectory());
      if(!tempDir.exists() && !tempDir.mkdir())
      {
        msgID = MSGID_JEB_IMPORT_CREATE_TMPDIR_ERROR;
        String msg = getMessage(msgID, tempDir);
        throw new IOException(msg);
      }
      if (tempDir.listFiles() != null)
      {
        for (File f : tempDir.listFiles())
        {
          f.delete();
        }
      }
      try
      {
        // Create a temporary work directory.
        File tempDir = new File(config.getImportTempDirectory());
        tempDir.mkdir();
        if (tempDir.listFiles() != null)
        importedCount = 0;
        int     passNumber = 1;
        boolean moreData   = true;
        while (moreData)
        {
          for (File f : tempDir.listFiles())
          moreData = processLDIF();
          if (moreData)
          {
            f.delete();
            msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
            message = getMessage(msgID, passNumber++);
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
        }
        try
        {
          importedCount = 0;
          int     passNumber = 1;
          boolean moreData   = true;
          while (moreData)
          else
          {
            moreData = processLDIF();
            if (moreData)
            {
              msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
              message = getMessage(msgID, passNumber++);
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
            else
            {
              msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
              message = getMessage(msgID);
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
            long mergeStartTime = System.currentTimeMillis();
            merge();
            long mergeEndTime = System.currentTimeMillis();
            if (moreData)
            {
              msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
            else
            {
              msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
            msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
            message = getMessage(msgID);
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
        }
        finally
        {
          tempDir.delete();
          long mergeStartTime = System.currentTimeMillis();
          merge();
          long mergeEndTime = System.currentTimeMillis();
          if (moreData)
          {
            msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
          else
          {
            msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
        }
      }
      finally
      {
        rootContainer.close();
        // Sync the environment to disk.
        msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
        message = getMessage(msgID);
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
        tempDir.delete();
      }
    }
    finally
@@ -351,8 +311,6 @@
   */
  public void merge()
  {
    Map<AttributeType,IndexConfig>
         indexConfigs = config.getIndexConfigMap();
    ArrayList<IndexMergeThread> mergers = new ArrayList<IndexMergeThread>();
    // Create merge threads for each base DN.
@@ -362,73 +320,77 @@
      EntryContainer entryContainer = importContext.getEntryContainer();
      // For each configured attribute index.
      for (IndexConfig indexConfig : indexConfigs.values())
      for (AttributeIndex attrIndex : entryContainer.getAttributeIndexes())
      {
        AttributeIndex attrIndex =
             entryContainer.getAttributeIndex(indexConfig.getAttributeType());
        if (indexConfig.isEqualityIndex())
        int indexEntryLimit = config.getBackendIndexEntryLimit();
        if(attrIndex.getConfiguration().getIndexEntryLimit() != null)
        {
          indexEntryLimit = attrIndex.getConfiguration().getIndexEntryLimit();
        }
        if (attrIndex.equalityIndex != null)
        {
          Index index = attrIndex.equalityIndex;
          String name = containerName + "_" + index.toString();
          IndexMergeThread indexMergeThread =
               new IndexMergeThread(name, config, ldifImportConfig, index,
                                    indexConfig.getEqualityEntryLimit());
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (indexConfig.isPresenceIndex())
        if (attrIndex.presenceIndex != null)
        {
          Index index = attrIndex.presenceIndex;
          String name = containerName + "_" + index.toString();
          IndexMergeThread indexMergeThread =
               new IndexMergeThread(name, config, ldifImportConfig, index,
                                    indexConfig.getPresenceEntryLimit());
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (indexConfig.isSubstringIndex())
        if (attrIndex.substringIndex != null)
        {
          Index index = attrIndex.substringIndex;
          String name = containerName + "_" + index.toString();
          IndexMergeThread indexMergeThread =
               new IndexMergeThread(name, config, ldifImportConfig, index,
                                    indexConfig.getSubstringEntryLimit());
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (indexConfig.isOrderingIndex())
        if (attrIndex.orderingIndex != null)
        {
          Index index = attrIndex.orderingIndex;
          String name = containerName + "_" + index.toString();
          IndexMergeThread indexMergeThread =
               new IndexMergeThread(name, config, ldifImportConfig, index,
                                    indexConfig.getEqualityEntryLimit());
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
        if (indexConfig.isApproximateIndex())
        if (attrIndex.approximateIndex != null)
        {
          Index index = attrIndex.approximateIndex;
          String name = containerName + "_" + index.toString();
          IndexMergeThread indexMergeThread =
              new IndexMergeThread(name, config, ldifImportConfig, index,
                                   indexConfig.getEqualityEntryLimit());
              new IndexMergeThread(config,
                                   ldifImportConfig, index,
                                   indexEntryLimit);
          mergers.add(indexMergeThread);
        }
      }
      // Id2Children index.
      Index id2Children = entryContainer.getID2Children();
      String name = containerName + "_" + id2Children.toString();
      IndexMergeThread indexMergeThread =
           new IndexMergeThread(name, config, ldifImportConfig,
                                id2Children,
                                config.getBackendIndexEntryLimit());
          new IndexMergeThread(config,
                               ldifImportConfig,
                               id2Children,
                               config.getBackendIndexEntryLimit());
      mergers.add(indexMergeThread);
      // Id2Subtree index.
      Index id2Subtree = entryContainer.getID2Subtree();
      name = containerName + "_" + id2Subtree.toString();
      indexMergeThread =
           new IndexMergeThread(name, config, ldifImportConfig,
                           id2Subtree,
                           config.getBackendIndexEntryLimit());
          new IndexMergeThread(config,
                               ldifImportConfig,
                               id2Subtree,
                               config.getBackendIndexEntryLimit());
      mergers.add(indexMergeThread);
    }
@@ -472,15 +434,12 @@
   *                       reading, or while reading from the LDIF file.
   */
  private boolean processLDIF()
          throws JebException, DatabaseException, IOException
      throws JebException, DatabaseException, IOException
  {
    boolean moreData = false;
    ArrayList<ImportThread> threads;
    // Create one set of worker threads for each base DN.
    int importThreadCount = config.getImportThreadCount();
    threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
    int importThreadCount = config.getBackendImportThreadCount();
    for (ImportContext ic : importMap.values())
    {
      for (int i = 0; i < importThreadCount; i++)
@@ -498,7 +457,7 @@
      // Create a counter to use to determine whether we've hit the import
      // pass size.
      int entriesProcessed = 0;
      int importPassSize   = config.getImportPassSize();
      int importPassSize   = config.getBackendImportPassSize();
      if (importPassSize <= 0)
      {
        importPassSize = Integer.MAX_VALUE;
@@ -514,6 +473,13 @@
      {
        do
        {
          if(threads.size() <= 0)
          {
            int msgID = MSGID_JEB_IMPORT_NO_WORKER_THREADS;
            String msg = getMessage(msgID);
            throw new JebException(msgID, msg);
          }
          try
          {
            // Read the next entry.
@@ -553,17 +519,20 @@
          }
        } while (true);
        // Wait for the queues to be drained.
        for (ImportContext ic : importMap.values())
        if(threads.size() > 0)
        {
          while (ic.getQueue().size() > 0)
          // Wait for the queues to be drained.
          for (ImportContext ic : importMap.values())
          {
            try
            while (ic.getQueue().size() > 0)
            {
              Thread.sleep(100);
            } catch (Exception e)
            {
              // No action needed.
              try
              {
                Thread.sleep(100);
              } catch (Exception e)
              {
                // No action needed.
              }
            }
          }
        }
@@ -612,7 +581,7 @@
   * @throws JebException If an error occurs in the JE backend.
   */
  public void processEntry(ImportContext importContext, Entry entry)
       throws JebException, DatabaseException
      throws JebException, DatabaseException
  {
    DN entryDN = entry.getDN();
    LDIFImportConfig ldifImportConfig = importContext.getLDIFImportConfig();
@@ -634,7 +603,7 @@
      {
        // See if we are allowed to replace the entry that exists.
        if (ldifImportConfig.appendToExistingData() &&
             ldifImportConfig.replaceExistingEntries())
            ldifImportConfig.replaceExistingEntries())
        {
          // Read the existing entry contents.
          Entry oldEntry = id2entry.get(txn, entryID);
@@ -672,7 +641,7 @@
        // Make sure the parent entry exists, unless this entry is a base DN.
        EntryID parentID = null;
        DN parentDN = importContext.getEntryContainer().
             getParentWithinBase(entryDN);
            getParentWithinBase(entryDN);
        if (parentDN != null)
        {
          parentID = dn2id.get(txn, parentDN);
@@ -725,7 +694,15 @@
        // Put the entry on the queue.
        try
        {
          importContext.getQueue().put(entry);
          while(!importContext.getQueue().offer(entry, 1000,
                                                TimeUnit.MILLISECONDS))
          {
            if(threads.size() <= 0)
            {
              // All worker threads died. We must stop now.
              return;
            }
          }
        }
        catch (InterruptedException e)
        {
@@ -775,7 +752,12 @@
   */
  public void uncaughtException(Thread t, Throwable e)
  {
    e.printStackTrace();
    threads.remove(t);
    int msgID = MSGID_JEB_IMPORT_THREAD_EXCEPTION;
    String msg = getMessage(msgID, t.getName(),
                            StaticUtils.stackTraceToSingleLineString(e));
    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR, msg,
             msgID);
  }
  /**
@@ -883,7 +865,7 @@
        EnvironmentStats envStats =
            rootContainer.getEnvironmentStats(new StatsConfig());
        long nCacheMiss =
             envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
            envStats.getNCacheMiss() - prevEnvStats.getNCacheMiss();
        float cacheMissRate = 0;
        if (deltaCount > 0)