mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

coulbeck
01.20.2007 31153ccec78ed6187b9543babccdaf996968d6bb
Fix for issue #1262: database becomes empty when importing a not existing ldif file.
3 files modified
499 ■■■■ changed files
opends/src/server/org/opends/server/backends/jeb/ImportJob.java 470 ●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/messages/ToolMessages.java 13 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/tools/ImportLDIF.java 16 ●●●●● patch | view | raw | blame | history
opends/src/server/org/opends/server/backends/jeb/ImportJob.java
@@ -150,160 +150,175 @@
    envConfig.setConfigParam("je.log.totalBufferBytes", "30000000");
    envConfig.setConfigParam("je.log.fileMax", "100000000");
*/
    rootContainer = new RootContainer(config, backend);
    if (ldifImportConfig.appendToExistingData())
    // Create an LDIF reader. Throws an exception if the file does not exist.
    reader = new LDIFReader(ldifImportConfig);
    int msgID;
    String message;
    long startTime;
    try
    {
      rootContainer.open(config.getBackendDirectory(),
                         config.getBackendPermission(),
                         false, true, true, true, true, false);
    }
    else
    {
      rootContainer.open(config.getBackendDirectory(),
                         config.getBackendPermission(),
                         false, true, false, false, false, false);
    }
      rootContainer = new RootContainer(config, backend);
      if (ldifImportConfig.appendToExistingData())
      {
        rootContainer.open(config.getBackendDirectory(),
                           config.getBackendPermission(),
                           false, true, true, true, true, false);
      }
      else
      {
        rootContainer.open(config.getBackendDirectory(),
                           config.getBackendPermission(),
                           false, true, false, false, false, false);
      }
    if (!ldifImportConfig.appendToExistingData())
    {
      // We have the writer lock on the environment, now delete the
      // environment and re-open it. Only do this when we are
      // importing to all the base DNs in the backend.
      rootContainer.close();
      EnvManager.removeFiles(config.getBackendDirectory().getPath());
      rootContainer.open(config.getBackendDirectory(),
                         config.getBackendPermission(),
                         false, true, false, false, false, false);
    }
      if (!ldifImportConfig.appendToExistingData())
      {
        // We have the writer lock on the environment, now delete the
        // environment and re-open it. Only do this when we are
        // importing to all the base DNs in the backend.
        rootContainer.close();
        EnvManager.removeFiles(config.getBackendDirectory().getPath());
        rootContainer.open(config.getBackendDirectory(),
                           config.getBackendPermission(),
                           false, true, false, false, false, false);
      }
    // Divide the total buffer size by the number of threads
    // and give that much to each thread.
    int importThreadCount = config.getImportThreadCount();
    long bufferSize = config.getImportBufferSize() /
         (importThreadCount*config.getBaseDNs().length);
      // Divide the total buffer size by the number of threads
      // and give that much to each thread.
      int importThreadCount = config.getImportThreadCount();
      long bufferSize = config.getImportBufferSize() /
           (importThreadCount*config.getBaseDNs().length);
    int msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
    String message = getMessage(msgID, importThreadCount);
    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
             message, msgID);
      msgID = MSGID_JEB_IMPORT_THREAD_COUNT;
      message = getMessage(msgID, importThreadCount);
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
               message, msgID);
    msgID = MSGID_JEB_IMPORT_BUFFER_SIZE;
    message = getMessage(msgID, bufferSize);
    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
             message, msgID);
      msgID = MSGID_JEB_IMPORT_BUFFER_SIZE;
      message = getMessage(msgID, bufferSize);
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
               message, msgID);
    msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG;
    message = getMessage(msgID,
                         rootContainer.getEnvironmentConfig().toString());
    logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
             message, msgID);
      msgID = MSGID_JEB_IMPORT_ENVIRONMENT_CONFIG;
      message = getMessage(msgID,
                           rootContainer.getEnvironmentConfig().toString());
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
               message, msgID);
    DebugLogger.debugInfo(
      DebugLogger.debugInfo(
        rootContainer.getEnvironmentConfig().toString());
    rootContainer.openEntryContainers(config.getBaseDNs());
      rootContainer.openEntryContainers(config.getBaseDNs());
    // Create the import contexts for each base DN.
    DN baseDN;
      // Create the import contexts for each base DN.
      DN baseDN;
    for (EntryContainer entryContainer : rootContainer.getEntryContainers())
    {
      baseDN = entryContainer.getBaseDN();
      // Create an import context.
      ImportContext importContext = new ImportContext();
      importContext.setBufferSize(bufferSize);
      importContext.setConfig(config);
      importContext.setLDIFImportConfig(this.ldifImportConfig);
      importContext.setBaseDN(baseDN);
      importContext.setContainerName(entryContainer.getContainerName());
      importContext.setEntryContainer(entryContainer);
      importContext.setBufferSize(bufferSize);
      // Create an entry queue.
      LinkedBlockingQueue<Entry> queue =
           new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
      importContext.setQueue(queue);
      importMap.put(baseDN, importContext);
    }
    // Make a note of the time we started.
    long startTime = System.currentTimeMillis();
    try
    {
      // Create a temporary work directory.
      File tempDir = new File(config.getImportTempDirectory());
      tempDir.mkdir();
      if (tempDir.listFiles() != null)
      for (EntryContainer entryContainer : rootContainer.getEntryContainers())
      {
        for (File f : tempDir.listFiles())
        {
          f.delete();
        }
        baseDN = entryContainer.getBaseDN();
        // Create an import context.
        ImportContext importContext = new ImportContext();
        importContext.setBufferSize(bufferSize);
        importContext.setConfig(config);
        importContext.setLDIFImportConfig(this.ldifImportConfig);
        importContext.setLDIFReader(reader);
        importContext.setBaseDN(baseDN);
        importContext.setContainerName(entryContainer.getContainerName());
        importContext.setEntryContainer(entryContainer);
        importContext.setBufferSize(bufferSize);
        // Create an entry queue.
        LinkedBlockingQueue<Entry> queue =
             new LinkedBlockingQueue<Entry>(config.getImportQueueSize());
        importContext.setQueue(queue);
        importMap.put(baseDN, importContext);
      }
      // Make a note of the time we started.
      startTime = System.currentTimeMillis();
      try
      {
        importedCount = 0;
        int     passNumber = 1;
        boolean moreData   = true;
        while (moreData)
        // Create a temporary work directory.
        File tempDir = new File(config.getImportTempDirectory());
        tempDir.mkdir();
        if (tempDir.listFiles() != null)
        {
          moreData = processLDIF();
          if (moreData)
          for (File f : tempDir.listFiles())
          {
            msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
            message = getMessage(msgID, passNumber++);
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
            f.delete();
          }
          else
        }
        try
        {
          importedCount = 0;
          int     passNumber = 1;
          boolean moreData   = true;
          while (moreData)
          {
            msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
            message = getMessage(msgID);
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
            moreData = processLDIF();
            if (moreData)
            {
              msgID = MSGID_JEB_IMPORT_BEGINNING_INTERMEDIATE_MERGE;
              message = getMessage(msgID, passNumber++);
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
            else
            {
              msgID = MSGID_JEB_IMPORT_BEGINNING_FINAL_MERGE;
              message = getMessage(msgID);
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
          long mergeStartTime = System.currentTimeMillis();
          merge();
          long mergeEndTime = System.currentTimeMillis();
            long mergeStartTime = System.currentTimeMillis();
            merge();
            long mergeEndTime = System.currentTimeMillis();
          if (moreData)
          {
            msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
            if (moreData)
            {
              msgID = MSGID_JEB_IMPORT_RESUMING_LDIF_PROCESSING;
              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
            else
            {
              msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
              message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
              logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                       message, msgID);
            }
          }
          else
          {
            msgID = MSGID_JEB_IMPORT_FINAL_MERGE_COMPLETED;
            message = getMessage(msgID, ((mergeEndTime-mergeStartTime)/1000));
            logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                     message, msgID);
          }
        }
        finally
        {
          tempDir.delete();
        }
      }
      finally
      {
        tempDir.delete();
        rootContainer.close();
        // Sync the environment to disk.
        msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
        message = getMessage(msgID);
        logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
                 message, msgID);
      }
    }
    finally
    {
      rootContainer.close();
      // Sync the environment to disk.
      msgID = MSGID_JEB_IMPORT_CLOSING_DATABASE;
      message = getMessage(msgID);
      logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.NOTICE,
               message, msgID);
      reader.close();
    }
    long finishTime = System.currentTimeMillis();
@@ -438,14 +453,14 @@
  }
  /**
   * Open a reader for the LDIF file and create a set of worker threads, one
   * set for each base DN. Read each entry from the LDIF and determine which
   * Create a set of worker threads, one set for each base DN.
   * Read each entry from the LDIF and determine which
   * base DN the entry belongs to. Write the dn2id database, then put the
   * entry on the appropriate queue for the worker threads to consume.
   * Record the entry count for each base DN when all entries have been
   * processed.
   *
   * @return true if is more data to be read from the LDIF file (the import
   * @return true if thre is more data to be read from the LDIF file (the import
   * pass size was reached), false if the entire LDIF file has been read.
   *
   * @throws JebException If an error occurs in the JE backend.
@@ -458,143 +473,124 @@
  {
    boolean moreData = false;
    // Create an LDIF reader if necessary.
    if (reader == null)
    ArrayList<ImportThread> threads;
    // Create one set of worker threads for each base DN.
    int importThreadCount = config.getImportThreadCount();
    threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
    for (ImportContext ic : importMap.values())
    {
      reader = new LDIFReader(ldifImportConfig);
      for (ImportContext ic : importMap.values())
      for (int i = 0; i < importThreadCount; i++)
      {
        ic.setLDIFReader(reader);
        ImportThread t = new ImportThread(ic, i);
        t.setUncaughtExceptionHandler(this);
        threads.add(t);
        t.start();
      }
    }
    ArrayList<ImportThread> threads;
    try
    {
      // Create one set of worker threads for each base DN.
      int importThreadCount = config.getImportThreadCount();
      threads = new ArrayList<ImportThread>(importThreadCount*importMap.size());
      for (ImportContext ic : importMap.values())
      // Create a counter to use to determine whether we've hit the import
      // pass size.
      int entriesProcessed = 0;
      int importPassSize   = config.getImportPassSize();
      if (importPassSize <= 0)
      {
        for (int i = 0; i < importThreadCount; i++)
        {
          ImportThread t = new ImportThread(ic, i);
          t.setUncaughtExceptionHandler(this);
          threads.add(t);
          t.start();
        }
        importPassSize = Integer.MAX_VALUE;
      }
      // Start a timer for the progress report.
      Timer timer = new Timer();
      TimerTask progressTask = new ImportJob.ProgressTask();
      timer.scheduleAtFixedRate(progressTask, progressInterval,
                                progressInterval);
      try
      {
        // Create a counter to use to determine whether we've hit the import
        // pass size.
        int entriesProcessed = 0;
        int importPassSize   = config.getImportPassSize();
        if (importPassSize <= 0)
        {
          importPassSize = Integer.MAX_VALUE;
        }
        // Start a timer for the progress report.
        Timer timer = new Timer();
        TimerTask progressTask = new ImportJob.ProgressTask();
        timer.scheduleAtFixedRate(progressTask, progressInterval,
                                  progressInterval);
        try
        {
          do
          {
            try
            {
              // Read the next entry.
              Entry entry = reader.readEntry();
              // Check for end of file.
              if (entry == null)
              {
                break;
              }
              // Route it according to base DN.
              ImportContext importContext = getImportConfig(entry.getDN());
              processEntry(importContext, entry);
              entriesProcessed++;
              if (entriesProcessed >= importPassSize)
              {
                moreData = true;
                break;
              }
            }
            catch (LDIFException e)
            {
              if (debugEnabled())
              {
                debugCaught(DebugLogLevel.ERROR, e);
              }
            }
            catch (DirectoryException e)
            {
              if (debugEnabled())
              {
                debugCaught(DebugLogLevel.ERROR, e);
              }
            }
          } while (true);
          // Wait for the queues to be drained.
          for (ImportContext ic : importMap.values())
          {
            while (ic.getQueue().size() > 0)
            {
              try
              {
                Thread.sleep(100);
              } catch (Exception e)
              {
                // No action needed.
              }
            }
          }
        }
        finally
        {
          timer.cancel();
        }
      }
      finally
      {
        // Order the threads to stop.
        for (ImportThread t : threads)
        {
          t.stopProcessing();
        }
        // Wait for each thread to stop.
        for (ImportThread t : threads)
        do
        {
          try
          {
            t.join();
            importedCount += t.getImportedCount();
            // Read the next entry.
            Entry entry = reader.readEntry();
            // Check for end of file.
            if (entry == null)
            {
              break;
            }
            // Route it according to base DN.
            ImportContext importContext = getImportConfig(entry.getDN());
            processEntry(importContext, entry);
            entriesProcessed++;
            if (entriesProcessed >= importPassSize)
            {
              moreData = true;
              break;
            }
          }
          catch (InterruptedException ie)
          catch (LDIFException e)
          {
            // No action needed?
            if (debugEnabled())
            {
              debugCaught(DebugLogLevel.ERROR, e);
            }
          }
          catch (DirectoryException e)
          {
            if (debugEnabled())
            {
              debugCaught(DebugLogLevel.ERROR, e);
            }
          }
        } while (true);
        // Wait for the queues to be drained.
        for (ImportContext ic : importMap.values())
        {
          while (ic.getQueue().size() > 0)
          {
            try
            {
              Thread.sleep(100);
            } catch (Exception e)
            {
              // No action needed.
            }
          }
        }
      }
      finally
      {
        timer.cancel();
      }
    }
    finally
    {
      if (! moreData)
      // Order the threads to stop.
      for (ImportThread t : threads)
      {
        reader.close();
        t.stopProcessing();
      }
      // Wait for each thread to stop.
      for (ImportThread t : threads)
      {
        try
        {
          t.join();
          importedCount += t.getImportedCount();
        }
        catch (InterruptedException ie)
        {
          // No action needed?
        }
      }
    }
opends/src/server/org/opends/server/messages/ToolMessages.java
@@ -7060,6 +7060,16 @@
  /**
   * The message ID for the message that will be used if one of the LDIF files
   * to be imported cannot be read.  This takes one argument, which is the
   * pathname of a file that cannot be read.
   */
  public static final int MSGID_LDIFIMPORT_CANNOT_READ_FILE =
       CATEGORY_MASK_TOOLS | SEVERITY_MASK_SEVERE_ERROR | 887;
  /**
   * Associates a set of generic messages with the message IDs defined in this
   * class.
   */
@@ -9281,6 +9291,9 @@
                    "shared lock for backend %s:  %s.  This lock should " +
                    "automatically be cleared when the rebuild process " +
                    "exits, so no further action should be required");
    registerMessage(MSGID_LDIFIMPORT_CANNOT_READ_FILE,
                    "The specified LDIF file %s cannot be read");
  }
}
opends/src/server/org/opends/server/tools/ImportLDIF.java
@@ -61,7 +61,7 @@
import static org.opends.server.config.ConfigConstants.*;
import static org.opends.server.loggers.Error.*;
import static org.opends.server.messages.MessageHandler.*;
import static org.opends.server.messages.MessageHandler.getMessage;
import static org.opends.server.messages.ToolMessages.*;
import static org.opends.server.util.ServerConstants.*;
import static org.opends.server.util.StaticUtils.*;
@@ -858,6 +858,20 @@
    if (ldifFiles.isPresent())
    {
      ArrayList<String> fileList = new ArrayList<String>(ldifFiles.getValues());
      int badFileCount = 0;
      for (String pathname : fileList)
      {
        File f = new File(pathname);
        if (!f.canRead())
        {
          int    msgID   = MSGID_LDIFIMPORT_CANNOT_READ_FILE;
          String message = getMessage(msgID, pathname);
          logError(ErrorLogCategory.BACKEND, ErrorLogSeverity.SEVERE_ERROR,
                   message, msgID);
          badFileCount++;
        }
      }
      if (badFileCount > 0) return 1;
      importConfig = new LDIFImportConfig(fileList);
    }
    else