mirror of https://github.com/OpenIdentityPlatform/OpenDJ.git

Jean-Noel Rouvignac
22.21.2014 f87c24caf25515d43b05ebe6baad655c7abaa0b7
opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
@@ -36,6 +36,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
@@ -73,17 +75,52 @@
public class RootContainer
     implements ConfigurationChangeListener<PersistitBackendCfg>
{
  private static final class ImportProgress implements Runnable
  {
    private final LDIFReader reader;
    private long previousCount;
    private long previousTime;
    public ImportProgress(LDIFReader reader)
    {
      this.reader = reader;
    }
    @Override
    public void run()
    {
      long latestCount = reader.getEntriesRead() + 0;
      long deltaCount = latestCount - previousCount;
      long latestTime = System.currentTimeMillis();
      long deltaTime = latestTime - previousTime;
      if (deltaTime == 0)
      {
        return;
      }
      long entriesRead = reader.getEntriesRead();
      long entriesIgnored = reader.getEntriesIgnored();
      long entriesRejected = reader.getEntriesRejected();
      float rate = 1000f * deltaCount / deltaTime;
      logger.info(NOTE_JEB_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
      previousCount = latestCount;
      previousTime = latestTime;
    }
  }
  private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
  private static final int IMPORT_PROGRESS_INTERVAL = 10000;
  private static final int KB = 1024;
  /** The JE database environment. */
  private PersistItStorage storage; // FIXME JNR do not hardcode here
  /** The backend configuration. */
  private PersistitBackendCfg config;
  private final File backendDirectory;
  /** The backend to which this entry root container belongs. */
  private final Backend<?> backend;
  /** The backend configuration. */
  private PersistitBackendCfg config;
  /** The database environment monitor for this JE environment. */
  private DatabaseEnvironmentMonitor monitor;
@@ -96,9 +133,6 @@
  /** The compressed schema manager for this backend. */
  private JECompressedSchema compressedSchema;
  private File backendDirectory;
  /**
   * Creates a new RootContainer object. Each root container represents a JE
@@ -136,6 +170,8 @@
    try
    {
      open();
      ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
      try
      {
        final LDIFReader reader;
@@ -145,12 +181,13 @@
        }
        catch (Exception e)
        {
          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(
                           stackTraceToSingleLineString(e));
          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
                                       m, e);
          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
        }
        long importCount = 0;
        final long startTime = System.currentTimeMillis();
        timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
        while (true)
        {
          final Entry entry;
@@ -166,10 +203,8 @@
          {
            if (!le.canContinueReading())
            {
              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF
                  .get(stackTraceToSingleLineString(le));
              throw new DirectoryException(
                  DirectoryServer.getServerErrorResultCode(), m, le);
              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
              throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
            }
            continue;
          }
@@ -187,6 +222,7 @@
          try
          {
            ec.addEntry(entry, null);
            importCount++;
          }
          catch (DirectoryException e)
          {
@@ -213,12 +249,26 @@
            }
          }
        }
        return new LDIFImportResult(reader.getEntriesRead(),
            reader.getEntriesRejected(), reader.getEntriesIgnored());
        final long finishTime = System.currentTimeMillis();
        waitForShutdown(timerService);
        final long importTime = finishTime - startTime;
        float rate = 0;
        if (importTime > 0)
        {
          rate = 1000f * reader.getEntriesRead() / importTime;
        }
        logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount,
            reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate);
        return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
      }
      finally
      {
        close();
        // if not already stopped, then stop it
        waitForShutdown(timerService);
      }
    }
    catch (DirectoryException e)
@@ -229,17 +279,21 @@
    catch (OpenDsException e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(),
          e.getMessageObject());
      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
    }
    catch (Exception e)
    {
      logger.traceException(e);
      throw new DirectoryException(getServerErrorResultCode(),
          LocalizableMessage.raw(e.getMessage()));
      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
    }
  }
  private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
  {
    timerService.shutdown();
    timerService.awaitTermination(20, TimeUnit.SECONDS);
  }
  private void removeFiles() throws StorageRuntimeException
  {
    if (!backendDirectory.isDirectory())