From f87c24caf25515d43b05ebe6baad655c7abaa0b7 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Mon, 22 Dec 2014 16:21:34 +0000
Subject: [PATCH] OPENDJ-1707 Persistit: various import problems
---
opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java | 96 +++++++++++++++++++++++++++++++++++++----------
1 files changed, 75 insertions(+), 21 deletions(-)
diff --git a/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java b/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
index 05985cc..918bde3 100644
--- a/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
+++ b/opendj3-server-dev/src/server/org/opends/server/backends/pluggable/RootContainer.java
@@ -36,6 +36,8 @@
import java.io.File;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.forgerock.i18n.LocalizableMessage;
@@ -73,17 +75,52 @@
public class RootContainer
implements ConfigurationChangeListener<PersistitBackendCfg>
{
+ private static final class ImportProgress implements Runnable
+ {
+ private final LDIFReader reader;
+ private long previousCount;
+ private long previousTime;
+
+ public ImportProgress(LDIFReader reader)
+ {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run()
+ {
+ long latestCount = reader.getEntriesRead() + 0;
+ long deltaCount = latestCount - previousCount;
+ long latestTime = System.currentTimeMillis();
+ long deltaTime = latestTime - previousTime;
+ if (deltaTime == 0)
+ {
+ return;
+ }
+ long entriesRead = reader.getEntriesRead();
+ long entriesIgnored = reader.getEntriesIgnored();
+ long entriesRejected = reader.getEntriesRejected();
+ float rate = 1000f * deltaCount / deltaTime;
+ logger.info(NOTE_JEB_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
+
+ previousCount = latestCount;
+ previousTime = latestTime;
+ }
+ }
+
private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
+ private static final int IMPORT_PROGRESS_INTERVAL = 10000;
+ private static final int KB = 1024;
+
/** The JE database environment. */
private PersistItStorage storage; // FIXME JNR do not hardcode here
- /** The backend configuration. */
- private PersistitBackendCfg config;
-
+ private final File backendDirectory;
/** The backend to which this entry root container belongs. */
private final Backend<?> backend;
-
+ /** The backend configuration. */
+ private PersistitBackendCfg config;
/** The database environment monitor for this JE environment. */
private DatabaseEnvironmentMonitor monitor;
@@ -96,9 +133,6 @@
/** The compressed schema manager for this backend. */
private JECompressedSchema compressedSchema;
- private File backendDirectory;
-
-
/**
* Creates a new RootContainer object. Each root container represents a JE
@@ -136,6 +170,8 @@
try
{
open();
+
+ ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
try
{
final LDIFReader reader;
@@ -145,12 +181,13 @@
}
catch (Exception e)
{
- LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(
- stackTraceToSingleLineString(e));
- throw new DirectoryException(DirectoryServer.getServerErrorResultCode(),
- m, e);
+ LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
+ throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
}
+ long importCount = 0;
+ final long startTime = System.currentTimeMillis();
+ timerService.scheduleAtFixedRate(new ImportProgress(reader), IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
while (true)
{
final Entry entry;
@@ -166,10 +203,8 @@
{
if (!le.canContinueReading())
{
- LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF
- .get(stackTraceToSingleLineString(le));
- throw new DirectoryException(
- DirectoryServer.getServerErrorResultCode(), m, le);
+ LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
+ throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
}
continue;
}
@@ -187,6 +222,7 @@
try
{
ec.addEntry(entry, null);
+ importCount++;
}
catch (DirectoryException e)
{
@@ -213,12 +249,26 @@
}
}
}
- return new LDIFImportResult(reader.getEntriesRead(),
- reader.getEntriesRejected(), reader.getEntriesIgnored());
+ final long finishTime = System.currentTimeMillis();
+
+ waitForShutdown(timerService);
+
+ final long importTime = finishTime - startTime;
+ float rate = 0;
+ if (importTime > 0)
+ {
+ rate = 1000f * reader.getEntriesRead() / importTime;
+ }
+ logger.info(NOTE_JEB_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount,
+ reader.getEntriesIgnored(), reader.getEntriesRejected(), 0, importTime / 1000, rate);
+ return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
}
finally
{
close();
+
+ // if not already stopped, then stop it
+ waitForShutdown(timerService);
}
}
catch (DirectoryException e)
@@ -229,17 +279,21 @@
catch (OpenDsException e)
{
logger.traceException(e);
- throw new DirectoryException(getServerErrorResultCode(),
- e.getMessageObject());
+ throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
}
catch (Exception e)
{
logger.traceException(e);
- throw new DirectoryException(getServerErrorResultCode(),
- LocalizableMessage.raw(e.getMessage()));
+ throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
}
}
+ private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
+ {
+ timerService.shutdown();
+ timerService.awaitTermination(20, TimeUnit.SECONDS);
+ }
+
private void removeFiles() throws StorageRuntimeException
{
if (!backendDirectory.isDirectory())
--
Gitblit v1.10.0