From f87c24caf25515d43b05ebe6baad655c7abaa0b7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Dec 2014 16:21:34 +0000
Subject: [PATCH] OPENDJ-1707 Persistit: various import problems

---
 opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java |   96 +++++++++++++++++++++++++++++++++++++----------
 1 files changed, 75 insertions(+), 21 deletions(-)

diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java b/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
index 05985cc..918bde3 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
@@ -36,6 +36,8 @@
 import java.io.File;
 import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.forgerock.i18n.LocalizableMessage;
@@ -73,17 +75,52 @@
 public class RootContainer
      implements ConfigurationChangeListener<PersistitBackendCfg>
 {
+  private static final class ImportProgress implements Runnable
+  {
+    private final LDIFReader reader;
+    private long previousCount;
+    private long previousTime;
+
+    public ImportProgress(LDIFReader reader)
+    {
+      this.reader = reader;
+    }
+
+    @Override
+    public void run()
+    {
+      long latestCount = reader.getEntriesRead() + 0;
+      long deltaCount = latestCount - previousCount;
+      long latestTime = System.currentTimeMillis();
+      long deltaTime = latestTime - previousTime;
+      if (deltaTime == 0)
+      {
+        return;
+      }
+      long entriesRead = reader.getEntriesRead();
+      long entriesIgnored = reader.getEntriesIgnored();
+      long entriesRejected = reader.getEntriesRejected();
+      float rate = 1000f * deltaCount / deltaTime;
+      logger.info(NOTE_JEB_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
+
+      previousCount = latestCount;
+      previousTime = latestTime;
+    }
+  }
+
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
+  private static final int IMPORT_PROGRESS_INTERVAL = 10000;
+  private static final int KB = 1024;
+
   /** The JE database environment. */
   private PersistItStorage storage; // FIXME JNR do not hardcode here
 
-  /** The backend configuration. */
-  private PersistitBackendCfg config;
-
+  private final File backendDirectory;
   /** The backend to which this entry root container belongs. */
   private final Backend<?> backend;
-
+  /** The backend configuration. */
+  private PersistitBackendCfg config;
   /** The database environment monitor for this JE environment. */
   private DatabaseEnvironmentMonitor monitor;
 
@@ -96,9 +133,6 @@
   /** The compressed schema manager for this backend. */
   private JECompressedSchema compressedSchema;
 
-  private File backendDirectory;
-
-
 
   /**
    * Creates a new RootContainer object. Each root container represents a JE
@@ -136,6 +170,8 @@
     try
     {
       open();
+
+      ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
       try
       {
         final LDIFReader reader;
@@ -145,12 +181,13 @@
         }
         catch (Exception e)
         {
-          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(
-                           stackTraceToSingleLineString(e));
-          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
-                                       m, e);
+          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
+          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
         }
 
+        long importCount = 0;
+        final long startTime = System.currentTimeMillis();
+        timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
         while (true)
         {
           final Entry entry;
@@ -166,10 +203,8 @@
           {
             if (!le.canContinueReading())
             {
-              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF
-                  .get(stackTraceToSingleLineString(le));
-              throw new DirectoryException(
-                  DirectoryServer.getServerErrorResultCode(), m, le);
+              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
+              throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
             }
             continue;
           }
@@ -187,6 +222,7 @@
           try
           {
             ec.addEntry(entry, null);
+            importCount++;
           }
           catch (DirectoryException e)
           {
@@ -213,12 +249,26 @@
             }
           }
         }
-        return new LDIFImportResult(reader.getEntriesRead(),
-            reader.getEntriesRejected(), reader.getEntriesIgnored());
+        final long finishTime = System.currentTimeMillis();
+
+        waitForShutdown(timerService);
+
+        final long importTime = finishTime - startTime;
+        float rate = 0;
+        if (importTime > 0)
+        {
+          rate = 1000f * reader.getEntriesRead() / importTime;
+        }
+        logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount,
+            reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate);
+        return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
       }
       finally
       {
         close();
+
+        // if not already stopped, then stop it
+        waitForShutdown(timerService);
       }
     }
     catch (DirectoryException e)
@@ -229,17 +279,21 @@
     catch (OpenDsException e)
     {
       logger.traceException(e);
-      throw new DirectoryException(getServerErrorResultCode(),
-          e.getMessageObject());
+      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
     }
     catch (Exception e)
     {
       logger.traceException(e);
-      throw new DirectoryException(getServerErrorResultCode(),
-          LocalizableMessage.raw(e.getMessage()));
+      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
     }
   }
 
+  private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
+  {
+    timerService.shutdown();
+    timerService.awaitTermination(20, TimeUnit.SECONDS);
+  }
+
   private void removeFiles() throws StorageRuntimeException
   {
     if (!backendDirectory.isDirectory())

--
Gitblit v1.10.0