From aaec0227c12c81b76899eb20ff99c947c7715df0 Mon Sep 17 00:00:00 2001
From: Jean-Noel Rouvignac <jean-noel.rouvignac@forgerock.com>
Date: Tue, 12 May 2015 15:41:46 +0000
Subject: [PATCH] Partial OPENDJ-2016 Implement new on disk merge import strategy based on storage engine

---
 opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java |  226 +++-----------------------------------------------------
 1 files changed, 14 insertions(+), 212 deletions(-)

diff --git a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
index 91907da..a53ef67 100644
--- a/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
+++ b/opendj-server-legacy/src/main/java/org/opends/server/backends/pluggable/RootContainer.java
@@ -27,8 +27,6 @@
 package org.opends.server.backends.pluggable;
 
 import static org.opends.messages.BackendMessages.*;
-import static org.opends.messages.UtilityMessages.*;
-import static org.opends.server.core.DirectoryServer.*;
 import static org.opends.server.util.StaticUtils.*;
 
 import java.util.ArrayList;
@@ -37,8 +35,6 @@
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.forgerock.i18n.LocalizableMessage;
@@ -56,20 +52,15 @@
 import org.opends.server.backends.pluggable.spi.StorageStatus;
 import org.opends.server.backends.pluggable.spi.WriteOperation;
 import org.opends.server.backends.pluggable.spi.WriteableTransaction;
-import org.opends.server.core.DirectoryServer;
 import org.opends.server.core.SearchOperation;
 import org.opends.server.core.ServerContext;
 import org.opends.server.types.DN;
 import org.opends.server.types.DirectoryException;
-import org.opends.server.types.Entry;
 import org.opends.server.types.InitializationException;
 import org.opends.server.types.LDIFImportConfig;
 import org.opends.server.types.LDIFImportResult;
-import org.opends.server.types.OpenDsException;
 import org.opends.server.types.Operation;
 import org.opends.server.types.Privilege;
-import org.opends.server.util.LDIFException;
-import org.opends.server.util.LDIFReader;
 
 /**
  * Wrapper class for a backend "container". Root container holds all the entry
@@ -78,44 +69,8 @@
  */
 public class RootContainer implements ConfigurationChangeListener<PluggableBackendCfg>
 {
-  /** Logs the progress of the import. */
-  private static final class ImportProgress implements Runnable
-  {
-    private final LDIFReader reader;
-    private long previousCount;
-    private long previousTime;
-
-    public ImportProgress(LDIFReader reader)
-    {
-      this.reader = reader;
-    }
-
-    @Override
-    public void run()
-    {
-      long latestCount = reader.getEntriesRead() + 0;
-      long deltaCount = latestCount - previousCount;
-      long latestTime = System.currentTimeMillis();
-      long deltaTime = latestTime - previousTime;
-      if (deltaTime == 0)
-      {
-        return;
-      }
-      long entriesRead = reader.getEntriesRead();
-      long entriesIgnored = reader.getEntriesIgnored();
-      long entriesRejected = reader.getEntriesRejected();
-      float rate = 1000f * deltaCount / deltaTime;
-      logger.info(NOTE_IMPORT_PROGRESS_REPORT, entriesRead, entriesIgnored, entriesRejected, rate);
-
-      previousCount = latestCount;
-      previousTime = latestTime;
-    }
-  }
-
   private static final LocalizedLogger logger = LocalizedLogger.getLoggerForThisClass();
 
-  private static final int IMPORT_PROGRESS_INTERVAL = 10000;
-
   /** The tree storage. */
   private Storage storage;
 
@@ -129,8 +84,8 @@
   /** The base DNs contained in this root container. */
   private final ConcurrentHashMap<DN, EntryContainer> entryContainers = new ConcurrentHashMap<DN, EntryContainer>();
 
-  /** The cached value of the next entry identifier to be assigned. */
-  private AtomicLong nextid = new AtomicLong(1);
+  /** Value of the next entryID to be assigned. */
+  private AtomicLong nextEntryID = new AtomicLong(1);
 
   /** The compressed schema manager for this backend. */
   private PersistentCompressedSchema compressedSchema;
@@ -166,161 +121,14 @@
   }
 
   LDIFImportResult importLDIF(LDIFImportConfig importConfig, ServerContext serverContext) throws DirectoryException
-  {//TODO JNR may call importLDIFWithSuccessiveAdds(importConfig) depending on configured import strategy
-    return importLDIFWithOnDiskMerge(importConfig, serverContext);
+  {
+    return getImportStrategy().importLDIF(importConfig, this, serverContext);
   }
 
-  private LDIFImportResult importLDIFWithSuccessiveAdds(LDIFImportConfig importConfig) throws DirectoryException
+  private ImportStrategy getImportStrategy() throws DirectoryException
   {
-    try
-    {
-      ScheduledThreadPoolExecutor timerService = new ScheduledThreadPoolExecutor(1);
-      try
-      {
-        final LDIFReader reader;
-        try
-        {
-          reader = new LDIFReader(importConfig);
-        }
-        catch (Exception e)
-        {
-          LocalizableMessage m = ERR_LDIF_BACKEND_CANNOT_CREATE_LDIF_READER.get(stackTraceToSingleLineString(e));
-          throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, e);
-        }
-
-        long importCount = 0;
-        final long startTime = System.currentTimeMillis();
-        timerService.scheduleAtFixedRate(new ImportProgress(reader),
-            IMPORT_PROGRESS_INTERVAL, IMPORT_PROGRESS_INTERVAL, TimeUnit.MILLISECONDS);
-        while (true)
-        {
-          final Entry entry;
-          try
-          {
-            entry = reader.readEntry();
-            if (entry == null)
-            {
-              break;
-            }
-          }
-          catch (LDIFException le)
-          {
-            if (!le.canContinueReading())
-            {
-              LocalizableMessage m = ERR_LDIF_BACKEND_ERROR_READING_LDIF.get(stackTraceToSingleLineString(le));
-              throw new DirectoryException(DirectoryServer.getServerErrorResultCode(), m, le);
-            }
-            continue;
-          }
-
-          final DN dn = entry.getName();
-          final EntryContainer ec = getEntryContainer(dn);
-          if (ec == null)
-          {
-            final LocalizableMessage m = ERR_LDIF_SKIP.get(dn);
-            logger.error(m);
-            reader.rejectLastEntry(m);
-            continue;
-          }
-
-          try
-          {
-            ec.addEntry(entry, null);
-            importCount++;
-          }
-          catch (DirectoryException e)
-          {
-            switch (e.getResultCode().asEnum())
-            {
-            case ENTRY_ALREADY_EXISTS:
-              if (importConfig.replaceExistingEntries())
-              {
-                final Entry oldEntry = ec.getEntry(entry.getName());
-                ec.replaceEntry(oldEntry, entry, null);
-              }
-              else
-              {
-                reader.rejectLastEntry(WARN_IMPORT_ENTRY_EXISTS.get());
-              }
-              break;
-            case NO_SUCH_OBJECT:
-              reader.rejectLastEntry(ERR_IMPORT_PARENT_NOT_FOUND.get(dn.parent()));
-              break;
-            default:
-              // Not sure why it failed.
-              reader.rejectLastEntry(e.getMessageObject());
-              break;
-            }
-          }
-        }
-        final long finishTime = System.currentTimeMillis();
-
-        waitForShutdown(timerService);
-
-        final long importTime = finishTime - startTime;
-        float rate = 0;
-        if (importTime > 0)
-        {
-          rate = 1000f * reader.getEntriesRead() / importTime;
-        }
-        logger.info(NOTE_IMPORT_FINAL_STATUS, reader.getEntriesRead(), importCount, reader.getEntriesIgnored(),
-            reader.getEntriesRejected(), 0, importTime / 1000, rate);
-        return new LDIFImportResult(reader.getEntriesRead(), reader.getEntriesRejected(), reader.getEntriesIgnored());
-      }
-      finally
-      {
-        close();
-
-        // if not already stopped, then stop it
-        waitForShutdown(timerService);
-      }
-    }
-    catch (DirectoryException e)
-    {
-      logger.traceException(e);
-      throw e;
-    }
-    catch (OpenDsException e)
-    {
-      logger.traceException(e);
-      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject());
-    }
-    catch (Exception e)
-    {
-      logger.traceException(e);
-      throw new DirectoryException(getServerErrorResultCode(), LocalizableMessage.raw(e.getMessage()));
-    }
-  }
-
-  private void waitForShutdown(ScheduledThreadPoolExecutor timerService) throws InterruptedException
-  {
-    timerService.shutdown();
-    timerService.awaitTermination(20, TimeUnit.SECONDS);
-  }
-
-  private LDIFImportResult importLDIFWithOnDiskMerge(final LDIFImportConfig importConfig, ServerContext serverContext)
-      throws DirectoryException
-  {
-    try
-    {
-      return new Importer(this, importConfig, config, serverContext).processImport();
-    }
-    catch (DirectoryException e)
-    {
-      logger.traceException(e);
-      throw e;
-    }
-    catch (OpenDsException e)
-    {
-      logger.traceException(e);
-      throw new DirectoryException(getServerErrorResultCode(), e.getMessageObject(), e);
-    }
-    catch (Exception e)
-    {
-      logger.traceException(e);
-      throw new DirectoryException(getServerErrorResultCode(),
-          LocalizableMessage.raw(stackTraceToSingleLineString(e)), e);
-    }
+    //TODO JNR may call new SuccessiveAddsImportStrategy() depending on configured import strategy
+    return new Importer.StrategyImpl(config);
   }
 
   /**
@@ -391,15 +199,11 @@
    */
   void registerEntryContainer(DN baseDN, EntryContainer entryContainer) throws InitializationException
   {
-    EntryContainer ec1 = this.entryContainers.get(baseDN);
-
-    // If an entry container for this baseDN is already open we don't allow
-    // another to be opened.
-    if (ec1 != null)
+    EntryContainer ec = this.entryContainers.get(baseDN);
+    if (ec != null)
     {
-      throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec1.getTreePrefix(), baseDN));
+      throw new InitializationException(ERR_ENTRY_CONTAINER_ALREADY_REGISTERED.get(ec.getTreePrefix(), baseDN));
     }
-
     this.entryContainers.put(baseDN, entryContainer);
   }
 
@@ -432,7 +236,7 @@
       }
     }
 
-    nextid = new AtomicLong(highestID.longValue() + 1);
+    nextEntryID = new AtomicLong(highestID.longValue() + 1);
   }
 
   /**
@@ -470,7 +274,6 @@
       String monitorName = backend.getBackendID() + " Storage";
       monitor = new BackendMonitor(monitorName, this);
     }
-
     return monitor;
   }
 
@@ -503,8 +306,7 @@
       // Sort the list in order of priority.
       Collections.sort(trees, new TreePreloadComparator());
 
-      // Preload each tree until we reach the time limit or the cache
-      // is filled.
+      // Preload each tree until we reach the time limit or the cache is filled.
       try
       {
         throw new UnsupportedOperationException("Not implemented exception");
@@ -638,7 +440,7 @@
    */
   EntryID getNextEntryID()
   {
-    return new EntryID(nextid.getAndIncrement());
+    return new EntryID(nextEntryID.getAndIncrement());
   }
 
   /**
@@ -647,7 +449,7 @@
    */
   public void resetNextEntryID()
   {
-    nextid.set(1);
+    nextEntryID.set(1);
   }
 
   /** {@inheritDoc} */

--
Gitblit v1.10.0